Apache Kafka - II - Topics and partitions

Topics

In Kafka, a topic is a named feed, or a category of messages. Producers send messages to a specific topic, addressable by its name, and consumers can read messages from that topic, also addressing it by name (all of this in the context of a particular Kafka cluster). This is how a topic is understood as a logical entity: the way it's stored and handled inside the cluster doesn't matter to producers and consumers; they just want to send and read data.

Kafka topics have the following essential characteristics:

  • Order: Kafka topics are a time-ordered sequence of messages. Messages are saved in the topic in the order in which they are received.
  • Immutability: Once a message has been saved to a topic, it cannot be modified, or deleted: topics are append-only structures (because of the time order). If a producer sends invalid data in a message, it will be up to him to notify the consumer and send the corrected data in a new message, and the consumer will have to reconcile this information properly. Of course, Kafka topics don't grow infinitely: expiration policies, both by time and size, can be defined for each topic. When a message has been in the topic longer than the expiration time, it will automatically be deleted by Kafka, in the case of a time-based policy. If a size policy is used, each time the topic grows beyond the maximum configured size, the oldest messages will be deleted until the size is back below the limit. This architectural approach of using time-ordered immutable messages is known as event sourcing. Also, making an analogy with relational databases, Kafka can be defined as publish-subscribe messaging implemented as a distributed commit/transaction log.

So, how does a Kafka cluster manage to service a theoretically unlimited number of heterogeneous consumers, each with its own consumption rate, potential bugs and crashes? Kafka's solution to this problem is called topic offset: each consumer periodically saves a reference to the last message it read in the topic, as if it were a bookmark (which is saved in the cluster). This way, every consumer establishes and maintains its offset, so they can all read the topic at their own pace without disrupting each other, or the producers. This is what is meant when it is said that, in Kafka, producers and consumers are loosely coupled: delays or crashes in a single consumer or producer won't grind the others to a halt. When a consumer connects to a topic for the first time, or wants to start over, it will say so, and his offset will point to the first message in the topic.

Partitions

A Kafka topic is implemented as one or more log files called partitions. How many partitions will comprise a topic is entirely configurable, and will depend on the use case. Each partition is maintained, in its entirety, in one or more brokers (if it's in more than one, it will be replicated: each broker will have a copy of the whole partition). So, if a single partition were to be used, since it has to fit completely inside a broker node, the application is henceforth limited by the broker's resources (especially, disk space). Evidently, in order to scale beyond, multiple partitions will be needed. Conclusion: in Kafka, the scalability of a topic is directly associated with the number of partitions and broker nodes which house them.

The next question is: when there is more than one partition, how are messages distributed among them? First of all, each partition will have its own messages, in their own order. Producers will define a partitioning scheme, which is basically an algorithm for deciding in which partition each message will go. This can be as simple as round-robin, or as complicated as the application needs it to be. In any case, the goal is to keep partitions balanced, i.e. of a similar size. More details on this in part III: Producers.

And how do consumers work when there are multiple partitions? Thanks to Zookeeper, the consumer can know which brokers have which partitions of the topic he is interested in. Then, he can get messages from each broker at his leisure. Since each partition has its own individual ordering, if a global message order must be established across the whole topic, it will be up to the consumer to do so. More details on this in part IV: Consumers.

For example, if topic a has 3 partitions (topic a-0, topic a-1, and topic a-2), and they are assigned to brokers 0, 1 and 2:

Example of a multi-partition topic

When the topic is created, one of the parameters will be the amount of partitions it will have. Zookeeper will look at the available brokers, and decide which will house will partition. Once that assignment has been made, each broker will create a new log file where it will save messages for the partition. In the example, Broker 0 was assigned partition 1, Broker 1 got partition 2, and Broker 2 got partition 0. This mapping will be broadcasted by Zookeeper, so every broker will know which brokers have which partitions. This will allow any broker to redirect a producer who wants to write to a partition to the broker who physically has it. On the side, the brokers periodically report their status to Zookeeper, so that consensus is maintained.

The consumer will ask Zookeeper which brokers own which partition, and gets additional metadata which will affect his consumption behavior (more on this in the upcoming Consumers post). Once the consumer knows where the topic partitions are, it will start actively reading messages from each, using a different offset for each partition.

Partition trade-offs

  • The more partitions, the greater the Zookeeper overhead. This can be mitigated by scaling up the Zookeeper ensemble (i.e., adding ZK nodes to it).
  • Message ordering can become complex: Inside a partition, messages are perfectly ordered, but that is not guaranteed across the partitions which comprise the topic. If a global order in the topic is paramount for the application, having a single partition in the topic can be a viable solution. The alternative is having consumer handle ordering, which scales better but can be more complex to implement.
  • The more partitions, the longer the leader/controller node fail over time. If the cluster controller node crashes, the more partitions, the longer it will take to select the new controller node. A solution to this is to have redundant clusters. This can get expensive fast, so it needs to be justified.

Go to Part III: Producers

Comments

Popular posts from this blog

Upgrading Lodash from 3.x to 4.x

C++/CLI: Trigger events from C++ native code and handle them in Managed code, Part I

Traduciendo un custom control de Windows Forms de VB.NET a C#