Monday, December 25, 2023

Python used for Data Engineering

 Python plays a crucial role in the world of data engineering, offering versatile and powerful libraries. It has been adopted in various domains, including data science, machine learning, AI, data visualization, and data engineering. Python is widely used in big data processing through frameworks like Apache Spark, workflow orchestration, web scraping, and more. In this post, I’ll present the useful elements of this language and its use cases for data engineering.

Illustrated by author

Build-in data structures

Python provides several built-in data structures that are widely used for various purposes. List, Tuple, Dictionary, and Set are four different types of data structures, each with its characteristics and use cases.

List

Mutable: Lists are mutable, meaning you can modify their elements after the list is created. You can add, remove, or modify elements.

Syntax: Defined using square brackets [].

my_list = [1, 2, 3, 'a', 'b', 'c']
my_list.append(4) # Adds 4 to the end of the list
my_list.remove('a') # Removes the element 'a'

Tuple

Immutable: Tuples are immutable, meaning once they are created, their elements cannot be changed or modified. Unlike lists, you can’t add or remove elements from a tuple.

Syntax: Defined using parentheses ().

my_tuple = (1, 2, 3, 'a', 'b', 'c')

Dictionary

Dictionaries are used to store data values in key-value pairs. A dictionary is a collection that is changeable and does not allow duplicates in the keys.

thisdict = {
"brand": "Ford",
"model": "Mustang",
"year": 1964
}

Set

Unordered and Unique Elements: Sets are unordered collections of unique elements. They do not allow duplicate values, and the order of elements is not guaranteed.

Syntax: Defined using curly braces {} or by using the set() constructor.

my_set = {1, 2, 3}
my_set.add(1)
my_set.add(1)
print(my_set)

As you can see, even after adding two more ‘1’s, the set still contains only one ‘1’.

Operations on Lists

List Comprehension

List comprehension offers a shorter syntax when you want to create a new list based on the values of an existing list. This allows you to simplify code.

# Without list comprehension
squares = []
for i in range(5):
squares.append(i**2)
print(squares)

# With list comprehension
squares = []
squares = [i**2 for i in range(5)]
print(squares)

# Output:
# [0, 1, 4, 9, 16]

Filter, Map, and Reduce

Map, Filter, and Reduce allow the programmer to write simpler, shorter code, without loops.

Filter

Filter as the name suggests helps to filter your list using filter conditions.

numbers = [1, 2, 3, 4, 5, 6, 7, 8, 9]

# Filter even numbers
even_numbers = list(filter(lambda x: x % 2 == 0, numbers))
# Output: [2, 4, 6, 8]

Map

The Map function allows us to use a function on a list of elements.

def power2(val: int) -> int:
return val*val

numbers = [1, 2, 3, 4, 5]

power_numbers = list(map(power2, numbers))
# OR
squared_numbers = list(map(lambda x: x**2, numbers))

# Output: [1, 4, 9, 16, 25]

Reduce

The reduce() function is a powerful tool in Python that operates on a list (or any iterable), applies a function to its elements, and ‘reduces’ them to a single output.

from functools import reduce

words = ["apple", "banana", "orange", "apple", "grape", "banana"]

# Count the occurrences of each word
word_counts = reduce(lambda counts, word: {**counts, word: counts.get(word, 0) + 1}, words, {})

print(word_counts)
# Output: {'apple': 2, 'banana': 2, 'orange': 1, 'grape': 1}


numbers = [1, 2, 3, 4, 5]
product = reduce((lambda x, y: x * y), numbers)

# Output: 120

Enumerate

The enumerate function adds a counter to an iterable and returns it. If your background is in C# avoid using this:

lista = ['a','b','c','d','e']
count = 0
for l in lista:
print('Index:', count,' Value:', l)
count+=1

Use Enumerate to achieve the same result:

lista = ['a','b','c','d','e']
for count, l in enumerate(lista):
print('Index:', count,' Value:', l)

