Detecting blocking calls in Akka with BlockHound

Asynchronous and non-blocking programming in JVM can provide very impressive performance results. Unlike the more classical blocking model, it runs on just a few threads from a pool. But, to keep it performant, you must ensure that you are not blocking these threads.
Just imagine having 4 threads in total, and blocking one of them for 100 milliseconds?

But how do you “protect” your application from the blocking calls?
Code reviews? Good, but suffers from the human factor.
Thread dumps in production? A bit too late, isn’t it? :)
Starvation detectors? Well, probably one of the best solutions, but they require a proper load.

My choice is BlockHound - a Java agent to detect blocking calls from non-blocking threads.

But wait… The title says “Akka”, but BlockHound is from Project Reactor and claims to support Reactor & RxJava 2?
Apparently it can be used with any Java technology thanks to the custom integrations.

Sample app

I am no Akka expert, but I have enough knowledge to draft a “Hello World” in Akka (here I am using Akka Typed):

import akka.actor.typed.*;
import akka.actor.typed.javadsl.Behaviors;

public class AkkaWithBlockHound {

    static final Behavior<String> behavior = Behaviors.receive((ctx, who) -> {
        if ("Blocking call".equals(who)) {
            Thread.sleep(100);
        }

        ctx.getSystem().log().info("Hello, " + who);

        return Behaviors.same();
    });

    public static void main(String[] args) throws Exception {
        var greeter = ActorSystem.create(behavior, "greeter");

        greeter.tell("Akka");
        greeter.tell("Java");
        greeter.tell("Blocking call");

        Thread.currentThread().join();
    }
}

Here we have a very simple Akka application, but it should be enough for our needs.
As you can see, we call a blocking method when we receive Blocking Call message.

If I run this code, it will execute and print the lines:

[INFO] [19:15:50.393] [greeter-akka.actor.default-dispatcher-2] Hello, Akka
[INFO] [19:15:50.394] [greeter-akka.actor.default-dispatcher-2] Hello, Java
[INFO] [19:15:50.497] [greeter-akka.actor.default-dispatcher-2] Hello, Blocking call

As you can see, everything will be running on greeter-akka.actor.default-dispatcher-2 thread. This is a thread started by Akka’s dispatcher, the one we should not block, but we do.

Note that it runs fine, no exceptions or anything.

Adding BlockHound

What will happen if we add the BlockHound as described in the docs?

public static void main(String[] args) throws Exception {
    BlockHound.install();

    var greeter = ActorSystem.create(behavior, "greeter");

    greeter.tell("Akka");
    greeter.tell("Java");
    greeter.tell("Blocking call");

    Thread.currentThread().join();
}

I run it, and…

[INFO] [19:22:57.563] [greeter-akka.actor.default-dispatcher-2] Hello, Akka
[INFO] [19:22:57.563] [greeter-akka.actor.default-dispatcher-2] Hello, Java
[INFO] [19:22:57.665] [greeter-akka.actor.default-dispatcher-2] Hello, Blocking call

Uh? Nothing happens… Wait. It says that there is a concept of “integrations”, but Akka is not listed among the supported ones.

Can we “integrate” it with Akka?

Akka integration

After reading the custom integrations section of the docs, it becomes clear that we need to hint which threads are considered non-blocking and “blacklist” a method on a stack, so that when it enters the method, it treats every call inside it as non-blocking.

Since BlockHound#install accepts a list of integrations, I will add them inline:

BlockHound.install(builder -> {
    builder.nonBlockingThreadPredicate(p -> {
        return p.or(AkkaForkJoinWorkerThread.class::isInstance);
    });
    builder.disallowBlockingCallsInside(
        "akka.dispatch.forkjoin.ForkJoinTask",
        "doExec"
    );
});

Here we say that every instance of akka.dispatch.MonitorableThreadFactory$AkkaForkJoinWorkerThread should be considered non-blocking.

⚠️ WARNING!
I am not sure whether it is always true or not, but I am not using any custom pool or anything and assume that it is so.
Please reach out to Akka’s team for proper guidance about the internals of Akka and which thread is what.

Also, we mark akka.dispatch.forkjoin.ForkJoinTask#doExec as the entry (I got it after analysing the stacktrace by putting a breakpoint on Thread.sleep).

Now, if we run it, we get… Nothing! 😱 Like… Really nothing :D

TBH I was a bit puzzled, so I added my favourite “WTH is going on” trick to debug it:

builder.blockingMethodCallback(m -> {
    new Exception(m.toString()).printStackTrace();
});

(yes, you can override the built-in handling of the blocking calls)

Now I see a few exceptions, but there is one that made me laugh:

java.lang.Exception: java.io.FileOutputStream#writeBytes
	at com.lightbend.akka.sample.AkkaWithBlockHound.lambda$main$0(AkkaWithBlockHound.java:25)
	at reactor.blockhound.BlockHound$Builder.lambda$install$8(BlockHound.java:254)
	at reactor.blockhound.BlockHoundRuntime.checkBlocking(BlockHoundRuntime.java:43)
	at java.base/java.io.FileOutputStream.writeBytes(FileOutputStream.java)
	at java.base/java.io.FileOutputStream.write(FileOutputStream.java:348)
	at java.base/java.io.BufferedOutputStream.flushBuffer(BufferedOutputStream.java:81)
	at java.base/java.io.BufferedOutputStream.flush(BufferedOutputStream.java:142)
	at java.base/java.io.PrintStream.write(PrintStream.java:561)
	at java.base/sun.nio.cs.StreamEncoder.writeBytes(StreamEncoder.java:233)
	at java.base/sun.nio.cs.StreamEncoder.implFlushBuffer(StreamEncoder.java:312)
	at java.base/sun.nio.cs.StreamEncoder.flushBuffer(StreamEncoder.java:104)
	at java.base/java.io.OutputStreamWriter.flushBuffer(OutputStreamWriter.java:184)
	at java.base/java.io.PrintStream.write(PrintStream.java:606)
	at java.base/java.io.PrintStream.print(PrintStream.java:745)
	at java.base/java.io.PrintStream.println(PrintStream.java:899)
	at scala.Console$.println(Console.scala:271)
	at scala.Predef$.println(Predef.scala:397)
	at akka.event.Logging$StdOutLogger.info(Logging.scala:1044)
	at akka.event.Logging$StdOutLogger.info$(Logging.scala:1031)
	at akka.event.Logging$DefaultLogger.info(Logging.scala:1126)
	at akka.event.Logging$StdOutLogger.print(Logging.scala:985)
	at akka.event.Logging$StdOutLogger.print$(Logging.scala:982)
	at akka.event.Logging$DefaultLogger.print(Logging.scala:1126)
	at akka.event.Logging$DefaultLogger$$anonfun$receive$2.applyOrElse(Logging.scala:1129)
	at akka.actor.Actor.aroundReceive(Actor.scala:539)
	at akka.actor.Actor.aroundReceive$(Actor.scala:537)
	at akka.event.Logging$DefaultLogger.aroundReceive(Logging.scala:1126)
	at akka.actor.ActorCell.receiveMessage(ActorCell.scala:612)
	at akka.actor.ActorCell.invoke(ActorCell.scala:581)
	at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:268)
	at akka.dispatch.Mailbox.run(Mailbox.scala:229)
	at akka.dispatch.Mailbox.exec(Mailbox.scala:241)
	at akka.dispatch.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
	at akka.dispatch.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
	at akka.dispatch.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
	at akka.dispatch.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)

Do you know how System.out.println works? Well, it writes to a special file, and writing is… a blocking operation!

What happens is:

  1. BlockHound detects a blocking call (Thread.sleep)
  2. Akka handles the error and attempts to print it to the console
  3. Printing is a blocking operation…

You get it, right?

That’s where the whitelisting comes in handly!

builder.allowBlockingCallsInside("scala.Console$", "println");

Now we get a much better result:

[INFO] [19:46:53.815] [greeter-akka.actor.default-dispatcher-2] Hello, Akka
[INFO] [19:46:53.816] [greeter-akka.actor.default-dispatcher-2] Hello, Java
[ERROR] [19:46:53.825] [greeter-akka.actor.default-dispatcher-2] Blocking call! java.lang.Thread.sleep
java.lang.Error: Blocking call! java.lang.Thread.sleep
	at reactor.blockhound.BlockHound$Builder.lambda$new$0(BlockHound.java:154)
	at reactor.blockhound.BlockHound$Builder.lambda$install$8(BlockHound.java:254)
	at reactor.blockhound.BlockHoundRuntime.checkBlocking(BlockHoundRuntime.java:43)
	at java.base/java.lang.Thread.sleep(Thread.java)
	at com.lightbend.akka.sample.AkkaWithBlockHound.lambda$static$4922afa4$1(AkkaWithBlockHound.java:14)
	at akka.actor.typed.javadsl.Behaviors$.$anonfun$receive$1(Behaviors.scala:106)
	at akka.actor.typed.internal.BehaviorImpl$ReceiveBehavior.receive(BehaviorImpl.scala:37)
	at akka.actor.typed.Behavior$.interpret(Behavior.scala:437)
	at akka.actor.typed.Behavior$.interpretMessage(Behavior.scala:393)
	at akka.actor.typed.internal.adapter.ActorAdapter.handleMessage(ActorAdapter.scala:121)
	at akka.actor.typed.internal.adapter.ActorAdapter.aroundReceive(ActorAdapter.scala:102)
	at akka.actor.ActorCell.receiveMessage(ActorCell.scala:612)
	at akka.actor.ActorCell.invoke(ActorCell.scala:581)
	at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:268)
	at akka.dispatch.Mailbox.run(Mailbox.scala:229)
	at akka.dispatch.Mailbox.exec(Mailbox.scala:241)
	at akka.dispatch.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
	at akka.dispatch.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
	at akka.dispatch.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
	at akka.dispatch.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)

A-ha! The blocking call is detected and properly logged.

Conclusion

Dealing with the blocking calls can be tricky. Project Loom will greatly improve it, but it will take a lot of time to land in the JVM world.

Until that happens, tools like BlockHound can save you a lot of time you would spend investigating a performance issue or even a deadlock in production.

comments powered by Disqus