How to send tabular time series data to Apache Kafka with Python and Pandas

What you’ll learn

What you should know already

Why use Apache Kafka for time series data?

Why use Python with Apache Kafka?

Prerequisites for this tutorial

SOFTWARE

Major steps

Setting up Apache Kafka

bin/zookeeper-server-start.sh config/zookeeper.properties
bin\windows\zookeeper-server-start.bat .\config\zookeeper.properties
bin/kafka-server-start.sh config/server.properties
bin\windows\kafka-server-start.bat .\config\server.properties
bin/kafka-topics.sh --create --topic transactions --bootstrap-server localhost:9092
bin\windows\kafka-topics.bat --create --topic transactions --bootstrap-server localhost:9092

Analyzing the data

>>> import pandas as pd
>>> df = pd.read_csv("online_retail_II.csv", encoding="unicode_escape")
>>> print(df.info())
RangeIndex: 1067371 entries, 0 to 1067370
Data columns (total 8 columns):
# Column Non-Null Count Dtype
- - - - - - - - - - - - - - -
0 Invoice 1067371 non-null object
1 StockCode 1067371 non-null object
2 Description 1062989 non-null object
3 Quantity 1067371 non-null int64
4 InvoiceDate 1067371 non-null object
5 Price 1067371 non-null float64
6 Customer ID 824364 non-null float64
7 Country 1067371 non-null object
dtypes: datetime64[ns](1), float64(2), int64(1), object(4)
memory usage: 65.1+ MB
>>> df['InvoiceDate'] = pd.to_datetime(df['InvoiceDate'])
>>> df.set_index('InvoiceDate', inplace=True)

What Kafka can do with time series data

The advantages of using DataFrames with Kafka

Creating a Kafka Producer to send the data

# Import packages
import pandas as pd
import json
import datetime as dt
from time import sleep
from kafka import KafkaProducer

# Initialize Kafka Producer Client
producer = KafkaProducer(bootstrap_servers=['localhost:9092'])
print(f'Initialized Kafka producer at {dt.datetime.utcnow()}')
# Set a basic message counter and define the file path
counter = 0
file = "online_retail_II.csv"

for chunk in pd.read_csv(file,encoding='unicode_escape',chunksize=10):

# For each chunk, convert the invoice date into the correct time format
chunk["InvoiceDate"] = pd.to_datetime(chunk["InvoiceDate"])

# Set a counter as the message key
key = str(counter).encode()

# Convert the data frame chunk into a dictionary
chunkd = chunk.to_dict()

# Encode the dictionary into a JSON Byte Array
data = json.dumps(chunkd, default=str).encode('utf-8')

# Send the data to Kafka
producer.send(topic="transactions", key=key, value=data)

# Sleep to simulate a real-world interval
sleep(0.5)

# Increment the message counter for the message key
counter = counter + 1

print(f'Sent record to topic at time {dt.datetime.utcnow()}')

Creating a Kafka Consumer to read the data

from kafka import KafkaConsumer
import json
import pandas as pd

# Consume all the messages from the topic but do not mark them as 'read' (enable_auto_commit=False)
# so that we can re-read them as often as we like.
consumer = KafkaConsumer('transactions',
group_id='test-consumer-group',
bootstrap_servers=['localhost:9092'],
value_deserializer=lambda m: json.loads(m.decode('utf-8')),
auto_offset_reset='earliest',
enable_auto_commit=False)
for message in consumer:
mframe = pd.DataFrame(message.value)

# Multiply the quantity by the price and store in a new "revenue" column
mframe['revenue'] = mframe.apply(lambda x: x['Quantity'] * x['Price'], axis=1)

# Aggregate the StockCodes in the individual batch by revenue
summary = mframe.groupby('StockCode')['revenue'].sum()

print(summary)
Name: revenue, dtype: float64
StockCode
16161P 10.50
16169N 10.50
21491 11.70
22065 17.40
22138 44.55
22139 44.55
22352 30.60
85014A 17.85
85014B 17.85
85183B 144.00

Wrapping up

1— You produced a high-frequency stream of messages and streamed them into a Kafka topic

2 — You consumed a high-frequency stream of messages from a Kafka topic and performed an aggregation on the data

About the author

Comments

Popular posts from this blog

Flutter for Single-Page Scrollable Websites with Navigator 2.0

A Data Science Portfolio is More Valuable than a Resume

Better File Storage in Oracle Cloud