Apache Spark

Publicado por

Spark es un sistema de computación distribuida open-source que opera sobre conjunto de máquinas. Fue creado por M.Zaharias y compañeros de trabajo en AMPLab, después cedieron el proyecto a la fundación Apache y en el 2014 fundaron la empresa Databricks ofreciendo una plataforma de analítica en la nube teniendo como núcleo central de su producto a Spark.

Índice

  1. Introducción
  2. Componentes
  3. Arquitectura
  4. RDD
  5. Implementación
  6. Optimización

1. Introducción

Es más fácil de entender si lo comparamos con su predecesor, MapReduce, que revolucionó la manera de procesar grandes conjuntos de datos ofreciendo un modelo de desarrollo de programas que se pueden ejecutar en multitud de máquinas al mismo tiempo y que gracias a su arquitectura permite una escalabilidad horizontal.

Spark en esencia mantiene la escalabilidad horizontal y la tolerancia a fallos de MapReduce, pero añade un conjunto de bondades extra ya que su funcionalidad se basa en optimizar DAG (Directed Acyclic Graph) y en estructuras de datos RDD (Resilient Data Distributed).

Los RDD son un conjunto de elementos, en forma de datos de solo lectura, que están distribuidos a lo largo del clúster, mantenidos de manera tolerante a fallos. La disponibilidad de RDDs facilita la implementación de algoritmos iterativos que accedan varias veces a los mismos datos y para el análisis exploratorio de datos.

Para que funcione Spark necesita de un planificador de recursos y de un sistema distribuido de ficheros. Para la gestión de recursos Spark soporta: Standalone, YARN o Apache Mesos. Como sistema de ficheros distribuido soporta HDFS, Cassandra o Kudu.

2. Componentes

spark-stack

Img: spark.apache.org

2.1 Spark Core

Es la base de Apache Spark y en él se apoyan todos los elementos de la arquitectura. Proporciona distribución de tareas, programación y operaciones de entrada/salida expuesta a través de una interfaz (Java, Python, Scala y R). La interfaz está centrada en la abstracción de RDDs, establece un modelo funcional o de orden superior en el que se invocan operaciones en paralelo como map, filterreduce sobre un RDD pasando una función a Spark, que planifica la ejecución en paralelo sobre el clúster.

Los RDD son inmutables, no cambian con el tiempo, y además sus operaciones son “perezosas”, esto quiere decir, que hasta que no se aplique una función específica sobre el RDD, no se realizarán las operaciones sobre los datos (save(), write(), show(), count(), etc).

Los RDDs son tolerantes a fallos porque guardan la secuencia de operaciones que se ha producido hasta generarse, esto se conoce como linaje, así que puede reconstruirse en caso de perdida de datos. Esto da la gran ventaja a Spark de ser tolerante a fallos a la vez que rápido porque solo guarda el linaje de un RDD y no todos sus datos replicados en otros nodos o escribir en algún log local.

2.2 Spark SQL

Es un componente sobre el core de Spark que introduce una abstracción de datos llamada DataFrame, que proporciona soporte para datos estructurados y semi-estructurados que pueden ser manipulados con un lenguaje específico en Scala, Java y Python. También proporciona soporte para lenguaje SQL, una línea de comandos y conectores ODBC/JDBC.

2.3 Spark Streaming

Se aprovecha de la capacidad de computación de Spark Core para realizar análisis de datos en streaming. Ingesta datos en mini-batches y realiza transformaciones sobre los RDDs. Este diseño habilita que el mismo conjunto de aplicaciones escritas para análisis batch puedan ser utilizadas para análisis en Streaming.  El inconveniente de utilizar mini-batches es que la latencia será igual a su duración y algo más. Otros motores de streaming procesan evento por evento en lugar de mini lotes como Storm y Flink.

2.4 Spark MLib

Es un framework Machine Learning distribuido que funciona sobre el core de Spark. Al tener una arquitectura en memoria, en sus benchmarks dicen que es 9 veces más rápido que la implementación de Apache Mahout que trabaja en disco. Muchos algoritmos de machine learning y de estadística se han implementado para usarlos.

