Daily Reactive: Pagination
This post is a part of the “Daily Reactive” series of short posts about common situations with Project Reactor and Reactive Programming.
In one of the previous Daily Reactive posts we talked about the polling.
Today we will focus on an interesting example of the polling: Pagination.
Example
Let’s say we have the following API:
interface Page extends Iterable<Item> {
Optional<String> nextPageId();
}
interface Client {
CompletionStage<Page> fetchPage(@Nullable String pageId);
}
As you can see, each fetchPage
will get a page (where pageId == null
means “the first page”).
The API is stateless, so that we have to maintain the state (next pageId
) ourselves.
Each Page
will have 0..n
items that can be iterated via Iterable
interface.
You may find a similar API in libraries like the AWS SDK v1.
We can test it like this:
var client = new PaginatingClient(
IntStream.range(0, 10).mapToObj(Item::new).collect(toList())
);
Flux<Item> flux = ...; // TODO
assertThat(flux.count().block()).isEqualTo(10);
assertThat(flux.collectList().block()).extracting(Item::getId).containsExactly(
0, 1, 2,
3, 4, 5,
6, 7, 8,
9
);
Learning by repetition
We already know how to handle pollable sources, so why not to treat this one as pollable?
var nextPage = new AtomicReference<String>(null);
Flux<item> = Mono
.fromCompletionStage(() -> client.fetchPage(nextPage.get()))
.repeat()
.takeUntil(page -> {
page.nextPageId().ifPresent(nextPage::set);
return !page.nextPageId().isPresent();
})
.flatMapIterable(it -> it);
If we run the test, it will… fail!
java.lang.AssertionError:
Expecting:
<[9]>
to contain exactly (and in same order):
<[0, 1, 2, 3, 4, 5, 6, 7, 8, 9]>
but could not find the following elements:
<[0, 1, 2, 3, 4, 5, 6, 7, 8]>
The test is wrong, I should delete it
If we take a closer look at the test, we will find that it subscribes to flux
twice: once for count()
and another for collectList()
.
But, since nextPage
is “reused”, our second subscription will start with value nextPage(3)
!
But, even if we only have one subscription, if we retry()
it, for example, we will get the same issue!
To fix that, we need to create a new AtomicReference
for every subscription.
The easiest way to do so is to use Flux.defer()
:
Flux<Item> flux = Flux.defer(() -> {
AtomicReference<String> nextPage = new AtomicReference<>(null);
return Mono
.fromCompletionStage(() -> client.fetchPage(nextPage.get()))
.repeat()
.takeUntil(page -> {
page.nextPageId().ifPresent(nextPage::set);
return !page.nextPageId().isPresent();
})
.flatMapIterable(it -> it);
});
Now the test is passing!
Expanding our horizons
Although the previous approach works and there is nothing “wrong” about it, it is a bit… imperative.
And, as we have seen, it is stateful (the nextPage
reference that we need to take care of).
But, there is apparently (I must admit that, despire working on Reactive code for years, I was mostly using the first approach) an operator in Project Reactor that suits the use case very well!
Please welcome, Mono#expand()
:
Flux<T> expand(Function<? super T, ? extends Publisher<? extends T>> expander)
Recursively expand elements into a graph and emit all the resulting element using a breadth-first traversal strategy.
As you can see, it turns this Mono
into a Flux
.
And “Recursively” here means that (unlike with concatMap
) it will attempt to apply
the same function on the returned result (until an empty Publisher
is returned).
We can easily apply it to our use case:
Flux<Item> flux = Mono
.fromCompletionStage(() -> client.fetchPage(null))
.expand(it -> {
return Mono
.justOrEmpty(it.nextPageId())
.flatMap(nextPageId -> Mono.fromCompletionStage(
client.fetchPage(nextPageId)
));
})
.flatMapIterable(it -> it);
The downside of it is that you need to have a bit of a code duplication (client.fetchPage
),
but it may get improved in Project Reactor.