El pipeline de datos de Netflix

Publicado por

Índice

  1. Introducción
  2. Arquitectura
  3. Keystone Pipeline
  4. Lecciones aprendidas

1. Introducción

En Netflix cualquier decisión de negocio o de producto está tomada a partir de ideas obtenidas del análisis de datos. El objetivo del pipeline de datos es obtener, agregar, procesar y mover datos a escala Cloud. Casi todas las aplicaciones en Netflix utilizan el pipeline de datos para la consulta o la escritura de datos.

El volumen de datos que se maneja en Netflix es el siguiente:

  • 500 billones de eventos y 1.3 Pb al día
  • 8 millones y 24Gb por sg en momentos pico

Los datos proviene en su mayoría de los siguientes orígenes:

  • Actividades de visualización
  • Actividades de la interfaz de usuario
  • Registros de errores
  • Eventos de rendimiento
  • Eventos de diagnóstico y resolución de problemas

Hay que decir que las métricas operacionales no viajan a través de éste pipeline de datos, para ello han creado un sistema de telemetría aparte llamado Atlas.

2. Arquitectura del pipeline

A través de la experiencia y de los nuevos casos de uso que han ido surgiendo la evolución del pipeline ha sido la siguiente:

2.1 Chuwka pipeline v1.0

El objetivo del pipeline original de Netflix surge de la necesidad de agregar y subir eventos a un clúster Hadoop para su procesamiento en batch. EL funcionamiento es simple:

Una pieza desarrollada por Netflix llamada Chuwka recoge eventos y los escribe en el sistema de ficheros HDFS de S3 (Amazon) en formato de archivo de secuencia. Posteriormente se procesan esos ficheros en S3 y se escriben en Hive en formato Parquet. La latencia de principio a fin es de 10 minutos.

2.2 Chuwka pipeline con tiempo real v1.5

Con tecnologías en auge como Kafka o Elasticsearch la demanda de análisis de datos en tiempo real ha aumentado gradualmente en Netflix, por eso se creo una nueva rama en el pipeline para el tratamiento de datos en tiempo real.

Además de subir eventos al S3/EMR, Chuwka también dirige el tráfico a Kafka (puerta de entrada de la rama RT). Aproximadamente el 30% de los eventos son dirigidos a la rama RT del pipeline. La pieza más importante de la rama de tiempo real es el “router“. Esta pieza es responsable de enviar datos desde Kafka a varios destinos entre ellos Elasticsearch o a un Kafka secundario.

En Netflix se ha notado el crecimiento exponencial del uso de Elasticsearch por los diferentes equipos de desarrollo en los últimos dos años. Existen mas o menos 150 clústeres con unas 3500 instancias almacenando alrededor de 1.3 Pb. La mayoría de los datos son inyectados desde el pipeline.

Cuando Chuwka dirige el tráfico a Kafka, puede enviar el stream completo o puede aplicar ciertos filtros. Por eso el router consume de un topic Kafka y escribe/produce en otro topic diferente.

Una vez que entregamos los eventos a Kafka, faculta a los usuarios para el procesamiento en tiempo real de los datos: Mantis, Spark o una aplicación propia. “Libertad y responsabilidad” es el ADN de la cultura de Netflix. Depende de los usuarios elegir la herramienta adecuada para cada tarea en cuestión.

Debido a que mover datos a gran escala es donde radica la experiencia de Netflix, el equipo mantiene el “router” como un servicio administrado. Pero hay algunas lecciones que han aprendido al operar el servicio de enrutamiento:

  • El consumidor Kafka a alto nivel puede perder la propiedad de la partición (lider) y dejar de consumir algunas particiones durante algún tiempo después de re-estabilizarse.
  • Cuando se necesita realizar una subida de versión del código, a veces el consumidor puede quedarse atascado por quedarse en mal estado después de un rebalanceo.
  • Se agrupan cientos de trabajo de “routing” en una docena de clústeres. El coste operacional para administrar este tipo de trabajos y clústeres es una carga creciente, hacen falta mejoras en la plataforma para administrar estos trabajos de enrutamiento.

2.3 Keystone pipeline: Kafka Frontend v2.0

Dados los problemas descritos en el punto anterior, y con la motivación de modernizar el pipeline de datos, Netflix escogió Kafka como pieza clave por:

  • Simplificar la arquitectura
  • Kafka implementa replicación lo que mejora la durabilidad de los mensajes, mientras que Chuwka no soportaba replicación
  • Kafka tiene una gran comunidad y se encuentra en un gran momento

Ingesta de datos: las aplicaciones pueden ingestar datos de dos formas:

  • Utilizando la librería Java desarrollado por ellos para escribir a Kafka directamente
  • Enviar peticiones al proxy para que a través de él se escriba en Kafka

