Daily Reactive: Pollable sources

This post is a part of the “Daily Reactive” series of short posts about common situations with Project Reactor and Reactive Programming.

Although streaming sources are very efficient with Reactive Streams, there are still many poll-based APIs. Kafka/Pulsar, REST, SQS/PubSub… You name it!

Let’s say we have the following blocking legacy API provided by the database:

// SDK-provided class
class Client {
    public Item poll() {
        // ...
    }
}

// and then:
while (true) {
    var item = client.poll(5, SECONDS);
    if (item == null) {
        TimeUnit.SECONDS.sleep(1);
    } else {
        println("Received item: " + item);
    }
}

Let’s say we want to consume it with Project Reactor.
We cannot change Client#poll since it is provided by the SDK. However, we can wrap it into Flux<Item> for later consumption.

The question is… how to create a Flux from such API? Okay, we can easily wrap poll with Mono:

public Mono<Item> pollAsync() {
    return Mono.fromCallable(() -> client.poll(5, SECONDS))
               .subscribeOn(Schedulers.boundedElastic());
}

Note that since we perform a blocking call, we use subscribeOn with Schedulers.boundedElastic().

⚠️ WARNING!
It is very important to not block the non-blocking threads, and you should treat every thread as non-blocking unless explicitly stated otherwise.
Tools like BlockHound help to detect it, btw.

So, we have a reactive pollAsync() and it returns Mono<Item>, and we need… Flux<Item>!
Turns out, we can easily transform our pollAsync() into a Flux by… repeating it with Mono#repeat!

public Flux<Item> streamItems() {
    return pollAsync()
        .repeat();
}

If we leave it like that, we will intoduce a regression (and I find it very easy to spot such things with Reactive Programming):
When Client#poll returns null, we should wait 1 second before retrying.

But, in Reactive Streams, nulls are not allowed! This is why Mono.fromCallable returns an empty Mono if Callable returns null.

Given this knowledge, we will use another repeating operator, Mono#repeatWhenEmpty:

public Flux<Item> streamItems() {
    return pollAsync()
        .repeatWhenEmpty(it -> it.delayElements(Duration.ofSeconds(1)))
        .repeat();
}

Now we have a stream of items that we can subsctibe onto:

streamItems().subscribe(item -> println("Received item: " + item));

⚠️ WARNING!
In the real world scenario you most probably will perform some IO on every received item.

Consider making it async too and use Flux#concatMap.

If you can’t make it async, you can add publishOn(Schedulers.boundedElastic()) before subscribe, so that you’re explicitly moving the computation to the blocking-friendly scheduler.

comments powered by Disqus