Daily Reactive: Let's talk about ThreadLocals.
This post is a part of the “Daily Reactive” series of short posts about common situations with Project Reactor and Reactive Programming.
ThreadLocals is one of the most common pain points topics nobody wants to talk about in the reactive world.
Unlike in synchronous programming, asynchronous programming makes them hard to use due to a lot of context switching and thread pooling.
Consider the following example:
static final ThreadLocal<String> USER_ID = new ThreadLocal<>();
@Test
public void testThreadLocals() {
USER_ID.set("bsideup");
Mono.just("Hello %s")
.delayElement(Duration.ofSeconds(1))
.doOnNext(greeting -> {
// WIll print "Hello null". Bummer!
System.out.println(String.format(greeting, USER_ID.get()));
})
.block();
}
What happens here?
- We set the variable to some value, and this value “sticks” to the current (main) thread.
- Now we introduce a delay, and, since everything is non-blocking, the task will be submitted to the parallel scheduler.
- Once delay completes,
doOnNext
will be called on the same (parallel) thread.
The catch here is that there is no logic to automagically move ThreadLocals from thread main
to parallel-1
, hence the variable is unset.
We can of course use Reactor’s built-in instrumentation capabilities to “fix” it:
static final ThreadLocal<String> USER_ID = new ThreadLocal<>();
static {
Function<Runnable, Runnable> decorator = task -> {
// Capture
String userId = USER_ID.get();
return () -> {
String previous = USER_ID.get();
// Restore
USER_ID.set(userId);
try {
// Call the original task
task.run();
}
finally {
// Cleanup
USER_ID.set(previous);
}
};
};
Schedulers.onScheduleHook("my-hook", decorator);
}
But don’t let me fool you, this will only partially solve the problem!
The problem
Let’s change our test a bit:
@Test
public void testThreadLocals() {
USER_ID.set("bsideup");
Mono.just("Hello %s")
// Hello RxJava!
.delaySubscription(Flowable.timer(1, TimeUnit.SECONDS))
.doOnNext(greeting -> {
// "Hello null" again. D'oh!
System.out.println(String.format(greeting, USER_ID.get()));
})
.block();
}
Didn’t we “fix” it? Well, yes. For Project Reactor. But now we have RxJava in between, while our hook is Reactor-specific.
Can we add it to RxJava as well? Sure we can!
Function<Runnable, Runnable> decorator = task -> { ... };
// Don't forget to delegate to the previous one!
var handler = RxJavaPlugins.getScheduleHandler();
RxJavaPlugins.setScheduleHandler(task -> {
if (handler != null) {
task = handler.apply(task);
}
return decorator.apply(task);
});
Schedulers.onScheduleHook("my-hook", decorator);
But what happens if the library we have in the middle of the chain does not support task instrumentation yet?
Like… Reactor Netty?
You tell me!
There is a common expectation that libraries like Project Reactor should solve this problem and not the users.
And, while I understand the expectation, I would like to talk a litlle bit why it cannot be easily achieved.
How do Reactive libraries work? They have at least one thing in common - Reactive Streams specification.
It defines a common set of interfaces (like Publisher
, Subscriber
, Subscription
) that allow
different frameworks to talk to each other.
What spec does not define is scheduling and how frameworks should deal with threads.
Let’s imagine for a second that we’re Project Reactor:
- We define a
Publisher
. - When somebody
subscribe
s on us, we receive an instance ofSubscriber
. - Later we can use that instance to call
onNext
/onError
/onComplete
. - We also pass
Subscription
withonSubscribe
to it, so that our subscriber can request items and/or cancel everything.
That’s… it 😅 Not to much to control, actually.
Now, given that knowledge, let’s “trace” an execution of our first example:
Mono.just("Hello %s")
.delayElement(...)
.doOnNext(greeting -> { ... })
.block();
block()
will subscribe todoOnNext
- …which will subscribe to
delayElement
- …which will subscribe to our source -
Mono.just
On the first request (Yeah! Backpressure!) just
will pass the string to delayElement
via onNext
call.
Upon receiveing, delayElement
will do (very roughly!!!) something like:
public void onNext(T element) {
scheduler.schedule(() -> {
actual.onNext(element);
}, delay.toMillis(), MILLISECONDS);
}
Where actual
is the Subscriber
we received when we got subscribe
d
and scheduler
is Schedulers.parallel()
, for example, that will submit the provided Runnable
to a queue
and execute it later (without blocking!), when the delay elapses.
As you can see, it does not report “back” that the execution will continue on another Thread
.
This is where our decorator
kicks in (yes, right on schedule
, not after the delay) and captures the ThreadLocal
s.
But actual
(of type Subscriber
) can be anything, including 3rd party libraries that don’t have any task instrumentation capabilities 🤷♂️
So, from Mono.just
’s point of review, calling onNext
is where everything ends and it no longer can control anything.
What can you do about it
The easiest would be… not to use ThreadLocal
s at all :D
In Project Reactor, for example, you can use the Context
API:
Mono.just("Hello %s")
.delaySubscription(Flowable.timer(1, TimeUnit.SECONDS))
.transform(flux -> Mono.deferWithContext(ctx -> {
return flux.doOnNext(greeting -> {
// Get it from the Context
String userId = ctx.get("userId");
System.out.println(String.format(greeting, userId));
});
}))
// Put something to the Context, e.g. in the web filter
.subscriberContext(Context.of("userId", "bsideup"))
.block();
Internally, it does not use any ThreadLocal
s and not exposed to the multi-threading issues.
However, this API is Reactor-specific and not defined in the Reactive Streams specification, and you won’t find it in libraries like RxJava.
What can we do about it
Well, not too much, but something.
As library developers, we can at least ensure that our scheduling can be instrumented.
Ideally, we could come up with a common Scheduler
abstraction, so that the instrumentation hook will need to be set only once.
But, until such API appears in JDK (which most probably won’t happen, at least not until Project Loom),
it is exposed to the problem #927:
Another alternative would be to always run everything on our threads (an idea similar to Netty’s), but that would be a major performance hit.
More links
- http://stevenskelton.ca/threadlocal-variables-scala-futures/
- https://github.com/ReactiveX/RxJava/issues/2885
- https://groups.google.com/forum/#!topic/netty/SF7XBakRWyE
- https://wiki.openjdk.java.net/display/loom/Main
- https://stackoverflow.com/a/14364794/1826422
- https://projectreactor.io/docs/core/release/reference/#faq.mdc
- https://ttddyy.github.io/mdc-with-webclient-in-webmvc/