Wednesday, January 25, 2023

Spark Note

apache spark notes PySpark https://spark.apache.org/docs/latest/api/python/user_guide/index.html https://stackoverflow.com/questions/68249294/in-spark-what-is-the-meaning-of-spark-executor-pyspark-memory-configuration-opti https://medium.com/walmartglobaltech/decoding-memory-in-spark-parameters-that-are-often-confused-c11be7488a24 PySpark Internals https://medium.com/@ketanvatsalya/a-scenic-route-through-pyspark-internals-feaf74ed660d https://cwiki.apache.org/confluence/display/SPARK/PySpark+Internals https://cwiki.apache.org/confluence/display/SPARK/Spark+Internals https://github.com/apache/spark/blob/master/python/pyspark/context.py https://github.com/apache/spark/blob/master/python/pyspark/worker.py https://github.com/apache/spark/blob/master/python/pyspark/rdd.py https://github.com/apache/spark/blob/master/python/pyspark/java_gateway.py https://github.com/apache/spark/tree/master/python/pyspark https://www.py4j.org/ https://www.youtube.com/watch?v=49Hr5xZyTEA monitoring https://github.com/LucaCanali/Miscellaneous/tree/master/Spark_Dashboard https://github.com/cerndb/SparkPlugins https://github.com/lucacanali/sparkMeasure https://github.com/LucaCanali/sparkMeasure/blob/master/examples/test_sparkmeasure_python.py https://canali.web.cern.ch/docs/WhatsNew_Spark3_Performance_Monitoring_DataAI_Summit_EU_Nov2020_LC.pdf https://www.slideshare.net/databricks/what-is-new-with-apache-spark-performance-monitoring-in-spark-30 https://github.com/cerndb/SparkTraining/blob/master/notebooks/Demo_Spark_on_Hadoop.ipynb apache/spark#31367 https://docs.python.org/3.5/library/resource.html https://stackoverflow.com/questions/43294153/spark-pyspark-how-to-monitor-python-worker-processes https://bobcares.com/blog/ansible-unable-to-open-shell/ monitoring https://db-blog.web.cern.ch/blog/luca-canali/2017-03-measuring-apache-spark-workload-metrics-performance-troubleshooting https://www.databricks.com/session_eu19/performance-troubleshooting-using-apache-spark-metrics Flight Recorder 16:24 https://www.databricks.com/session_eu19/performance-troubleshooting-using-apache-spark-metrics InfluxDB/Grafana Integration 16:56 https://www.youtube.com/watch?v=JlPu6FgHb0o mointoring https://github.com/LucaCanali/Miscellaneous https://github.com/LucaCanali/Miscellaneous/blob/master/Spark_Notes/Spark_TaskMetrics.md Making Sense of Spark Performance - Kay Ousterhout (UC Berkeley) https://www.youtube.com/watch?v=mBk4tt7AEHU http://kayousterhout.org/ troubleshouting https://externaltable.blogspot.com/2016/09/spark-20-performance-improvements.html https://canali.web.cern.ch/ https://gist.github.com/kayousterhout/7008a8ebf2babeedc7ce6f8723fd1bf4 flame graphs collectdb integration with influxdb https://elatov.github.io/2013/02/monitor-different-systems-with-collectd/ How to install Xen in Ubuntu Linux Distro https://help.ubuntu.com/community/Xen apache hadoop + spark quick install https://low-level.wiki/diverse/hadoop-setup.html apache hadoop ansible scripts https://github.com/pippozq/hadoop-ansible https://www.digitalocean.com/community/tutorials/how-to-install-and-configure-ansible-on-ubuntu-20-04 https://www.digitalocean.com/community/cheatsheets/how-to-use-ansible-cheat-sheet-guide https://www.softwaretestinghelp.com/ansible-roles-jenkins-integration-ec2-modules/ https://docs.ansible.com/ansible/latest/network/user_guide/network_debug_troubleshooting.html https://docs.ansible.com/ansible/latest/installation_guide/index.html yarn ha configuration https://hadoop.apache.org/docs/stable/hadoop-yarn/hadoop-yarn-site/ResourceManagerHA.html yarn resources partition https://hadoop.apache.org/docs/stable/hadoop-yarn/hadoop-yarn-site/NodeManagerCgroups.html https://hadoop.apache.org/docs/stable/hadoop-yarn/hadoop-yarn-site/RuncContainers.html https://hadoop.apache.org/docs/stable/hadoop-yarn/hadoop-yarn-site/UsingGpus.html yarn security https://hadoop.apache.org/docs/stable/hadoop-yarn/hadoop-yarn-site/YarnApplicationSecurity.html yarn fair scheduler https://hadoop.apache.org/docs/stable/hadoop-yarn/hadoop-yarn-site/FairScheduler.html https://clouderatemp.wpengine.com/blog/2016/06/untangling-apache-hadoop-yarn-part-4-fair-scheduler-queue-basics/ https://clouderatemp.wpengine.com/blog/2017/02/untangling-apache-hadoop-yarn-part-5-using-fairscheduler-queue-properties/ https://hadoop.apache.org/docs/stable/hadoop-yarn/hadoop-yarn-site/CapacityScheduler.html https://blog.cloudera.com/yarn-capacity-scheduler/ https://blog.cloudera.com/fine-tune-fair-to-capacity-scheduler-in-weight-mode/ https://blog.cloudera.com/better-slas-via-resource-preemption-in-yarns-capacityscheduler/ compiler https://www.databricks.com/blog/2016/05/23/apache-spark-as-a-compiler-joining-a-billion-rows-per-second-on-a-laptop.html