2.5 Graph X

Es un framework de procesamiento de gráficos distribuido sobre el core de Spark. Como se basa en RDDs, que son inmutables, los gráficos son inmutables y por eso no esta diseñado para gráficos que necesitan actualizarse. GraphX proporciona dos APIs por separado para la implementación de algoritmos de procesamiento en paralelo. Una abstracción de Pregel y un estilo MapReduce mas general. A diferencia de su predecesor Bagel, que esta obsoleto desde Spark 1.6, GraphX tiene soporte completo para gráficos de propiedades.

3. Arquitectura

Spark Context

Es el punto de entrada a la funcionalidad de Spark. La instancia de Spark Context nos conecta al clúster de Spark  y sirve para crear todos los elementos que queramos sobre Spark. Si todo va bien  debería estar disponible el objeto sc en la aplicación.

Para leer un fichero se utiliza el Spark Context que se encarga de cargarlo en un RDD. Como Spark es perezoso, y no ejecuta ninguna instrucción hasta que se realiza alguna acción, a la hora de cargarlo sólo se mostrará por pantalla el RDD que se ha creado y su tipo.

val data = sc.textFile("train.tsv")

Spark

Las aplicaciones de Spark se ejecutan como conjuntos de procesos independientes en un clúster, coordinados por el objeto SparkContext del main de un programa, llamado driver. Para ejecutarse en un clúster el SparkContext necesita conectarse a un gestor de clusters (puede ser en Standalone, Mesos o YARN) que asigna recursos a las aplicaciones. Cuando se conecta:

  1. Spark adquiere executors en los nodos del clúster. Son procesos que realizan cálculos y almacenan datos para la aplicación
  2. Después se envía el código de aplicación a los executors en forma de ficheros JAR o Python pasados al SparkContext
  3. Por último el sparkcontext envía tareas al los executors para que las realice

cluster-overview
Img: spark.apache.org

Notas útiles sobre esta arquitectura:

  • Cada aplicación obtiene sus propios procesos executor, que se mantienen activos durante toda la existencia de la aplicación y ejecuta tareas en múltiples hilos. Esto da la ventaja de ejecutar cada aplicación por separado. Por esto mismo, las aplicaciones de Spark, para intercambiarse datos, tendrán que realizar una escritura en un sistema de almacenamiento externo.
  • Spark es agnóstico del gestor de clúster que se utilice. Siempre que pueda adquirir procesos executors, y estos se comuniquen entre si, es relativamente fácil ejecutarlo.
  • Dado que el driver planifica tareas en el clúster, debería ejecutarse cerca de los nodos workers, preferiblemente en la misma red.

4. RDD (Resilient Distributed Dataset)

Un RDD es una abstracción en memoria y distribuido que permite a los programadores realizar cálculos en grandes clústers con tolerancia a fallos. La motivación para crear los RDDs es por los dos tipos de aplicaciones en las que actualmente los frameworks de computación muestran ineficiencias: los algoritmos iterativos y las herramientas de minería de datos interactivas. En los dos casos guardar los datos en memoria puede mejorar el rendimiento en un orden de magnitud.

La reutilización de datos intermedios es fundamental para este tipo de aplicaciones, y actualmente los frameworks que necesitan compartir información para computar un resultado entre los nodos, escriben en disco con lo que ello conlleva, mas utilización de I/O, serialización, deseralización, etc. Los RDD permiten la reutilización de datos eficiente en muchos tipos de aplicaciones. Son tolerantes a fallos, son estructuras de datos paralelas que permiten persistir resultados intermedios en memoria. También controlan sus particiones para optimizar la localidad de los datos.

4.1 Cargar datos

Los datos son cargados como RDDs, que no son más que una representación del dataset distribuido a través del clúster. Una característica muy importante de los RDDs es que son inmutables. Esto significa que no cambian con el tiempo; si el RDD de entrada es el mismo, los resultados de salida siempre serán iguales. Los RDD también permiten a Spark realizar alguna optimización, por ejemplo si un trabajo falla, solo tiene que volver a realizar la misma secuencia de operaciones que generó el RDD perdido. rdd-base