Generators

Python provides a generator to create your iterator function. A generator is a special type of function that does not return a single value, instead, it returns an iterator object with a sequence of values. In a generator function, a yield keyword is used instead of a return. It allows you to create an iterator without loading the entire dataset into memory, making it suitable for processing huge files.

text = """
transaction_id,user\r
1,aaa\r
\r
2,xx\r
3,ccc\r
\r
"""


def process_large_file(text):
for line in text.split("\r"):
# Process the line
processed_line = line.strip().upper()

if processed_line != "":
# Yield the processed line
yield processed_line

# Example usage

for processed_line in process_large_file(text):
print(processed_line)

# Output:
# 1,AAA
# 2,XX
# 3,CCC

We can use it to adjust file format to remove content that doesn’t meet the CSV format requirements.

Decorators

Python Decorators are used to apply additional functionality to objects. They are used to provide more functionality without having to write additional code inside the object. For example, we can modify the returned value and display it from the decorator function.

def make_upper(function):
def upper():
f = function()
print(f"this from orgin value: {f}")
return f.upper()
return upper

@make_upper #decorator
def helloworld():
return "hello world"

print(helloworld())

# Otput:
# this from orgin value: hello world
# HELLO WORLD

In practice, you can create a retry decorator to execute a function in the case of an error a few times.

import time

def retry(times, wait):

def decorator(func):
def newfn(*args, **kwargs):
attempt = 0
while attempt < times:
try:
time.sleep(wait)
return func(*args, **kwargs)
except Exception as e:
print(
'Exception thrown when attempting to run %s, attempt '
'%d of %d' % (func, attempt, times)
)
attempt += 1
time.sleep(wait)
return func(*args, **kwargs)
return newfn
return decorator

@retry(times=3, wait=2)
def get_from_rest():
print('Try read data from rest API')

raise ConnectionError ('Lack of connection')

get_from_rest()

Data Class

A data class in Python is a specially structured class that is optimized for the storage and representation of data. Data classes have certain built-in functions to take care of the representation of data as well as its storage.

Data class takes care of things like displaying values, and object comparison. We don’t need to use a constructor to assign values. You don’t need to implement __repr__, __eq__, or __hash__ for debugging and object comparison.

from dataclasses import dataclass


@dataclass #dataclass decorator
class cutomerD:
name: str #Type Hints
id: int
surname: str

class customer:
def __init__(self,name,aid,books):
self.name = name
self.id = aid
self.surname = books

Obj1 = customer("Erick",1254,"Nowak")
Obj2 = customer("Erick",1254,"Nowak")

Obj3 = cutomerD("Erick",1254,"Nowak")
Obj4 = cutomerD("Erick",1254,"Nowak")


print("Difrence for debuging")
print(Obj1)
print(Obj3)

print("\nDifference in Equality Check")
print(Obj1==Obj2)
print(Obj3==Obj4)

# Output:
# Difrence for debuging
# <__main__.customer object at 0x0000021E1108AC80>
# cutomerD(name='Erick', id=1254, surname='Nowak')
#
# Difference in Equality Check
# False
# True

Easy way to convert to dictionary or tuple. It simplifies the code when we need to operate on this format and save data in JSON output.

from dataclasses import dataclass, astuple, asdict


@dataclass #dataclass decorator
class customer:
name: str #Type Hints
id: int
surname: str


Obj1 = customer("Erick",1254,"Nowak")

print(astuple(Obj1))
print(asdict(Obj1))

# Output:
# ('Erick', 1254, 'Nowak')
# {'name': 'Erick', 'id': 1254, 'surname': 'Nowak'}

json.dumps(asdict(Obj1))

Concurrency vs. parallelism

Concurrency and parallelism are names for two different mechanisms for task execution in a script. For a data engineer, these techniques will help to speed up the process of retrieving data from API or transforming data.