Apache Spark Installation

  • Using Spark installation only in Linux or windows in any one or multiple node is called Standalone Mode. Without using Hadoop Architecture or any other Processing layers like meesos or yarn etc....

  • A Cluster can be single or multiple Nodes.

  • Java Environment is mandatory to work in Spark, So First we have to install Java Development Kit before installing Spark.

  • bashrc -> Hidden file, which contains all environmental variables. Set Java Home and Path. And to execute the changes we have to provide source .bashrc cmd.

  • So we have to specify java home in two areas, one in bashrc and other in spark-env.sh.template.

  • Spark suppotrs two shells referred as REPL, one for scala and other is for python. (Scala - spark-shell) and (python shell - pyspark)

  • Spark supports 4 programming Languages, Java, Python, Scala, R. And only for Python and Scala has shell availabilities. And whatever code we have to implement we have to convert it to a jar file and then execute it finally.

  • Here in Spark also has 2 Daemon process which runs in background, Master and Worker.

  • We can use Spark in 2 modes, Client Mode and Cluster Mode. If we are using Shell then it's Client Mode. If we are going with huge programs or IDE's then those background process should run, So we have to start both daemons and create a jar file before execution.

  • Start and Stop cmds are present in sbin, so that to start those daemons we can use sbin/start-all.sh

  • JPS -> Java Process Status

  • There are 2 port Numbers available , RPC and web port. So to connect any daemon via code, then they have to connect it with code else web port Number.

  • To start master daemon, start-master.sh , port number - localhost:8080 and RPC : 7077

  • To start slave daemon, start-slave.sh spark://MILE-BL-4824-LAP.localdomain:7077 , port number - localhost:8081. So while connecting slave node with master via code we have to use RPC.

PySpark WordCount Program

Spark RDD:

  • Resilient Distribution Dataset

  • Spark does the coordination of all the executors.

  • Lineage is maintained in DAG.

  • DAG is maintained in Spark Context.

  • Action produces an output whereas transformation creates another RDD as previous RDD's output.

Overview of Spark Working

  • Execution of program or job can be done in two ways, spark-submit or using interactive spark shell.

  • In spark Context, both the DAG and Task scheduler are present.

  • DAG Scheduler splitted into stages and submit each stages to task scheduler.

  • Task scheduler splits each job to multiple tasks and sends to executor in worker Node.

  • Once the task execution is done, there will be multiple partitions output which will be aggregated and sends back to driver program.

  • Task can vary, like count or collect etc... So when we apply collect() the total sum data from all partitions to the driver program, so if the memory of driver program is lesser compared to the aggregated output it may throw an error. So always send the reduced output to driver.

