Declarative Style in Reactive Programming – Take 3.2

This is the second part of my story about a bug I have created due to the lack of understanding on Spring reactor operators. I have described the basic of the operators in part1.

This post will just show the simplified version of the real code. I have preserved all important components in the code to show you how this issue could manifest itself in real world usage.

What the code is trying to achive

There is a Kafka topic; let’s say COMMAND_TOPIC storing messages that represent command to create-user and adding subscription (news-letter) to users. The system will continuously drain messages from the topic and make REST API call to another system according to the type of message.

The order of messages in the topic

kafka-partition-1:
create(user:1)
subscribe(user:1,sub:1)
create(user:2)
create(user:3)
subscribe(user:1,sub:2)
subscribe(user:2,sub:1)
subscribe(user:2,sub:2)
subscribe(user:3,sub:1)

You can see that messages from different users could be interleaved. But messages from same user will always be in order.

  • create(user:2) will appear in the Kafka partition before subscribe (user:2,sub:1).
  • create(user:2) must be processed before subscribe (user:2,sub:1). Calling subscribe for a user that hasn’t been created yet on remote system will result in error.

The problematic message consumer

This Kafka consumer will drain messages and translate them to remote REST API call to another system.

private fun pollLoop() {
        while (!isStopped.get()) {
            val records = consumer.poll(Duration.ofSeconds(2))
            logger.debug { "Poll ${records.count()}" + 
            "messages from topic: $COMMAND_TOPIC" }

            if (records.count() != 0) {
                buggyMessageProcessing(records)
            }
        }
    }

Let’s look at the message processing method that cause the issue.

private fun buggyMessageProcessing(
            records: ConsumerRecords<String, String>
    ) {
        records
            .toFlux()
            //flatMap will eagerly trigger inner operator
            .flatMap { record ->
                record
                    .toMono()
                    .map {mapper.readValue<Message>(record.value()) }
                    .flatMap(::callRemoteWebEndpoint)
            }
            .blockLast()
    }

As I have described in the first part of this story that flatMap will eagerly subscribe to inner operator. It doesn’t wait for current message to be processed before trigger the process for next message. If callRemoteWebEndpoint() happen to do message processing in parallel ( e.g. using thread pool ) then the message ordering will not be preserved.

It just happen that callRemoteWebEndpoint() use Spring WebClient which internally use a number of worker-threads to send HTTP request.

private fun createUser(userId: UUID, userDetail: UserDetail): Mono<String> {
    return webClient
        .post()
        .uri("/create-user/$userId")
        .body(BodyInserters.fromValue(userDetail))
        .exchange()
        .transform(::processResponse)
}

This cause error as we can see below.

ERROR Thread [ctor-http-nio-4] Could not add subscription: 1. User eadcd067-bd22-4667-aec4-50dab3817459 not exist
ERROR Thread [ctor-http-nio-1] Could not add subscription: 1. User e559bfff-6b8d-4aa7-b834-9e2a93f8fdd3 not exist
ERROR Thread [ctor-http-nio-1] Could not add subscription: 3. User a82934e3-a89f-4c5a-ba7e-9b3630366250 not exist
ERROR Thread [ctor-http-nio-3] Could not add subscription: 1. User a82934e3-a89f-4c5a-ba7e-9b3630366250 not exist

Fixing the issue

Process all messages in sequential

Using concatMap() will make the system process all messages one-by-one. It could fix the problem but it will be slow and it doesn’t quite make sense since messages for a user could be processed independently from messages of other users.

 private fun sequentialProcessing(records: ConsumerRecords<String, String>) {
        records
            .toFlux()
            .concatMap { record ->
                record
                    .toMono()
                    .map { mapper.readValue<Message>(record.value()) }
                    .flatMap(::callRemoteWebEndpoint)
            }
            .blockLast()
    }

Group messages for each users

The better solution is to group messages using userId. Use flatMap() to process each group parallely. But process each message in a group sequentially using concatMap()

private fun groupMessagesPerUser(records: ConsumerRecords<String, String>) {
        records
            .toFlux()
            .map { record -> mapper.readValue<Message>(record.value()) }
            .groupBy { record -> record.userId }
            //Use flatMap() to parallely process group of messages.
            .flatMap { messagesPerUser: GroupedFlux<UUID, Message> ->
                messagesPerUser
                    //Use concatMap() to sequentially process each message in same group.
                    .concatMap(::callRemoteWebEndpoint)
            }
            .blockLast()
    }

You may find all the code above here