Img: Sparktutorials.net

4.2 Transformaciones

Las transformaciones son manipulaciones de los datos que se encuentran guardados en los RDDs. Por cada transformación que se realiza, se crea un nuevo RDD.

rdd-transformation-diagram

Img: Sparktutorials.net

Por ejemplo: Dividir un conjunto de datos de entrada por sus columnas que se encuentran separadas por una tabulación. Esto se hace con la función map().

val reviews = data.map(_.split("\t"))
reviews = data.map(lambda x: x.split("\t"))

Tras ejecutar estas transformaciones por consola, no recibimos ninguna respuesta, tan solo sabemos que se ha creado un RDD, esto se debe a que todavía no hemos realizado ninguna “pregunta directa” sobre los datos. Esto se conoce como acciones, y podríamos pedir que nos devolviera el número de elementos del conjunto.

4.3 Acciones

Las acciones devuelven datos a las aplicaciones o usuarios. Las acciones pueden ser obtener el primer valor, obtener varios valores, contar el número total de valores. Por ejemplo, queremos obtener los 10 primeros resultados.

reviews.take(10)

rdd-action-diagram

4.4 Representación

La representación de cada RDD se realiza a través de un interfaz que expone cinco piezas de información: un conjunto de particiones, que son piezas atómicas del dataset, un conjunto de dependencias con los RDDs padres, una función para calcular el conjunto de datos basado en sus padres, y metadatos sobre el esquemas de sus particiones y la localización de los datos. Por ejemplo, un RDD que represente un fichero HDFS tiene una partición por cada bloque del fichero y conoce en que máquinas esta cada bloque.

Operación Significado
partitions() Devuelve una lista de objetos Partition
preferredLocations(p) Lista de los nodos donde la partición p puede ser accedida más rápido por la localidad del dato
dependencies() Devuelve la lista de dependencias en un Iterator(p, parenIters)
partitioner() Devuelve los metadatos que especifican si un RDD esta particionado por hash o por range.

Tabla: Interfaces utilizadas para representar un RDD en Spark

La pregunta mas interesante diseñando esta interfaz es como representar dependencias entre RDDs. Existen dos tipos: dependencias estrechas (narrow), donde cada partición del RDD padre es utilizada por al menos una partición del RDD hijo, y dependencias amplias (wide) , donde múltiples particiones hijas dependerán de una partición padre. Por ejemplo, un map() genera dependencias estrechas mientras que un join genera dependencias amplias (a menos que los padres estén particionados por hash).

Esta distinción es útil por dos razones:

  1. Las dependencias estrechas permiten la ejecución en cascada en un nodo del cluster, que puede calcular todas las particiones del padre. Por otro lado las dependencias amplias necesitan los datos de todas las particiones padres disponibles y necesitan ser enviadas por la red a través de los nodos utilizando una operación parecida a MapReduce.
  2. Para recuperarse de un fallo en un nodo, es más eficiente recuperar con una dependencia estrecha, porque solo pierde las particiones del padre, sin embargo con dependencias amplias, causará perdida en particiones de todos los ancestros y habrá que recalcular todos los elementos otra vez.

5. Implementación

Cuando un usuario ejecuta una acción (count o save) sobre un RDD, el planificador examina los grafos de linaje de los RDDs para construir un DAG de stages a ejecutar. Cada stage contiene tantas transformaciones con dependencias estrechas como sea posible. Los beneficios de los stages son reducir el número de operaciones de shuffle en dependencias amplias.

El planificador asigna las tareas a las máquinas basándose en la localidad de los datos. Si una tarea necesita procesar una partición que está disponible en memoria en un nodo, la tarea se manda a ese nodo. Por otra parte, si una tarea procesa una partición para la cual el RDD contiene ubicaciones preferida (Preferred Locations, ej. un fichero HDFS), la tarea se envía a ese nodo. Para las dependencias amplias, los registros intermedios se materializan en los nodos que contienen particiones de los padres, para simplificar la recuperación de errores. Si una tarea falla, se vuelve a ejecutar en otro nodo siempre y cuando los padres de su stage estén disponibles. Si algunos stages no están disponibles, volvemos a enviar las tareas para a calcular las particiones perdidas en paralelo.

