Daily Reactive: Splitting a stream
This post is a part of the “Daily Reactive” series of short posts about common situations with Project Reactor and Reactive Programming.
Although it is not common, sometimes there is a need to split a Flux
into a few.
And one has to be careful to do it correctly!
Let’s say we have a stream of persons and we want to return a Tuple
of names and ids:
Flux<Person> personFlux;
Flux<String> namesFlux = ...;
Flux<Integer> idsFlux = ...;
return Tuples.of(namesFlux, idsFlux);
The easiest would be to use .map
on personFlux
, right?
Flux<Person> personFlux;
Flux<String> namesFlux = personFlux.map(Person::getName);
Flux<Integer> idsFlux = personFlux.map(Person::getId);
return Tuples.of(namesFlux, idsFlux);
I hope you have the tests, and proper ones, because we just introduced a serious issue!
var counter = new AtomicInteger(0);
var flux = Flux.<Person>generate(sink -> {
int id = counter.getAndIncrement();
sink.next(new Person(id));
});
var personFlux = flux.take(3);
var namesFlux = personFlux.map(Person::getName);
var idsFlux = personFlux.map(Person::getId);
var tuple = Tuples.of(namesFlux, idsFlux);
// Print the names
tuple.getT1().subscribe(System.out::println);
assertThat(tuple.getT2().collectList().block()).containsExactly(0, 1, 2);
If we run this test, we will get:
java.lang.AssertionError:
Expecting:
<[3, 4, 5]>
to contain exactly (and in same order):
<[0, 1, 2]>
but some elements were not found:
<[0, 1, 2]>
and others were not expected:
<[3, 4, 5]>
Wait… what? It looks like our Flux
… was subscribed twice and generated twice as much!
Imagine doing a database query instead of this simple Flux.generate
?
So, this is what happened:
tuple.getT1().subscribe(System.out::println)
will subscribe onpersonFlux.map(Person::getName)
- … which will subscribe on
personFlux
- that, as per the Reactive Streams specification, will result in a fresh new subscription
- Then,
tuple.getT2().collectList().block()
will subscribe onpersonFlux.map(Person::getId)
- … which will subscribe on
personFlux
- And, once again, As per the Reactive Streams specification, a new subscription will be returned.
At this stage, your reaction may look like this: ಠ_ಠ
It would be mine too, if not the exhaustive specification that dictates this behavior!
Who reads the docs I recommend you to read it, to avoid many common pitfalls like this one.
Stay connected!
How do we solve the problem, then? We can’t reuse the same instance, because it is not reusable.
But of course there is an operator for that! In fact, there is a set of operators:
Flux#publish()
- prepares aConnectableFlux
which shares thisFlux
sequence and dispatches values to subscribers in a backpressure-aware manner.Flux#share()
- a shortcut topublish().refCount()
Flux#replay()
- turns thisFlux
into a hot source and caches last emitted signals.Flux#cache()
- a shortcut toreplay().autoConnect()
- and more, check
Flux
’s Javadoc
The common thing they have in common is ConnectableFlux
-
a special type in Project Reactor that let subscribers pile up before they connect to their data source.
So, let’s try one of them, Flux#share()
:
var personFlux = flux.take(3).share();
Nope!
java.lang.AssertionError:
Expecting:
<[3, 4, 5]>
to contain exactly (and in same order):
<[0, 1, 2]>
but some elements were not found:
<[0, 1, 2]>
and others were not expected:
<[3, 4, 5]>
Why so?
Well, Flux#share()
is a shortcut to publish().refCount()
, where refCount()
defaults to 1
, so it will subscribe as soon as one subscriber connects (in our case it is the T1
one)!
And, since our source is synchronous, tuple.getT1().subscribe(System.out::println);
will subscribe and complete before the tuple.getT2().collectList().block()
call.
So, unlike cache()
(that will store every item in memory, probably not something we want), share()
auto-disconnects!
It depends on the use case, but if we know for sure that the consumer of our Tuple
will always subscribe on both results,
we can hint Reactor about that:
var personFlux = flux.take(3).publish().refCount(2);
Now our test is green. We reuse the original Flux
and subscribe to it only when both id
and name
are subscribed.
Conclusion
ConnectableFlux
is a very powerful tool for sharing access to the same Flux
instance!
And, if you don’t find an operator of ConnectableFlux
that satisfies you, you can always use
ConnectableFlux#connect()
manually.