Daily Reactive: Let's talk about ThreadLocals.

This post is a part of the “Daily Reactive” series of short posts about common situations with Project Reactor and Reactive Programming.

ThreadLocals is one of the most common pain points topics nobody wants to talk about in the reactive world. Unlike in synchronous programming, asynchronous programming makes them hard to use due to a lot of context switching and thread pooling.

Consider the following example:

static final ThreadLocal<String> USER_ID = new ThreadLocal<>();

@Test
public void testThreadLocals() {
    USER_ID.set("bsideup");

    Mono.just("Hello %s")
        .delayElement(Duration.ofSeconds(1))
        .doOnNext(greeting -> {
            // WIll print "Hello null". Bummer!
            System.out.println(String.format(greeting, USER_ID.get()));
        })
        .block();
}

What happens here?

  1. We set the variable to some value, and this value “sticks” to the current (main) thread.
  2. Now we introduce a delay, and, since everything is non-blocking, the task will be submitted to the parallel scheduler.
  3. Once delay completes, doOnNext will be called on the same (parallel) thread.

The catch here is that there is no logic to automagically move ThreadLocals from thread main to parallel-1, hence the variable is unset.

We can of course use Reactor’s built-in instrumentation capabilities to “fix” it:

static final ThreadLocal<String> USER_ID = new ThreadLocal<>();

static {
    Function<Runnable, Runnable> decorator = task -> {
        // Capture
        String userId = USER_ID.get();

        return () -> {
            String previous = USER_ID.get();
            // Restore
            USER_ID.set(userId);
            try {
                // Call the original task
                task.run();
            }
            finally {
                // Cleanup
                USER_ID.set(previous);
            }
        };
    };
    Schedulers.onScheduleHook("my-hook", decorator);
}

But don’t let me fool you, this will only partially solve the problem!

The problem

Let’s change our test a bit:

@Test
public void testThreadLocals() {
    USER_ID.set("bsideup");

    Mono.just("Hello %s")
        // Hello RxJava!
		.delaySubscription(Flowable.timer(1, TimeUnit.SECONDS))
        .doOnNext(greeting -> {
            // "Hello null" again. D'oh!
            System.out.println(String.format(greeting, USER_ID.get()));
        })
        .block();
}

Didn’t we “fix” it? Well, yes. For Project Reactor. But now we have RxJava in between, while our hook is Reactor-specific.

Can we add it to RxJava as well? Sure we can!

Function<Runnable, Runnable> decorator = task -> { ... };

// Don't forget to delegate to the previous one!
var handler = RxJavaPlugins.getScheduleHandler();
RxJavaPlugins.setScheduleHandler(task -> {
    if (handler != null) {
        task = handler.apply(task);
    }
    return decorator.apply(task);
});

Schedulers.onScheduleHook("my-hook", decorator);

But what happens if the library we have in the middle of the chain does not support task instrumentation yet?
Like… Reactor Netty?

You tell me!

There is a common expectation that libraries like Project Reactor should solve this problem and not the users.

And, while I understand the expectation, I would like to talk a litlle bit why it cannot be easily achieved.

How do Reactive libraries work? They have at least one thing in common - Reactive Streams specification. It defines a common set of interfaces (like Publisher, Subscriber, Subscription) that allow different frameworks to talk to each other.

What spec does not define is scheduling and how frameworks should deal with threads.

Let’s imagine for a second that we’re Project Reactor:

  1. We define a Publisher.
  2. When somebody subscribes on us, we receive an instance of Subscriber.
  3. Later we can use that instance to call onNext/onError/onComplete.
  4. We also pass Subscription with onSubscribe to it, so that our subscriber can request items and/or cancel everything.

That’s… it 😅 Not to much to control, actually.

Now, given that knowledge, let’s “trace” an execution of our first example:

Mono.just("Hello %s")
    .delayElement(...)
    .doOnNext(greeting -> { ... })
    .block();
  1. block() will subscribe to doOnNext
  2. …which will subscribe to delayElement
  3. …which will subscribe to our source - Mono.just

On the first request (Yeah! Backpressure!) just will pass the string to delayElement via onNext call.
Upon receiveing, delayElement will do (very roughly!!!) something like:

public void onNext(T element) {
    scheduler.schedule(() -> {
        actual.onNext(element);
    }, delay.toMillis(), MILLISECONDS);
}

Where actual is the Subscriber we received when we got subscribed and scheduler is Schedulers.parallel(), for example, that will submit the provided Runnable to a queue and execute it later (without blocking!), when the delay elapses.

As you can see, it does not report “back” that the execution will continue on another Thread.

This is where our decorator kicks in (yes, right on schedule, not after the delay) and captures the ThreadLocals.

But actual (of type Subscriber) can be anything, including 3rd party libraries that don’t have any task instrumentation capabilities 🤷‍♂️

So, from Mono.just’s point of review, calling onNext is where everything ends and it no longer can control anything.

What can you do about it

The easiest would be… not to use ThreadLocals at all :D

In Project Reactor, for example, you can use the Context API:

Mono.just("Hello %s")
    .delaySubscription(Flowable.timer(1, TimeUnit.SECONDS))
    .transform(flux -> Mono.deferWithContext(ctx -> {
        return flux.doOnNext(greeting -> {
            // Get it from the Context
            String userId = ctx.get("userId");
            System.out.println(String.format(greeting, userId));
        });
    }))

    // Put something to the Context, e.g. in the web filter
    .subscriberContext(Context.of("userId", "bsideup"))
    .block();

Internally, it does not use any ThreadLocals and not exposed to the multi-threading issues.

However, this API is Reactor-specific and not defined in the Reactive Streams specification, and you won’t find it in libraries like RxJava.

What can we do about it

Well, not too much, but something.

As library developers, we can at least ensure that our scheduling can be instrumented.

Ideally, we could come up with a common Scheduler abstraction, so that the instrumentation hook will need to be set only once.
But, until such API appears in JDK (which most probably won’t happen, at least not until Project Loom), it is exposed to the problem #927:

Another alternative would be to always run everything on our threads (an idea similar to Netty’s), but that would be a major performance hit.

More links

comments powered by Disqus