Apache Kafka

Publicado por

El desarrollo  de Kafka comenzó en el año 2009 en la empresa LinkedIn. En el 2011 fue donado a la fundación Apache, y estuvo en la incubadora hasta Octubre de 2012 donde se graduó y paso a formar parte de los proyectos Top Level.

En Noviembre de 2012 varios miembros del equipo de desarrollo de Kafka en LinkedIn crearon una empresa llamada Confluent basando gran parte de su proyecto en Kafka.

Índice

  1. Introducción
  2. Modelo de datos
  3. Características
  4. Arquitectura
  5. Casos de uso
  6. Balanceo
  7. API Java

1. Introducción

La mensajería tradicional tiene principalmente dos modelos: colas y publicación-suscripción. En una cola multitud de consumidores leen registros de un servidor, y cada registro puede ser leído por un solo consumidor (pull). En el modelo de suscripción el registro es enviado a todos los consumidores (push). Cada uno de estos modelos tiene una fortaleza y una debilidad.

La fortaleza de las colas es que permiten dividir el procesamiento de datos sobre múltiples consumidores, lo que permite escalar en procesamiento. Desafortunadamente las colas no aceptan multi-suscripción, una vez que el dato es leído se borra de la cola. El modelo de suscripción permite emitir datos a múltiples procesos, pero no tiene forma de escalar en procesamiento ya que todos los mensajes se envían a cada suscriptor.

Kafka generaliza estos dos modelos haciendo uso del concepto de grupo de consumidores. Como una cola, el grupo de consumidores permite dividir el procesamiento sobre una colección de procesos (los miembros del grupo de consumidores). Y como suscripción, Kafka permite emitir mensajes a múltiples grupos de consumidores.

2. Modelo de datos

La unidad básica de almacenamiento es el registro que consiste en una clave, un valor y un timestamp.

Un tema (topic) es una categoría donde los registros se publican. Los temas en Kafka siempre son multisuscripción, esto significa que puede tener cero, uno o más consumidores suscritos a los datos escritos en él.

Para cada tema, el clúster de Kafka mantiene un log particionado:

log_anatomyImg: kafka.apache.org                                 

Cada partición es una secuencia ordenada e inmutables de registros que se añaden continuamente a un log estructurado. Los registros de una partición tienen asignado un identificador secuencial llamado offset, único para cada registro dentro de una partición.

El clúster mantiene todos los registros publicados hayan sido consumidos o no, pudiendo configurar el tiempo que se mantienen en disco. El rendimiento de Kafka es constante con respecto al volumen que maneja, por eso no es un problema almacenar datos durante un largo periodo de tiempo.

El único metadato que guarda el consumidor es el offset o posición en la que se encuentre leyendo el log. Esta posición es controlada por el consumidor: normalmente un consumidor avanzará la posición linealmente a medida que vaya leyendo registros, pero como la posición es controlada por el consumidor, éste podrá consumir registros en el orden que quiera.

3. Características

Kafka se ejecuta en un clúster con uno o más servidores que guardan streams de registros en categorías denominadas topics.

Como plataforma de streaming Kafka tiene tres funciones clave:

  • Permitir la publicación y suscripción de streams de registros. En este aspecto es parecido a un sistema de mensajes basado en colas
  • Permitir guardar streams de registros con tolerancia a fallos
  • Permitir procesar los streams de registros en el momento en que se producen

 

producing-to-partitions

Fuente: balamaci.ro/

Kafka puede ser utilizado para:

  • Construir un pipeline de datos en tiempo real basado en streams que intercambie información entre sistemas y aplicaciones de manera confiable
  • Construir una aplicación en tiempo real que transforme o reaccione a los streams de datos

Kafka tiene cuatro APIs principales

  • Producer API: permite a las aplicaciones publicar un stream de registros a uno o varios topics
  • Consumer API: permite a las aplicaciones suscribirse a uno o más topics y procesar el stream de registros producido para él
  • Streams API: permite a las aplicaciones actuar como un procesador de streams, consumir un stream de entrada, procesarlo y producir un stream de salida para uno o varios topics
  • Connector API: permite la construcción y el funcionamiento de productores o consumidores reutilizables que conectan topics con aplicaciones existentes o sistemas

Kafka replica sus particiones sobre múltiples servidores para soportar una tolerancia a fallos mayor, el parámetro de replicación es configurable.

La combinación de mensajería, almacenamiento y procesamiento de streams puede parecer inusual pero es en esencia el rol de Kafka como plataforma de streaming.

4. Arquitectura

