Daily Reactive: Splitting a stream

This post is a part of the “Daily Reactive” series of short posts about common situations with Project Reactor and Reactive Programming.

Although it is not common, sometimes there is a need to split a Flux into a few. And one has to be careful to do it correctly!

Let’s say we have a stream of persons and we want to return a Tuple of names and ids:

Flux<Person> personFlux;
Flux<String> namesFlux = ...;
Flux<Integer> idsFlux = ...;
return Tuples.of(namesFlux, idsFlux);

The easiest would be to use .map on personFlux, right?

Flux<Person> personFlux;
Flux<String> namesFlux = personFlux.map(Person::getName);
Flux<Integer> idsFlux = personFlux.map(Person::getId);
return Tuples.of(namesFlux, idsFlux);

I hope you have the tests, and proper ones, because we just introduced a serious issue!

var counter = new AtomicInteger(0);
var flux = Flux.<Person>generate(sink -> {
	int id = counter.getAndIncrement();
	sink.next(new Person(id));
});

var personFlux = flux.take(3);

var namesFlux = personFlux.map(Person::getName);
var idsFlux = personFlux.map(Person::getId);
var tuple = Tuples.of(namesFlux, idsFlux);

// Print the names
tuple.getT1().subscribe(System.out::println);

assertThat(tuple.getT2().collectList().block()).containsExactly(0, 1, 2);

If we run this test, we will get:

java.lang.AssertionError: 
Expecting:
  <[3, 4, 5]>
to contain exactly (and in same order):
  <[0, 1, 2]>
but some elements were not found:
  <[0, 1, 2]>
and others were not expected:
  <[3, 4, 5]>

Wait… what? It looks like our Flux… was subscribed twice and generated twice as much!
Imagine doing a database query instead of this simple Flux.generate?

So, this is what happened:

  1. tuple.getT1().subscribe(System.out::println) will subscribe on personFlux.map(Person::getName)
  2. … which will subscribe on personFlux
  3. that, as per the Reactive Streams specification, will result in a fresh new subscription
  4. Then, tuple.getT2().collectList().block() will subscribe on personFlux.map(Person::getId)
  5. … which will subscribe on personFlux
  6. And, once again, As per the Reactive Streams specification, a new subscription will be returned.

At this stage, your reaction may look like this: ಠ_ಠ

It would be mine too, if not the exhaustive specification that dictates this behavior!

Who reads the docs I recommend you to read it, to avoid many common pitfalls like this one.

Stay connected!

How do we solve the problem, then? We can’t reuse the same instance, because it is not reusable.

But of course there is an operator for that! In fact, there is a set of operators:

The common thing they have in common is ConnectableFlux - a special type in Project Reactor that let subscribers pile up before they connect to their data source.

So, let’s try one of them, Flux#share():

var personFlux = flux.take(3).share();

Nope!

java.lang.AssertionError: 
Expecting:
  <[3, 4, 5]>
to contain exactly (and in same order):
  <[0, 1, 2]>
but some elements were not found:
  <[0, 1, 2]>
and others were not expected:
  <[3, 4, 5]>

Why so?
Well, Flux#share() is a shortcut to publish().refCount(), where refCount() defaults to 1, so it will subscribe as soon as one subscriber connects (in our case it is the T1 one)!

And, since our source is synchronous, tuple.getT1().subscribe(System.out::println); will subscribe and complete before the tuple.getT2().collectList().block() call.

So, unlike cache() (that will store every item in memory, probably not something we want), share() auto-disconnects!

It depends on the use case, but if we know for sure that the consumer of our Tuple will always subscribe on both results, we can hint Reactor about that:

var personFlux = flux.take(3).publish().refCount(2);

Now our test is green. We reuse the original Flux and subscribe to it only when both id and name are subscribed.

Conclusion

ConnectableFlux is a very powerful tool for sharing access to the same Flux instance!

And, if you don’t find an operator of ConnectableFlux that satisfies you, you can always use ConnectableFlux#connect() manually.

comments powered by Disqus