Build your own customized ETL Jobs on AWS Glue using Python
Introduction
This article aims to show readers how to write their own scripts for AWS Glue Jobs using Python. Glue provides default automation scripts to automate many different types of tasks on it, but we will write our own script today using the Gradient Boosting Tree Regression to train a model to predict the demand for items in a shop.
What is AWS glue?
1. AWS Glue is a fully managed Extract-Transform-Load pipeline (ETL) service. This service makes it simple and cost-effective to categorize your data, clean it, enrich it, and move it swiftly and reliably between various data stores.
2. It comprises of components such as a central metadata repository known as the AWS Glue Data Catalog, an ETL engine that automatically generates Python or Scala code, and a flexible scheduler that handles dependency resolution, job monitoring, and retries.
3. AWS Glue is serverless, which means that there’s no infrastructure to set up or manage.
When to use AWS Glue?
1. When you want to automate your ETL processes.
2. When you want to easily integrate with other data sources and targets such as Amazon Kinesis, Amazon Redshift, Amazon S3, etc.
3. When you want your pipeline to be cost-effective; it can be cheaper because users only pay for the consumed resources. If your ETL jobs require more computing power from time to time but generally consume fewer resources, you don’t need to pay for usage hours without any actual usage.
Let us start the practical work on writing the custom script for the Glue Job.
S3 Bucket
S3 is one of the most user-friendly services in the AWS arsenal. We have multiple options available to upload data to S3, which include:
1. Manually upload data using the Management Console.
2. Uploading programmatically via S3 APIs, SDKs, and AWS CLI…
3. et cetera
1. Create an S3 bucket:
Go to AWS console and search for S3 Bucket.
Create a new S3 bucket, Bucket name should be unique. (We created a bucket named glue-job-customscript)
After creating the bucket, create two directories inside the bucket.
1. Input
2. Output
Note:
Please change the bucket policy to grant read and write access to the user.
{"Version": "2012-10-17","Statement": [{"Sid": "ListObjectsInBucket","Effect": "Allow","Principal": "*","Action": "s3:*","Resource": "arn:aws:s3:::glue-job-customscript/*" //please replace with you own bucket arn}]}
Crawlers
A Crawler reads data at the source location and creates tables in the Data Catalog. A table is the metadata definition that represents your data.
The Crawler creates the metadata that allows GLUE and services such as ATHENA to view the information stored in the S3 bucket as a database with tables.
2. Create a Crawlers
Now we are going to create a Crawler
Go to the AWS console and search for AWS Glue.
You will be able to see Crawlers on the right side, click on Crawler.
Click on Add Crawler
1. Give a name to the crawler then click Next.
2. In Crawler source type, we do not need to change anything. Just click Next.
3. Data Store
Choose a data store S3 (I stored the file in the S3 bucket)
In Include path, provide the path of the file (s3://glue-job-customscript/input/) and click Next.
4. In Add another data source, select No and click Next.
5. IAM Role creates a new IAM role. Choose to create an IAM role and provide a name to the role (we created a role named Glue-Script).
6. Schedule: choose the appropriate option and click Next(we chose Run on-demand))
7. In Output select an existing database if there are, any otherwise create a new database (by clicking Add Database) and click Next.
8. Review all steps and click finish.
9. Select the created crawler and click Run Crawler.
10. After running the crawler, you will see a table generated in Tables added column, signified by a ‘1’ replacing the ‘0’ in the added column (initially, there were none).
You can verify the database that you created by clicking on Databases on the left side of the AWS Glue page.
You will see the database that you created while executing the crawler. Click the database that you created, you will be able to see the table that you created.
Job
An AWS Glue job encapsulates a script that connects to your source data, processes it, and then writes it out to your data target.
3. Create a Job
- Go to the AWS Glue page and on the left side, click on Jobs (legacy).
Click Add Job.
- Give a name to the job.
- Select IAM role
- Type: spark
- version: spark 2.4, Python 3(Glue Version 2.0)
This job runs
:- Select a radio button
- Choose A new script to be authored by you
- A blank script will be generated.
- In the script, write the code for your ETL process.
We wrote a script for Demand forecasting, using Python for Glue Job.
We took the dataset from Kaggle and used it to create a job.
We build a model for generating demand forecasts.
The challenge is to predict the sales in each store based on the features available in the dataset such as day of the week, month, year store and items.
Demand prediction is a common problem across businesses; good predictions allow a business or service to optimize inventory and to match supply and demand to make customers happy and maximize profitability.
To generate a forecast we have used the Gradient Boosting Tree Regression algorithm.
# Importing all the required Packagesimport sysfrom awsglue.transforms import *from awsglue.utils import getResolvedOptionsfrom pyspark.context import SparkContextfrom awsglue.context import GlueContextfrom awsglue.job import Jobfrom pyspark.ml import Pipelinefrom pyspark.ml.regression import GBTRegressorfrom pyspark.ml.feature import VectorIndexerfrom pyspark.ml.evaluation import RegressionEvaluatorfrom pyspark.ml.feature import VectorAssembler, VectorIndexerfrom pyspark.sql.functions import dayofweek, year, monthfrom awsglue.dynamicframe import DynamicFrame## @params: [JOB_NAME]args = getResolvedOptions(sys.argv, ['JOB_NAME'])sc = SparkContext()glueContext = GlueContext(sc)spark = glueContext.spark_sessionjob = Job(glueContext)job.init(args['JOB_NAME'], args)## Read the datadatasource0 = glueContext.create_dynamic_frame.from_catalog(database = "test_ v", table_name = "input", transformation_ctx = "datasource0")# Convert the DynamicFrame to Spark DataFramedf_data = datasource0.toDF()# from date field we are extracting "day_of_week", "month" and yeardf_data_1=df_data.withColumn('day_of_week',dayofweek(df_data.date)).withColumn('month', month(df_data.date)).withColumn("year", year(df_data.date))# Droping date coloumdf_data_2=df_data_1.drop('date')# Remove the target column from the input feature set.featuresCols = df_data_2.columnsfeaturesCols.remove('sales')# vectorAssembler combines all feature columns into a single feature vector column, "rawFeatures".vectorAssembler = VectorAssembler(inputCols=featuresCols, outputCol="rawFeatures")data_vector = vectorAssembler.transform(df_data_2)df_data_2=df_data_2.withColumnRenamed("sales", "label")data =df_data_2.select("label", "rawFeatures")# Automatically identify categorical features, and index them.# Set maxCategories so features with > 4 distinct values are treated as continuous.featureIndexer =\VectorIndexer(inputCol="rawFeatures", outputCol="features", maxCategories=4).fit(data)# Split the data into training and test sets (30% held out for testing)(trainingData, testData) = data.randomSplit([0.7, 0.3])# Train a GBT model.gbt = GBTRegressor(featuresCol="features", maxIter=15)# Chain indexer and GBT in a Pipelinepipeline = Pipeline(stages=[featureIndexer, gbt])# Train model. This also runs the indexer.model = pipeline.fit(trainingData)# Make predictions.predictions = model.transform(testData)# Select example rows to display.predictions.select("prediction", "label", "features").show(5)# Select (prediction, true label) and compute test errorevaluator = RegressionEvaluator(labelCol="label", predictionCol="prediction", metricName="rmse")rmse = evaluator.evaluate(predictions)gbtModel = model.stages[1]# Convert backed to a Dynamic DataFrametest_nest=DynamicFrame.fromDF(predictions, glueContext, "test_nest")# Result are store back to the S3 bucketdatasink2 = glueContext.write_dynamic_frame.from_options(frame = test_nest, connection_type = "s3", connection_options = {"path": "s3://glue-job-customscript/input"}, format = "csv", transformation_ctx = "datasink2")job.commit()
Github:
https://github.com/rajansahu713/Customized-ETL-Jobs-on-AWS-Glue-using-Python
Reference
https://docs.aws.amazon.com/glue/latest/dg/custom-scripts.html
https://spark.apache.org/docs/latest/api/python/
https://spark.apache.org/docs/latest/api/python/pyspark.ml.html
#Python # Pyspark #AWS
Thank You!
The next Article will be on
Let's build CRUD operations Rest API in Python Top 3 Web-Framework Django, Flask, and FastAPI.
Comments