Las particiones de un log se distribuyen sobre los servidores del clúster de Kafka, llamados brokers, cada servidor maneja datos y peticiones para algunas de las particiones del topic. Cada partición es replicada en un número configurable de servidores para soportar una tolerancia a fallos mayor.

Cada partición tiene un servidor que actúa como líder de esa partición y cero o más servidores que actúan como seguidores. El líder gestiona todas las peticiones de lectura y escritura para la partición, mientras que los seguidores replican al líder de forma pasiva. Si el líder falla, uno de los seguidores automáticamente se vuelve el líder de la partición. Cada servidor actúa como líder de alguna partición y como seguidor de otras, así la carga del clúster se reparte.

El sistema de ficheros distribuido HDFS permite el almacenamiento estático para el procesamiento en batch y así poder procesar datos históricos.

a. Productor

Los productores publican datos a temas (topic) de su elección. El productor es responsable de escoger la partición adecuada para el registro que va a escribir. Esto puede hacerse a través de Round-Robin para repartir la carga o puede hacerse aplicando una función semántica sobre la clave.

producer_consumer

b.Consumidores

Los consumidores son agrupados bajo el concepto de grupos de consumidores, así cada registro publicado en un tema será entregado a un consumidor dentro del grupo de consumidores suscrito al tema. Los consumidores pueden ser procesos separados o máquinas distintas.

Si todos los consumidores pertenecen al mismo grupo, el reparto de la carga (particiones) será perfecto sobre estos.  Si todos los consumidores pertenecen a grupos distintos, entonces cada registro será emitido a todos los consumidores.

consumer-groups

Img: kafka.apache.org

Posición del consumidor (offset)

El seguimiento de lo que se ha consumido es uno de los principales puntos de rendimiento en un sistema de mensajería. Por ejemplo si un broker marca como consumido un mensaje que acaba de enviar a un consumidor, y éste falla al procesarlo, ese mensaje se perderá. Para solventar este problema los sistemas de mensajería implementan confirmaciones ack, así cuando un mensaje es enviado y se devuelve la respuesta de recibido, el mensaje se marcará como enviado no como consumido. Solo pasará a este estado cuando el consumidor haya enviado un ack de procesado.

Los ack resuelven un problema pero crean otros, si el consumidor falla al enviar el mensaje de consumido, entonces el mensaje se consumirá dos veces. También se incrementa el procesamiento de los brokers porque tienen que llevar el rastreo del estado para cada mensaje. ¿Qué hacemos con los paquetes que hemos enviado y no hemos recibido respuesta?

Kafka maneja esto diferente. El topic es dividido en un conjunto de particiones ordenadas en el tiempo. Cada una de ellas es consumida sólo por un consumidor dentro un grupo suscrito al tema. Esto significa que la posición de un consumidor en cada partición es solo un número, en concreto, el offset del siguiente mensaje a consumir. Esto hace que el estado de lo que se ha consumido no ocupe casi espacio, tan solo un número por partición y grupo.

Esto proporciona la ventaja de que un consumidor pueda acceder a un offset antiguo y reprocesar los datos. Esto es lo que en una cola no se debe hacer, pero es una característica esencial para muchos consumidores en Kafka. Por ejemplo si hay un bug en el código y es descubierto después de haber consumido algunos mensajes,  el consumidor podrá volver a consumir los datos mal procesados cuando se arregle el bug.

pub-subFuente: oracle.com

5. Casos de uso

  1. Funciona bien como reemplazo de los agentes de mensajería tradicionales como RabbitMQ o ActiveMQ.
  2. Rastrear actividad de la web. Originalmente Kafka fue creado con este propósito, generar tuberías de datos en tiempo real con un modelo publicador-suscriptor.
  3. Métricas: Kafka suele utilizarse para monitorizar datos operacionales. Esto incluye agregación de estadísticas de aplicaciones distribuidas para centralizar los datos del operacional.
  4. Hay gente que utiliza Kafka como reemplazo a su solución de agregación de logs. Normalmente el proceso es recoger ficheros físicos de logs y ponerlos en un repositorio central para procesarlos.
  5. Se puede utilizar Kafka para procesamiento de streams de datos. Los datos en crudo son consumidos por un topic y después se agregan, se enriquecen o se realiza otro tipo de transformación sobre ellos para publicarlos en un topic distinto.

6. Balanceo

El productor, cuando quiere escribir, envía datos directamente al broker líder de la partición, sin que exista una capa de enrutamiento entre medias. Para ayudar al productor a encontrar al líder puede preguntar a cualquier broker por los metadatos sobre que nodos se encuentran vivos y quien es el líder de las particiones de un topic dado.

