Daily Reactive: Where is my exception?!

This post is a part of the “Daily Reactive” series of short posts about common situations with Project Reactor and Reactive Programming.

Exceptions For Flow Contol?

Throwing an error is a very easy way of terminating the processing. Consider the following example:

public Mono<Void> sendRequest() {
    return doSendAndReceiveResponse()
            .doOnNext(buffer -> {
                throw new StopProcessingException(buffer);
            })
            .onErrorResume(StopProcessingException.class, e -> {
                ResourceUtils.releaseResource(e.getBuffer());
                return Mono.empty();
            })
            .then();
}

Here, if we receive a response from the call, we throw an error (to cancel the flow, because error is a terminating signal and it will cancel the processing), and later we handle the error with onErrorResume in which we release the buffer.

Looks simple, huh? What could possibly go wrong?

Meet onErrorContinue!

Since Error is a signal in Reactive Programming, some frameworks offer various advanced error handling mechanisms. But, unlike in imperative programming, where we’re limited to try {} catch {} finally {}, the reactive approach gives more control!

An example of such is Project Reactor’s onErrorContinue():

public final Flux<T> onErrorContinue(BiConsumer<Throwable,Object> errorConsumer)

Let compatible operators upstream recover from errors by dropping the incriminating element from the sequence and continuing with subsequent elements.

This is how you can use it:

Flux.range(0, 5)
    .doOnNext(i -> {
        if (i % 2 == 1) {
            throw new IllegalStateException("Boom!");
	    }
    })
    .onErrorContinue((e, i) -> {
        println("Value '" + i + "' have triggered an error " + e);
    })
    .log("values")
    .blockLast();

Running it will produce:

15:25:45.319 INFO  values - | onNext(0)
Value '1' have triggered an error java.lang.IllegalStateException: Boom!
15:25:45.325 INFO  values - | onNext(2)
Value '3' have triggered an error java.lang.IllegalStateException: Boom!
15:25:45.325 INFO  values - | onNext(4)
15:25:45.326 INFO  values - | onComplete()

As you can see, some (yes, some, not every, see the docs) Reactor operators allow you to “ignore” the errors, without cancelling the whole flow.

Now, if we apply this knowledge to our original code:

public Mono<Void> sendRequest() {
    return doSendAndReceiveResponse()
            .doOnNext(buffer -> {
                throw new StopProcessingException(buffer);
            })
            .onErrorResume(StopProcessingException.class, e -> {
                ResourceUtils.releaseResource(e.getBuffer());
                return Mono.empty();
            })
            .then();
}

// ...

sendRequest()
    .then(doSomething())
    .onErrorContinue((e, o) -> {});

StopProcessingException will never be processed by onErrorResume! This is how it works:

  1. doOnNext captures an error
  2. it checks whether it is in “onErrorContinue mode”
  3. if so, instead of processing the error, it calls the provided onErrorContinue callback instead!

How to fix the unexpected behavior?

A good rule of Reactive Programming is to never assume things and be explicit.

Since onErrorContinue is a “mode”, you can… turn it off (but only for the upstream)!

return doSendAndReceiveResponse()
        .doOnNext(buffer -> ...)
        .onErrorResume(StopProcessingException.class, e -> ...)
        .onErrorStop() // <-- this "restores" the original "fail on error" behavior
        .then();

How to actually fix the problem

onErrorContinue is a very helpful operator and there is no issue in using it. Let’s have a look at our code again. This is the algorithm:

  1. Start receiving
  2. If we receive a buffer, we need to “release” it and terminate the flow

Sometimes, rethinking code in “Reactive Way” solves a bunch of problems altogether. Throwing an exception is indeed one way of canceling the flow. But, in Reactive Programming, we have many more!

And, in most situations, there is always an operator that matches the requirements so well. In our case, it is takeWhile:

return doSendAndReceiveResponse()
        .takeWhile(buffer -> {
            ResourceUtils.releaseResource(buffer);
            return false;
        })
        .then();

It is not only shorter, but also easier to understand! Not throwing an exception should improve the performance too!

Conclusion

The Reactive Programming would not be so successful without the great set of various operators provided by the frameworks.

Do check the ones from Project Reactor, and maybe you will find a better one for your use case!

comments powered by Disqus