Apache Kafka - IV - Consumidores
Una aplicación consumidora básica, usando la misma biblioteca que en la parte 3, podría ser:
public class KafkaConsumerApp{
public static void main([]String args) {
Properties props = new Properties();
props.put("bootstrap.servers", "BROKER-1:9092, BROKER-2:9093");
props.put("key.deserializer", "org.apache.common.serialization.StringDeserializer");
props.put("value.deserializer", "org.apache.common.serialization.StringDeserializer");
KafkaConsumer myConsumer = new KafkaConsumer(props);
myConsumer.subscribe(Arrays.asList("my-topic"));
try{
while (true) {
ConsumerRecords<String, String> records = myConsumer.poll(100);
processRecords(records);
}
} catch (Exception ex) {
ex.printStackTrace();
} finally {
myConsumer.close();
}
}
}
La única diferencia es que se usan deserializadores en lugar de serializadores, y tendrán que coincidir con los serializadores usados en el productor. Tal como en el caso del productor, la propiedad bootstrap.servers indica a qué brokers conectarse.
El próximo paso es suscribirse a un topic, y esto se hace pasando una lista de nombres de topics al método subscribe. Al usar subscribe, sólo se especifica el nombre del topic: el consumidor leerá automáticamente de todas las particiones. Si se agrega una partición luego de la suscripción, el consumidor también empezará a leer de ella automáticamente.
La alternativa a subscribe es assign, que requiere una lista de pares (topic, partición). En base a estos pares, el consumidores leerá sólo de los mismos, incluso si se agregan particiones más adelante. Esto puede ser útil si se desea más control sobre el consumo.
En ambas modalidades, no es posible agregar incrementalmente topics o particiones. Cada llamada a subscribe o assign borra todas las suscripciones anteriores y las pisa con las nuevas. Por otro lado, aunque existe un método unsubscribe, no recibe argumentos, por lo cual no es posible anular una suscripción a un par (topic, partición) específico manteniendo todas las otras: es todo o nada.
Ahora bien, considerando el bucle while, Kafka tiene un modelo de tipo pull. Esto implica que los consumidores tienen que revisar periódicamente si hay nuevos mensajes. Esto suena ineficiente, especialmente por el concepto prevalente en Sistemas de que hacer polling es ineficiente y es algo a evitar. Sin embargo, Kafka lo hace de manera inteligente. Se verán más detalles luego, pero como adelante, considérese que el método poll recibe como argumento el tiempo en milisegundos a esperar que se acumulen mensajes antes de leer un lote de ellos y volver. Esto es otra instancia de micro batching en Kafka; mucha carga sobre la red se reduce con esta optimización.
Polling del consumidor en detalle
Cabría esperar que luego de llamar a subscribe, el consumidor empezaría a leer mensajes, pero no es el caso. Eso sólo comenzará luego de llamar al método poll por primera vez. Existen muchas interacciones entre el consumidor y los brokers más allá de leer mensajes, como se verá en breve.
En primer lugar, al llamar a subscribe o assign, una instancia de SubscriptionState es inicializada con los topics y particiones a seguir. Este objeto colabora con una instancia de ConsumerCoordinator para administrar los offsets. Más sobre esto luego.
De esta manera, cuando poll es invocado por primera vez, el consumidor usa sus opciones de configuración, particularmente bootstrap.servers, para conectarse al cluster y obtener sus metadatos. Los mismos son guardados en un objeto interno de tipo Metadata. Este pedido de metadatos, y la mayoría de las interacciones con el cluster, son controlador por una instancia de Fetcher. Sin embargo, el Fetcher no se comunica directamente con el cluster; eso corresponde a Consumer Network Client; el Fetcher es un coordinador de más alto nivel.
Una vez establecida la conexión con el cluster, el consumidor comenzará a enviar periódicamente paquetes de heartbeat al cluster, de modo que éste sepa qué consumidores están activos. Cada vez que el Fetcher actualiza los metadatos, el ConsumerCoordinator es notificado y actúa en consecuencia. Eso involucra notificar al SubscriptionState. Esto puede ser disparado, por ejemplo, al agregar una partición nueva a un topic al que el consumidor esté suscrito. El ConsumerCoordinator también es responsable de enviar offsets al cluster a medidas que se leen mensajes.
Cuando expira el timeout de poll, el Consumer Network Client deja de pedir mensajes a los brokers y envía el lote de mensajes que leyó al Fetcher, que a su vez lo guarda en un buffer interno. El fetcher realiza la deserialización, según la configuración del consumidor, y devuelve los mensajes decodificados al contexto que invocó originalmente a poll.
Un hecho muy importante es que los consumidores de Kafka deben ser single-threaded: debe haber un solo bucle de polling por consumidor. Puede haber otros threads en el proceso consumidor, pero sólo uno puede llamar a poll. Aunque un consumidor lento no afecta al cluster, el procesamiento de mensajes debe ser lo más eficiente posible, para seguirle el ritmo a los productores. Sin embargo, tener un solo thread puede ser un limitante, especialmente al crecer en cantidad los topics y particiones. El mecanismo que ofrece Kafka para escalar a los consumidores son los grupos de consumidores, que serán analizados en el próximo post de esta serie.
Offsets
Como se dijo anteriormente, los offsets de Kafka son usados por los consumidores para llevar cuenta del último mensaje leído de cada partición de cada topic. El offset funciona así como un señalador de libro, y es persistido periódicamente en los brokers del cluster en un topic especial llamado __consumer_offsets. El objeto ConsumerCoordinator dentro del consumidor es el que actúa como productor de este topic, efectivamente persistiendo los offsets. Existen 3 tipos de offset, a saber:
- Last committed offset: Último offset enviado. Corresponde al último mensaje que el consumidor confirmó como leído al cluster. Este offset es almacenado en los brokers.
- Current position: Posición actual. Apunta al último mensaje leído por el consumidor. Este valor es mantenido por el consumidor.
- Log-end offset: Offset de fin de log. Apunta al último mensaje de la partición. Si el consumidor alcanza este offset, se dice que está "al día" con la partición. Naturalmente, dejará de estarlo apenas llegue un mensaje nuevo.
La brecha entre la posición actual y el último offset enviado corresponde a offsets sin enviar. Cuánto puede dejarse crecer esta brecha dependerá de la aplicación. Por ejemplo, si se desea que los consumidores estén lo más al día posible en todo momento, se deseará que esta brecha se mantenga siempre lo más baja posible.
There are configuration properties which control how Kafka handles offsets. These are optional because their defaults are sufficient for getting started.
- enable.auto.commit: Habilitar auto commit. Por defecto, esta opción está activada. En dicho caso, Kafka se encarga de actualizar el último offset enviado. La frecuencia con que eso se hace depende de la próxima opción.
- auto.commit.interval: Intervalo de auto commit. Por defecto, 5 segundos.
- auto.offset.reset: Reinicio automático de offset. Define qué debe hacer Kafka con el offset cuando un consumidor pierde su conexión con el cluster y más tarde la recupera. Hay tres modalides:
- latest (modo por defecto): empezar con el último offset enviado.
- earliest: volver al primer offset enviado, o al inicio de la partición si no hay.
- none: lanzar una excepción y dejar que el consumidor decida dinámicamente qué hacer.
Debido a las brechas entre offsets, puede decirse que un sistema que usa Kafka es eventualmente consistente. De todas formas, es importante destacar que el grado en que un sistema puede tolerar consistencia eventual depende de su robustez. Esto implica que se debe tener mucho cuidado con el manejo de erores en el consumidor, especialmente al procesar los mensajes. Ese camino del código debe ser lo más robusto posible.
Si enable.auto.commit se pone en false, el consumidor pasa a ser responsable de enviar los offsets. Puede hacerlo llamando al método commitSync, o commitAsync. En el caso del primero, la llamada se bloqueará hasta que el cluster confirme que almacenó el offset. Por lo tanto, se recomienda sólo llamarlo luego de haber procesado un lote de mensajes, como la última acción dentro del bucle de polling. Si hay un error, y no es fatal, commitSync volverá a intentar automáticamente, usando el valor de la opción retry.backoff.ms para decidir cuánto esperar entre reintentos.
Enviar offsets de manera sincrónica es mejor para asegurar la consistencia, pero lo hace sacrificando latencia. Mientras el thread de polling está bloqueado esperando confirmación, se desperdicia valioso tiempo de procesamiento. Si la latencia es más importante que la consistencia, se debería usar commitAsync. Con ese método, además de no bloquearse, no hay reintento automático. Si un envío de offset falla, se invoca una callback para que el consumidor decida qué hacer. Dicha callback es el segundo argumento requerido por commitAsync.
Comments
Post a Comment