Apache Kafka - III - Producers

All code examples in this post will be in Java, for simplicity; however, keep in mind that Kafka offers client libraries in a myriad of languages. More specifically, in a Maven project, the Kafka dependency can be defined by:

  • groupId: org.apache.kafka
  • artifactId: kafka-clients
  • version: 0.10.0.1 (or newer if available)

So, to create a producer, some basic properties are required (although there are many, many more optional properties to tweak):

Properties props = new Properties();
props.put("bootstrap.servers", "BROKER-1:9092, BROKER-2:9093");
props.put("key.serializer", "org.apache.common.serialization.StringSerializer");
props.put("value.serializer", "org.apache.common.serialization.StringSerializer");

The bootstrap.servers property defines a list of brokers in the cluster the producer can connect to. It doesn't need to be a full list of the cluster's brokers, since any broker can redirect the producer to any other. However, it is good practice to put more than one here, in case any of them fails. The producer will connect to the first available one. Also, it is assumed that producers and brokers are in the same network, and names such as "BROKER-1:9092" univocally identify a broker in the network, including the port where it listens for connections, via some name resolution setup.

The serializers define how to convert the messages to binary. Serialization is needed to make messages as small as possible, to reduce the overhead on the network. There are several options besides the one used here (StringSerializer), each with their pros and cons. A very popular choice is Avro. When defining this encoding, a contract is established: the consumer will need to know this in order to decode the messages. About key and value, we'll explain these later.

The next step is to send a message. The class which represents them is ProducerRecord. Its fields are:

  • Topic (mandatory): The name of the topic where the message will be sent.
  • Partition (optional): A producer can optionally define which topic partition to write on. More on this later.
  • Timestamp (optional): If brokers are configured to set timestamp with CreateTime, the message will be saved in the topic with this value, which is set by the producer (this is the default). The other option is LogAppendTime, and in that case the timestamp will be set by the broker and it will correspond to the time in which the message was effectively persisted to a topic partition inside a broker. 
  • Key (optional): If present, and using multiple partitions, this value will be crucial in defining which partition the message will go to. This is called partitioning strategy. Even if this is an optional field, it is considered best practice to always define it. The only argument against defining the key would be the overhead of having it in the message, but in the general case, the advantages of having a key far overweigh this.
  • Value (mandatory): Message payload. It needs to match the value serializer defined. If it doesn't, a SerializationException will be thrown when trying to send the message.

So, going back to the code, a very basic producer implementation could be:

public class KafkaProducerApp{
	public static void main(String[] args){
    	/* props settings from previous block code */
        KafkaProducer myProducer = new KafkaProducer(props);
        ProducerRecors myRecord = new ProducerRecord(
        	"my_topic", "Course-001", "My Message 1"
        );
        
        try{
        	myProducer.send(myRecord);
        } catch (Exception ex) {
        	ex.printStackTrace();
        } finally {
        	myProducer.close(); // Always close gracefully
        }
    }
}

So, what happens when the message is sent? First, the producer tries to connect to the first available broker defined in the bootstrap.servers property. When it does, it will get metadata detailing which brokers own which partition for the topic of interest. Throughout the producer's lifetime, the metadata will be periodically updated.

Next, the message will be serialized, and then the Partitioner, an internal class used by KafkaProducer, will decide which partition to send the message to.

Partition strategies

  1. Direct: If a partition was specified in the ProducerRecord, and it is a valid partition number for the given topic (the producer will check this using its internal metadata), it will be sent to the next stage in the sending pipeline (see next section)
  2. Round robin: If no partition was specified, and the record has no key, messages will be sent to each partition alternatively, round-robin style.
  3. Key mod-hash: If no partition was specified, and the record has a key, but the producer config did not define a custom partitioner class, this default algorithm will be used. Basically, for each message, it will calculate a hash for the key, and use mod by the number of partitions to decide the destination partition.
  4. Custom: To use a custom strategy, a Partitioner class must be defined, and set in the producer config.

Message buffering and micro-batching

Once the destination partition has been decided, the producer hands the message to the RecordAccumulator object. What this object does is micro batching: it will accumulate records, separated by destination partition, in order to send many at once, instead of sending each at a time. This saves a lot of network overhead, and is done in many other places in Kafka.

This batching at the producer level can be controlled through certain properties in the ProducerConfig:

  • batch.size: Maximum number of bytes for each individual record batch. If exceeded, send the batch to the destination.
  • buffer.memory: This is across all batches. If the sum of their sizes exceed this value, which is also in bytes, the next config value comes into play.
  • max.block.ms: This defines how many milliseconds to block the producer's send call when the max buffer memory size is exceeded. This allows to throttle the consumer when it is producing too much too fast. The value for this property is such that in the time the producer is blocked, it is hoped that some messages will be sent and free up buffer space.
  • linger.ms: This is the time a non-full buffer should wait before sending its incomplete batch. In a busy enough system, this would almost never come into effect.

Finally, after a message batch is sent to a broker, that broker will respond with a RecordMetadata object, specifying the success or failure of the send operation. Keep in mind that the send operation is synchronous: execution will block until it is performed and a response was received from the broker.

Delivery guarantees

These can also be set at the producer level.

  1. Level of acknowledgment: Via property acks (acknowledgements)
    • 0 (fire and forget): no ACK; fastest, but less reliable. The producer has no way of knowing if the message got to the broker. This might be OK for lossy data.
    • 1 (leader acknowledged): This corresponds to only the partition leader broker having received and persisted the message. It's the balanced option.
    • 2 (replication quorum acknowledged): In this option, ACK will be sent only after the leader and all the partition replicas have seen and persisted the message. This is the safest but slowest choice.
  2. Broker error handling: This is done using the following properties:
    • retries: How many times the producer will retry sending a message before giving up.
    • retry.backoff.ms: Time to wait between retries.

Advanced producer stuff not covered in this post

(These might be covered in future posts)

  • Custom serializers
  • Custom partitioners
  • Asynchronous send
  • Compression
  • Advanced producer settings

See Part IV: Consumers

Comments

Popular posts from this blog

Upgrading Lodash from 3.x to 4.x

C++/CLI: Trigger events from C++ native code and handle them in Managed code, Part I

Traduciendo un custom control de Windows Forms de VB.NET a C#