El cliente controla en que partición publica los mensajes. Esto puede realizarse de manera aleatoria, implementando algún tipo de reparto de carga, o podemos particionar la clave con alguna regla semántica. Por ejemplo, si se elige como clave el id del usuario, todos los datos para un usuario dado serán enviados a la misma partición. El  estilo de partición en Kafka está explícitamente diseñado para permitir un procesamiento sensible a la localidad de los consumidores.

Envío asíncrono. Para el procesamiento por lotes el productor de Kafka intentará acumular datos en memoria para enviar lotes mas grandes en una sola petición. Esto se puede configurar para que no acumule un número máximo de mensajes o no esperar mas de cierto tiempo en enviar una petición (versiones nuevas).

El algoritmo de rebalanceo del consumidor, permite a todos los consumidores de un grupo establecer consenso entre que consumidores consumen que particiones. El rebalanceo es activado cuando se añade o se elimina un broker o algún consumidores de un mismo grupo. Para un topic y un grupo de consumidores, las particiones son divididas entre todos los consumidores del grupo. Una partición es siempre consumida sólo por un consumidor. Si se hubiera permitido que una partición fuera consumida concurrentemente por múltiples consumidores, habría pelea por la partición y hubiese sido necesario establecer algún tipo de bloqueo.

Durante el rebalanceo, se trata de asignar particiones a los consumidores de tal manera que se reduzca el numero de brokers a los que cada consumidor debe conectarse.

6. API Java

Productor

import java.util.Properties;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerRecord;

public class Productor {
 public static void main(String[] args) {
 
   Properties props = new Properties();
   props.put("bootstrap.servers", "localhost:9092");
   props.put("key.serializer","org.apache.kafka.common.serialization.StringSerializer"); 
   props.put("value.serializer","org.apache.kafka.common.serialization.StringSerializer");
   props.put("partitioner.class","kafka.CustomPartitioner");
 
   KafkaProducer<String, String> productor = new KafkaProducer<String, String>(props);
 
   for(int i=1; i<1000000;i++){
     ProducerRecord<String, String> data = new ProducerRecord<String, String>("topicParticionado2", String.valueOf(i), "Hola Mundo_" + i);
     productor.send(data);
    }
   productor.close(); 
 }
}

Particionador personalizado

import java.util.List;
import java.util.Map;
import org.apache.kafka.clients.producer.Partitioner;
import org.apache.kafka.common.Cluster;
import org.apache.kafka.common.PartitionInfo;

public class CustomPartitioner implements Partitioner{

public void configure(Map<String, ?> arg0) {
 // TODO Auto-generated method stub
}

public void close() {
 // TODO Auto-generated method stub
}

 public int partition(String topic, Object key, byte[] keyBytes, Object value,byte[] valueBytes, Cluster cluster) {
 
   List partitionInfo = cluster.partitionsForTopic(topic);
   int numPartitions = partitionInfo.size();
   return value.hashCode() % numPartitions + 1;
 }
}

Consumidor

import java.util.ArrayList;
import java.util.List;
import java.util.Properties;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;

public class Consumer {
   public static void main(String[] args) {
 
     Properties props = new Properties();
     props.put("bootstrap.servers", "localhost:9092");
     props.put("key.deserializer","org.apache.kafka.common.serialization.StringDeserializer"); 
     props.put("value.deserializer","org.apache.kafka.common.serialization.StringDeserializer");
     props.put("group.id", "DYC");
 
     List topics = new ArrayList();
     topics.add("topicParticionado2");
 
     KafkaConsumer<String,String> consumer = new KafkaConsumer<String, String>(props);
     consumer.subscribe(topics);
 
     while(true){
       ConsumerRecords<String, String> message = consumer.poll(0);
       if (!message.isEmpty()){
         java.util.Iterator<ConsumerRecord<String, String>> it = message.iterator();
         while(it.hasNext()){
           ConsumerRecord<String, String> record = it.next();
           System.out.println("\nTopic: " + record.topic() + ". Partition: " + record.partition() +
           ". Offset: " + record.offset() + ". Key: " + record.key() + ". Value: " + record.value());
         }
       }
     }
  }
}

Fuentes

Apache Kafka

Linkedin

Responder

Introduce tus datos o haz clic en un icono para iniciar sesión:

Logo de WordPress.com

Estás comentando usando tu cuenta de WordPress.com. Cerrar sesión /  Cambiar )

Google photo

Estás comentando usando tu cuenta de Google. Cerrar sesión /  Cambiar )

Imagen de Twitter

Estás comentando usando tu cuenta de Twitter. Cerrar sesión /  Cambiar )

Foto de Facebook

Estás comentando usando tu cuenta de Facebook. Cerrar sesión /  Cambiar )

Conectando a %s