Apache Kafka - III - Productores
Por simplicidad, todos los ejemplos de código del post serán en Java. Sin embargo, es importante considerar que Kafka ofrece bibliotecas cliente en varios otros lenguajes. Volviendo a Java, en un proyecto gestionado por Maven, la dependencia con Kafka puede estar dada por:
- groupId: org.apache.kafka
- artifactId: kafka-clients
- version: 0.10.0.1 (o más reciente)
Con la dependencia agregada, para crear un productor, será necesario agregar algunas propiedades básicas al proyecto. Hay muchas, muchas más; se verán las básicas por ahora:
Properties props = new Properties();
props.put("bootstrap.servers", "BROKER-1:9092, BROKER-2:9093");
props.put("key.serializer", "org.apache.common.serialization.StringSerializer");
props.put("value.serializer", "org.apache.common.serialization.StringSerializer");
La propiedad bootstrap.servers define una lista de brokers del cluster al que el productor puede conectarse. No es necesario que sea una lista de todos los brokers del cluster, ya que cualquier broker puede redireccionar al productor a cualquier otro. Sin embargo, es buena práctica poner más de un broker en este parámetro, en caso de que alguno falle. El productor se conectará al primero que encuentre disponible. Se asume que productores y brokers están en la misma red, y que nombres como "BROKER-1:9092" identifican unívocamente a un broker en la red, incluyendo el puerto en que escucha conexiones, a través de algún esquema de resolución de nombres.
Los serializadores definen cómo convertir los mensajes a formato binario. La serialización es necesaria para comprimir los mensajes, a fin de reducir la carga sobre la red. Existen diversas opciones además de las utilizadas aquí (StringSerializer), cada una con sus pros y contras. Una alternative muy popular es Avro. Al definir el formato de serialización, se establece un contrato entre productor y consumidor: este último deberá conocer el formato y esquema utilizados por el productor a fin de poder decodificar los mensajes. En cuanto a key and value, serán explicados más adelante.
El próximo paso es crear y enviar un mensaje. La clase que los representa es ProducerRecord. Algunos de sus atributos son:
- Topic (obligatorio): El nombre del topic al cual serán enviados el mensaje.
- Partition (opcional): Un productor puede, opcionalmente, definir la partición a la cual escribir. Más sobre esto luego.
- Timestamp (opcional): Si los brokers están configurados para fijar el timestamp de los mensajes con la opción CreateTime, los mensajes serán persistidos en el topic utilizando el momento de creación del mensaje como timestamp, y dicho valor será provisto por el productor. Éste es el comportamiento por defecto. Otra opción es LogAppendTime, para la cual el timestamp del mensaje corresponderá al momento en que el mensaje fue exitosamente persistido en una partición dentro de un broker.
- Key (opcional): Si se especifica, y el topic tiene más de una partición, este valor será vital a la hora de determinar a qué partición irá el mensaje. Esto estará dado por la estrategia de particionado. Pese a ser un valor opcional, se considera buena práctica siempre especificarlo. El único argumento para no definir la key sería el costo de agregarla al mensaje, pero en el caso general, las ventajas de tener key compensan esto con creces.
- Value (obligatorio): Contenido del mensaje. Debe estar en el formato del serializador elegido. Si no, Kafka arrojará una SerializationException al intentar enviar el mensaje.
Volviendo al código, la implementación más trivial de un productor sería:
public class KafkaProducerApp{
public static void main(String[] args){
/* props settings from previous block code */
KafkaProducer myProducer = new KafkaProducer(props);
ProducerRecors myRecord = new ProducerRecord(
"my_topic", "Course-001", "My Message 1"
);
try{
myProducer.send(myRecord);
} catch (Exception ex) {
ex.printStackTrace();
} finally {
myProducer.close(); // Always close gracefully
}
}
}
¿Qué ocurre al enviar el mensaje? En primer lugar, el productor intenta conectarse con el primer broker disponible definido en la propiedad bootstrap.servers. Una vez que lo logra, obtiene metadatos detallando qué brokers tienen cada partición del topic. A lo largo de la vida del productor, en tanto esté conectado al cluster, recibirá actualizaciones periódicas de esos metadatos.
A continuación, el mensaje es serializado, y entonces el Partitioner, un objeto interno utilizado por KafkaProducer, decidirá a qué partición enviar el mensaje.
Estrategias de particionado
- Directa: Si se especifica una partición en el ProducerRecord, y es un valor válido para el topic, será enviado a la próxima etapa del pipeline de envío. El productor realiza esta validación automáticamente utilizando los metadatos que recibió del broker al conectarse. Más sobre el pipeline de envío en la próxima sección.
- Round robin: Si no se especifica una partición, y el mensaje no tiene key, los mensajes serán enviados a cada partición de manera alternada.
- Key mod-hash: Si no se especifica una partición, y el mensaje tiene una key, y en la configuración del productor no se definió una clase de particionado personalizado, el algoritmo por defecto será utilizado. Básicamente, el mismo consiste en que para cada mensaje, se calcula un hash de la clave, y se usa el operador mod para convertir el hash a un valor entre 1 y la cantidad de particiones. Esto efectivamente define la partición destino del mensaje.
- Custom: Para utilizar un algoritmo personalizado, es posible definir una instancia de la clase Partitioner, y pasarla a la configuración del productor.
Buffering de mensajes y micro-batching
Una vez que se ha determinado la partición destino, el productor pasa el mensaje a una instancia de RecordAccumulator. Este objeto interno hace micro batching: acumula mensajes, separándolos por partición destino, a fin de enviar varios juntos, en lugar de uno a la vez. Esto ahorra mucho uso de la red, y se hace en varias otras partes de Kafka.
Este batching a nivel del productor puede ser controlado mediante ciertas propiedades de ProducerConfig:
- batch.size: Máximo tamaño en bytes para cada lote de mensajes. Si se excede este tamaño, enviar el lote a su destino.
- buffer.memory: Este límite se aplica a la suma de todos los lotes. Al exceder este valor, que también se especifica en bytes, el próximo valor de configuración entra en juego.
- max.block.ms: La cantidad de milisegundos que debe bloquearse la llamada send del productor cuando se excede buffer.memory. Esto permite bajar la velocidad del productor cuando el mismo produce demasiado rápido. Este valor debe ajustarse de tal manera que durante el tiempo que el productor se bloquee, algunos mensajes sean enviados y se libere espacio en el buffer.
- linger.ms: Este valor representa el tiempo que un buffer no lleno debería esperar antes de enviar un lote incompleto. En un sistema suficientemente ocupado, este valor jamás debería entrar en efecto.
Por último, cuando un lote de mensajes es enviado a un broker, el mismo responderá con un objeto RecordMetadata. Dicho objeto especifica el éxito o fracaso del envío. Téngase en cuenta que la operación de envío es sincrónica: la ejecución del productor se bloqueará hasta que el envío se haya realizado y se haya recibido una respuesta del broker.
Garantías de entrega
Éstas también pueden ser definidas en el productor.
- Nivel de acuse de recibo (acknowledgment): A través de la propiedad acks (acknowledgements)
- 0 (fire and forget): sin ACK; rápido, pero menos confiable. El productor no tiene forma de saber si el mensaje fue recibido por el broker. Esto puede ser suficiente para datos de tipo "lossy", donde es aceptable perder cierta cantidad de datos.
- 1 (leader acknowledged): Se envía un ACK cuando el broker controlador haya recibido y persistido el mensaje. Es la opción intermedia.
- 2 (replication quorum acknowledged): El ACK se envía sólo luego de que el controlador y todas sus réplicas hayan visto y persistido el mensaje. Esta es la opción más segura, pero también la más lenta.
- Manejo de errores de broker: Esto es configurable a través de las siguientes propiedades:
- retries: Cantidad máxima de veces que el productor intentará enviar el mensaje antes de desistir.
- retry.backoff.ms: Tiempo a esperar entre reintentos.
Temas avanzados sobre productores no cubiertos en este post
Tal vez se verán en posts futuros, pero de momento sólo serán listados:
- Serializadores personalizados
- Particionadores personalizados
- Envío asincrónico
- Compresión
- Configuración avanzada de productor
Comments
Post a Comment