Spark Streaming

Publicado por

Spark Streaming es una extensión de la API core de Spark que ofrece procesamiento de datos en streaming de manera escalable, alto rendimiento y tolerancia a fallos. Los datos pueden ser ingestados de diferentes fuentes como Kafka, Flume, Kinesis o sockets TCP, etc.

Los datos ingestados pueden ser procesados utilizando algoritmos complejos expresados como funciones de alto nivel como map, reduce o join. Finalmente los datos procesados se envían al sistema de ficheros, base de datos o dashboards. También se pueden aplicar algoritmos de machine learning y de grafos sobre los streams de datos.

Internamente Spark Streaming trabaja recibiendo streams de datos en vivo y los divide en batches o lotes, que son procesados por el motor de Spark para generar un stream de salida.

streaming-flowImg: spark.apache.org

Spark Streaming proporciona una abstracción de alto nivel llamada DStream, que representa un flujo continuo de streams de datos. Un DStream se puede crear ya sea a partir de flujos de datos de entrada o fuentes como Kafka, Flume y Kinesis. Internamente la representación de un DStream es una secuencia de RDDs.

Ejemplo práctico con Scala.

import org.apache.spark._
import org.apache.spark.streaming._
import org.apache.spark.streaming.StreamingContext._ // not necessary since Spark 1.3

// Create a local StreamingContext with two working thread and batch interval of 1 second.
// The master requires 2 cores to prevent from a starvation scenario.

val conf = new SparkConf().setMaster("local[2]").setAppName("NetworkWordCount")
val ssc = new StreamingContext(conf, Seconds(1))

// Create a DStream that will connect to hostname:port, like localhost:9999
val lines = ssc.socketTextStream("localhost", 9999)

// Split each line into words
val words = lines.flatMap(_.split(" "))
val pairs = words.map(word => (word, 1))
val wordCounts = pairs.reduceByKey(_ + _)

// Print the first ten elements of each RDD generated in this DStream to the console
wordCounts.print()

ssc.start() // Start the computation
ssc.awaitTermination() // Wait for the computation to terminate+
$ nc -lk 9999
$ ./bin/run-example streaming.NetworkWordCount localhost 9999

StreamingContext: hay que inicializar este objeto como punto de entrada a toda la funcionalidad de Spark Streaming. Se puede crear pasándole una nueva configuración o utilizando un objeto SparkContext previamente creado.

  • Cuando se ha iniciado un contexto no se puede actualizar.
  • Cuando se para un contexto no se puede reiniciar.
  • Solo un StreamingContext puede estar activo en una JVM al mismo tiempo.
val conf = new SparkConf().setAppName(appName).setMaster(master)
val ssc = new StreamingContext(conf, Seconds(1))

Dstreams (Discretized Streams): es la abstracción básica proporcionada por Spark Streaming. Representa un stream continuo de datos, ya sea el flujo de entrada recibido desde una fuente, o el flujo de datos procesados de salida. La representación de un DStream es una secuencia de RDDs ordenados en el tiempo, cada uno de ellos guardando datos para un intervalo concreto.

Cualquier operación realizada sobre un DStream se traduce en una operación sobre cada uno de los RDDs que lo forman.

streaming-dstream-ops
Img: spark.apache.org

Las transformaciones que se pueden aplicar sobre un DStream son prácticamente las mismas que se pueden aplicar sobre un RDD, aunque existen tres funciones a las que vamos a prestar más atención.

UpdateStateByKey: esta operación permite mantener un estado arbitrario mientras se actualiza continuamente con nueva información. Para poder utilizarlo hay que:

  1. Definir el estado. Puede ser un tipo de dato arbitrario.
  2. Definir la función de actualización de estado. Especificar en una función como actualizar el estado utilizando el estado anterior y los nuevos valores de un stream de entrada.

En cada batch Spark aplicará la función de actualización de estado para todas las claves existentes, independientemente de si se tienen nuevos datos o no. Si la función devuelve none entonces se eliminará el par de clave-valor.

Ejemplo de una función de actualización de estado.

def updateFunction(newValues: Seq[Int], runningCount: Option[Int]): Option[Int] = {
    val newCount = ...  // add the new values with the previous running count to get the new count
    Some(newCount)
}

Y se aplica sobre un DStream que contiene el número de palabras.

val runningCounts = pairs.updateStateByKey[Int](updateFunction _)

La función de actualización se llamará para cada palabra, con los nuevos valores y los anteriores. Requiere un directorio de checkpoint.

Transform: son operaciones que permiten aplicar funciones de RDD en RDD sobre un DStream. También puede utilizarse para aplicar funciones definidas por el usuario y que no están definidas en la API DStream.

Window: las operaciones de ventana actúan sobre los datos de una duración concreta. Si necesitamos disponer de los datos de los último diez intervalos de tiempo para realizar algún cálculo podemos utilizar alguna de las operaciones de ventana.

streaming-dstream-window

Checkpoint

Una aplicación de streaming tiene que estar operativa 24/7 y por eso debe ser tolerante a fallos que no sean propios de la aplicación. Por eso Spark Streaming necesita verificar la suficiente información a un sistema de almacenamiento con tolerancia a fallos de tal manera que pueda recuperar los datos perdidos.

Fuentes

Apache Spark Streaming

Responder

Introduce tus datos o haz clic en un icono para iniciar sesión:

Logo de WordPress.com

Estás comentando usando tu cuenta de WordPress.com. Cerrar sesión /  Cambiar )

Google photo

Estás comentando usando tu cuenta de Google. Cerrar sesión /  Cambiar )

Imagen de Twitter

Estás comentando usando tu cuenta de Twitter. Cerrar sesión /  Cambiar )

Foto de Facebook

Estás comentando usando tu cuenta de Facebook. Cerrar sesión /  Cambiar )

Conectando a %s