Creating a Spark Environment

  • start-master.sh

  • start-slave.sh spark://MILE-BL-4824-LAP.localdomain:7077

  • Launching a Pyspark shell,

    • pyspark --driver-memory 2G --driver-cores 2 --master spark://MILE-BL-4824-LAP.localdomain:7077
  • Syntax : spark-submit --executor-memory = 2g wordCount.py Here --executor-memory in above syntax can be replaced by other functions for configuration.

    • --deploy-mode -> Here it refers to whether the driver runs outside the cluster or inside the cluster. (default it is going to be client mode ,else cluster mode based on the deploy-mode)
    • --driver-memory -> by default it's 1GB for driver memory, and it can be customized.
    • --executor-memory -> by default it's 1 GB for executor memory, and can be customizable.
    • --executor-cores -> number of cores assigned to each executor.
    • --num-executors -> number of executors can also be specified, and this is possible only in kubernetes mode and yarn mode.
    • --driver-cores -> number of cores assigned to driver.

Storage levels

  • In RDD there are 3 storage levels,
  • MEMORY_ONLY
  • MEMORY_AND_DISK
  • DISK_ONLY
  • MEMORY_ONLY_2, MEMORY_AND_DISK_2, DISK_ONLY_2 -> refers the number of replications.
  • rdd.getStorageLevel() -> refers the storage level

Persistent levels and Replications

  • rdd.unpersist() -> to remove the storage level

  • rdd.persist() -> sets the storag elevel with replications also.

  • Here these replications are used over DAG because, we can avoid the DAG lineage computation and use replication so the time reduces.

  • In the above case, if data is stored in memory storage level, it will take lesser time.

  • Sometimes the recomputation takes lesser time than extracting data from disc storage level.


StandAlone Mode of Pyspark

  • Spark has various deployment Modes like Spark with Mesos or Spark with Yarn and Spark StandAlone

  • If we install Spark as a cluster as single or multiple Nodes without any other technologies like Yarn, or cluster management or hadoop etc.. it is StandAlone Architecture.

  • Terminologies:

    • Daemons - background Process
    • Master and Worker are the two daemons in Spark.
    • Driver Program : The program what we write in any programming language, where we declare spark Context as entry point of the compiler to start the execution, which is responsible for all activities like job and job to tasks.
  • The flow of job submition and execution,

    • In Client Node when we send a jar file with Spark-submit request, the request goes to spark master machine.

    • In Spark Master, a driver program will start. So Number of jobs created by each clients = Number of driver programs will start in spark master of the cluster.

    • Driver Program will communicate with cluster manager, which is a service which runs inside Spark itself, It is responsible for resource allocation like number of cores and memory usage etc... These informations will be provided by the programmer while creating the architecture itself.

    • Cluster Manager allocates the resources for each worker Node requested by the driver program.

    • Cluster Manager is also responsible for creating executors in worker Nodes.

    • Note : Distribution of data should be done before parallel processing. (Data has been distributed as RDD before spark-submit)

    • Cluster Manager will identify on which node the data resides . This is referred as data locality.

    • Once the executor is created, it sends heart beats to the driver program frequently.

    • Driver Program will assign jobs whatever written in jar file to the executors in different Nodes at the same time and the process will start by executors once the job is assigned.

    • Same job has been implemented on different worker Nodes and atlast all results are grouped together.

    • The output of each executors (which is refered as Intermediate data) are stored in Persist.

    • Persist has 3 types, memory only, disk only and disk and memory

    • Grouping can be done atlast like groupByKey or reduceByKey or some other transformtion by shuffling based on the programmer needs.......

    • For Grouping(the final result), another executor is created on same node or different node by the Cluster Manager to store the result of final result and stored in Persist

    • If Executor fails for any Node or the whole node faces downtime during process, it won't affect the other executors of same job. Because of DAG or Persist replications in memory itself.

    • Driver program will recreate the executor with the help of cluster manager and recreate the whole process if replication isn't available.

    • If driver program fails, every executor has to be recreated and all the process has to be recreated.

    • What if Master itself faces downtime, Since it's a single point of Contact ?

    -Ans : So With teh help of Zoo Keeper daemon, the passive master will takes place of the active master

    • All passive master will get heart beats from all worker Nodes, so at time of single point of failure, when a passive master takes place of active master then it can be stil in sync with slave nodes.

