Declarative Style in Reactive Programming – Take 3.1

Just experienced a bug in my code that related to how I composed reactive publisher. The underlying issue is not that complicated but the way it manifest itself in reactive style is quite interesting.

I will share the details in 2 blog posts. This post is the first part explaining basic of the building block related to the issue. The next post will show the how the issue appear in real use case.

Asynchronous Execution

To be frank, I am not really sure what reactive really means. For me , it just practically means the system/ framework that compose every action to be executed asynchronously.

You have a logic that you want to execute but you don’t want to run in immediately and wait for result since it might block your thread. Instead , you wrap the logic in some kind of execution container (Mono, Flux or Publisher in Spring Reactor in general term). Multiple containers could be wired together to form execution chain. There will be underlying threading mechanism to actually execute what you have defined later.

It looks as if the reactive framework allow you to compose your logic asynchronously without the need to know much about underlying threading model. And it is quite true in a lot of case. You should not worry which thread execute which Mono in your logic chain as long as it get executed in correct execution path you defined.

It is trickier that you think

Composing your logic in reactive way to be executed in the correct path is trickier than you think. Let’s look at the code below.

Plain text
Copy to clipboard
Open code in new window
EnlighterJS 3 Syntax Highlighter
fun main() {
val messages = Flux.just(
Message(guid = 1),
Message(guid = 2),
Message(guid = 3)
)
messages
.flatMap { msg -> processInParallel(msg) }
.map { println("Thread(${Thread.currentThread().name}) - Do next step $it") }
.blockLast()
}
fun processInParallel(msg: Message): Mono<Message> {
return Mono
.fromRunnable<Void> {
println("Thread(${Thread.currentThread().name}) - Start processing $msg")
Thread.sleep((Math.random() * 5000).toLong())
println("Thread(${Thread.currentThread().name}) - End processing $msg")
}
.subscribeOn(Schedulers.parallel()) // Process task in thread pool
.thenReturn(msg)
}
fun main() { val messages = Flux.just( Message(guid = 1), Message(guid = 2), Message(guid = 3) ) messages .flatMap { msg -> processInParallel(msg) } .map { println("Thread(${Thread.currentThread().name}) - Do next step $it") } .blockLast() } fun processInParallel(msg: Message): Mono<Message> { return Mono .fromRunnable<Void> { println("Thread(${Thread.currentThread().name}) - Start processing $msg") Thread.sleep((Math.random() * 5000).toLong()) println("Thread(${Thread.currentThread().name}) - End processing $msg") } .subscribeOn(Schedulers.parallel()) // Process task in thread pool .thenReturn(msg) }
fun main() {
    val messages = Flux.just(
        Message(guid = 1),
        Message(guid = 2),
        Message(guid = 3)
    )

    messages
        .flatMap { msg -> processInParallel(msg) }
        .map { println("Thread(${Thread.currentThread().name}) - Do next step $it") }
        .blockLast()
}

fun processInParallel(msg: Message): Mono<Message> {
    return Mono
        .fromRunnable<Void> {
            println("Thread(${Thread.currentThread().name}) - Start processing $msg")
            Thread.sleep((Math.random() * 5000).toLong())
            println("Thread(${Thread.currentThread().name}) - End processing $msg")
        }
        .subscribeOn(Schedulers.parallel()) // Process task in thread pool
        .thenReturn(msg)
}

Don’t worry if you are not familiar with Spring reactor. The above code is practically means below:

  • Flux is just a list of item. But it only allow you to apply logic to each item in reactive( asynchronous) way.
  • message.flatMap().map().blockLast() means for each item in flux, wire it to next reactive chain in method processInParallel() using flatMap. Then call next step defined in map() method for each output. And also wait for all item in the flux in finish execute.
  • You can think of processInParallel() as a method to process each message in thread pool.

Will this work correctly if you expect your message to be processed in order 1->2->3?

The chain message.flatMap().map().blockLast() doesn’t say anything about parallel processing. If you don’t look the the method called in flatMap() then you might think that it is just like java stream and each item should be executed in the same order with source list.

Here is the output

Notice that each message is processed parallely in different thread.
Thread(parallel-1) - Start processing Message(guid=1)
Thread(parallel-2) - Start processing Message(guid=2)
Thread(parallel-3) - Start processing Message(guid=3)

The result from different thread is passed on to next step in different order.
Thread(parallel-1) - End processing Message(guid=1)
Thread(parallel-1) - Do next step Message(guid=1)
Thread(parallel-3) - End processing Message(guid=3)
Thread(parallel-3) - Do next step Message(guid=3)
Thread(parallel-2) - End processing Message(guid=2)
Thread(parallel-2) - Do next step Message(guid=2)

FlatMapSequential

We could use flatMapSequential() if we want each message to appear to the next step in correct order.

Plain text
Copy to clipboard
Open code in new window
EnlighterJS 3 Syntax Highlighter
messages
.flatMapSequential { msg -> processInParallel(msg) }
.map { println("Thread(${Thread.currentThread().name}) - Do next step $it") }
.blockLast()
messages .flatMapSequential { msg -> processInParallel(msg) } .map { println("Thread(${Thread.currentThread().name}) - Do next step $it") } .blockLast()
messages
        .flatMapSequential { msg -> processInParallel(msg) }
        .map { println("Thread(${Thread.currentThread().name}) - Do next step $it") }
        .blockLast()

Notice that each message is still processed parallely. It just that the flatMapSequential() will arrange the output to be in the same order of the original source messages. Event though each message produce output in different order.

//each message is still processed parallely
Thread(parallel-1) - Start processing Message(guid=1)
Thread(parallel-2) - Start processing Message(guid=2)
Thread(parallel-3) - Start processing Message(guid=3)
Thread(parallel-2) - End processing Message(guid=2)
Thread(parallel-1) - End processing Message(guid=1)

//But output to next step is arranged in correct order
Thread(parallel-1) - Do next step Message(guid=1)
Thread(parallel-1) - Do next step Message(guid=2)
Thread(parallel-3) - End processing Message(guid=3)
Thread(parallel-3) - Do next step Message(guid=3)

ConcatMap

What we actually want is to actually execute messages in sequential order. Message 2 should not be executed if message 1 hasn’t produce output yet. The concatMap is the one we want.

Plain text
Copy to clipboard
Open code in new window
EnlighterJS 3 Syntax Highlighter
messages
.concatMap { msg -> processInParallel(msg) }
.map { println("Thread(${Thread.currentThread().name}) - Do next step $it") }
.blockLast()
messages .concatMap { msg -> processInParallel(msg) } .map { println("Thread(${Thread.currentThread().name}) - Do next step $it") } .blockLast()
messages
        .concatMap { msg -> processInParallel(msg) }
        .map { println("Thread(${Thread.currentThread().name}) - Do next step $it") }
        .blockLast()
//Now each message is processed in sequential
Thread(parallel-1) - Start processing Message(guid=1)
Thread(parallel-1) - End processing Message(guid=1)
Thread(parallel-1) - Do next step Message(guid=1)

Thread(parallel-2) - Start processing Message(guid=2)
Thread(parallel-2) - End processing Message(guid=2)
Thread(parallel-2) - Do next step Message(guid=2)

Thread(parallel-3) - Start processing Message(guid=3)
Thread(parallel-3) - End processing Message(guid=3)
Thread(parallel-3) - Do next step Message(guid=3)

The bug I have experienced is that I call flatMap on each message which asynchronously make remote call. The messages has ordering implication and I ended up calling remote endpoint in wrong order.

Reactive is practically means asynchronous. Each node of logic in the chain may get executed in different thread. We still need to be aware and be careful on threading model to correctly compose the logic chain.

You may find the example code here.