Kafka slow consuming and messages duplication

Kafka is super fast. One design tradeoff to achieve very high through put is to put more responsibilities on client side. The underlying concept of messages fetching in Kafka client API is simple enough. But there are many behavior of the API that could be well understood only by spending some time studying how it is designed and how it is supposed to be used.

One critical thing in Kafka consumer API is to recognize the important of consumer.poll() method. The method is not just for fetching new messages. The API does a lot of network and internal state change operation as part of the method. We don’t really know whether a client is in valid state until we call poll() again and let the method trigger internal state check.

This post will demonstrate a scenario of the API behavior which could cause messages duplication. The scenario is quite common and should be well known for developers.

  1. consumer fetches a batch of 500 records.
  2. 100 messages are processed and consumer has commit the offsets
  3. Next 100 messages are processed and consumer has commit the offsets
  4. Next 100 messages are processed.
  5. The consume somehow gets stuck in a long operation (simulated using long sleep in testing program )
  6. The API mechanism (in this case, the heart-beat thread) will eventually kick the consumer out of consumer group. Now the consumer client is in bad state but it doesn’t affect anything in polling thread yet. The main actor in the API is the polling thread. Client will learn about the new state and take action only when the thread goes back and call poll() gain.
  7. The consumer then wakes up from long pause and tries to commit the offset of 100 messages it just processed. The commit will result in exception since it is out of the group so it doesn’t have ownership over the partitions anymore.
  8. In this example code, the consumer decides to catch the exception and try to continue by calling the poll() method again.
  9. At the point the internal state check has been triggered and the client knows that it needs to re-join the consumer group. The underlying mechanism will eventually bring the consumer back to valid state.
  10. The client starts consuming messages again. The last known success offset is of point 3 so the consumer ends up re-process 100 messages again.

Here is the testing code. I used RocksDb to detect duplication.

consumer.subscribe(listOf(MESSAGES_TOPIC))
val offsetMap = mutableMapOf<TopicPartition, OffsetAndMetadata>()

while (true) {
    logger.debug { "Poll next batch of records." }
    val records = consumer.poll(Duration.ofSeconds(2))
    logger.debug { "Polled ${records.count()} records." }

    if (records.isEmpty) continue

    val count = AtomicInteger(0)
    try {
        //process and commit every 100 messages
        records.forEach { rec ->
            val keyBytes = rec.key().toByteArray()
            when (db.get(keyBytes)) {
                null -> db.put(keyBytes, keyBytes)
                else -> db.incrementDuplicateCount()
            }

            //Commit the offset of next message to be fetched
            offsetMap[rec.topicPartition()] =
                    OffsetAndMetadata(rec.offset() + 1)
            if (count.incrementAndGet() % 100 == 0) {
                logger.debug {
                    "Processed 100 messages. " +
                    "Current duplicate count: ${db.getDuplicateCount()}. "
                }

                if (count.get() % 300 == 0) simulateTooLongProcessing()

                logger.debug { "Manually commit offsets." }
                // Commit sync is slow. I use it here just to see if it throw error
                // when the consumer is already out of consumer group.
                consumer.commitSync(offsetMap)
            }
        }

    } catch (ex: CommitFailedException) {
        logger.error { ex.message }
    }
}

Lets see the logs from the testing program.

2020-02-15 17:34:55.650  Poll next batch of records.
2020-02-15 17:34:56.014  Polled 500 records.
2020-02-15 17:34:56.023  Processed 100 messages. Current duplicate count: 0. 
2020-02-15 17:34:56.023  Manually commit offsets.
2020-02-15 17:34:56.041  Processed 100 messages. Current duplicate count: 0. 
2020-02-15 17:34:56.041  Manually commit offsets.
2020-02-15 17:34:56.061  Processed 100 messages. Current duplicate count: 0. 
2020-02-15 17:34:56.062  Simulate too long processing time.

After max.poll.interval.ms of 10 seconds, the heartbeat thread has considered this client as not-alive and left consumer group.

2020-02-15 17:35:06.040  [Consumer clientId=consumer-1, groupId=test-consumer-4] This member will leave the group because consumer poll timeout has expired. This means the time between subsequent calls to poll() was longer than the configured 
 max.poll.interval.ms, which typically implies that the poll loop is spending too much time processing messages.

2020-02-15 17:35:06.041  [Consumer clientId=consumer-1, groupId=test-consumer-4] Member consumer-1-0e3b4495-229e-4886-9155-96f0dd457ec3 sending LeaveGroup request to coordinator kafka105z.rsc.local:9092 (id: 2147483642 rack: null)

The polling thread woke up from long pause and try to commit offset. The thread didn’t know that it wasn’t in consume group anymore. The consumer then caught the commit-fail exception and try to move on.

2020-02-15 17:35:26.062 Manually commit offsets.
2020-02-15 17:35:26.063 Commit cannot be completed since the group has already rebalanced and assigned the partitions to another member. 

The consumer called poll() again. At this point that the internal states were updated and the mechanism brought the client back to its valid state again.

2020-02-15 17:35:26.064 Poll next batch of records.

2020-02-15 17:35:26.064 Revoking previously assigned partitions  [test-message-topic-2, test-message-topic-3, test-message-topic-0, test-message-topic-1]
2020-02-15 17:35:26.064 (Re-)joining group
2020-02-15 17:35:26.084 Successfully joined group with generation 3
2020-02-15 17:35:26.085 Setting newly assigned partitions: test-message-topic-2, test-message-topic-3, test-message-topic-0, test-message-topic-1
2020-02-15 17:35:26.097 Resetting offset for partition test-message-topic-1 to offset 0.
2020-02-15 17:35:26.102 Resetting offset for partition test-message-topic-3 to offset 0.
2020-02-15 17:35:26.102 Resetting offset for partition test-message-topic-0 to offset 0.

The consumer started fetching messages again after re-balance. The consumer fetched the already processed 100 messages gain since the consumer failed to commit offsets of the latest 100 messages.

2020-02-15 17:35:26.114 Polled 315 records.
2020-02-15 17:35:26.123 Processed 100 messages. Current duplicate count: 100. 
2020-02-15 17:35:26.123 Manually commit offsets.
2020-02-15 17:35:26.138 Processed 100 messages. Current duplicate count: 100. 
2020-02-15 17:35:26.138 Manually commit offsets.

You may find the code above here.