RDD Functionality:

  • Resilient Distribution Dataset (Reliability, Distribution and Fault Intolerance)

  • Dataframe and dataset are the next versions of RDD.

  • Actions and Transformations are the two operations used in RDD's.

  • The distribution of data among the Worker Nodes takes place before parallel processing.

  • The Spark read the input file and creates a logical partitions and stores in the worker Nodes.

  • The Number of logical partitions may or may not be equal with the Worker Nodes. Eg : 4 partitions of data can be stored in 3 worker Nodes is a possibility.

  • And the data in Worker Node is stored in Memory (RAM) not in the disc.

  • In Spark we can create N. Number of transformations. All the transformations and it's intermediate results are stored in persist, This maintanence is called as Data Lineage.

  • DAG is maintaining the data lineage. And DAG data is stored in Spark Master Node.

  • So if any one of the task is failed, Spark won't fail the whole job, with the help of lineage it will recompute the whole procees for that corresponding task.

  • Replication is also solution for the above issue instead of Recomputing.

  • Replication can be stored in any persist storage. Sometimes Replication provides faster results and sometimes recomputing with the help of lineage gives faster results.

RDD Transformation and Actions:

  • Difference between transformation and action ?

  • Ans : For every transformation Spark creates another RDD and action sends the output to user.

  • Lazy Evaluation : In Spark the compiler, when it finds any action in the code, then only it will start the job.

  • The compiler won't start transformations unless a action is foundin job, because if no action is found, it result's in wastage of resources for transformation.

  • There are two types of transformation, Wide and Narrow transformation.

  • So if shuffling is required in any transformations it's a wide transformation else narrow transformation.

  • For Every Jobs, it creates many intermediate stages based on transformations which we used in the code.

  • Here if we are using 5 narrow transformations all the transformations are performed in the single stage, let's assume stage 1. And if we are using any wide transformation then the transformation is moved to Stage 2.

  • The stages can be seen visually using DAG in master UI port: 8080

Distribution of Data to Worker Nodes.

  • Parallelize : Rdd is a simple data structure which is spliited into partitions and sends to various worker Nodes at same time.

  • Spark offers a method called Spark.parallelize(data,n) which converts the data type to RDD to parallelizing the data (refers partitions). And n refers the number of partitions, which can be assigned explicitly.

  • Rdd's can be created from list, set or dictionary or other file formats. And can also be created from another RDD.



Transformations and Actions:

  • sc

  • rdd1 = sc.parallelize([1,2,3,4,5])

  • rdd1.getNumPartitions() Result : 8

  • rdd1 = sc.parallelize([1,2,3,4,5],4)

  • rdd1.getNumPartitions() Result : 4

  • out = rdd1.count()

  • rdd1.collect() Result : [1, 2, 3, 4, 5]

  • out Result : 5

  • Multiple actions can be performed on a same RDD.

  • Multiple transformations can be performed on same RDD

  • type(rdd1) <class 'pyspark.rdd.RDD'>

  • type(out) <class 'int'>

  • rdd2 = rdd1.repartition(5)

  • rdd1.getNumPartitions() Output : 4

  • rdd2.getNumPartitions() Output : 5

Spark Documentation

PySpark Documentation

Squaring the elements in a list:

  • num = [1,2,4,8,16]

  • out = num.map(lambda x : x**2)

  • out = sc.parallelize(num).map(lambda x : x**2)

  • out.collect()

  • Output : [1, 4, 16, 64, 256]

