Pyspark: Filters, Data cleaning and UDF
I will try to explain my learnings and observations in Pyspark.
Contents
Dealing with the mess
Replacing values
Reshaping
Testing
Common Impurities that oft need refining
- Incorrect data types
- Invalid rows
- Incomplete rows
- Poorly chosen placeholders
- Poorly named columns
- Unpermissable shape
- Grouping and aggregation
- Joining datasets
- Ordering results
Permissible data conventions
Before we clean, it is important to know what our clean end-product should look like:
- When 95 % completion and clean data is permissible
Implicit standards in the company
- Regional datetimes vs UTC
- Column naming conventions
Low-level system details
- Representation of unknown or incomplete data
- Ranges for numerical values
- Fields meanings
Dealing with messy data
Defining datatypes whilst reading
schema = StructType([
StructField(‘name’, StringType(), False),
StructField(‘age’, StringType(), False)
])
df = spark.read.format(‘csv’).load(‘datafile’).schema(schema)
Setting an Index Column
df = df.withColumn(‘id’, monotonically_increasing_id())
Changing datatypes After reading
voter_df.withColumn(‘year’, voter_df[‘_c4’].cast(IntegerType()))
Replacing Values
one _year _ from _ now = date.today().replace(year=date.today().year + 1)
better _ frame = employees.withColumn(“end _ date” , when(col(“end _ date”) > one _year _ from _ now, None).otherwise(col(“end _ date”)))
Null Data
df_dropped = df.na.drop()
df_dropped = df.na.drop(sub_set=[‘colName’)
df.witholumn(“CleanColName”, when(df.ColName.isNull(), ‘’unknown’) . otherwise (df.colname)
Filtering Rows
prices _ in _ belgium = prices.filter(col(‘countrycode’) == ‘BE’).orderBy(col(‘date’))
# Detecting when a column has a class without much values
df.groupby('native_country').agg({'native_country': 'count'}).sort(asc("count(native_country)")).show()
Out:
| native_country|count(native_country)|
+--------------------+---------------------+
| Holand-Netherlands| 1|
| Scotland| 12|
| Hungary| 13|
The feature native_country has only one household coming from INDIA. To exclude it:
df_remove = df.filter(df.native_country !='INDIA')
Duplicate Values
Detecting and flagging Duplicate Values
3import
pyspark.sql.functions as f
df_basket1.join( df_basket1.groupBy(df_basket1.columns).agg((f.count("*")>1).cast("int"). alias("Duplicate_indicator")), on=df_basket1.columns, how="inner").show()
Removing Duplicate Values with groupBy and filter
df1=df_basket1.groupBy("Item_group","Item_name","price").count().filter("count > 1")
4df1.drop('count').show()
Removing Duplicate Values with distinct
prices.select( col(“store”), col(“brand”).alias(“brandname”) ).distinct()
Filtering Columns
voter_df.drop(‘unused_column’)
Buckets
With ml.feature
values = [("a", 23), ("b", 45), ("c", 10), ("d", 60), ("e", 56), ("f", 2), ("g", 25), ("h", 40), ("j", 33)]
df = spark.createDataFrame(values, ["name", "ages"])
from pyspark.ml.feature import Bucketizer
bucketizer = Bucketizer(splits=[ 0, 6, 18, 60, float('Inf') ],inputCol="ages", outputCol="buckets")
df_buck = bucketizer.setHandleInvalid("keep").transform(df)df_buck.show()
With df.select
df.select(df.Name, df.Age,
.when(df.Age >= 18, “Adult”)
.otherwise(“Minor”))
Text Data
Lower/Upper Case
With select
From pyspark.sql.functions import lower, col
Syntax: df. df.select(lower(df.colName)).show()
Example: df = df.select(lower(df.first_name)).show()
With sql.functions
import pyspark.sql.functions as F
voter_df.withColumn(‘upper’, F.upper(‘name’)
Extract values containing part of a string
voter_df.where(voter_df[‘_c0’].contains(‘VOTE’))
Splitting Columns
import pyspark.sql.functions as F
voter_df.withColumn(‘splits’, F.split(‘name’, ‘ ‘))
Reshaping
Grouping and aggregating
(prices .groupBy(col(‘brand’)) .agg( avg(‘price’).alias(‘average _price’), count(‘brand’).alias(‘number _ of _ items’) ) ).show()
(prices .groupBy(col(‘brand’)) .agg( avg(‘price’).alias(‘average _price’), count(‘brand’).alias(‘number _ of _ items’) ) ).show()
ratings _ with _prices = ratings.join(prices, [“brand” , “model”])
UDFs
To implement UDFs
1.Define a Python Method
def reverseString(mystr):
return mystr[::-1]
2.Wrap the function and store as a variable
udfReverseString = udf(reverseString, StringType())
3.Call with Spark
user_df = user_df.withColumn(‘ReverseName’,
udfReverseString(user_df.Name))
Pipelines
from pyspark.ml import Pipeline
from pyspark.ml.feature import OneHotEncoderEstimator
CATE_FEATURES = ['workclass', 'education', 'marital', 'occupation', 'relationship', 'race', 'sex', 'native_country']
stages = [] # stages in our Pipeline
for categoricalCol in CATE_FEATURES:
stringIndexer = StringIndexer(inputCol=categoricalCol, outputCol=categoricalCol + "Index")
encoder = OneHotEncoderEstimator(inputCols=[stringIndexer.getOutputCol()],
outputCols=[categoricalCol + "classVec"])
stages += [stringIndexer, encoder]
Thanks for Reading :)
Comments