Almacenamiento de datos: con Kafka tenemos una cola de mensajes persistente con replicación. También ayuda a soportar las interrupciones temporales de los destinos de los datos

Enrutado de datos: el servicio de routing es responsable de mover los datos desde el frontal de Kafka hasta: S3, Elasticsearch o el Kafka secundario.

3. Keystone pipeline

En resumen el keystone pipelines es una infraestructura unificada para la publicación, recolección y enrutamiento de eventos en batch y en tiempo real.

Netflix tiene dos clústeres Kafka en el pipeline: El frontal Kafka y el consumidor Kafka. El frontal Kafka es responsable de obtener los mensajes de los productores que son, prácticamente, todas las instancias de aplicaciones en Netflix. Su función es la recopilación de datos y el almacenamiento en memoria intermedia para sistemas posteriores. Los clústeres de consumidores Kafka contienen un conjunto de topics dirigidos por Samza para los consumidores en tiempo real.

Actualmente tienen 36 clústers con más de 4000 brokers, entre el frontal y el consumidor Kafka. Mas de 700 billones de mensajes son recibidos de medía al día

3.1 Diseño

  • Dada la arquitectura actual de Kafka y el alto volumen de datos, lograr una entrega sin pérdidas de datos es un costo prohibitivo en AWS EC2. Teniendo en cuenta esto, han trabajado con equipos que dependen de la infraestructura para llegar a una cantidad aceptable de pérdida de datos mientras se equilibra el costo.
  • El keystone pipeline produce mensajes asíncronamente sin bloquear las aplicaciones. En caso de que un mensaje no pueda ser entregado después de varios intentos, ese mensaje se desecha para que no afecte a la disponibilidad de la aplicación y asegurar una buena experiencia de usuario. Estas son algunas de las  propiedad de configuración de Netflix en sus clústeres Kafka:
    • acks = 1
    • block.on.buffer.full = false
    • unclean.leader.election.enable = true
  • Los productores del frontal Kafka son flexibles en cuanto a configuración, por ejemplo para el nombre del topic donde escribir o los parámetros del sink, se gestionan a través de configuraciones dinámicas que pueden ser modificadas en tiempo de ejecución sin tener que reiniciar los procesos de aplicación. Esto permite hacer cosas como redirigir el tráfico y migrar los topics de los clústeres Kafka.
  • Otra cosa importante es que los productores no utilizan mensajes con clave para el particionamiento, de esta manera mejora la flexibilidad en cuanto al uso de Kafka. El orden de los mensajes se establece en la capa batch (Hive/Elasticsearch) o en la capa de enrutamiento para las aplicaciones en streaming.
  • Es muy importante para Netflix la estabilidad del Front Kafka porque es donde se reciben todos los mensajes, por eso no se permite a ninguna aplicación cliente que consuma directamente de este clúster para estar seguros de que la carga se puede predecir.

3.2 Tolerancia a fallos

El proceso de automatización creado en Netflix puede fallar en lado de los productores o en el lado de los consumidores. Si el problema se encuentra en el frontal Kafka (productores) los nuevos mensajes no llegarán a su destino. Para cada clúster Kafka del frontal, existe un clúster en standby con una configuración mínima de puesta en marcha y con poca capacidad inicial.

Para garantizar un estado limpio con el que iniciar, el clúster de failover no tiene creado ningún topic y no comparte el clúster Zookeeper con el clúster primario. El factor de replicación del clúster de failover es 1, con esto se protege el clúster de problemas originados por la replicación.

Cuando se produce un escenario de failover, los siguientes pasos son los que se llevan a cabo:

  1. Redimensionar el clúster con el tamaño deseado
  2. Crear topics y lanzar los trabajos de enrutamiento
  3. (Opcional) Esperar a los líderes de las particiones para minimizar el descarte de mensajes en el momento de inicio
  4. Dinámicamente modificar las configuraciones de los productores para cambiar el tráfico al clúster de failover

3.3 Desarrollo

Algunos detalles de las herramientas desarrolladas por Netflix para Kafka:

Producer sticky partitioner (Particionador “adhesivo” del productor )

Es un particionador personalizado que han desarrollado para la librería Java del productor. Como el nombre sugiere, se adhiere a una determinada partición para producir durante un período de tiempo configurable antes de elegir aleatoriamente la siguiente partición. Utilizar el particionador “adhesivo” junto con la persistencia ayuda a mejorar el procesamiento batch de mensajes y reduce la carga del broker.

