Daily Reactive: Context loss detection in Project Reactor
This post is a part of the “Daily Reactive” series of short posts about common situations with Project Reactor and Reactive Programming.
One of the unique features of Project Reactor is the Context support. It allows you to associate an immutable context (represented by key/value pairs) and pass it to the upstream subscribers:
public Flux<Item> getItems() {
return Flux.deferWithContext(context -> {
String userId = context.get("userId");
Publisher<Item> items = fetchItemsByUserId(userId);
return items;
});
}
public void streamItems() {
getItems().subscriberContext(Context.of("userId", "bsideup"))
.subscribe(System.out::println);
}
As you can see, instead of passing userId
as an argument to getItems
, we were able to put it into the Context
and access it later.
It is a very powerful feature that is used by Spring Security, Spring Cloud Sleuth, R2DBC and other libraries.
But, with great power comes great responsibility!
Imagine losing the transactionId
half way throught the flow?
Or not being able to access the security context anymore?
Or a new span from your favourite distributed tracing library being created due to a missing current one?
Such issues may cause big troubles, and debugging them isn’t easy… unless you have tools for that!
How to lose Context
Let’s use our previous example and demonstrate how a wrongly implemented operator can break the Context
chain.
First, our “operator”:
class LoggingOperator<T> extends FluxOperator<T, T> {
LoggingOperator(Flux<T> flux) {
super(flux);
}
@Override
public void subscribe(CoreSubscriber<? super T> actual) {
source.subscribe(new CoreSubscriber<T>() {
@Override
public void onSubscribe(Subscription s) {
actual.onSubscribe(s);
}
@Override
public void onNext(T s) {
System.out.println("Received: " + s);
actual.onNext(s);
}
@Override
public void onError(Throwable t) {
actual.onError(t);
}
@Override
public void onComplete() {
actual.onComplete();
}
});
}
}
Here we simply forward all signals to the actual subscriber and log every onNext
.
And here is how we can apply it:
getItems()
.transform(LoggingOperator::new)
.subscriberContext(Context.of("userId", "bsideup"))
.subscribe(System.out::println);
But, if we run this code, we will get:
Caused by: java.util.NoSuchElementException: Context is empty
at reactor.util.context.Context0.get(Context0.java:41)
Pointing to this line:
String userId = context.get("userId");
See how easy it is to lose the why you should think twice before implementing a custom operator?Context
Debugging Context
-related issues
Despite being very performance focused, the development experience of Project Reactor is among our top priorities!
We’re constantly adding more features that help you debugging your reactive code that is otherwise uneasy to debug due to the asynchronous nature. And one such feature is the context loss detection.
as any other hook, you need to apply it before running your code.
ℹ️ It is a global flag and you only need to apply it once (e.g. in
@BeforeAll
in your tests).
But, for the sake of simplicity, I will enable it right before our test code.
Hooks.enableContextLossTracking();
getItems()
.transform(LoggingOperator::new)
.subscriberContext(Context.of("userId", "bsideup"))
.subscribe(System.out::println);
Now we get a different exception:
java.lang.IllegalStateException: Context loss after applying playground.ContextLossTest$$Lambda$30/0x000000080010ac40@62150f9e
Caused by: java.lang.IllegalStateException: Context loss after applying playground.ContextLossTest$$Lambda$30/0x000000080010ac40@62150f9e
at reactor.core.publisher.ContextTrackingFunctionWrapper.lambda$apply$0(ContextTrackingFunctionWrapper.java:50)
at reactor.core.publisher.FluxLift.subscribeOrReturn(FluxLift.java:50)
at reactor.core.publisher.InternalFluxOperator.subscribe(InternalFluxOperator.java:55)
at playground.ContextLossTest$LoggingOperator.subscribe(ContextLossTest.java:44)
Nice! Now, the error happens when we lose the context and not when we try to access it.
Although, it would be nice to see the exact line where playground.ContextLossTest$$Lambda$30
got applied.
For that, we will use Hooks.onOperatorDebug()
:
Hooks.onOperatorDebug();
Hooks.enableContextLossTracking();
getItems()
.transform(LoggingOperator::new)
.subscriberContext(Context.of("userId", "bsideup"))
.subscribe(System.out::println);
java.lang.IllegalStateException: Context loss after applying playground.ContextLossTest$$Lambda$38/0x0000000800160440@67d48005
at reactor.core.publisher.ContextTrackingFunctionWrapper.lambda$apply$0(ContextTrackingFunctionWrapper.java:50)
Suppressed: reactor.core.publisher.FluxOnAssembly$OnAssemblyException:
Assembly trace from producer [reactor.core.publisher.FluxSource] :
reactor.core.publisher.Flux.transform(Flux.java:9192)
playground.ContextLossTest.main(ContextLossTest.java:17)
Error has been observed at the following site(s):
|_ Flux.transform ⇢ at playground.ContextLossTest.main(ContextLossTest.java:17)
From the list of sites that observed the error, we can easily identify the problem, line 17 of ContextLossTest
:
.transform(LoggingOperator::new)
How to fix?
As you may have noticed, our subscriber isn’t just Subscriber
, but CoreSubscriber
- a special type in Project Reactor.
It extends Reactive Streams’ Subscriber
and provides more capabilities on top:
public interface CoreSubscriber<T> extends Subscriber<T> {
/**
* Request a {@link Context} from dependent components which can include downstream
* operators during subscribing or a terminal {@link org.reactivestreams.Subscriber}.
*
* @return a resolved context or {@link Context#empty()}
*/
default Context currentContext() {
return Context.empty();
}
/**
* Implementors should initialize any state used by {@link #onNext(Object)} before
* calling {@link Subscription#request(long)}. Should further {@code onNext} related
* state modification occur, thread-safety will be required.
* <p>
* Note that an invalid request {@code <= 0} will not produce an onError and
* will simply be ignored or reported through a debug-enabled
* {@link reactor.util.Logger}.
*
* {@inheritDoc}
*/
@Override
void onSubscribe(Subscription s);
A-ha! So, apparently, there is always an implicit Context
, and, by default, it is… empty.
Given this knowledge, we can easily fix our subscriber:
source.subscribe(new CoreSubscriber<T>() {
@Override
public void onSubscribe(Subscription s) {
actual.onSubscribe(s);
}
@Override
public void onNext(T s) {
System.out.println("Received: " + s);
actual.onNext(s);
}
@Override
public void onError(Throwable t) {
actual.onError(t);
}
@Override
public void onComplete() {
actual.onComplete();
}
@Override
public Context currentContext() {
// tada 🎉
return actual.currentContext();
}
});
Now everything works as expected! We can even return a new, enriched Context
:
@Override
public Context currentContext() {
return actual.currentContext()
.put("foo", "bar");
}
Note that put
WILL NOT mutate the current context, but return a new one instead.
⚠️ WARNING!
If you’re writing a custom operator, consider always using the current value.
Do I need this?
You may ask yourself “as a regular user of Project Reactor, do I need this at all?”.
Well, while library authors that write custom operators will benefit from the context loss detection the most, you, as a user, can protect yourself from incorrectly implemented 3rd party library.
Also, if you need to deal with multiple reactive libraries, it may happen that you apply non-Reactor Publisher
in between,
and, if applied incorrectly, the Context
will be lost as well.
It costs very little but may save you a few minutes or even hours debugging why your DB transaction isn’t working properly, or why you don’t see a span from Sleuth. Your call :)