Daily Reactive: Context loss detection in Project Reactor

This post is a part of the “Daily Reactive” series of short posts about common situations with Project Reactor and Reactive Programming.

One of the unique features of Project Reactor is the Context support. It allows you to associate an immutable context (represented by key/value pairs) and pass it to the upstream subscribers:

public Flux<Item> getItems() {
	return Flux.deferWithContext(context -> {
		String userId = context.get("userId");

		Publisher<Item> items = fetchItemsByUserId(userId);
		return items;
	});
}

public void streamItems() {
	getItems().subscriberContext(Context.of("userId", "bsideup"))
	          .subscribe(System.out::println);
}

As you can see, instead of passing userId as an argument to getItems, we were able to put it into the Context and access it later.

It is a very powerful feature that is used by Spring Security, Spring Cloud Sleuth, R2DBC and other libraries. But, with great power comes great responsibility!
Imagine losing the transactionId half way throught the flow?
Or not being able to access the security context anymore?
Or a new span from your favourite distributed tracing library being created due to a missing current one?

Such issues may cause big troubles, and debugging them isn’t easy… unless you have tools for that!

How to lose Context

Let’s use our previous example and demonstrate how a wrongly implemented operator can break the Context chain.

First, our “operator”:

class LoggingOperator<T> extends FluxOperator<T, T> {

    LoggingOperator(Flux<T> flux) {
        super(flux);
    }

    @Override
    public void subscribe(CoreSubscriber<? super T> actual) {
        source.subscribe(new CoreSubscriber<T>() {
            @Override
            public void onSubscribe(Subscription s) {
                actual.onSubscribe(s);
            }

            @Override
            public void onNext(T s) {
                System.out.println("Received: " + s);
                actual.onNext(s);
            }

            @Override
            public void onError(Throwable t) {
                actual.onError(t);
            }

            @Override
            public void onComplete() {
                actual.onComplete();
            }
        });
    }
}

Here we simply forward all signals to the actual subscriber and log every onNext.

And here is how we can apply it:

getItems()
        .transform(LoggingOperator::new)
        .subscriberContext(Context.of("userId", "bsideup"))
        .subscribe(System.out::println);

But, if we run this code, we will get:

Caused by: java.util.NoSuchElementException: Context is empty
	at reactor.util.context.Context0.get(Context0.java:41)

Pointing to this line:

String userId = context.get("userId");

See how easy it is to lose the Context why you should think twice before implementing a custom operator?

Despite being very performance focused, the development experience of Project Reactor is among our top priorities!

We’re constantly adding more features that help you debugging your reactive code that is otherwise uneasy to debug due to the asynchronous nature. And one such feature is the context loss detection.

as any other hook, you need to apply it before running your code.

ℹ️ It is a global flag and you only need to apply it once (e.g. in @BeforeAll in your tests).
But, for the sake of simplicity, I will enable it right before our test code.

Hooks.enableContextLossTracking();
getItems()
        .transform(LoggingOperator::new)
        .subscriberContext(Context.of("userId", "bsideup"))
        .subscribe(System.out::println);

Now we get a different exception:

java.lang.IllegalStateException: Context loss after applying playground.ContextLossTest$$Lambda$30/0x000000080010ac40@62150f9e
Caused by: java.lang.IllegalStateException: Context loss after applying playground.ContextLossTest$$Lambda$30/0x000000080010ac40@62150f9e
	at reactor.core.publisher.ContextTrackingFunctionWrapper.lambda$apply$0(ContextTrackingFunctionWrapper.java:50)
	at reactor.core.publisher.FluxLift.subscribeOrReturn(FluxLift.java:50)
	at reactor.core.publisher.InternalFluxOperator.subscribe(InternalFluxOperator.java:55)
	at playground.ContextLossTest$LoggingOperator.subscribe(ContextLossTest.java:44)

Nice! Now, the error happens when we lose the context and not when we try to access it.

Although, it would be nice to see the exact line where playground.ContextLossTest$$Lambda$30 got applied. For that, we will use Hooks.onOperatorDebug():

Hooks.onOperatorDebug();
Hooks.enableContextLossTracking();
getItems()
        .transform(LoggingOperator::new)
        .subscriberContext(Context.of("userId", "bsideup"))
        .subscribe(System.out::println);
java.lang.IllegalStateException: Context loss after applying playground.ContextLossTest$$Lambda$38/0x0000000800160440@67d48005
	at reactor.core.publisher.ContextTrackingFunctionWrapper.lambda$apply$0(ContextTrackingFunctionWrapper.java:50)
	Suppressed: reactor.core.publisher.FluxOnAssembly$OnAssemblyException: 
Assembly trace from producer [reactor.core.publisher.FluxSource] :
	reactor.core.publisher.Flux.transform(Flux.java:9192)
	playground.ContextLossTest.main(ContextLossTest.java:17)
Error has been observed at the following site(s):
	|_ Flux.transform ⇢ at playground.ContextLossTest.main(ContextLossTest.java:17)

From the list of sites that observed the error, we can easily identify the problem, line 17 of ContextLossTest:

.transform(LoggingOperator::new)

How to fix?

As you may have noticed, our subscriber isn’t just Subscriber, but CoreSubscriber - a special type in Project Reactor. It extends Reactive Streams’ Subscriber and provides more capabilities on top:

public interface CoreSubscriber<T> extends Subscriber<T> {

	/**
	 * Request a {@link Context} from dependent components which can include downstream
	 * operators during subscribing or a terminal {@link org.reactivestreams.Subscriber}.
	 *
	 * @return a resolved context or {@link Context#empty()}
	 */
	default Context currentContext() {
		return Context.empty();
	}

	/**
	 * Implementors should initialize any state used by {@link #onNext(Object)} before
	 * calling {@link Subscription#request(long)}. Should further {@code onNext} related
	 * state modification occur, thread-safety will be required.
	 * <p>
	 *    Note that an invalid request {@code <= 0} will not produce an onError and
	 *    will simply be ignored or reported through a debug-enabled
	 *    {@link reactor.util.Logger}.
	 *
	 * {@inheritDoc}
	 */
	@Override
	void onSubscribe(Subscription s);

A-ha! So, apparently, there is always an implicit Context, and, by default, it is… empty.

Given this knowledge, we can easily fix our subscriber:

source.subscribe(new CoreSubscriber<T>() {
    @Override
    public void onSubscribe(Subscription s) {
        actual.onSubscribe(s);
    }

    @Override
    public void onNext(T s) {
        System.out.println("Received: " + s);
        actual.onNext(s);
    }

    @Override
    public void onError(Throwable t) {
        actual.onError(t);
    }

    @Override
    public void onComplete() {
        actual.onComplete();
    }

    @Override
    public Context currentContext() {
        // tada 🎉
        return actual.currentContext();
    }
});

Now everything works as expected! We can even return a new, enriched Context:

@Override
public Context currentContext() {
    return actual.currentContext()
            .put("foo", "bar");
}

Note that put WILL NOT mutate the current context, but return a new one instead.

⚠️ WARNING!
If you’re writing a custom operator, consider always using the current value.

Do I need this?

You may ask yourself “as a regular user of Project Reactor, do I need this at all?”.

Well, while library authors that write custom operators will benefit from the context loss detection the most, you, as a user, can protect yourself from incorrectly implemented 3rd party library.

Also, if you need to deal with multiple reactive libraries, it may happen that you apply non-Reactor Publisher in between, and, if applied incorrectly, the Context will be lost as well.

It costs very little but may save you a few minutes or even hours debugging why your DB transaction isn’t working properly, or why you don’t see a span from Sleuth. Your call :)

comments powered by Disqus