Spark proporciona tres mecanismos para guardar RDDs de manera persistente:

  1. Almacenamiento en memoria con objetos Java des-serializados
  2. Almacenamiento en memoria con objeto Java serializados
  3. Almacenamiento en disco.

La primera opción proporciona el rendimiento más rápido, porque la máquina virtual puede acceder a cada elemento del RDD de forma nativa. La segunda opción proporciona mayor eficiencia del uso de memoria para cuando sea limitada,  y la última opción es útil para RDDs que son muy grandes para guardarlos en memoria, aunque el coste de computación es muy alto.

Cuando se procesa una partición y no hay espacio suficiente para guardarla, se busca la partición que más tiempo lleva inactiva y la eliminamos de memoria, a menos que sea del mismo RDD. En este caso, mantenemos la partición más antigua para prevenir los ciclos de particiones de un RDD y evitar que estén entrando y saliendo constantemente.

Aunque el linaje siempre se puede utilizar para recuperar un RDD después de un fallo, esta recuperación tiene un costo en tiempo y computación para RDD complejos. Por este motivo se hace necesario disponer de un mecanismo que permita almacenar RDDs intermedios en un almacenamiento estable, Checkpoints.

6. Optimización

Broadcast Hash Join

Cuando se hace un join entre una tabla pequeña y una grande lo que hay que hacer es enviar la tabla pequeña a memoria de cada worker como broadcast variable, así evitamos que la tabla grande se envíe a través de la red.

Si el join es entre una tabla media y una grande irremediablemente se va a tener que hacer un suffle y un spilling al disco, nuestro objetivo será evitar estas operaciones, aunque no siempre será posible. Una estrategia muy común es tratar de filtrar el conjunto de datos grande para reducir los datos que se transmiten en la etapa de shuffle.

Shuffle

El shuffle se produce cuando hay que transferir todos los datos con la misma clave al mismo nodo worker.

ReduceByKey vs. GroupByKey: las dos operaciones nos van a dar el mismo resultado, pero reduceByKey es más eficiente que groupByKey, además esta última puede causar problemas de espacio en disco.

Un ejemplo de reduceByKey. Los datos son combinados en el nodo y cada partición devuelve como mucho un valor por cada clave que se envía por la red.

captura-de-pantalla-2017-02-13-a-las-10-02-16

Un ejemplo de groupByKey. Todos los datos son enviados por la red, se recogen en los workers reducer y se agregan.

captura-de-pantalla-2017-02-13-a-las-10-02-29
Img: databricks.com

Siempre que sea posible es mejor utilizar reduceByKey, también utilizar aggregateByKey, foldByKeycombineByKey antes que groupByKey.

Acumuladores

Los acumuladores son variables que sólo se agregan a través de una operación asociativa y por lo tanto, pueden ser realizadas de forma paralela. Pueden utilizarse para implementar contadores o sumas.

Los acumuladores se crean con un valor iniciar llamando a SparkContext.accumulator(v). Las tareas que se ejecutan en el clúster pueden modificar el resultado llamando a la operación add por ejemplo. Sin embargo no pueden leer su valor, solo el driver puede leer el valor del acumulador utilizando el método value.

Fuentes

Wikipedia
Apache Spark
What is a RDD?

Responder

Introduce tus datos o haz clic en un icono para iniciar sesión:

Logo de WordPress.com

Estás comentando usando tu cuenta de WordPress.com. Cerrar sesión /  Cambiar )

Google photo

Estás comentando usando tu cuenta de Google. Cerrar sesión /  Cambiar )

Imagen de Twitter

Estás comentando usando tu cuenta de Twitter. Cerrar sesión /  Cambiar )

Foto de Facebook

Estás comentando usando tu cuenta de Facebook. Cerrar sesión /  Cambiar )

Conectando a %s