FlatMap:

  • data = ["Project's Guttenberg's", "Alice's Adventures in Wonderland", "Project's Guttenberg's"]

  • rdd2 = sc.parallelize(data)

  • rdd2.collect()

  • Output : ["Project's Guttenberg's", "Alice's Adventures in Wonderland", "Project's Guttenberg's"]
  • rdd3 = rdd2.map(lambda x : x.split())

  • rdd3.collect()

  • Output : [["Project's", "Guttenberg's"], ["Alice's", 'Adventures', 'in', 'Wonderland'], ["Project's", "Guttenberg's"]]
  • rdd4 = rdd2.flatMap(lambda x : x.split())

  • rdd4.collect()

  • Output: ["Project's", "Guttenberg's", "Alice's", 'Adventures', 'in', 'Wonderland', "Project's", "Guttenberg's"]

Example of flatMap (Converting 2D to 1D):

  • data = [[1,2,3],[4,5,6],[7,8,9]]
  • rdd1 = sc.parallelize(data)

  • rdd1.collect()

  • Output : [[1, 2, 3], [4, 5, 6], [7, 8, 9]]
  • rdd2 = rdd1.flatMap(lambda x : x)

  • rdd2.collect()

  • Output : [1, 2, 3, 4, 5, 6, 7, 8, 9]

Filter Transformation:

  • rdd2.collect()

  • Output : [1, 2, 3, 4, 5, 6, 7, 8, 9]
  • rdd3 = rdd2.filter(lambda x : x in [7,8,9])

  • rdd3.collect()

  • Output : [7, 8, 9]
  • rdd4 = rdd2.filter(lambda x : x > 3 and x < 7)

  • rdd4.collect()

  • Output : [4, 5, 6]

Filtering Odd Numbers and Squaring the odd numbers

  • rdd5 = rdd2.filter(lambda x : x % 2 != 0)

  • rdd5 = rdd5.map(lambda x : x ** 2)

  • rdd5.collect()

  • Output : [1, 9, 25, 49, 81]

Filtering Odd Numbers and Squaring the odd numbers from 2D list

  • data

  • [[1, 2, 3], [4, 5, 6], [7, 8, 9]]
  • sc.parallelize(data).flatMap(lambda x : x).filter(lambda x : x % 2 != 0).map(lambda x : x ** 2)

  • Output : PythonRDD[28] at RDD at PythonRDD.scala:53
  • sc.parallelize(data).flatMap(lambda x : x).filter(lambda x : x % 2 != 0).map(lambda x : x ** 2).collect()

  • Output : [1, 9, 25, 49, 81]

Transactional file with Debit above 3000

  • transaction_file = [[100,'Debit',2000.0],[101,'Credit',3000.0],[102,'Debit',4000.0],[103,'Credit',5000.0]]

  • rdd1 = sc.parallelize(transaction_file)

  • rdd2 = rdd1.filter(lambda x : 'Debit' in x)

  • rdd3 = rdd2.filter(lambda x : x[2] > 3000)

  • rdd3.collect()

  • Output : [[102, 'Debit', 4000.0]]

Transactional file with Debit above 3000 from a file

  • tran_file = sc.textFile("/mnt/c/Users/miles/Documents/pyspark/Pyspark/transaction.txt")

  • rdd1 = tran_file.map(lambda x : x.split(','))

  • rdd2 = rdd1.filter(lambda x : 'DEBIT' in x[1] and float(x[2]) > 3000)

  • rdd2.collect()

  • Output : [['102', 'DEBIT', '4000.0'], ['104', 'DEBIT', '6000.0']]

Filtering Transcation ID and Amount from the previous problem's output

  • rdd3 = rdd2.map(lambda x : (x[0] , x[2]))

  • rdd3.collect()

  • Output : [('102', '4000.0'), ('104', '6000.0')]

