Is your Blocking Queue... Blocking?
Java’s BlockingQueue
hierarchy is widely used for coordinating work between different producer and consumer threads.
When set up with a maximum capacity (i.e. a bounded queue), no more elements can be added by producers to the queue once it is full, until a consumer has taken at least one element.
For scenarios where new work may arrive more quickly than it can be consumed, this applies means of back-pressure,
ensuring the application doesn’t run out of memory eventually, while enqueuing more and more work items.
One interesting usage of blocking queues is to buffer writes to a database. Let’s take SQLite, an embedded RDBMS, as an example; SQLite only allows for a single writer at any given time, and it tends to yield a sub-optimal write through-put when executing many small transactions. A blocking queue can be used to mitigate that situation: all threads that wish to perform an update to the database, for instance the worker threads of a web application, submit work items with their write tasks to a blocking queue. Another thread fetches items in batches from that queue, executing one single transaction for all work items of a batch. This results in a much better performance compared to each thread executing its own individual write transaction, in particular when keeping those open for the entire duration of web requests, as it’s commonly the case with most web frameworks. More on that architecture, in particular in regards to failure handling, in a future blog post. |
How do you find out though when a producer actually is blocked while trying to add items to a BlockingQueue
?
After all, this is an indicator that the through-put of your system isn’t as high as it would need to be in order to fully satisfy the workload submitted by the producers.
If you have the means of running a profiler against the system, then for instance async-profiler with its wall-clock profiling option will come in handy for this task; unlike CPU profiling which only profiles running threads, wall-clock profiling will also tell you about the time spent by threads in blocked and waiting states, as is the case here.
But what when connecting with a wall-clock profiler is not an option?
In this case, JDK Flight Recorder, Java’s go-to tool for all kinds of performance analyses,
and its accompanying client, JDK Mission Control (JMC),
can be of help to you.
JFR specifically has been designed as an "always-on" event recording engine for usage in production environment.
It doesn’t provide bespoke support for identifying blocked queue producers, though.
BlockingQueue
implementations such as ArrayBlockingQueue
don’t use Java intrinsic locks
(i.e. what you’d get when using the synchronized
keyword),
but rather locks based on the LockSupport
primitives.
These don’t show up in the "Lock Instances" view in JMC at this point.
Emitting Custom Events
One possible solution is to emit custom JFR events from within your own code whenever you’re trying to submit an item to a bounded queue at its maximum capacity.
For this, you couldn’t use the put()
method of the BlockingQueue
interface, though,
as it actually is blocking and you’d have no way to react to that.
Instead, you’d have to rely on either offer()
(which returns false
when it cannot submit an item) or add()
(which raises an exception).
When the queue is full and you can’t submit another item, you’d instantiate your custom JFR event type, retry to submit the item for as long as it’s needed,
and finally commit the JFR event.
Needless to say that this kind of busy waiting is not only rather inefficient,
you’d also have to remember to implement this pattern in all your blocking queue producers of your program.
A better option, at least in theory, would be to use the JMC Agent.
Part of the JDK Mission Control project, this Java agent allows you to instrument the byte code of existing methods,
so that a JFR event will be emitted whenever they are invoked.
The configuration of JMC Agent happens via an XML file and is rather straightforward.
Here’s how instrumenting the put()
method of the ArrayBlockingQueue
type would look like:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
<?xml version="1.0" encoding="UTF-8"?>
<jfragent>
<config>
<classprefix>__JFREvent</classprefix>
<allowtostring>true</allowtostring>
<allowconverter>true</allowconverter>
</config>
<events>
<event id="queue.Put">
<label>Put</label>
<description>Queue Put</description>
<path>Queues</path>
<stacktrace>true</stacktrace>
<class>java.util.concurrent.ArrayBlockingQueue</class>
<method>
<name>put</name>
<descriptor>(Ljava/lang/Object;)V</descriptor>
</method>
<location>WRAP</location>
</event>
</events>
</jfragent>
With this agent configuration in place, you’d get an event for every invocation of put()
though,
no matter whether it actually is blocking or not.
While you might be able to make some educated guess based on the duration of these events,
that’s not totally reliable.
For instance, you couldn’t be quite sure whether a "long" event actually is caused by blocking on the queue or by some GC activity.
So how about going one level deeper then?
If you look at the implementation of ArrayBlockingQueue::put()
,
you’ll find that the actual blocking call happens through the await()
method on the notFull
Condition
object.
You could use JMC Agent to instrument that await()
method,
but this would give you events for every Condition
instance,
also for those not used by BlockingQueue
implementations.
Filtering "Thread Park" Events
But this finally hints us into the right direction:
await()
is implemented on top of LockSupport::park()
,
and the JVM itself emits a JFR event whenever a thread is parked.
How to identify though those "Java Thread Park" events actually triggered by blocking on a queue?
If there only was a way to query and filter JFR events in a structured query language!
Turns out there is.
JFR Analytics lets you do exactly that:
analysing JFR recording files using standard SQL.
I haven’t worked that much on this project over the last year,
but extending it for the use case at hand was easy enough.
By means of the new HAS_MATCHING_FRAME()
function it becomes trivial to identify the relevant events.
JFR Analytics hasn’t been released to Maven Central yet, so you need to check out its source code and build it from source yourself. You then can use the SQLLine command line interface for examining your recordings:
1
2
java --class-path "target/lib/*:target/jfr-analytics-1.0.0-SNAPSHOT.jar" \
sqlline.SqlLine
Then, within the CLI tool, "connect" to a recording file and change the output format to "vertical" for better readability of stack traces:
1
2
sqlline> !connect jdbc:calcite:schemaFactory=org.moditect.jfranalytics.JfrSchemaFactory;schema.file=path/to/lock-recording.jfr dummy dummy
sqlline> !outputformat vertical
If you need a recording file to play with, check out this example project.
It has a very simple main class with two threads:
a producer thread which inserts 20 items per second to a blocking queue, and a consumer thread, which takes those items at a rate of ten items per second.
Once the queue’s capacity has been reached, the producer will regularly block, as it can only insert ten items per second instead of 20.
With JFR Analytics, the affected put()
calls can be identified via the following query:
1
2
3
4
5
6
7
8
SELECT
"startTime",
"duration" / 1000000 AS "duration",
"eventThread",
TRUNCATE_STACKTRACE("stackTrace", 8) as "stack trace"
FROM "jdk.ThreadPark"
WHERE
HAS_MATCHING_FRAME("stackTrace", '.*ArrayBlockingQueue\.put.*');
Et voilà, the query returns exactly those thread park events emitted for any blocked put()
call:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
...
startTime 2023-01-02 18:42:57.594
duration 455
eventThread pool-1-thread-1
stack trace jdk.internal.misc.Unsafe.park(boolean, long)
java.util.concurrent.locks.LockSupport.park():371
java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionNode.block():506
java.util.concurrent.ForkJoinPool.unmanagedBlock(ForkJoinPool$ManagedBlocker):3744
java.util.concurrent.ForkJoinPool.managedBlock(ForkJoinPool$ManagedBlocker):3689
java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject.await():1625
java.util.concurrent.ArrayBlockingQueue.put(Object):370
dev.morling.demos.BlockingQueueExample$1.run():35
startTime 2023-01-02 18:42:58.097
duration 954
eventThread pool-1-thread-1
stack trace jdk.internal.misc.Unsafe.park(boolean, long)
java.util.concurrent.locks.LockSupport.park():371
java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionNode.block():506
java.util.concurrent.ForkJoinPool.unmanagedBlock(ForkJoinPool$ManagedBlocker):3744
java.util.concurrent.ForkJoinPool.managedBlock(ForkJoinPool$ManagedBlocker):3689
java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject.await():1625
java.util.concurrent.ArrayBlockingQueue.put(Object):370
dev.morling.demos.BlockingQueueExample$1.run():35
...
Note how the stack traces are truncated so you can see the immediate caller, in this case the producer thread of the aforementioned example application. One thing to be aware of is that JFR applies a minimum threshold for capturing thread park events: 20 ms with the default configuration and 10 ms with the profile configuration. I.e. you would not know about any calls blocking shorter than that. You can adjust the threshold in your JFR configuration, but be aware of the potential overhead.
Equipped with the information about any blocked invocations of put()
, you now could take appropriate action;
depending on the specific workload and its characteristics,
you might for instance look into tuning your queue consumers,
add more of them (when not in a sequencer scenario as with SQLite above),
or maybe share the load across multiple machines.
You also might increase the size of the queue,
providing more wiggle room to accommodate short load spikes.
Towards Real-Time Analysis of JFR Events
All this happens after the fact though, through offline analysis of JFR recording files. An alternative would be to run this kind of analysis in realtime on live JFR data. The foundation for this is JFR event streaming which provides low-latency access to the JFR events of a running JVM.
Expanding JFR Analytics into this direction is one of my goals for this year: complementing its current pull query capabilities (based on Apache Calcite) with push queries, leveraging Apache Flink as a stream processing engine. That way, blocked queue producers could trigger some kind of alert in a live production environment, for instance raised when the overall duration of blocked calls exceeds a given threshold in a given time window, indicating the need for intervention with a much lower delay than possible with offline analysis.
Taking things even further, streaming queries could even enable predictive analytics;
Flink’s pattern matching capabilities and the MATCH_RECOGNIZE
clause could be used for instance to identify specific sequences of events which indicate that a full garbage collection is going to happen very soon.
This information could be exposed via a health check,
signalling to the load balancer in front of a clustered web application that affected nodes should not receive any more requests for some time,
so as to shield users from long GC-induced response times.
If this sounds interesting to you, please let me know; I’d love to collaborate with the open-source community on this effort.
Many thanks to Richard Startin for his feedback while working on this post!