Wednesday, May 18, 2022

Spark Cheetsheet

 RDD, is merely a Resilient Distributed Dataset that is more of a blackbox of data that cannot be optimized as the operations that can be performed against it, are not as constrained.However, you can go from a DataFrame to an RDD via its rdd method, and you can go from an RDD to a DataFrame 

A data frame is a table, or two-dimensional array-like structure, in which each column contains measurements on one variable, and each row contains one case. 

Dataset API is an extension to DataFrames that provides a type-safe, object-oriented programming interface. It is a strongly-typed, immutable collection of objects that are mapped to a relational schema.The Datasets API brings in several advantages over the existing RDD and Dataframe API with better type safety and functional programming.With the challenge of type casting requirements in the API, you would still not the required type safety and will make your code brittle.

Spark Cheatsheet

  • Conexión a Spark Shell: spark-shell
  • Datos de contexto: sc

Importante: Existe autocompletado (tab).

  • Uso de operaciones: val rddresult = rdd.operacion(argumentos)

Transformaciones

Iniciales

  • sc.parallelize : transforma una Scala collection en un RDD.
  • sc.textFile : lee un fichero de texto desde HDFS o Local y lo transforma en un RDD[String].

Sobre RDD

  • map() : genera un nuevo RDD aplicando una función sobre cada línea del RDD.
  • filter() : genera un nuevo RDD a partir de las líneas del RDD que cumplen un filtro (esencialmente un SQL WHERE)
  • flatMap() : igual que map(), pero en vez de devolver un resultado compuesto (array de arrays) "aplana" las dimensiones y devuelve un solo array con todo.
    • Ejemplo: ("hola mundo","adios mundo"). Donde map devolvería [["hola","mundo"]["adios","mundo"]]; flatMap devolvería ["hola","mundo","adios","mundo"].
  • distinct() : devuelve los valores distintos.
  • union() : devuelve la unión de dos rdd.
  • intersection() : devuelve la intersección de dos rdd.
  • subtract() : devuelve la resta de dos rdd (operaciones de conjuntos).
  • cartesian() : devuelve el producto cartesiano de dos rdd.
  • sample(withReplacement, fraction) : devuelve una muestra porcentual (fraction) de un rdd con o sin reemplazo.

Acciones

  • reduce() : opera sobre dos elementos de un RDD y devuelve un resultado de igual tipo. Por ejemplo, una suma: val sum = rdd.reduce((x,y) => x + y).
  • foldLeft() : entrada y salida pueden ser de tipos distintos.
  • count() : cuenta el número de elementos(filas) de un RDD.
  • take(n) : toma un array de n elementos.
  • countByValue() : el número de veces que ocurre cada valor en el rdd. Devuelve pares.
  • top(n) : devuelve los mayores n elementos del rdd.
  • takeOrdered(n)(orden) : devuelve n elementos ordenados por orden.
  • takeSample(withReplacement, n) : devuelve n elementos aleatorios.
  • fold(zero)(func) : igual que reduce pero con el valor cero.

Guardado

  • collect() : devuelve el dataset completo, un array de todos los elementos. Es esencialmente un dump. Solo recomendable para datasets pequeños.
  • saveAsSequenceFile() : guarda el resultado en un archivo en un sistema de almacenamiento. Recomendable para datasets medianos, grandes, muy grandes, etc.
  • saveAsTextFile(file) : guarda el resultado en un archivo de texto.

Otros

  • RDD.persist() : guarda en memoria un conjunto de datos para evitar ejecutar la sucesión de transformaciones que ha llevado hasta él repetidas veces. Recomendable tenerla en mente para persistir datos intermedios de vez en cuando (si la secuencia de transformaciones acaba siendo muy grande).

Trabajo con pares RDD

Ejemplo de función func rdd.operacion(func) => rdd.operacion(a => a + a)