Replica de Rack
Todos los clúster Kafka abarcan tres zonas de disponibilidad de AWS. Una zona de disponibilidad en AWS es conceptualmente un rack. Para asegurar la disponibilidad en caso de que una zona se caiga, Netflix ha desarrollado la replica de Rack, esto significa que las réplicas para un mismo topic se realizan en zonas diferentes.

Visualizador de metadatos Kafka
Los metadatos de Kafka se guardan en Zookeeper. Sin embargo la vista de árbol proporcionada por el Exhibitor (sistema desarrollado por Netflix para la gestión de Zookeeper) es difícil de navegar y se lleva mucho tiempo para encontrar y correlacionar la información

En Netflix han creado su propia UI para visualizar metadatos. Proporciona dos tipos de vistas: de gráficos y tabular, utiliza esquemas de color avanzados para indicar el estado del ISR (In-Sync Replicas). Las características principales son:

  • Pestañas individuales para la visualización de brokers, topics y clusters.
  • Mucha de la información es ordenable y buscable
  • Búsqueda de topics sobre los clústeres
  • Asignación directa desde ID del broker a ID de instancia de AWS
  • Correlación de brokers por la relación líder-seguidor

Despliegues

Las estrategias utilizadas por Netflix para los despliegues de clústeres Kafka son:

  • Es mejor tener pequeños clústeres que uno gigante. Esto reduce la complejidad operacional de cada clúster. El clúster más grande tiene menos de 200 brokers
  • Limitar el número de particiones para cada clúster. Cada clúster tiene menos de 10.000 particiones. Esto mejora la disponibilidad y reduce la latencia para las peticiones/respuestas que están ligadas a particiones
  • Distribución pareja de las replicas de los topics
  • Utilizar un clúster Zookeeper dedicado para cada clúster Kafka y así reducir el impacto por problemas en Zookeeper

4. Recomendaciones

Tomando como contexto el teorema de CAP, Netflix utiliza en algunos casos AP (disponibilidad sobre consistencia) y en otros CP (consistencia frente a disponibilidad).

Diseño jerárquico del Bus de mensajes

Netflix utiliza Kafka como tecnología para implantar un bus de mensajes. Parecido a todos los despliegues de Kafka, tiene mas consumidores que productores, por eso tienen un solo clúster frontal y varios clústeres consumidores. Con el clúster frontal se previenen de errores por parte de los grupos consumidores. Además el objetivo del clúster frontal no está en ofrecer disponibilidad con un periodo de retención de 8 a 24horas.

Consistencia a escala

El Keystone está configurado como “At-least-once”, “out-of-order-delivery” (con un timestamp manejado en la capa de aplicación). El drop de mensajes del Keystone es menor al 0,01, aun así es preferible perder un mensaje que impactar el servicio de una aplicación que está produciendo eventos. La librería del productor de eventos crear un buffer de un tamaño que es ajustado dinámicamente para prevenir el descarte de mensajes.

A través de muchas iteraciones durante los años sobre varios clusters en AWS, la mejor práctica es tener un máximo de 200 (VMs) nodos por cada clúster Kafka. Esto puede llevar a dedicar un clúster para un topic con mucho tráfico.

Particiones

Para decidir el número de particiones que debe de tener un topic, la regla es usar 0.5-1MBps por partición. Por ejemplo si el rendimiento de un topic se prevee que va a ser de 10MBps, entonces habrá que crear 10 particiones.

Zookeeper

Cada clúster Kafka debe de ser desplegado con un clúster Zookeeper diferente.

Durabilidad

Dependiendo de la naturaleza de los datos, los clústers Kafka tienen configurado 2 o 3 réplicas. También para clusters AP (Availability over consistency) se configura que el reinicio se realice con la configuración “Unclean Lider Election”

Metadata

La biblioteca del cliente de Kafka está envuelta en un envoltorio amigable con el ecosistema de Netflix para aplicar las mejores prácticas del productor y agregar atributos de metadatos transparentemente a cada mensaje: GUID, marca de tiempo, nombre de la aplicación y host.

Monitorización

Netflix ha implantado un sistema de auditor de métricas Kafka para conocer el lag que existe entre productores y consumidores.

Enlaces de interés

Evolution of the Netflix Pipeline

View at Medium.com

Responder

Introduce tus datos o haz clic en un icono para iniciar sesión:

Logo de WordPress.com

Estás comentando usando tu cuenta de WordPress.com. Cerrar sesión /  Cambiar )

Google photo

Estás comentando usando tu cuenta de Google. Cerrar sesión /  Cambiar )

Imagen de Twitter

Estás comentando usando tu cuenta de Twitter. Cerrar sesión /  Cambiar )

Foto de Facebook

Estás comentando usando tu cuenta de Facebook. Cerrar sesión /  Cambiar )

Conectando a %s