Multithreading Pools

Multithreading is a way to achieve parallelism by executing multiple threads of code. A thread is a lightweight process that shares the same memory space as the parent process. To use multi-threading pools in Python, you can use the ThreadPoolExecutor class from the concurrent.futures module. Using threads we can parallel calls to API to retrieve data.

import calendar
from concurrent.futures import ThreadPoolExecutor
import requests

def generate_dates(year, month):
_, last_day = calendar.monthrange(year, month)

dates = [f"{year}-{month:02d}-{day:02d}" for day in range(1, last_day + 1)]
return dates

year = 2023
month = 10
result = generate_dates(year, month)
urls = []
for x in result:
urls.append(f"http://api.nbp.pl/api/exchangerates/rates/a/gbp/{x}/")

def download_page(url):
response = requests.get(url)
return response.content

with ThreadPoolExecutor(max_workers=5) as executor:
results = list(executor.map(download_page, urls))

for result in results:
print(result)

Multiprocessing Pools

Multiprocessing is a technique used to achieve parallelism by running multiple processes simultaneously. A process is an independent instance of a running program that possesses its own memory space. Python provides a built-in multiprocessing module that facilitates the creation and management of processes.

from multiprocessing import Pool
import time

def square(x):
time.sleep(1)
print(x)
return x * 2


numbers = [1, 2, 3, 4, 5]

if __name__ == '__main__':
with Pool(processes=2) as pool:
results = pool.map(square, numbers)
print(results)

Concurrency

Concurrency in Python can be achieved through coroutines or asynchronous programming, providing an alternative approach to running functions concurrently. Unlike traditional threading, coroutines use specific programming constructs and are managed by the Python runtime with significantly reduced overhead.

For example, consider the usage of asyncio:

import asyncio
import time

async def sql_command1():
await asyncio.sleep(2)
print("Query executed 1")
return {"col1": 1, "col2": 2}


async def sql_command2():
await asyncio.sleep(3)
print("Query executed 2")
return {"col1": 1, "col2": 2}

async def sql_command3():
await asyncio.sleep(3)
print("Query executed 3")
return {"col1": 1, "col2": 2}


async def main():

start_time = time.time()

sql1 = asyncio.create_task(sql_command1())
sql2 = asyncio.create_task(sql_command2())
sql3 = asyncio.create_task(sql_command3())

res1 = await sql1
res2 = await sql2
res3 = await sql3

end_time = time.time()
exec_time = end_time - start_time
print(f"re1 {res1}")
print(f"re2 {res2}")
print(f"re3 {res3}")
print(f"total time {exec_time:.2f}")

if __name__ == "__main__":
asyncio.run(main())

Rather than executing code synchronously and waiting for each step, using this method allows us to import data simultaneously from different URLs, thereby reducing import time.

For example, here is an illustration of importing data from multiple URLs:

import aiohttp
import asyncio
import calendar

def generate_dates(year, month):
_, last_day = calendar.monthrange(year, month)

dates = [f"{year}-{month:02d}-{day:02d}" for day in range(1, last_day + 1)]
return dates

year = 2023
month = 10
result = generate_dates(year, month)
urls = []
for x in result:
urls.append(f"http://api.nbp.pl/api/exchangerates/rates/a/gbp/{x}/")

async def download_page(session, url):
async with session.get(url) as r:
return await r.text()


async def main():
async with aiohttp.ClientSession() as session:
datas = await asyncio.gather(*[download_page(session, u) for u in urls])
for x in datas:
print(x)

if __name__ == "__main__":
loop = asyncio.new_event_loop()
loop.run_until_complete(main())

Integration with Cloud Storage

When integrating our application with cloud storage, the typical approach involves using SDKs and writing methods to handle tasks such as reading, writing, and listing files. However, this often requires different methods for each cloud provider.