Transformaciones

  • reduceByKey(func) combina valores con la misma clave.
  • groupByKey() agrupa valores con la misma clave. Da (Clave,[Grupo,De,Valores]).
  • combineByKey() combina valores con la misma clave usando un tipo de resultado diferente. Parece complejo.
  • mapValues(func) aplica una función a cada valor de un pair RDD sin cambiar la clave.
  • flatMapValues(func) aplica un flatMap combinado con un mapValues. Devuelve valores clave-valor por cada valor.
  • keys() devuelve un RDD simple de las claves.
  • subtractByKey(key) elimina los elementos que tengan una clave dada (operación de conjuntos)
  • join(key) inner join entre dos RDD. Devuelve claves con grupos (ver groupByKey)
  • rightOuterJoin(key), la clave ha de estar en el primer RDD.
  • leftOuterJoin(key), la clave ha de estar en el segundo RDD.
  • cogroup(key), agrupa datos que compartan clave. Con datos vacíos.
  • values() devuelve un RDD simple de los valores.
  • sortByKey() ordena un RDD por clave.

Acciones

  • countByKey() cuenta el número de elementos por clave.
  • collectAsMap() hace un collect() como un mapa para que sea más fácil buscarlo.
  • lookup(key) devuelve los valores asociados con una clave.

Dependencias y eficiencia

  • toDebugString muestra el plan de ejecución de una operación. Ahí podremos ver si va a haber mucho shuffle.
  • dependencies muestra la secuencia de dependencias que va a haber.
    • Narrow: OneToOneDependencyPruneDependencyRangeDependency
    • Wide: ShuffleDependency.

Dependencias Narrow

  • map
  • mapValues
  • flatMap
  • filter
  • mapPartitions
  • mapPartitionsWithIndex

Dependencias Wide

  • cogroup
  • groupWith
  • join
  • leftOuterJoin
  • rightOuterJoin
  • groupByKey
  • reduceByKey
  • combineByKey
  • distinct
  • intersection
  • repartition
  • coalesce

Asuntos "avanzados"

Mostrar un elemento por línea

  • rdd.foreach(println) pasa por cada elemento imprimiéndolo en una línea separada.

Imprimir un número de líneas

  • rdd.take(n).foreach(println) ejecuta lo anterior pero solo en un número n de líneas.

Limpieza de espacios o carácteres

  • rdd.map(word => word.replace("to_replace","replace_with")) reemplaza carácteres. Permite el uso de expresiones regulares.

Ordenar un Pair RDD por valor

  • rdd.map(field => field.swap).sortByKey([false]).map(field => field.swap), siendo [false] lo que determina si es orden ascendente.

Troubleshooting

¡Se me ha olvidado pasar el valor a variable!

¡No pasa nada! Si estás trabajando en consola, presta atención a la línea que te devuelve. Normalmente empezará algo así:

resXX: org.apache.spark.rdd.RDD[...]

Como podrás comprobar, te crea una variable automaticamente con la que puedes trabajar. res1res12res48res123... puedes trabajar con esas variables. No son intuitivas y no se recomienda, pero te pueden sacar de un apuro inmediato.

Librerías adicionales

SparkSQL

  • pairRDD.toDF(["columna1","columna2",...]) crea un DataFrame a partir de la estructura implícita de un PairRDD.
    • Si no se incluyen nombres de columna, asume _1, _2, _3...
  • La creación explícita es un poco más complicada.
    1. Creamos un RDD.
    2. Creamos un StructField que refleja la estructura de ese RDD, schema.
    3. Convertimos el RDD a filas de atributos (rowRDD)
    4. Usamos createDataFrame(rowRDD,schema) para aplicar la estructura creada en schema a las filas del RDD rowRDD.
  • La lectura desde origen se realiza con readval abc = spark.read.json("asdf.json") para leer desde json. spark es una SparkSession.

