Build your own customized ETL Jobs on AWS Glue using Python

 

Introduction

What is AWS glue?

1. AWS Glue is a fully managed Extract-Transform-Load pipeline (ETL) service. This service makes it simple and cost-effective to categorize your data, clean it, enrich it, and move it swiftly and reliably between various data stores.

2. It comprises of components such as a central metadata repository known as the AWS Glue Data Catalog, an ETL engine that automatically generates Python or Scala code, and a flexible scheduler that handles dependency resolution, job monitoring, and retries.

3. AWS Glue is serverless, which means that there’s no infrastructure to set up or manage.

When to use AWS Glue?

1. When you want to automate your ETL processes.

2. When you want to easily integrate with other data sources and targets such as Amazon Kinesis, Amazon Redshift, Amazon S3, etc.

3. When you want your pipeline to be cost-effective; it can be cheaper because users only pay for the consumed resources. If your ETL jobs require more computing power from time to time but generally consume fewer resources, you don’t need to pay for usage hours without any actual usage.

S3 Bucket

1. Create an S3 bucket:

AWS S3 Bucket

Note:

Please change the bucket policy to grant read and write access to the user.

{"Version": "2012-10-17","Statement": [{"Sid": "ListObjectsInBucket","Effect": "Allow","Principal": "*","Action": "s3:*","Resource": "arn:aws:s3:::glue-job-customscript/*"   //please replace with you own bucket arn}]}

Crawlers

A Crawler reads data at the source location and creates tables in the Data Catalog. A table is the metadata definition that represents your data.

The Crawler creates the metadata that allows GLUE and services such as ATHENA to view the information stored in the S3 bucket as a database with tables.

2. Create a Crawlers

You will see the database that you created while executing the crawler. Click the database that you created, you will be able to see the table that you created.

Job

An AWS Glue job encapsulates a script that connects to your source data, processes it, and then writes it out to your data target.

3. Create a Job

  • Go to the AWS Glue page and on the left side, click on Jobs (legacy).
Glue Job
  • Give a name to the job.
  • Select IAM role
  • Type: spark
  • version: spark 2.4, Python 3(Glue Version 2.0)
  • This job runs:
  • Select a radio button
  • Choose A new script to be authored by you
GLue Job
  • A blank script will be generated.
  • In the script, write the code for your ETL process.
# Importing all the required Packagesimport sysfrom awsglue.transforms import *from awsglue.utils import getResolvedOptionsfrom pyspark.context import SparkContextfrom awsglue.context import GlueContextfrom awsglue.job import Jobfrom pyspark.ml import Pipelinefrom pyspark.ml.regression import GBTRegressorfrom pyspark.ml.feature import VectorIndexerfrom pyspark.ml.evaluation import RegressionEvaluatorfrom pyspark.ml.feature import VectorAssembler, VectorIndexerfrom pyspark.sql.functions import dayofweek, year, monthfrom awsglue.dynamicframe import DynamicFrame## @params: [JOB_NAME]args = getResolvedOptions(sys.argv, ['JOB_NAME'])sc = SparkContext()glueContext = GlueContext(sc)spark = glueContext.spark_sessionjob = Job(glueContext)job.init(args['JOB_NAME'], args)## Read the datadatasource0 = glueContext.create_dynamic_frame.from_catalog(database = "test_ v", table_name = "input", transformation_ctx = "datasource0")# Convert the DynamicFrame to Spark DataFramedf_data = datasource0.toDF()# from date field we are extracting "day_of_week", "month" and yeardf_data_1=df_data.withColumn('day_of_week',dayofweek(df_data.date)).withColumn('month', month(df_data.date)).withColumn("year", year(df_data.date))# Droping date coloumdf_data_2=df_data_1.drop('date')# Remove the target column from the input feature set.featuresCols = df_data_2.columnsfeaturesCols.remove('sales')# vectorAssembler combines all feature columns into a single feature vector column, "rawFeatures".vectorAssembler = VectorAssembler(inputCols=featuresCols, outputCol="rawFeatures")data_vector = vectorAssembler.transform(df_data_2)df_data_2=df_data_2.withColumnRenamed("sales", "label")data =df_data_2.select("label", "rawFeatures")# Automatically identify categorical features, and index them.# Set maxCategories so features with > 4 distinct values are treated as continuous.featureIndexer =\VectorIndexer(inputCol="rawFeatures", outputCol="features", maxCategories=4).fit(data)# Split the data into training and test sets (30% held out for testing)(trainingData, testData) = data.randomSplit([0.7, 0.3])# Train a GBT model.gbt = GBTRegressor(featuresCol="features", maxIter=15)# Chain indexer and GBT in a Pipelinepipeline = Pipeline(stages=[featureIndexer, gbt])# Train model.  This also runs the indexer.model = pipeline.fit(trainingData)# Make predictions.predictions = model.transform(testData)# Select example rows to display.predictions.select("prediction", "label", "features").show(5)# Select (prediction, true label) and compute test errorevaluator = RegressionEvaluator(labelCol="label", predictionCol="prediction", metricName="rmse")rmse = evaluator.evaluate(predictions)gbtModel = model.stages[1]# Convert backed to a Dynamic DataFrametest_nest=DynamicFrame.fromDF(predictions, glueContext, "test_nest")# Result are store back to the S3 bucketdatasink2 = glueContext.write_dynamic_frame.from_options(frame = test_nest, connection_type = "s3", connection_options = {"path": "s3://glue-job-customscript/input"}, format = "csv", transformation_ctx = "datasink2")job.commit()

Thank You!

Comments

Popular posts from this blog

Flutter for Single-Page Scrollable Websites with Navigator 2.0

A Data Science Portfolio is More Valuable than a Resume

Better File Storage in Oracle Cloud