Alternatively, Python libraries like gcsfs, adlfs, and s3fs offer a streamlined way to integrate with the cloud storage services of Azure, AWS, and GCP respectively.

gcsfs — GCP Cloud storage integration

import gcsfs
fs = gcsfs.GCSFileSystem()

dest = "gs://dxxx/clients/clients.csv"

with fs.open(dest,"wb") as file:
file.write("hello;csv;file")

adlfs- Azure Account storage integration

import adlfs


fs = adlfs.AzureBlobFileSystem(account_name=os.environ["AZURE_STORAGE_ACCOUNT_NAME"])
local_filename = "landing/clients.csv"

with fs.open(local_filename, "wb") as f:
f.write("hello;csv;file")

s3fs- AWS S3 storage integration

import s3fs

fs = s3fs.S3FileSystem(key=mykey, secret=mysecretkey)
bucket = "my-bucket"


files = fs.ls(bucket)

with s3.open('my-bucket/my-file.txt', 'rb') as f:
print(f.read())

As evident, these libraries offer a straightforward and unified method for communication with cloud storage. The consistency in file read/write operations across gcsfs, adlfs, and s3fs proves immensely helpful, especially when working with multiple cloud providers. This uniformity not only simplifies code implementation but also contributes to making your code cloud-agnostic, allowing for seamless transitions between different cloud storage services.

Useful libraries

As a data engineer, a set of essential libraries is crucial to efficiently handle various aspects of data processing. These include libraries for data manipulation, filtering, adjusting values, aggregations, as well as tools for seamlessly working with diverse file formats such as CSV, Parquet, JSON, Avro, and more. Additionally, having robust libraries for interfacing with databases further streamlines data management tasks, contributing to the overall effectiveness of data engineering workflows.

Pandas

Pandas stands as a widely-used Python library in the realm of data manipulation and analysis. Its versatility makes it an invaluable tool for handling datasets of various sizes and complexities. Below is an example demonstrating how Pandas can efficiently read large CSV files, showcasing its capability to manage substantial datasets with ease:

import pandas as pd

chunksize = 10 ** 6
for chunk in pd.read_csv(filename, chunksize=chunksize):
for index, row in chunk.iterrows():
print(row)

PySpark

PySpark is the Python library for Apache Spark, an open-source, distributed computing system. Apache Spark offers a fast and versatile cluster computing framework designed for extensive big data processing and analytics. One of its key strengths is the ability to efficiently perform processing tasks on massive datasets while also enabling the distribution of these tasks across multiple computers for enhanced scalability.

data = {'Name': ['Alice', 'Bob', 'Charlie', 'David'],
'Age': [25, 30, 35, 40],
'City': ['New York', 'San Francisco', 'Los Angeles', 'Chicago']}


df = spark.createDataFrame(data)
df.printSchema()

Polars

Polars is a high-performance DataFrame library implemented in Rust and designed for seamless data manipulation in Python. While similar to Pandas in its DataFrame functionality, Polars distinguishes itself by prioritizing performance. Below is a brief example showcasing the efficiency and ease of use offered by Polars:

import polars as pl

# Creating a DataFrame
data = {'Name': ['Alice', 'Bob', 'Charlie', 'David'],
'Age': [25, 30, 35, 40],
'City': ['New York', 'San Francisco', 'Los Angeles', 'Chicago']}

df = pl.DataFrame(data)
# Filtering data
filtered_df = df.filter(df['Age'] > 30)

# Adding a new column
df = df.with_columns(salary = pl.lit(100))

# Grouping data and calculating statistics
grouped_city = df.groupby('City').agg(pl.col('Age').mean().alias('Avg_Age'))

DuckDB

DuckDB stands out as an in-memory analytical database management system tailored for OLAP (Online Analytical Processing) workloads. Renowned for its exceptional performance on analytical queries and efficient resource utilization, DuckDB offers a seamless experience for data analysis. One notable feature is its execution on a local machine, eliminating the need for server installations. Moreover, DuckDB seamlessly integrates with Python, enabling the reading of various formats such as CSV and Parquet. Its versatility extends to integration with popular data frame libraries like Polars and Pandas, providing users with flexible options for data manipulation and analysis.

