Apache Kafka - IV - Consumers

A basic Kafka consumer application, using the same library used for producers in part 3, could be:

public class KafkaConsumerApp{
	public static void main([]String args) {
    	Properties props = new Properties();
        props.put("bootstrap.servers", "BROKER-1:9092, BROKER-2:9093");
        props.put("key.deserializer", "org.apache.common.serialization.StringDeserializer");
        props.put("value.deserializer", "org.apache.common.serialization.StringDeserializer");
        
        KafkaConsumer myConsumer = new KafkaConsumer(props);
        myConsumer.subscribe(Arrays.asList("my-topic"));
        
        try{
        	while (true) {
            	ConsumerRecords<String, String> records = myConsumer.poll(100);
                processRecords(records);
            }
        } catch (Exception ex) {
        	ex.printStackTrace();
        } finally {
        	myConsumer.close();
        }
    }
}

The only difference here is that deserializers are used instead of serializers, and they will have to match the serializers used by the producer. Just like in the producer's case, bootstrap.servers indicates which brokers to try to connect to.

The next step is subscribing to a topic, and this is done passing a list of topic names to the subscribe method. When using subscribe, only the topic name is specified: the consumer will automatically read from all the topic partitions. If a partition is added after subscription, the consumer will start reading from it to automatically.

An alternative to subscribe is assign, which requires a list of (topic,partition) pairs. Providing those, the consumer will always read from the specified combinations of topics and partitions exclusively, even if other partitions are added to the topics later. This can be useful if more control over consumption is desired.

In both cases, topics and partition cannot be added incrementally: each call to subscribe or assign clears any existing subscriptions and overrides them with the new ones. Another thing to note is that, although there is an unsubscribe method, it takes no arguments, hence it is not possible to unsubscribe from a specific topic/partition while staying subscribed to others: it's all or nothing.

Now, considering the while loop: Kafka has a pull model, which means that consumers need to actively check for new messages periodically. This sounds inefficient, especially if it's been hammered into our heads that polling is evil and should be avoided. However, we'll see that Kafka is smart enough about it to remain efficient. More details on the poll() method later, but so far, all we need to know is that its argument is the time, in milliseconds, to wait for messages to accumulate before reading a batch of them and returning. Notice that this is another instance of Kafka's micro batching approach in action: a lot of network overhead is avoided by reading messages in batches.

Consumer polling in detail

It could be expected that after calling subscribe(), the consumer would start consuming messages, but that is not the case. That will only start happening after calling poll() for the first time. There are a lot of interactions between the consumer and brokers beyond simple message retrieval, as will be seen shortly.

First, when the subscribe/assign method is called, the SubscriptionState object is initialized with the given topics/partitions to follow. This objects collaborates with the ConsumerCoordinator in managing the offsets (more on this later).

Then, when poll() is called for the first time, the consumer uses its configurations settings, especially bootstrap.servers, to connect to the Kafka cluster and request its metadata, which it will save in its internal Metadata object. This request for metadata, and most interactions with the cluster are managed via the Fetcher object. However, the Fetcher doesn't communicate directly with the cluster; that is the job of the Consumer Network Client; the Fetcher is just a higher level coordinator.

When connection is first established with the cluster, the consumer will start sending heartbeat packets to the cluster, so that it knows which consumers are active. Whenever the fetcher updates the metadata, the ConsumerCoordinator will be notified and act accordingly, notifying the SubscriptionState to do the same. This can be triggered, for example, by a new partition being added to a topic the consumer is subscribed to. The ConsumerCoordinator is also responsible for committing offsets to the cluster as records are being read.

When the poll timeout expires, the Consumer Network Client stops polling the brokers for messages and sends the message batch it built to the fetcher, who saves it in an internal buffer. The fetcher performs deserialization, according to the consumer settings, and returns the parsed records to the context which called poll().

A very important fact is that Kafka consumers are single-threaded: There should be just a single poll loop per consumer. Even though a slow consumer does not affect the Kafka cluster at all, record processing should be as efficient as possible, to keep up with the producers. However, having a single thread could prove very limiting, especially as topic and partitions grow in number. The way Kafka provides scalability on the consumer side is Consumer Groups (more on those later).

Offsets

As previously mentioned, a Kafka offset is used by a consumer to track the last message read from a partition (there is one offset per partition). The consumer offset works as a bookmark, being persisted periodically in the cluster's brokers, in a special topic called __consumer_offsets. The ConsumerCoordinator object inside the KafkaConsumer is the one who acts as a producer to this topic, therefore persisting the offsets. There are 3 types of offset:

  1. Last committed offset: Corresponds to the last message the consumer has confirmed as processed to the Kafka cluster. This is saved in the brokers.
  2. Current position: Points to the last message processed by the consumer. This is tracked by the consumer.
  3. Log-end offset: Points to the last message in the partition. If the consumer reaches this offset, it is said to be caught up with the partition.

The gap between the current position and the last committed offset corresponds to uncommitted offsets: how big this can be allowed to get will depend on the application.

There are configuration properties which control how Kafka handles offsets. These are optional because their defaults are sufficient for getting started.

  • enable.auto.commit: By default, this is set to true. In that case, Kafka takes care of updating the last committed offset. How frequently that is done is controlled by the next option.
  • auto.commit.interval: By default, set to 5 seconds.
  • auto.offset.reset: This tells Kafka what to do with the offset when a consumer loses connection with the cluster and then re-connects. It has 3 modes:
    • latest (default): start in the latest committed offset.
    • earliest: go back to the first committed offset, or the partition beginning if none was saved.
    • none: throw an exception and let the consumer dynamically decide.

Due to the offset gaps, it can be said that a system which uses Kafka is eventually consistent. However, it needs to be stressed that the degree to which a system can tolerate eventual consistency depends on its reliability. This means that extra care should be taken with handling errors in the consumer application, especially in the processing stage. That code path needs to be as robust as possible.

If enable.auto.commit is set to false, the consumer is now responsible for committing the offsets. It can do so by calling the commitSync or commitAsync methods. For the former, the call will block until the cluster confirms that the offset has been saved; therefore, it is recommended to only call this method after processing a batch of messages, i.e. as the last action inside the poll loop. If there is an error, and it is not a fatal one, commitSync will automatically retry, using the retry.backoff.ms option to decide how long to wait before retrying.

Committing synchronously is better for ensuring consistency, but it does so at the cost of latency: while the poll thread is blocked waiting for confirmation, it wastes precious processing time. If latency is more important, commitAsync should be used. In this mode, asides from not blocking, there is no automatic retry if the commit fails; instead, a callback argument is available, so that the consumer gets the commit result and can decide what to do.

See Part V: Consumer Groups

Comments

Popular posts from this blog

VB.NET: Raise base class events from a derived class

Apache Kafka - I - High level architecture and concepts

Upgrading Lodash from 3.x to 4.x