Uso de SQL en SparkSQL

  • Lo primero que hay que hacer es registrar el DataFrame como vista temporal SQL: DF.createOrReplaceTempView("ejemplito"), o como tabla temporal: DF.registerTempTable("ejemplito")
  • A partir de un objeto SparkSession: spark.sql(SELECT * FROM ejemplito WHERE condicion").

Tipos de datos

SQL: x[Type]

  • Tipos simples: Byte,Short,Int,Long,Decimal,Float,Binary,Boolean,Timestamp,Date,String
  • Tipos complejos: ArrayType(elementType,constainsNull),MapType(keyType,valueType,valueContainsNull,StructType(List[StructFields])

Los tipos complejos son anidables.

API

Es importante destacar que también hay métodos lazy y métodos eager, es decir, que devuelven nada hasta que se realiza una acción y que realizan esa acción respectivamente.

Transformaciones (Lazy)
  • df.select(columna1[, columna2, ...]) devuelve las columnas nombradas del df. (Similarmente, hay operaciones análogas a whereorderBy, etc.) - Hay que especificar el nombre de la columna.
  • df.join(otroDF) hace un JOIN de df y otrodF.
  • df.agg(columna1[, columna2, ...]) hace la sumatoria de las columnas especificadas.
Agregación
  • df.groupBy(columna) devuelve un RelationalGroupedDataset y a partir de ahí se definen funciones de agregación estándar: count, max, min, sum y avg. (Hay que especificar el nombre de la columna, sin dólar)
Operaciones de conjuntos

Todas las que se puedan pensar.

  • df1.join(df2, $"df1.id" === $"df2.id"[,"tipo"]) pasándole los parámetros de unión y siendo tipo el tipo de join (por defecto es Inner, pero puede especificarse innerouterleft_outerright_outer, etc.)
Acciones (Eager)
  • df.show() muestra los primeros 20 elementos del DF de forma tabulada.
  • df.printschema() hace un describe con el DF en formato de árbol.
Columnas

Hay varias maneras de referirse a ellas:

  • df.filter($"nombreCol" = 0) - notación $
  • df.filter(df("nombreCol") = 0) - referencia a DataFrame
  • df.filter("nombreCol = 0") - sentencia SQL tradicional
Tipaje de un Row

Los DataFrames tienen limitaciones en tanto que devuelven Rows no tipadas, por lo que tendremos que tipar nosotros lo que devuelvan.

  • row(0).asInstanceOf[Type] tipa un campo de una fila, siendo Type por ejemmplo String o Int.

DataSets

  • myDF.toDS transforma un DataFrame en un DataSet.
  • val myDS = spark.read.json("asdf.json").as[Type] permite leer datos explicitando el tipo.
  • myRDD.toDS transforma un RDD en DataSet.
  • List("a","b","c").toDS transforma una lista en DataSet.

Spark Streaming

  • Comienzo: start().
  • Fin: awaitTermination().

Operaciones stateful

  • updateStateByKey(function) mantiene el estado a través de los batches a través de una variable.

Output Operations

  • printsaveAsTextFilessaveAsObjectFilesforeachRDD(function, time).

Window

  • window() para configurarla. Parámetros: windowDuration - batches a tener en cuenta, slidingDuration - frecuencia de ejecución.
  • reduceByWindow() reduce por ventana.
  • reduceByKeyAndWindow() reduce por clave y ventana.
  • countByWindow() cuenta el número de elementos por ventana.
  • countByValueAndWindow() cuenta por valor el número de elementos por ventana.

Hadoop Cheatsheet

Es importante destacar que HDFS no es el sistema local. Son dos sistemas separados. HDFS puede acceder al sistema local, pero no al revés.

Es interesante también destacar que la carpeta desde la que se empieza tiene como prefijo /user/cloudera, en el caso de la máquina virtual. Esto se puede comprobar con el hecho de que al hacer hadoop fs -ls y hadoop fs -ls /user/cloudera salen las mismas carpetas, pero con /user/cloudera por delante.

HDFS

  • hadoop fs - Listado de comandos disponibles para HDFS

    En su mayoría, casi todos los comandos son equivalentes a comandos linux: lsrmcat, entre otros funcionan de manera similar. Algunos ejemplos:

    • hadoop fs -ls <directorio linux> - Lista los archivos en ese directorio de HDFS.
    • hadoop fs -rm <archivo> - Borra un archivo en HDFS.
    • hadoop fs -cat <archivo> - Mira los contenidos de un archivo por la salida estándar (la consola).
    • hadoop fs -mkdir <ruta> - Crea un directorio vacío con la ruta seleccionada (si solo se pone el nombre, se asume directorio actual).

    Otros comandos:

    • hadoop fs -put <orig> <dest> - Coloca un archivo "orig" en el sistema local en una localización "dest" en el hdfs
    • hadoop fs -get <orig> <dest> - Coloca un archivo "orig" de HDFS en la localización "dest" del sistema local.
  • hadoop jar <jar-file> <solution-class> <orig> <dest> - Ejecuta un archivo jar determinado en jar-file y usando la solution-class ejecuta un MapReduce en el archivo "orig" y lo escribe en "dest".

MapReduce

  • mapred job -list - Muestra la lista de Jobs en ejecución.
  • mapred job -kill <JobID> - Detiene un Job en ejecución a partir de su JobID.

Hive

  • Entrada: hive

  • Dentro: Los comandos son como usar SQL desde consola.

  • Salida: hive> exit;

  • Importación de datos a tabla: LOAD DATA [LOCAL] INPATH '<ruta_archivo_externo>' [OVERWRITE] INTO TABLE <tabla> PARTITION(<campo>='<valor>'[,<otro_campo>=<otro_valor>...])

    • LOCAL indica que es un fichero local. Si no, es un fichero en HDFS.
    • OVERWRITE indica sobrescribir la tabla en destino.

IMPORTANTE: Usar LOAD DATA borra el archivo de HDFS.

  • Exportación de datos de tabla: INSERT OVERWRITE [LOCAL] DIRECTORY '<ruta/archivo>' SELECT campos, varios FROM tabla [WHERE condicion] Para guardar el resultado de una consulta en un archivo local. Para condiciones, ver importación de datos a tabla.

Impala

  • Entrada: impala-shell para entrar a la consola de Impala. El resto es similar a Hive.
  • Cabe destacar que también se accede a las bases de datos disponibles en HDFS (me aparece cursohivedb, por ejemplo)
  • Definitivamente es más rápido.

Pig y Pig Latin

Como si de un lenguaje SQL se tratara, Pig también tiene palabras reservadas.

Comentarios

  • Comentario de línea: --Esto es un comentario
  • Comentario de bloque: /* Esto es un \n comentario */

Operaciones comunes

Similares a SQL.

  • Comparaciones: == y !=

Carga de datos

La función se llama PigStorage, implícito en la instrucción LOAD.

  • Carga de datos: variable = LOAD 'tabla' [AS (columna, otracolumna)] - Carga la tabla tabla en la variable variable con sus atributos llamándose columna y otracolumna.

    • Nombrar los atributos es opcional y pueden referirse por número: $0$1$2...
    • tabla es una ruta al directorio de la tabla.
  • Carga de datos por fichero: variable = LOAD 'tabla.(csv|txt|otro) USING PigStorage(',') [AS (columna, otracolumna)] - USING permite determinar el delimitador de campo que se va a usar.

Muestra/Almacenamiento de datos

  • DUMP variable - Muestra el contenido de la variable en formato de paréntesis: (Dato,123,xx)
  • STORE variable INTO 'ruta_guardado' - Guarda el contenido de la variable en el disco, en HDFS, en la ruta ruta_guardado que se ha determinado (nuevamente, puede ser absoluta o relativa).
    • Delimitador por defecto es tab. Puede modificarse usando USING PigStorage('x'), siendo x el comando delimitador.

IMPORTANTE: Al realizar un STORE, la ruta de salida (ruta_guardado) no debe existir. Será un directorio.

Tipos de datos

Pig soporta todos los tipos de datos básicos. Los fields no identificados se portan como arrays de bytes, llamados bytearray.

  • Tipos soportados: intlongfloatdoublebooleandatetimechararraybytearray.

  • Especificación: Es recomendable. variable = LOAD 'tabla' AS (atributo:int, otroatributo:chararray)

  • Datos inválidos: Pig los sustituye por NULL.

  • Filtrar datos erróneos: IS NULLIS NOT NULL.

hasprices = FILTER records BY price IS NOT NULL; --Solo mostrará los récords cuyo precio es válido.

Y con esto pasamos al siguiente apartado.

Filtrado de datos

  • FILTER es el comando que se usa para extraer tuplas que cumplan una condición. Es una especie de WHERE compacto.
  • variable = FILTER otravariable BY condicion, donde condicion puede ser por ejemplo atributo > 10 o atributo == 'texto'.
  • Expresiones regulares: variable = FILTER otravariable BY atributo MATCES 'regexp -- donde regexp es una expresión regular de toda la vida.

Ejemplo de expresión regular:

spammers = FILTER senders BY email_addr MATCHES '.*@example|.com$';

Selección, extracción, generación, eliminación, ordenación

  • FOREACH itera por cada tupla.

  • GENERATE trae campos ya existentes o calculados a la nueva variable.

  • variable = FOREACH otravariable GENERATE campo1, campo1 * 2 AS doble:int; - Esto genera en la variable una tabla que toma el campo1 de otravariable y otra columna que es el doble de dicho valor, dándole el nombre doble y el tipo int.

  • DISTINCT selecciona los valores únicos de cada consulta. únicos = DISTINCT variablegrande;

  • ORDER ... BY ordena de forma ascendente por defecto. Hay que añadir DESC para que sea descendente. ordenado = ORDER variable BY campo [DESC];

Funciones

  • UPPER(campo) - Pasa a mayúsculas un campo de texto.
  • TRIM(campo) - Elimina espacios en blanco al principio y al final de un campo de texto.
  • RANDOM() - Genera un número aleatorio entre 0 y 1.
  • ROUND(price) - Redondea un número flotante a entero.
  • SUBSTRING(campo, x, y) - Coge una subcadena empezando en el carácter nº x y con una longitud y a partir de la cadena en campo.

Salir

  • quit; para salir.

Sqoop

  • sqoop help [<comando>] - Por defecto mostrará una lista de comandos. Si pones un comando te mostrará la ayuda, opciones y explicación de ese comando.

Importación

  • sqoop import --username user --password pass --connect jdbc:mysql://database.example.com/personal [--table empleados]/[--where "condicion"]/[--query "SELECT * FROM tabla"] -m x --target-dir XXX
    • --username para poner el usuario de acceso a la base de datos.
    • --password para poner la contraseña de acceso a la base de datos en texto plano.
      • Puede hacerse --username user -p, que te pedirá la contraseña por consola en lugar de escribirla en texto plano (es más seguro).
      • También puede usarse --password-alias, señalando un archivo donde está guardada la contraseña.
    • --connect señala la cadena de conexión, el motor y la base de datos donde conectarse.
    • --table señala la tabla de la base de datos conectada de donde sacar los datos a importar.
    • --where es una condición SQL a cumplir por los datos que se importarán. Por ejemplo, where "edad>35" o cosas así.
    • --query permite hacer una consulta SQL más compleja (usando JOIN y cosas así) no limitada por los confines de un WHERE.
    • O se usa table y opcionalmente where o se usa query.
    • --target-dir señala el directorio objetivo donde se guardará el archivo part-*.0* donde estarán guardados los datos.
    • -m configura un número x de Mappers para este import. El resto de opciones tratan con temas de fuente, configuración de NULL, particiones, etcétera.

Exportación

  • sqoop export - Permite exportar datos de HDFS e insertarlos en una tabla existente de una RDBMS

Compatibilidad con Hive

Sqoop facilita importar directamente a Hive sin pasar por HDFS.

  • sqoop import <argumentos> --hive-import importa directamente a hive.
    • Si la tabla ya existe se puede añadir la opción -hive-overwrite para sobreescribirla.
    • Sqoop a continuación genera un script HQL para crear la tabla si no exite.
    • Por último se genera uan instrucción de carga para mover los datos al warehouse de hive.

Las opciones son similares y tratan con diversas cuestiones de configuración, particiones, reemplazos de carácteres especiales, etcétera.

Flume

Sobre todo temas de configuración más usuales.

Sources por defecto

  • Avro: Escucha de un puerto Avro y recibe eventos desde streams de clientes externos Avro.
  • Thrift: Igual pero con Thrift, puede autenticarse con Kerberos
  • Exec: Ejecuta comandos Unix al inicializar la fuente. Si es comando continuo (cat, tail) se irán recogiendo eventos según un límite (tiempo, nlíneas). Si es concreto (date, ls) solo se recoge un evento.
  • JMS: Se leen mensajes de una cola.
  • Spooling directory: Se lee desde ficheros movidos a un directorio concreto. Se va leyendo el fichero y enviando el contenido al channel.
  • Twitter: Conecta a la API de Twitter con las credenciales de tu usuario.
  • Kafka: Mensajes almacenados en Kafka.
  • Netcat: Se lee desde un puerto. Cada línea de texto es un evento.
  • Sequence Generator: Generador secuencial de eventos.
  • Syslog: El syslog de la máquina.
  • Http: Eventos desde petición HTTP GET o POST.
  • Stress: Simula un test de carga.
  • Legacy: Eventos de agentes Flume más antiguos.
  • Custom: Tiene que configurarse mediante una clase Java propia implementando las interfaces base.
  • Scribe: Ingesta propia, utilizable junto a Flume.

Sinks por defecto

  • HDFS: Almacena eventos el sistema de archivos de Hadoop, en formato text y sequenceFile. Permite compresión.
  • Hive: Almacena en texto o JSON en tablas o particiones de Hive. Transaccionalidad de Hive.
  • Logger: Log INFO para guardar los eventos.
  • Avro: Host/port de Avro.
  • Thrift: Lo mismo.
  • IRC: Usa un chat IRC.
  • FileRoll: Sistema de ficheros local.
  • Null: Se tiran.
  • Hbase: Se almacenan en una base de datos Hbase, usando un serializer específico. Autenticable mediante Kerberos.
  • MorphlineSolr: Transforma los eventos y los almacena en un motor de búsqueda Solr.
  • ElasticSearch: Se almacenan en ElasticSearch
  • Kite Dataset: Se almacenan en Kite (una capa de Hadoop)
  • Kafka: En un topic de Kafka.
  • Custom: Lo mismo que los sources, tienen que configurarse específicamente.

Channels por defecto

  • Memoria: Los eventos se almacenan en memoria de tamaño predefinido.
  • JDBC: Persistidos en una base de datos, hay que definir driver, url, etc.
  • Kafka: Clúster de Kafka. Alta disponibiilidad y replicación.
  • File: En un fichero en el sistema local.
  • Spillable Memory: En una cola en memoria. Si se sobrecarga la misma se pueden guardar en disco.
  • Pseudo Transaction: Testing.
  • Custom Channel: Pues eso mismo.

Interceptores por defecto

  • Timestamp: Agrega una timestamp en la cabecera.
  • Host: Añade Host o IP al evento.
  • Static: Cabecera fija.
  • UUID: Identificador único.
  • Morphline: Transformación predefinida en un fichero de configuración de la transformación.
  • Search&Replace: Busca y reemplaza una cadena en el evento.
  • Regex: Lo mismo pero con expresiones regulares.

Para más información mirar los ejercicios resueltos de flume.



Must Watch YouTube Videos for Databricks Platform Administrators

  While written word is clearly the medium of choice for this platform, sometimes a picture or a video can be worth 1,000 words. Below are  ...