import duckdb
import pandas

# Create a Pandas dataframe
my_df = pandas.DataFrame.from_dict({'a': [42]})

# query the Pandas DataFrame "my_df"
# Note: duckdb.sql connects to the default in-memory database connection
results = duckdb.sql("SELECT * FROM my_df").df()

Faker-Synthetic data generator

Faker is a powerful Python package designed for generating synthetic data. It proves invaluable for testing pipelines, databases, and creating stress tests by providing a diverse set of fake data, including names, addresses, card numbers, and more.

Noteworthy is Faker’s versatility, allowing users to customize generated data using locales. This feature enhances the tool’s adaptability to various scenarios, making it a go-to choice for those in need of realistic yet synthetic data for testing and development.

from faker import Faker

# Generate synthetic client data
def generate_client_data(num_clients=100000):
clients = []
for client_num in range(1, num_clients + 1):
client = {
"client_number": client_num,
"name": fake.name(),
"email": fake.email(),
"phone_number": fake.phone_number(),
"bulding_number": fake.building_number(),
"street_name": fake.street_name(),
"postcode": fake.postcode(),
"city": fake.city(),
"state": fake.state(),
"birth_date": fake.date_of_birth(minimum_age=18, maximum_age=90).strftime('%Y-%m-%d'),
"credit_card_number" : fake.credit_card_number(card_type='mastercard'),
}
clients.append(client)
return clients

SQLAlchemy — Connection to Database

Data engineering often involves working with databases, encompassing tasks like querying tables, and inserting, updating, and deleting rows. SQLAlchemy is a versatile Python library that supports a range of popular SQL databases such as PostgreSQL, Oracle, MS SQL, Snowflake, BigQuery, and more. Notably, its seamless integration with Pandas streamlines and automates many data-related tasks. To establish a connection to a database, the `create_engine()` function is employed with a URL. For instance, the URL for connecting to a PostgreSQL database would look like this:

dialect+driver://username:password@host:port/database”.

import psycopg2
import pandas as pds
from sqlalchemy import create_engine

engine= create_engine('postgresql+psycopg2://test:@127.0.0.1', pool_recycle=3600)


with engine.connect() as conn:
dataFrame = pds.read_sql("select * from tab1", conn)

If we need to load data to an SQL table you can easily achieve this using:

df = pd.read_excel('sample.xlsx')
df.to_sql('table_name', con=engine, if_exists='append', index= False)

Unit Tests

Testing is a crucial component of data engineering development, particularly when dealing with large and intricate systems for data processing. To ensure the robustness of methods and prevent bugs, Python offers two popular testing frameworks: unittest and pytest. While a detailed discussion of their differences is beyond the scope here, ample resources are available online for those interested.

In the context of data engineering, let’s explore a few examples of how unittest and pytest can be employed to fortify our testing strategies:

Unittest

Below you can see a few examples that test if get the expected result.

import unittest

class ExampleTestSuite(unittest.TestCase):

def test_import(self):
self.assertTrue(True)

def test_addition(self):
self.assertEqual(1 + 2, 3)

def test_subtraction(self):
self.assertNotEqual(1 - 2, 0)

if __name__ == '__main__':
unittest.main()

Pytest

Below you can see the same example for pytest


def test_import():
assert True


def test_addition():
assert 1 + 2 == 3


def test_subtraction():
assert 1 - 2 != 0

We can call this test using pytest command.

Mocking

Mocking is a powerful technique, especially when dealing with external APIs in data engineering. It enables us to simulate responses from APIs in dependent functions, allowing for thorough testing without making actual API calls. Below, you’ll find examples showcasing how to use mocking effectively in a data engineering context:

# app.py

import requests
import json


def get_currency(url):
response = requests.get(url)
return response.text

def get_currency_rate():
url = "http://api.nbp.pl/api/exchangerates/rates/a/gbp/2023-11-02/"
currency = json.loads(get_currency(url))
return currency["rates"][0]["mid"]


if __name__ == "__main__":
print(get_currency_rate())

Mocking Unit Test:

import unittest
from unittest.mock import patch, MagicMock

from app import get_currency_rate, get_currency


class TestImports(unittest.TestCase):
text = '{"table": "A", "currency": "funt szterling", "code": "GBP", "rates": [{"no": "212/A/NBP/2023", "effectiveDate": "2023-11-02", "mid": 5.1135}]}'

@patch("app.get_currency")
def test_import_currency(self, mock_get_currency):
mock_get_currency.return_value = self.text
self.assertEqual(get_currency_rate(), 5.1135)

@patch("app.requests")
def test_get_currency(self, mock_requests):
mock_response = MagicMock()
mock_response.status_code = 404
mock_requests.get.return_value = mock_response

self.assertEqual(get_currency("test"), '{"rates": [{"mid": ""}]}')


if __name__ == "__main__":
unittest.main()

When you run the test script `test_import.py`, you should observe the following result:

The Unit test result

Module Fixture — Data Frame testing

When working with data frames in libraries like Spark, Pandas, Polars, and others, having input data for testing is essential. In Pytest, fixtures provide a convenient way to prepare such data. A fixture is a function marked with the @pytest.fixture decorator, which automatically executes and delivers test data for a corresponding test function. Here’s an example demonstrating how to use a fixture for testing data frames:

# app.py

import polars as pl


def filter_dataframe(df: pl.DataFrame) -> pl.DataFrame:
return df.filter(pl.col("col2") == "a")


def add_col_to_df(df: pl.DataFrame) -> pl.DataFrame:
return df.with_columns((pl.col("col1") ** 2).alias("col1_power"))


if __name__ == "__main__":
df = pl.DataFrame({"col1": [1, 2, 3], "col2": ["a", "b", "c"]})
print(add_col_to_df(df))

Test file:

# test_app.py

import pytest
import polars as pl


from app import filter_dataframe, add_col_to_df


@pytest.fixture
def test_dataframe() -> pl.DataFrame:

return pl.DataFrame({"col1": [1, 2, 3], "col2": ["a", "b", "c"]})


def test_filter_dataframe(test_dataframe):
df = filter_dataframe(test_dataframe)
rows_count_b = df.filter(pl.col("col2") == "b").select(pl.count()).item()
rows_count_c = df.filter(pl.col("col2") == "c").select(pl.count()).item()
assert rows_count_b == 0
assert rows_count_c == 0


def test_add_col_to_df(test_dataframe):

actual_df = add_col_to_df(test_dataframe)

assert actual_df.columns == ["col1", "col2", "col1_power"]
assert actual_df.rows() == [(1, "a", 1), (2, "b", 4), (3, "c", 9)]

To execute the tests, you can use the following command in the terminal. Pytest will automatically discover and run all test files in the specified directory.

The test result

Summary

In this overview, I’ve delved into the diverse applications of Python in data engineering, drawing from my own experiences. Python emerges as a versatile powerhouse, offering practical tools for data processing, importing, storing, transforming, and orchestrating data. From microservices to ETL processes, Python facilitates solutions for both Big Data and smaller datasets, enabling seamless stream and batch processing tailored to specific needs and use cases.

If you found this article insightful, I invite you to show your appreciation by liking it on LinkedIn and clicking the “clap” button. Your support is greatly appreciated.

No comments:

Must Watch YouTube Videos for Databricks Platform Administrators

  While written word is clearly the medium of choice for this platform, sometimes a picture or a video can be worth 1,000 words. Below are  ...