Pyspark: Filters, Data cleaning and UDF

I will try to explain my learnings and observations in Pyspark.

Dealing with messy data

df_dropped = df.na.drop()

df_dropped = df.na.drop(sub_set=[‘colName’)

df.witholumn(“CleanColName”, when(df.ColName.isNull(), ‘’unknown’) . otherwise (df.colname)

df.groupby('native_country').agg({'native_country': 'count'}).sort(asc("count(native_country)")).show()
|      native_country|count(native_country)|
+--------------------+---------------------+
| Holand-Netherlands| 1|
| Scotland| 12|
| Hungary| 13|
df_remove = df.filter(df.native_country !='INDIA')
values = [("a", 23), ("b", 45), ("c", 10), ("d", 60), ("e", 56), ("f", 2), ("g", 25), ("h", 40), ("j", 33)]
df = spark.createDataFrame(values, ["name", "ages"])
from pyspark.ml.feature import Bucketizer
bucketizer = Bucketizer(splits=[ 0, 6, 18, 60, float('Inf') ],inputCol="ages", outputCol="buckets")
df_buck = bucketizer.setHandleInvalid("keep").transform(df)
df_buck.show()

From pyspark.sql.functions import lower, col

Syntax: df. df.select(lower(df.colName)).show()

Example: df = df.select(lower(df.first_name)).show()

import pyspark.sql.functions as F

voter_df.withColumn(‘upper’, F.upper(‘name’)

import pyspark.sql.functions as F

voter_df.withColumn(‘splits’, F.split(‘name’, ‘ ‘))

from pyspark.ml import Pipeline
from pyspark.ml.feature import OneHotEncoderEstimator
CATE_FEATURES = ['workclass', 'education', 'marital', 'occupation', 'relationship', 'race', 'sex', 'native_country']
stages = [] # stages in our Pipeline
for categoricalCol in CATE_FEATURES:
stringIndexer = StringIndexer(inputCol=categoricalCol, outputCol=categoricalCol + "Index")
encoder = OneHotEncoderEstimator(inputCols=[stringIndexer.getOutputCol()],
outputCols=[categoricalCol + "classVec"])
stages += [stringIndexer, encoder]

Comments

Popular posts from this blog

Flutter for Single-Page Scrollable Websites with Navigator 2.0

A Data Science Portfolio is More Valuable than a Resume

Better File Storage in Oracle Cloud