Daily Reactive: Pollable sources
This post is a part of the “Daily Reactive” series of short posts about common situations with Project Reactor and Reactive Programming.
Although streaming sources are very efficient with Reactive Streams, there are still many poll-based APIs. Kafka/Pulsar, REST, SQS/PubSub… You name it!
Let’s say we have the following blocking legacy API provided by the database:
// SDK-provided class
class Client {
public Item poll() {
// ...
}
}
// and then:
while (true) {
var item = client.poll(5, SECONDS);
if (item == null) {
TimeUnit.SECONDS.sleep(1);
} else {
println("Received item: " + item);
}
}
Let’s say we want to consume it with Project Reactor.
We cannot change Client#poll
since it is provided by the SDK. However, we can wrap it into Flux<Item>
for later consumption.
The question is… how to create a Flux
from such API? Okay, we can easily wrap poll
with Mono
:
public Mono<Item> pollAsync() {
return Mono.fromCallable(() -> client.poll(5, SECONDS))
.subscribeOn(Schedulers.boundedElastic());
}
Note that since we perform a blocking call,
we use subscribeOn
with Schedulers.boundedElastic()
.
⚠️ WARNING!
It is very important to not block the non-blocking threads, and you should treat every thread as non-blocking unless explicitly stated otherwise.
Tools like BlockHound help to detect it, btw.
So, we have a reactive pollAsync()
and it returns Mono<Item>
, and we need… Flux<Item>
!
Turns out, we can easily transform our pollAsync()
into a Flux
by… repeating it with Mono#repeat
!
public Flux<Item> streamItems() {
return pollAsync()
.repeat();
}
If we leave it like that, we will intoduce a regression
(and I find it very easy to spot such things with Reactive Programming):
When Client#poll
returns null
, we should wait 1 second before retrying.
But, in Reactive Streams, null
s are not allowed! This is why Mono.fromCallable
returns an empty Mono
if Callable
returns null
.
Given this knowledge, we will use another repeating operator, Mono#repeatWhenEmpty
:
public Flux<Item> streamItems() {
return pollAsync()
.repeatWhenEmpty(it -> it.delayElements(Duration.ofSeconds(1)))
.repeat();
}
Now we have a stream of items that we can subsctibe onto:
streamItems().subscribe(item -> println("Received item: " + item));
⚠️ WARNING!
In the real world scenario you most probably will perform some IO on every received item.Consider making it async too and use
Flux#concatMap
.If you can’t make it async, you can add
publishOn(Schedulers.boundedElastic())
beforesubscribe
, so that you’re explicitly moving the computation to the blocking-friendly scheduler.