Spark Cheetsheet
RDD, is merely a Resilient Distributed Dataset that is more of a blackbox of data that cannot be optimized as the operations that can be performed against it, are not as constrained.However, you can go from a DataFrame to an RDD via its rdd method, and you can go from an RDD to a DataFrame
A data frame is a table, or two-dimensional array-like structure, in which each column contains measurements on one variable, and each row contains one case.
Dataset API is an extension to DataFrames that provides a type-safe, object-oriented programming interface. It is a strongly-typed, immutable collection of objects that are mapped to a relational schema.The Datasets API brings in several advantages over the existing RDD and Dataframe API with better type safety and functional programming.With the challenge of type casting requirements in the API, you would still not the required type safety and will make your code brittle.
Spark Cheatsheet
- Conexión a Spark Shell:
spark-shell
- Datos de contexto:
sc
Importante: Existe autocompletado (tab).
- Uso de operaciones:
val rddresult = rdd.operacion(argumentos)
Transformaciones
Iniciales
sc.parallelize
: transforma una Scala collection en un RDD.sc.textFile
: lee un fichero de texto desde HDFS o Local y lo transforma en un RDD[String].
Sobre RDD
map()
: genera un nuevo RDD aplicando una función sobre cada línea del RDD.filter()
: genera un nuevo RDD a partir de las líneas del RDD que cumplen un filtro (esencialmente un SQL WHERE)flatMap()
: igual quemap()
, pero en vez de devolver un resultado compuesto (array de arrays) "aplana" las dimensiones y devuelve un solo array con todo.- Ejemplo:
("hola mundo","adios mundo")
. Donde map devolvería[["hola","mundo"]["adios","mundo"]]
; flatMap devolvería["hola","mundo","adios","mundo"]
.
- Ejemplo:
distinct()
: devuelve los valores distintos.union()
: devuelve la unión de dos rdd.intersection()
: devuelve la intersección de dos rdd.subtract()
: devuelve la resta de dos rdd (operaciones de conjuntos).cartesian()
: devuelve el producto cartesiano de dos rdd.sample(withReplacement, fraction)
: devuelve una muestra porcentual (fraction) de un rdd con o sin reemplazo.
Acciones
reduce()
: opera sobre dos elementos de un RDD y devuelve un resultado de igual tipo. Por ejemplo, una suma:val sum = rdd.reduce((x,y) => x + y)
.foldLeft()
: entrada y salida pueden ser de tipos distintos.count()
: cuenta el número de elementos(filas) de un RDD.take(n)
: toma un array den
elementos.countByValue()
: el número de veces que ocurre cada valor en el rdd. Devuelve pares.top(n)
: devuelve los mayoresn
elementos del rdd.takeOrdered(n)(orden)
: devuelven
elementos ordenados por orden.takeSample(withReplacement, n)
: devuelven
elementos aleatorios.fold(zero)(func)
: igual que reduce pero con el valor cero.
Guardado
collect()
: devuelve el dataset completo, un array de todos los elementos. Es esencialmente un dump. Solo recomendable para datasets pequeños.saveAsSequenceFile()
: guarda el resultado en un archivo en un sistema de almacenamiento. Recomendable para datasets medianos, grandes, muy grandes, etc.saveAsTextFile(file)
: guarda el resultado en un archivo de texto.
Otros
RDD.persist()
: guarda en memoria un conjunto de datos para evitar ejecutar la sucesión de transformaciones que ha llevado hasta él repetidas veces. Recomendable tenerla en mente para persistir datos intermedios de vez en cuando (si la secuencia de transformaciones acaba siendo muy grande).
Trabajo con pares RDD
Ejemplo de función func
rdd.operacion(func) => rdd.operacion(a => a + a)
Transformaciones
reduceByKey(func)
combina valores con la misma clave.groupByKey()
agrupa valores con la misma clave. Da(Clave,[Grupo,De,Valores])
.combineByKey()
combina valores con la misma clave usando un tipo de resultado diferente. Parece complejo.mapValues(func)
aplica una función a cada valor de un pair RDD sin cambiar la clave.flatMapValues(func)
aplica un flatMap combinado con un mapValues. Devuelve valores clave-valor por cada valor.keys()
devuelve un RDD simple de las claves.subtractByKey(key)
elimina los elementos que tengan una clave dada (operación de conjuntos)join(key)
inner join entre dos RDD. Devuelve claves con grupos (ver groupByKey)rightOuterJoin(key)
, la clave ha de estar en el primer RDD.leftOuterJoin(key)
, la clave ha de estar en el segundo RDD.cogroup(key)
, agrupa datos que compartan clave. Con datos vacíos.values()
devuelve un RDD simple de los valores.sortByKey()
ordena un RDD por clave.
Acciones
countByKey()
cuenta el número de elementos por clave.collectAsMap()
hace uncollect()
como un mapa para que sea más fácil buscarlo.lookup(key)
devuelve los valores asociados con una clave.
Dependencias y eficiencia
toDebugString
muestra el plan de ejecución de una operación. Ahí podremos ver si va a haber mucho shuffle.dependencies
muestra la secuencia de dependencias que va a haber.- Narrow:
OneToOneDependency
,PruneDependency
,RangeDependency
- Wide:
ShuffleDependency
.
- Narrow:
Dependencias Narrow
map
mapValues
flatMap
filter
mapPartitions
mapPartitionsWithIndex
Dependencias Wide
cogroup
groupWith
join
leftOuterJoin
rightOuterJoin
groupByKey
reduceByKey
combineByKey
distinct
intersection
repartition
coalesce
Asuntos "avanzados"
Mostrar un elemento por línea
rdd.foreach(println)
pasa por cada elemento imprimiéndolo en una línea separada.
Imprimir un número de líneas
rdd.take(n).foreach(println)
ejecuta lo anterior pero solo en un númeron
de líneas.
Limpieza de espacios o carácteres
rdd.map(word => word.replace("to_replace","replace_with"))
reemplaza carácteres. Permite el uso de expresiones regulares.
Ordenar un Pair RDD por valor
rdd.map(field => field.swap).sortByKey([false]).map(field => field.swap)
, siendo[false]
lo que determina si es orden ascendente.
Troubleshooting
¡Se me ha olvidado pasar el valor a variable!
¡No pasa nada! Si estás trabajando en consola, presta atención a la línea que te devuelve. Normalmente empezará algo así:
resXX: org.apache.spark.rdd.RDD[...]
Como podrás comprobar, te crea una variable automaticamente con la que puedes trabajar. res1
, res12
, res48
, res123
... puedes trabajar con esas variables. No son intuitivas y no se recomienda, pero te pueden sacar de un apuro inmediato.
Librerías adicionales
SparkSQL
pairRDD.toDF(["columna1","columna2",...])
crea un DataFrame a partir de la estructura implícita de un PairRDD.- Si no se incluyen nombres de columna, asume
_1, _2, _3
...
- Si no se incluyen nombres de columna, asume
- La creación explícita es un poco más complicada.
- Creamos un RDD.
- Creamos un StructField que refleja la estructura de ese RDD,
schema
. - Convertimos el RDD a filas de atributos (
rowRDD
) - Usamos
createDataFrame(rowRDD,schema)
para aplicar la estructura creada enschema
a las filas del RDDrowRDD
.
- La lectura desde origen se realiza con
read
:val abc = spark.read.json("asdf.json")
para leer desde json.spark
es unaSparkSession
.
Uso de SQL en SparkSQL
- Lo primero que hay que hacer es registrar el DataFrame como vista temporal SQL:
DF.createOrReplaceTempView("ejemplito")
, o como tabla temporal:DF.registerTempTable("ejemplito")
- A partir de un objeto SparkSession:
spark.sql(SELECT * FROM ejemplito WHERE condicion")
.
Tipos de datos
SQL: x[Type]
- Tipos simples:
Byte
,Short
,Int
,Long
,Decimal
,Float
,Binary
,Boolean
,Timestamp
,Date
,String
- Tipos complejos:
ArrayType(elementType,constainsNull)
,MapType(keyType,valueType,valueContainsNull
,StructType(List[StructFields])
Los tipos complejos son anidables.
API
Es importante destacar que también hay métodos lazy y métodos eager, es decir, que devuelven nada hasta que se realiza una acción y que realizan esa acción respectivamente.
Transformaciones (Lazy)
df.select(columna1[, columna2, ...])
devuelve las columnas nombradas del df. (Similarmente, hay operaciones análogas awhere
,orderBy
, etc.) - Hay que especificar el nombre de la columna.df.join(otroDF)
hace un JOIN de df y otrodF.df.agg(columna1[, columna2, ...])
hace la sumatoria de las columnas especificadas.
Agregación
df.groupBy(columna)
devuelve unRelationalGroupedDataset
y a partir de ahí se definen funciones de agregación estándar: count, max, min, sum y avg. (Hay que especificar el nombre de la columna, sin dólar)
Operaciones de conjuntos
Todas las que se puedan pensar.
df1.join(df2, $"df1.id" === $"df2.id"[,"tipo"])
pasándole los parámetros de unión y siendotipo
el tipo de join (por defecto es Inner, pero puede especificarseinner
,outer
,left_outer
,right_outer
, etc.)
Acciones (Eager)
df.show()
muestra los primeros 20 elementos del DF de forma tabulada.df.printschema()
hace undescribe
con el DF en formato de árbol.
Columnas
Hay varias maneras de referirse a ellas:
df.filter($"nombreCol" = 0)
- notación $df.filter(df("nombreCol") = 0)
- referencia a DataFramedf.filter("nombreCol = 0")
- sentencia SQL tradicional
Tipaje de un Row
Los DataFrames tienen limitaciones en tanto que devuelven Rows no tipadas, por lo que tendremos que tipar nosotros lo que devuelvan.
row(0).asInstanceOf[Type]
tipa un campo de una fila, siendoType
por ejemmploString
oInt
.
DataSets
myDF.toDS
transforma un DataFrame en un DataSet.val myDS = spark.read.json("asdf.json").as[Type]
permite leer datos explicitando el tipo.myRDD.toDS
transforma un RDD en DataSet.List("a","b","c").toDS
transforma una lista en DataSet.
Spark Streaming
- Comienzo:
start()
. - Fin:
awaitTermination()
.
Operaciones stateful
updateStateByKey(function)
mantiene el estado a través de los batches a través de una variable.
Output Operations
print
,saveAsTextFiles
,saveAsObjectFiles
,foreachRDD(function, time)
.
Window
window()
para configurarla. Parámetros:windowDuration
- batches a tener en cuenta,slidingDuration
- frecuencia de ejecución.reduceByWindow()
reduce por ventana.reduceByKeyAndWindow()
reduce por clave y ventana.countByWindow()
cuenta el número de elementos por ventana.countByValueAndWindow()
cuenta por valor el número de elementos por ventana.
Hadoop Cheatsheet
Es importante destacar que HDFS no es el sistema local. Son dos sistemas separados. HDFS puede acceder al sistema local, pero no al revés.
Es interesante también destacar que la carpeta desde la que se empieza tiene como prefijo /user/cloudera, en el caso de la máquina virtual. Esto se puede comprobar con el hecho de que al hacer hadoop fs -ls
y hadoop fs -ls /user/cloudera
salen las mismas carpetas, pero con /user/cloudera
por delante.
HDFS
hadoop fs
- Listado de comandos disponibles para HDFSEn su mayoría, casi todos los comandos son equivalentes a comandos linux:
ls
,rm
,cat
, entre otros funcionan de manera similar. Algunos ejemplos:hadoop fs -ls <directorio linux>
- Lista los archivos en ese directorio de HDFS.hadoop fs -rm <archivo>
- Borra un archivo en HDFS.hadoop fs -cat <archivo>
- Mira los contenidos de un archivo por la salida estándar (la consola).hadoop fs -mkdir <ruta>
- Crea un directorio vacío con la ruta seleccionada (si solo se pone el nombre, se asume directorio actual).
Otros comandos:
hadoop fs -put <orig> <dest>
- Coloca un archivo "orig" en el sistema local en una localización "dest" en el hdfshadoop fs -get <orig> <dest>
- Coloca un archivo "orig" de HDFS en la localización "dest" del sistema local.
hadoop jar <jar-file> <solution-class> <orig> <dest>
- Ejecuta un archivo jar determinado en jar-file y usando la solution-class ejecuta un MapReduce en el archivo "orig" y lo escribe en "dest".
MapReduce
mapred job -list
- Muestra la lista de Jobs en ejecución.mapred job -kill <JobID>
- Detiene un Job en ejecución a partir de su JobID.
Hive
Entrada:
hive
Dentro: Los comandos son como usar SQL desde consola.
Salida:
hive> exit;
Importación de datos a tabla:
LOAD DATA [LOCAL] INPATH '<ruta_archivo_externo>' [OVERWRITE] INTO TABLE <tabla> PARTITION(<campo>='<valor>'[,<otro_campo>=<otro_valor>...])
LOCAL
indica que es un fichero local. Si no, es un fichero en HDFS.OVERWRITE
indica sobrescribir la tabla en destino.
IMPORTANTE: Usar LOAD DATA
borra el archivo de HDFS.
- Exportación de datos de tabla:
INSERT OVERWRITE [LOCAL] DIRECTORY '<ruta/archivo>' SELECT campos, varios FROM tabla [WHERE condicion]
Para guardar el resultado de una consulta en un archivo local. Para condiciones, ver importación de datos a tabla.
Impala
- Entrada:
impala-shell
para entrar a la consola de Impala. El resto es similar a Hive. - Cabe destacar que también se accede a las bases de datos disponibles en HDFS (me aparece cursohivedb, por ejemplo)
- Definitivamente es más rápido.
Pig y Pig Latin
Como si de un lenguaje SQL se tratara, Pig también tiene palabras reservadas.
Comentarios
- Comentario de línea:
--Esto es un comentario
- Comentario de bloque:
/* Esto es un \n comentario */
Operaciones comunes
Similares a SQL.
- Comparaciones:
==
y!=
Carga de datos
La función se llama PigStorage, implícito en la instrucción LOAD.
Carga de datos:
variable = LOAD 'tabla' [AS (columna, otracolumna)]
- Carga la tablatabla
en la variablevariable
con sus atributos llamándosecolumna
yotracolumna
.- Nombrar los atributos es opcional y pueden referirse por número:
$0
,$1
,$2
... tabla
es una ruta al directorio de la tabla.
- Nombrar los atributos es opcional y pueden referirse por número:
Carga de datos por fichero:
variable = LOAD 'tabla.(csv|txt|otro) USING PigStorage(',') [AS (columna, otracolumna)]
-USING
permite determinar el delimitador de campo que se va a usar.
Muestra/Almacenamiento de datos
DUMP variable
- Muestra el contenido de la variable en formato de paréntesis:(Dato,123,xx)
STORE variable INTO 'ruta_guardado'
- Guarda el contenido de la variable en el disco, en HDFS, en la rutaruta_guardado
que se ha determinado (nuevamente, puede ser absoluta o relativa).- Delimitador por defecto es tab. Puede modificarse usando
USING PigStorage('x')
, siendox
el comando delimitador.
- Delimitador por defecto es tab. Puede modificarse usando
IMPORTANTE: Al realizar un STORE
, la ruta de salida (ruta_guardado
) no debe existir. Será un directorio.
Tipos de datos
Pig soporta todos los tipos de datos básicos. Los fields no identificados se portan como arrays de bytes, llamados bytearray
.
Tipos soportados:
int
,long
,float
,double
,boolean
,datetime
,chararray
,bytearray
.Especificación: Es recomendable.
variable = LOAD 'tabla' AS (atributo:int, otroatributo:chararray)
Datos inválidos: Pig los sustituye por
NULL
.Filtrar datos erróneos:
IS NULL
,IS NOT NULL
.
hasprices = FILTER records BY price IS NOT NULL; --Solo mostrará los récords cuyo precio es válido.
Y con esto pasamos al siguiente apartado.
Filtrado de datos
FILTER
es el comando que se usa para extraer tuplas que cumplan una condición. Es una especie deWHERE
compacto.variable = FILTER otravariable BY condicion
, dondecondicion
puede ser por ejemploatributo > 10
oatributo == 'texto'
.- Expresiones regulares:
variable = FILTER otravariable BY atributo MATCES 'regexp
-- donderegexp
es una expresión regular de toda la vida.
Ejemplo de expresión regular:
spammers = FILTER senders BY email_addr MATCHES '.*@example|.com$';
Selección, extracción, generación, eliminación, ordenación
FOREACH
itera por cada tupla.GENERATE
trae campos ya existentes o calculados a la nueva variable.variable = FOREACH otravariable GENERATE campo1, campo1 * 2 AS doble:int;
- Esto genera en lavariable
una tabla que toma elcampo1
deotravariable
y otra columna que es el doble de dicho valor, dándole el nombredoble
y el tipoint
.DISTINCT
selecciona los valores únicos de cada consulta.únicos = DISTINCT variablegrande;
ORDER ... BY
ordena de forma ascendente por defecto. Hay que añadirDESC
para que sea descendente.ordenado = ORDER variable BY campo [DESC];
Funciones
UPPER(campo)
- Pasa a mayúsculas un campo de texto.TRIM(campo)
- Elimina espacios en blanco al principio y al final de un campo de texto.RANDOM()
- Genera un número aleatorio entre 0 y 1.ROUND(price)
- Redondea un número flotante a entero.SUBSTRING(campo, x, y)
- Coge una subcadena empezando en el carácter nºx
y con una longitudy
a partir de la cadena encampo
.
Salir
quit;
para salir.
Sqoop
sqoop help [<comando>]
- Por defecto mostrará una lista de comandos. Si pones un comando te mostrará la ayuda, opciones y explicación de ese comando.
Importación
sqoop import --username user --password pass --connect jdbc:mysql://database.example.com/personal [--table empleados]/[--where "condicion"]/[--query "SELECT * FROM tabla"] -m x --target-dir XXX
--username
para poner el usuario de acceso a la base de datos.--password
para poner la contraseña de acceso a la base de datos en texto plano.- Puede hacerse
--username user -p
, que te pedirá la contraseña por consola en lugar de escribirla en texto plano (es más seguro). - También puede usarse
--password-alias
, señalando un archivo donde está guardada la contraseña.
- Puede hacerse
--connect
señala la cadena de conexión, el motor y la base de datos donde conectarse.--table
señala la tabla de la base de datos conectada de donde sacar los datos a importar.--where
es una condición SQL a cumplir por los datos que se importarán. Por ejemplo,where "edad>35"
o cosas así.--query
permite hacer una consulta SQL más compleja (usando JOIN y cosas así) no limitada por los confines de un WHERE.- O se usa
table
y opcionalmentewhere
o se usaquery
. --target-dir
señala el directorio objetivo donde se guardará el archivopart-*.0*
donde estarán guardados los datos.-m
configura un númerox
de Mappers para esteimport.
El resto de opciones tratan con temas de fuente, configuración de NULL, particiones, etcétera.
Exportación
sqoop export
- Permite exportar datos de HDFS e insertarlos en una tabla existente de una RDBMS
Compatibilidad con Hive
Sqoop facilita importar directamente a Hive sin pasar por HDFS.
sqoop import <argumentos> --hive-import
importa directamente a hive.- Si la tabla ya existe se puede añadir la opción
-hive-overwrite
para sobreescribirla. - Sqoop a continuación genera un script HQL para crear la tabla si no exite.
- Por último se genera uan instrucción de carga para mover los datos al warehouse de hive.
- Si la tabla ya existe se puede añadir la opción
Las opciones son similares y tratan con diversas cuestiones de configuración, particiones, reemplazos de carácteres especiales, etcétera.
Flume
Sobre todo temas de configuración más usuales.
Sources por defecto
- Avro: Escucha de un puerto Avro y recibe eventos desde streams de clientes externos Avro.
- Thrift: Igual pero con Thrift, puede autenticarse con Kerberos
- Exec: Ejecuta comandos Unix al inicializar la fuente. Si es comando continuo (cat, tail) se irán recogiendo eventos según un límite (tiempo, nlíneas). Si es concreto (date, ls) solo se recoge un evento.
- JMS: Se leen mensajes de una cola.
- Spooling directory: Se lee desde ficheros movidos a un directorio concreto. Se va leyendo el fichero y enviando el contenido al channel.
- Twitter: Conecta a la API de Twitter con las credenciales de tu usuario.
- Kafka: Mensajes almacenados en Kafka.
- Netcat: Se lee desde un puerto. Cada línea de texto es un evento.
- Sequence Generator: Generador secuencial de eventos.
- Syslog: El syslog de la máquina.
- Http: Eventos desde petición HTTP GET o POST.
- Stress: Simula un test de carga.
- Legacy: Eventos de agentes Flume más antiguos.
- Custom: Tiene que configurarse mediante una clase Java propia implementando las interfaces base.
- Scribe: Ingesta propia, utilizable junto a Flume.
Sinks por defecto
- HDFS: Almacena eventos el sistema de archivos de Hadoop, en formato text y sequenceFile. Permite compresión.
- Hive: Almacena en texto o JSON en tablas o particiones de Hive. Transaccionalidad de Hive.
- Logger: Log INFO para guardar los eventos.
- Avro: Host/port de Avro.
- Thrift: Lo mismo.
- IRC: Usa un chat IRC.
- FileRoll: Sistema de ficheros local.
- Null: Se tiran.
- Hbase: Se almacenan en una base de datos Hbase, usando un serializer específico. Autenticable mediante Kerberos.
- MorphlineSolr: Transforma los eventos y los almacena en un motor de búsqueda Solr.
- ElasticSearch: Se almacenan en ElasticSearch
- Kite Dataset: Se almacenan en Kite (una capa de Hadoop)
- Kafka: En un topic de Kafka.
- Custom: Lo mismo que los sources, tienen que configurarse específicamente.
Channels por defecto
- Memoria: Los eventos se almacenan en memoria de tamaño predefinido.
- JDBC: Persistidos en una base de datos, hay que definir driver, url, etc.
- Kafka: Clúster de Kafka. Alta disponibiilidad y replicación.
- File: En un fichero en el sistema local.
- Spillable Memory: En una cola en memoria. Si se sobrecarga la misma se pueden guardar en disco.
- Pseudo Transaction: Testing.
- Custom Channel: Pues eso mismo.
Interceptores por defecto
- Timestamp: Agrega una timestamp en la cabecera.
- Host: Añade Host o IP al evento.
- Static: Cabecera fija.
- UUID: Identificador único.
- Morphline: Transformación predefinida en un fichero de configuración de la transformación.
- Search&Replace: Busca y reemplaza una cadena en el evento.
- Regex: Lo mismo pero con expresiones regulares.
Para más información mirar los ejercicios resueltos de flume.
Comments