Kafka Consumer 0.10.1 has introduced a background thread for sending heartbeat instead of relying on user application thread to keep polling regularly like in the earlier versions.
The normal pattern of Kafka consumer looks like the code below
while (!shuttingDown() ) { ConsumerRecords<byte[], byte[]> records = kafkaConsumer.poll(pollTimeout); doSomethingWithMessages(records); }
Before 0.10.1, heart beat is sent as part of polling for new message. This is by-design mechanism to prevent livelock situation that process is live but the application is actually stuck. If client thread could come back from messages processing and do polling for more then the application is surely still able to make progress. If client thread is stuck somewhere and stops polling then client will not sending heartbeat to server. The consumer will get kicked out of consumer group when it miss sending heartbeat longer then “session.timeout.ms”.
This design is proved to be quite tricky to tune. Increasing session timeout will allow more time for messages processing but the consumer group will also took more time to detect failure like process crash. Kafka consumer 0.10.1 has introduced “max.poll.interval.ms”to decouple between processing timeout and session timeout.
The max.poll.interval.ms is the upper bound of time the client is allow to spent in message processing. If it is set to 2 minutes then client could go on with its message processing within this 2 minutes without the need to go back and call polling method. The client is still considered alive since there is a background thread (introduced in 0.10.1) that keep sending heartbeat to server regularly.
To prevent the livelock issue, the heartbeat thread also keep track of the last time application thread actually call polling method for more message. If the time since last poll is longer than 2 minutes (max.poll.interval.ms) then the consumer reached the upper bound of processing time. Something might go wrong with this consumer e.g. the thread might stuck somewhere. The heartbeat thread will stop sending heartbeat and send an explicit LeaveGroup request.
I have added some custom logs to Kafka client code to demonstrate the mechanism. The log below show max.poll.interval.ms of 1 minute and session.timeout.ms of 10 seconds with client that sleep for 3 minutes after polling the first batch of records.
Application log (with custom Kafka client log)
client2-main-thread 11:16:20.076 – Partition Revoked
client2-main-thread 11:16:21.887 – New assignment : 7 partitions
heartbeat-thread 11:16:24.887 – Send heartbeat, last-poll : 11:16:24.887
heartbeat-thread 11:16:27.889 – Send heartbeat, last-poll : 11:16:27.888
heartbeat-thread 11:16:30.889 – Send heartbeat, last-poll : 11:16:30.889
heartbeat-thread 11:16:33.893 – Send heartbeat, last-poll : 11:16:33.893
client2-main-thread 11:16:36.355 – Consumed 500 records
Client is stuck in thread sleep at this point. Notice that background thread is still sending heartbeat.
heartbeat-thread 11:16:36.902 – Send heartbeat, last-poll : 11:16:36.294
heartbeat-thread 11:16:39.909 – Send heartbeat, last-poll : 11:16:36.294
heartbeat-thread 11:16:43.010 – Send heartbeat, last-poll : 11:16:36.294
heartbeat-thread 11:16:46.108 – Send heartbeat, last-poll : 11:16:36.294
…
…
heartbeat-thread 11:17:26.262 – Send heartbeat, last-poll : 11:16:36.294
heartbeat-thread 11:17:29.356 – Send heartbeat, last-poll : 11:16:36.294
heartbeat-thread 11:17:32.433 – Send heartbeat, last-poll : 11:16:36.294
heartbeat-thread 11:17:35.517 – Send heartbeat, last-poll : 11:16:36.294
heartbeat-thread 11:17:36.347 – the poll timeout has expired, which means that the foreground thread has stalled in between calls to poll(), so we explicitly leave the group.
Consumer is now left the group. Consumer group will start rebalancing. If consumer happen to just have delay in the processing, the application thread will eventually do polling again and trigger group joining request.
This new mechanism enable consumer to have long processing time but still react timely on process crash. The example above could have 2 minutes processing time but still able to detect process crash within 10 seconds after it lost heartbeat.
The client for this example is shown below.
... config.setProperty(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, "10000"); config.setProperty(ConsumerConfig.MAX_POLL_INTERVAL_MS_CONFIG, "60000"); KafkaConsumer<byte[], byte[]> con = new KafkaConsumer<byte[], byte[]>(config); con.subscribe(Collections.singleton("SomeMessageTopic"), new ConsumerRebalanceListener(){ @Override public void onPartitionsRevoked(Collection<TopicPartition> partitions) { log("Partition Revoked"); } @Override public void onPartitionsAssigned(Collection<TopicPartition> partitions) { log("New assignment : " + partitions.size() + " partitions"); } }); long pollTimeout = 100; long sleep = TimeUnit.MILLISECONDS.convert(3, TimeUnit.MINUTES); while (true) { ConsumerRecords<byte[], byte[]> records = con.poll(pollTimeout); if(records.count() == 0){ continue; } log("Consumed "+ records.count() + " records"); Thread.sleep(sleep); }