When you are working on Spark especially on Data Engineering tasks, you have to deal with partitioning to get the best of Spark.
A good partitioning strategy knows about data and its structure, and cluster configuration
Bad partitioning can lead to bad performance, mostly in 3 fields :
- Too many partitions regarding your cluster size and you won’t use efficiently your cluster. For example, it will produce intense task scheduling.
- Not enough partitions regarding your cluster size, and you will have to deal with memory and CPU issues. Memory because your executor nodes will have to put high volume of data in memory (possibly causing OOM Exception), and CPU because compute across the cluster will be unequal: A subset of you CPU will do the work and the others ones will look their neighbors work.
- Skewed data in your partitions. When a Spark task will be executed on these partitioned, they will be distributed across executor slots and CPUs. If your partitions are unbalanced in terms of data volume, some tasks will run longer compared to others and will slow down the global execution time of the tasks (and a node will probably burn more CPU that others)
To do so, you have to understand what is partitioning and how it works behind the hood.
What is Spark partitioning ?
Partitioning is nothing but dividing data structure into parts. In a distributed system like Apache Spark, it can be defined as a division of a dataset stored as multiple parts across the cluster.
Spark uses 3 main data structures : RDDs (Resilient Distributed Datasets), Dataframes and Datasets. Each of this structures are in memory structures and can be split into chunks of data, each chunk sit on an physical node (executor)
In extension to this schema, if we consider a RDD or a DataFrame of 10 millions rows. It can be divided into 60 partitions across 4 executors (15 partitions per executor). With 16 CPU core per executor, each task will process one partition.
As we’ve seen before, a good partitioning depends on number of partitions and how data is distributed across partitions.
Number of partitions
Number of partitions is calculated on the base on various parameters:
- spark.default.parallelism (value depends on used cluster manager, see https://spark.apache.org/docs/latest/configuration.html#execution-behavior for more information)
- number of files you’re reading (if located on the same directory)
- when reading a textfile from SparkContext.textfile, number of partitions is calculated from formula: min(2, spark.default.parallelism):
How data is distributed across partitions ?
The way data will be distributed across partitions depends on a object called Partitioner.
In Apache Spark, there are two main Partitioners :
- HashPartitioner will distribute evenly data across all the partitions. If you don’t provide a specific partition key (a column in case of a dataframe), data will be associated with a key. That will produce a (K,V) pair and the destination partition will be attributed by the following algorithm:
partitionId = hash(Key) % NumberOfPartition
HashPartitioner is the default partitioner used by Spark.
Note: hash function is variable depending on the API language you will use:
for python see portable_hash() function here: https://github.com/apache/spark/blob/master/python/pyspark/rdd.py
for scala, see here: https://github.com/apache/spark/blob/master/core/src/main/scala/org/apache/spark/Partitioner.scala
- RangePartitioner will distribute data across partitions based on a specific range. The RangePartitioner will use a column (for a dataframe) that will be used as partition key. This key will will be sampled (for performance issues) and based on the number of values and the target number of partitions, data will be distributed based on this key.
If those two partitioners doesn’t fit you, you’re free to use your own.
The dataframe case
When you are dealing with dataframes, those ones can be repartitioned explicitly (by a call to “Dataframe.repartition()”) or implictly (during a shuffle on this dataframe).
If you call Dataframe.repartition() without specifying a number of partitions, or during a shuffle, you have to know that Spark will produce a new dataframe with X partitions (X equals the value of “spark.sql.shuffle.partitions” parameter which is 200 by default).
This can result in a dataframe with lots of empty partitions, specially if you are dealing with small data (or not big enough!), and scheduling issues.
Need examples ?
In order to support previous paragraph, I wrote some examples in Python based on :
- a function that will create a dataframe and will repartition it on the first column with x partition
- two functions that will produce a detailed output of the dataframe passed in parameter:
Function #1
This first function will produce this kind of output:
names=('aaaaaaaaaa','bbbbbbbbbb',
'cccccccccc','dddddddddd',
'eeeeeeeeee','ffffffffff',
'ggggggggggg','hhhhhhhhhh')
nb_of_values=10
nb_part=4df=newDf(data=names,
values_cnt=nb_of_values,
partition_cnt=nb_part)
df_details(df)
We can see :
- the physical execution plan generated. This one mentions that the “repartition” call will use a hashPartitioning
- Numbers of partitions in the dataframe and content of each partition.
We can already notice that, even with hash partitioning, data is not necessarily evenly distributed (specially when the number of partitions is not very high)
Function #2
As the previous function, this one will prints the physical plan of the repartition call and then, the detail partition per partition with:
- number of rows per partition
- distinct values per partition
This will produce this kind of results:
names=('aaaaaaaaaa','bbbbbbbbbb',
'cccccccccc','dddddddddd',
'eeeeeeeeee','ffffffffff',
'ggggggggggg','hhhhhhhhhh')
nb_of_values=10
nb_part=4df=newDf(data=names,
values_cnt=nb_of_values,
partition_cnt=nb_part)
df_details_2(df)
Why my data is not evenly distributed ?
Let’s take a basic example. In my previous code, I run it with 1,000,000 values picked in the “names” list and I repartition it across my 8 partitions (my default parallelism is set to 8)
names=('aaaaaaaaaa','bbbbbbbbbb',
'cccccccccc','dddddddddd',
'eeeeeeeeee','ffffffffff',
'ggggggggggg','hhhhhhhhhh')
nb_of_values=1000000
nb_part=8print(f"default parallelism = {sc.defaultParallelism}")
df=newDf(data=names,
values_cnt=nb_of_values,
partition_cnt=nb_part)
df_details_2(df)
That’s funny to see I don’t have a single value per partition and a uneven distribution (due to data distribution too!).
The reason is in the hash formula (PartitionId=hash(key)%partitionCount) function and the number of partition that is too low.
If I want to have a value in each partition, I usually have to increase the number of partition. But in my example, I still had a partition that stored two values in the same partition (I’ve tested up to 200 partitions)
NB: In order to produce cute plots, I’ve extended the source code of this function to put data inside a pandas dataframe:
We can then produce plots with matplotlib:
So, if you are searching for an even distribution, you’d better use your own hash function, or if you want only one key per partition, you can write your own function. But, if you are dealing with python, you have to know that you will have to deal with rdd interface and map your function on a modified dataframe.
Specifying your own partitioning function
If you want to specify your own partitioning function, you will have to :
- write a function that take in parameter a key and you will return a unique identifier for this key
- deal with RDD API, if you are using Python as Spark API language
A sample code for associating a key to a specific partition (this will produce an odd data distribution but this can be interesting if we want to filter partitions based on this specific key). Please notice that we have to transform the initial data structure as K,V pair:
This will produce that kind of plot which prove :
The dataframe case
In a previous chapter, I explained that explicitly repartition a dataframe without specifying a number of partition or during a shuffle will produce a dataframe with the value of “spark.sql.shuffle.partitions” parameter which is 200 by default.
Here an example:
This will produce the following output:
One of the best solution to avoid a static number of partitions (200 by default) is to enabled Spark 3.0 new features … Adaptive Query Execution (AQE).
This feature enables Spark to dynamically coalesce shuffle partitions even when the static parameter which defines the default number of shuffle partitionsis set to a inapropriate number (defined by spark.sql.shuffle.partitions, 200 by default)
That’s it for today ! 😁
Laurent
All the code samples used in this post are available on github:
No comments:
Post a Comment