Export the output to a file:

  • rdd3.saveAsTextFile("/mnt/c/Users/miles/Documents/pyspark/Pyspark/output")

  • Creates a 2 files since 2 partitions are created as output.
  • Output Directory:
  • miles@MILE-BL-4824-LAP:/mnt/c/Users/miles/Documents/pyspark/Pyspark/output$ ls
  • Output : _SUCCESS part-00000 part-00001

Importing Multiple files and save the content to a RDD:

  • input = sc.textFile("/mnt/c/Users/miles/Documents/pyspark/Pyspark/output/part*")

  • input.collect()

  • Output : ["('102', '4000.0')", "('104', '6000.0')"]

Exporting the output by changing the partitions :

  • Here the input RDD contains 2 partitions.

  • input.repartition(1).saveasTextFile("/mnt/c/Users/miles/Documents/pyspark/Pyspark/new_output/")

mapPartitions : Similar to map, but runs on each partitiosn

  • rdd = sc.parallelize([1,2,3,4],2)

  • rdd.getNumPartitions()

  • Output : 2

  • rdd.collect()

  • Output : [1, 2, 3, 4]

  • def f(iterator) : yield sum(iterator)

  • ...

  • rdd.mapPartitions(f).collect()

  • Output : [3, 7]

  • rdd = sc.parallelize([1,2,3,4],3)

  • def f(iterator) : yield sum(iterator)

  • ...

  • rdd.mapPartitions(f).collect()

  • Output : [1, 2, 7]

  • rdd = sc.parallelize([1,2,3,4],4)

  • def f(iterator) : yield sum(iterator)

  • ...

  • rdd.mapPartitions(f).collect()

  • Output : [1, 2, 3, 4]

  • In transformation, Narrow Partitions vs Wide Partitions : If shuffle takes place in partitioning then it is narrow partition else wide partition.

  • In Repartition, shuffling takes place therefore it's a Wide Partition.

  • If Repartitioning from higher to lower, repartition is not advisable use coalesce.

  • The only difference between coalesce and repartition , performance in reducing the partitions. In coalesce, the shuffling won't happen.

  • rdd = sc.parallelize(range(100),4)

  • rdd.getNumPartitions()

  • Output : 4

  • rdd.repartition(2).getNumPartitions()

  • Output : 2

  • rdd.repartition(5).getNumPartitions()

  • Output : 5

  • rdd.coalesce(5).getNumPartitions()

  • Output : 4

Union:

  • Merging 2 RDDS, and the duplicates won't be removed.
  • rdd1 = sc.parallelize([1,2,3,4,5])

  • rdd2 = sc.parallelize([5,6,7,8,9])

  • rdd3 = rdd1.union(rdd2)

  • rdd3.collect()

  • Output : [1, 2, 3, 4, 5, 5, 6, 7, 8, 9]

Distinct (Wide partitioning since the result is shuffled)

  • Merging 2 RDDS, and the duplicates will be removed.
  • rdd1 = sc.parallelize([1,2,3,4,5])

  • rdd2 = sc.parallelize([5,6,7,8,9])

  • rdd3 = rdd1.union(rdd2)

  • rdd3.distinct().collect()

  • Output : [1, 2, 3, 4, 5, 6, 7, 8, 9]

Using union between two different partitioned RDDS:

  • rdd1 = sc.parallelize([1,2,3,4,5],4)

  • rdd2 = sc.parallelize([5,6,7,8,9],2)

  • rdd3 = rdd1.union(rdd2)

  • rdd3.getNumPartitions()

  • Output : 6
  • rdd3.distinct().getNumPartitions()

  • Output : 6
  • rdd3.distinct(4).getNumPartitions()

  • Output : 4

Intersection :

  • rdd3 = rdd1.intersection(rdd2)

  • rdd3.collect()

  • Output : [5]
  • rdd3.getNumPartitions()

  • Output : 6

Joins: (default : Inner join based on keys)

  • x = sc.parallelize([("a",1),("b",4)])

  • y = sc.parallelize([("a",2),("a",3)])

  • z = x.join(y)

  • z.collect()

  • Output : [('a', (1, 2)), ('a', (1, 3))]
  • z = x.join(y,4)

  • z.getNumPartitions()

  • output : 4

