Many years ago ( 2010 ), I have blogged about a time I experienced a deadlock issue in my project. I have revisited the blog again when I was moving content to new hosting. I felt like the post was too long and not focus. I decided to re-write in here to make it more concise and easy to read. The main characteristic of concurrency issue in this new post is skill the same but I will change the code to Kotlin (just because I like it). You can find the whole project code here.
The classes below are similar to the real code in my project with a lot of detail has been simplified.
- Update Feed : information feed providing new update to client
- AsyncTransport : let client subscribe interest in particular subject of the feed.
Diagram below show the outline of execution flow. Please don’t mind the SyncRequestor for now. We will talk about it later.
Async transport maintains a list of subscriptions. There is a background thread in this class that keep polling new update form feed and execute subscription call back if the update is meant for the subscription.
The interesting part is that the code use Java Vector to keep subscriptions. That means adding and removing item is thread-safe (Vector is thread-safe data structure). But the code still need to put additional synchronized block while iterating over subscriptions to prevent ConcurrentModificationException as you can see in the pollUpdate() method.
class AsyncTransport(private val feed: Feed) { private val stopped = AtomicBoolean() private val dispatcherThread = Thread(::pollUpdateForSubscriptions, "UpdateDispatcherThread") // The code in real case is using Java Vector. // I preserve it here since it is part of concurrency characteristic in the code. private val subscriptions = Vector<Subscription>() ... private fun pollUpdate() { feed.getUpdate()?.let { update -> //Block any subscriptionIds modification while iterating synchronized(subscriptions) { subscriptions.forEach { sub -> if (update.forSubscription(sub)) { sub.callback(update.data) } } } } } fun subscribe(subjectExpression: String, callback: (data: String) -> Unit): Subscription { return Subscription(UUID.randomUUID().toString(), callback) .also { sub -> subscriptions.add(sub) feed.subscribe(sub.subId, subjectExpression) } } fun unsubscribe(subscription: Subscription) { subscriptions.remove(subscription) feed.unSubscribe(subscription.subId) } ... }
The use case for this transport is easy. Client will subscribe for some pre-defined topic/subjects with a callback. The transport will keep polling new update from feed and pass the update to client callback.
... val subscription = transport.subscribe("subject:news"){ updateData -> println("New Update: $updateData") } Thread.sleep(10 * 1000) //wait to get update for 10 seconds transport.unsubscribe(subscription) transport.stop()
SyncRequester
Another functionality of this API is to provide synchronous call. Client will make a call with special subject type that let feed server know that this is synchronous request and client only expect one update which is response of the call. Note that we are testing it with mock feed so the format of this subject is not matter.
The tricky thing is that this functionality is implemented on top of the AsyncTransport. This will of course required some threading coordination and that is where I found the deadlock issue. The implementation looks like below.
class SyncRequester(private val transport: AsyncTransport) { private val responseData = AtomicReference<String?>() private val lock = Object() fun request(subjectExpression: String, timeout: Long): String? { val subscription = transport.subscribe(subjectExpression, ::onUpdate) //wait for notification of response val until = nowMillis() + timeout synchronized(lock) { while (responseData.get() == null && nowMillis() < until) { //spurious wake up could happen before timeout val remainingTimeout = until - nowMillis() lock.wait(remainingTimeout) } //Should unsubscribe regardless of getting data or not. transport.unsubscribe(subscription) } return responseData.get() } private fun onUpdate(data: String) { if (responseData.get() == null) { responseData.set(data) synchronized(lock) { lock.notify() //wake up the request method } } } }
Deadlock could happen when multiple threads try to get more than one locks in different order. The problematic part in the above code is in the synchronized block. The requestor thread is trying to lock on the lock object. Once the tread is in the synchronization block and lock.wait() reach timeout, it then try to call transport.unsubscribe(subscription) which will try to get lock on subscription vector instance to remove the subscription.
// SyncRequester synchronized(lock) { ... lock.wait(remainingTimeout) ... //Should unsubscribe regardless of getting data or not. transport.unsubscribe(subscription) }
On another end, we have update dispatcher thread which will periodically lock on subscription vector instance to send update to each callback.
//AsyncTransport update dispatching synchronized(subscriptions) { subscriptions.forEach { sub -> if (update.forSubscription(sub)) { sub.callback(update.data) } } }
Deadlock
The deadlock sequence could be like the following.
- SyncRequester successfully get lock on its internal lock object.
- AsyncTransport successfully get lock on its subscription vector instance
- SyncRequester wait on lock.wait() and reach timeout.
- SyncRequester will then try to call transport.unsubscribe(subscription) which will call subscriptions.remove(subscription) . The call could not be done since AsyncTransport still holding the lock for subscription vector instance
- At this same bad luck time AsyncTransport is executing callback of this exact SyncRequester. The callback happen to need to get lock on SyncRequester lock object.
private fun onUpdate(data: String) { ... synchronized(lock) { lock.notify() //wake up the request method } }
- Now we got deadlock since neither of SyncRequester nor AsyncTransport could make process.
If we use Jstack command to printout stack trace of the application, the tool will also able to detect deadlock. Example of the output is here.
You could see the scenarios I have described above in the stack trace. Notice the different order both treads are trying to get <0x000000076b2b07a0> which is SyncRequester lock object and <0x000000076acf8a30> which is the AsyncTransport subscription vector.
Java stack information for the threads listed above: =================================================== "UpdateDispatcherThread": at cc.home.SyncRequester.onUpdate() - waiting to lock <0x000000076b2b07a0> (a java.lang.Object) at cc.home.SyncRequester.access$onUpdate( ) ... at cc.home.AsyncTransport.pollUpdate() - locked <0x000000076acf8a30> (a java.util.Vector) at cc.home.AsyncTransport.pollUpdateForSubscriptions() at cc.home.AsyncTransport.access$pollUpdateForSubscriptions() ... "pool-1-thread-3": at java.util.Vector.removeElement(Vector.java:643) - waiting to lock <0x000000076acf8a30> (a java.util.Vector) at java.util.Vector.remove(Vector.java:802) at cc.home.AsyncTransport.unsubscribe(AsyncTransport.kt:45) at cc.home.SyncRequester.request(SyncRequester.kt:26) - locked <0x000000076b2b07a0> (a java.lang.Object) at cc.home.MainKt$deadlockWithSyncSyncRequester$1.run(Main.kt:48) at java.util.concurrent.Executors$RunnableAdapter.call() .... Found 1 deadlock.
The easiest way to fix this is try not to get too many lock at the same time. The line transport.unsubscribe(subscription) is actually doesn’t need to be executed in the synchronized block. Moving it out of the block will make SyncRequester release its inter lock and let AsyncTransport able to execute call back. Then both threads could progress nicely.