Monday, March 14, 2022

Apache Spark with Scala: read files from S3 using python and Boto3.

 How to read data from S3 using boto3 and python, transform using Scala.

Image by author

Designing and developing data pipelines is at the core of big data engineering. ETL is a major job that plays a key role in data movement from source to destination.

Extracting data from Sources can be daunting at times due to access restrictions and policy constraints. Almost all the businesses are targeting to be cloud-agnostic, AWS is one of the most reliable cloud service providers and S3 is the most performant and cost-efficient cloud storage, most ETL jobs will read data from S3 at one point or the other.

It is important to know how to dynamically read data from S3 for transformations and to derive meaningful insights. With Boto3 and Python reading data and with Apache spark transforming data is a piece of cake. we are going to utilize amazon’s popular python library ‘boto3’ to read data from S3 and perform our read.

Enough talk, Let's read our data from S3 buckets using boto3 and iterate over the bucket prefixes to fetch and perform operations on the files,

Boto3: is used in creating, updating, and deleting AWS resources from python scripts and is very efficient in running operations on AWS resources directly.

before proceeding set up your AWS credentials and make a note of them, these credentials will be used by Boto3 to interact with your AWS account.

Boto3 offers two distinct ways for accessing S3 resources,

1: Client: low-level service access

2: Resource: higher-level object-oriented service access

You can use either to interact with S3. here we are going to leverage
‘resource’ to interact with S3 for high-level access,


import boto3

session = boto3.Session(
    aws_access_key_id=settings.AWS_SERVER_PUBLIC_KEY,
    aws_secret_access_key=settings.AWS_SERVER_SECRET_KEY,
)

S3 = session.resource('s3')

get_sz_folder_name = []
source_path = S3_bucket_address  #(http://s3.amazonaws.com/[bucket_name]/)

tokens = source_path.split('/')
src_bucket_name = tokens[2]
prefix = tokens[3] + '/' + tokens[4] + '/'
paginator = client.get_paginator('list_objects_v2')
pages = paginator.paginate(
  Bucket = src_bucket_nm,
  Delimiter = '/',
  Prefix = prefix,
  PaginationConfig={
    'MaxItems': 50000,
    'PageSize': 200
  }
)

for page in pages:
  for obj in page['CommonPrefixed']:
    k = (obj.get('Prefix')).split('=')[1][0:-1]
    get_sz_folder_name.append(mystring + k, )
    
source_folders_list = ','.join(map(repr, get_sz_folder_name)).replace("'","")
S3_path = source_path + '/' + source_folders_list + '/*' + file_format_type
Read: We have our S3 bucket and prefix details at hand, let’s query over the files from S3 and load them into Spark for transformations.
import com.typesafe.config.{ Config, ConfigFactory }
import org.apache.spark.sql.SparkSession

val sparkSession = SparkSession.builder().appName("S3 Transform").master("yarn").getOrCreate()

def read_from_s3(spark,df, app_config) = {
  df = spark.read.format("data_format").load(S3_path)
  input_format = app_config.get_outputFormat()
  
  loggging.info("Reading from S3------>")
  if ('xml' == data_format){
    row_tag = config.get_rowTag()
    df = spark.read.format("com.databricks.spark.xml").option("rowTag", row_tag).option("nullValue","").load(S3_path)
  } else if (data_format = 'json'){
    df = spark.read.json("S3_path") // add option("multiline",true) for nested json
  } else if (data_format = 'parquet){
    df = spark.read.parquet("S3_path")
  }
}
Transformation:

Leaving the transformation part for audiences to implement their own logic and transform the data as they wish.

Write: writing to S3 can be easy after transforming the data, all we need is output location and the file format in which we want the data to be saved, Apache spark does the rest of the job.

def write_to_s3(spark, df, app_config) = {
  output_location = app_config.get_outputLocation()
  output_format = app_config.get_outputFormat()
  
  loggging.info("Writing to S3------>")
  if (output_format == 'parquet'){
    df.write.mode('overwrite').parquet(output_location)
  }
  else if (output_format == 'json'){
    df.write.json(output_location)
  }
  else (output_format == 'csv'){
    df.write.csv(output_location)
  }
  df
}





Conclusion:

ETL is at every step of the data journey, leveraging the best and optimal tools and frameworks is a key trait of Developers and Engineers. Having said that, Apache spark doesn't need much introduction in the big data field. it is one of the most popular and efficient big data processing frameworks to handle and operate over big data.

Boto3 is one of the popular python libraries to read and query S3, This article focuses on presenting how to dynamically query the files to read and write from S3 using Apache Spark and transforming the data in those files.

No comments:

Must Watch YouTube Videos for Databricks Platform Administrators

  While written word is clearly the medium of choice for this platform, sometimes a picture or a video can be worth 1,000 words. Below are  ...