Cogroups and Joins require key value pair, it can't apply the function in a linear.

Cogroup:

  • Grouping locations based on Keys

  • x = sc.parallelize([("a",1),("b",4)],4)

  • y = sc.parallelize([("a",2),("a",3)],2)

  • z = x.cogroup(y)

  • z.collect()

  • Output : [('b', (<pyspark.resultiterable.ResultIterable object at 0x7f77b4bf6bc0>, <pyspark.resultiterable.ResultIterable object at 0x7f77b4a3cdf0>)), ('a', (<pyspark.resultiterable.ResultIterable object at 0x7f77b4a84b20>, <pyspark.resultiterable.ResultIterable object at 0x7f77b4a84b80>))]

    • type(z)

  • Output : <class 'pyspark.rdd.PipelinedRDD'>

  • x = sc.parallelize([("a",1),("b",4)])

  • y = sc.parallelize([("a",2),("b",3)])

  • [(x, tuple(map(list,y))) for x,y in sorted(list(x.cogroup(y).collect()))]

  • Output : [('a', ([1], [2])), ('b', ([4], [3]))]

  • y = sc.parallelize([("a",2),("a",1),("b",3)])

  • [(x, tuple(map(list,y))) for x,y in sorted(list(x.cogroup(y).collect()))]

  • Output : [('a', ([1], [2, 1])), ('b', ([4], [3]))]

Cartesian Product :

Wide transformations:

groupByKey (Doesn't perform well compared to reduceByKey and aggregateByKey)

  • rdd = sc.parallelize([("a",1),("b",1),("a",1)])

  • rdd.groupByKey().collect()

  • Output : [('a', <pyspark.resultiterable.ResultIterable object at 0x7f77b4a3e530>), ('b', <pyspark.resultiterable.ResultIterable object at 0x7f77b4a84910>)]
  • rdd.groupByKey().mapValues(len).collect()

  • Output : [('a', 2), ('b', 1)]
  • rdd.groupByKey().mapValues(list).collect()

  • Output : [('a', [1, 1]), ('b', [1])]

reduceByKey

  • rdd.collect()

  • Output : [('a', 1), ('b', 1), ('a', 1)]
  • rdd.reduceByKey(lambda a,b : a + b ).collect()

  • Output : [('a', 2), ('b', 1)]

aggregateByKey

  • (Difference between reduceByKey and aggregateByKey, In aggregateByKey, the aggregate logic can be independent among partitions level and across partition level or resultant level)

  • rdd.reduceByKey(0, lambda a,b : a + b, lambda a,b : a + b + 1 ).collect()

  • first argument -> partition level

  • second argument -> across partition level

sortByKey

Actions:

  • count()
  • collect()
  • reduce(function) => 1 arg => reduce(lambda a,b : a+b)
  • sum()
  • forEach() -> def f(x) : print(x) => sc.parallelize([1,2,3,4,5]).foreach(f) => iterate each element to the function.
  • saveasTextFile('path')
  • first()
  • min()
  • max()
  • mean()
  • aggregate() => 3 args => aggregate(0, lambda a,b : a+b, lambda a,b : a+b+1) -> initial value of a , aggregate fn at partitional level , aggregate fn across partition level
    • eg : rdd.aggregate(0, lambda a,b : a+b, lambda a,b : a+b+1) -> 47 (No Partitions so 3rd argument is not used)
    • eg : rdd.repartitions(5).aggregate(0, lambda a,b : a+b, lambda a,b : a+b+1) -> 50
  • take(n) -> first n elements
  • sample() => 2 args
  • forEachPartition -> similar to mapPartition and functions as forEach
  • takeSample
  • countByKey => returns a dictionary => rdd.countByKey().items()

No comments:

Must Watch YouTube Videos for Databricks Platform Administrators

  While written word is clearly the medium of choice for this platform, sometimes a picture or a video can be worth 1,000 words. Below are  ...