Daily Reactive: Pagination

This post is a part of the “Daily Reactive” series of short posts about common situations with Project Reactor and Reactive Programming.

In one of the previous Daily Reactive posts we talked about the polling.

Today we will focus on an interesting example of the polling: Pagination.

Example

Let’s say we have the following API:

interface Page extends Iterable<Item> {
    Optional<String> nextPageId();
}

interface Client {
    CompletionStage<Page> fetchPage(@Nullable String pageId);
}

As you can see, each fetchPage will get a page (where pageId == null means “the first page”).

The API is stateless, so that we have to maintain the state (next pageId) ourselves.

Each Page will have 0..n items that can be iterated via Iterable interface.

You may find a similar API in libraries like the AWS SDK v1.

We can test it like this:

var client = new PaginatingClient(
        IntStream.range(0, 10).mapToObj(Item::new).collect(toList())
);

Flux<Item> flux = ...; // TODO

assertThat(flux.count().block()).isEqualTo(10);

assertThat(flux.collectList().block()).extracting(Item::getId).containsExactly(
        0, 1, 2,
        3, 4, 5,
        6, 7, 8,
        9
);

Learning by repetition

We already know how to handle pollable sources, so why not to treat this one as pollable?

var nextPage = new AtomicReference<String>(null);

Flux<item> = Mono
        .fromCompletionStage(() -> client.fetchPage(nextPage.get()))
        .repeat()
        .takeUntil(page -> {
            page.nextPageId().ifPresent(nextPage::set);
            return !page.nextPageId().isPresent();
        })
        .flatMapIterable(it -> it);

If we run the test, it will… fail!

java.lang.AssertionError: 
Expecting:
  <[9]>
to contain exactly (and in same order):
  <[0, 1, 2, 3, 4, 5, 6, 7, 8, 9]>
but could not find the following elements:
  <[0, 1, 2, 3, 4, 5, 6, 7, 8]>

The test is wrong, I should delete it
If we take a closer look at the test, we will find that it subscribes to flux twice: once for count() and another for collectList().

But, since nextPage is “reused”, our second subscription will start with value nextPage(3)!

But, even if we only have one subscription, if we retry() it, for example, we will get the same issue!

To fix that, we need to create a new AtomicReference for every subscription. The easiest way to do so is to use Flux.defer():

Flux<Item> flux = Flux.defer(() -> {
    AtomicReference<String> nextPage = new AtomicReference<>(null);

    return Mono
            .fromCompletionStage(() -> client.fetchPage(nextPage.get()))
            .repeat()
            .takeUntil(page -> {
                page.nextPageId().ifPresent(nextPage::set);
                return !page.nextPageId().isPresent();
            })
            .flatMapIterable(it -> it);
});

Now the test is passing!

Expanding our horizons

Although the previous approach works and there is nothing “wrong” about it, it is a bit… imperative.

And, as we have seen, it is stateful (the nextPage reference that we need to take care of).

But, there is apparently (I must admit that, despire working on Reactive code for years, I was mostly using the first approach) an operator in Project Reactor that suits the use case very well!

Please welcome, Mono#expand():

Flux<T> expand(Function<? super T, ? extends Publisher<? extends T>> expander)

Recursively expand elements into a graph and emit all the resulting element using a breadth-first traversal strategy.

As you can see, it turns this Mono into a Flux.
And “Recursively” here means that (unlike with concatMap) it will attempt to apply the same function on the returned result (until an empty Publisher is returned).

We can easily apply it to our use case:

Flux<Item> flux = Mono
        .fromCompletionStage(() -> client.fetchPage(null))
        .expand(it -> {
            return Mono
                    .justOrEmpty(it.nextPageId())
                    .flatMap(nextPageId -> Mono.fromCompletionStage(
                        client.fetchPage(nextPageId)
                    ));
        })
        .flatMapIterable(it -> it);

The downside of it is that you need to have a bit of a code duplication (client.fetchPage), but it may get improved in Project Reactor.

comments powered by Disqus