Thursday, January 26, 2023

How to send tabular time series data to Apache Kafka with Python and Pandas

Setting up Apache Kafka

bin/ config/
bin\windows\zookeeper-server-start.bat .\config\
bin/ config/
bin\windows\kafka-server-start.bat .\config\
bin/ --create --topic transactions --bootstrap-server localhost:9092
bin\windows\kafka-topics.bat --create --topic transactions --bootstrap-server localhost:9092

Analyzing the data

>>> import pandas as pd
>>> df = pd.read_csv("online_retail_II.csv", encoding="unicode_escape")
>>> print(
RangeIndex: 1067371 entries, 0 to 1067370
Data columns (total 8 columns):
# Column Non-Null Count Dtype
- - - - - - - - - - - - - - -
0 Invoice 1067371 non-null object
1 StockCode 1067371 non-null object
2 Description 1062989 non-null object
3 Quantity 1067371 non-null int64
4 InvoiceDate 1067371 non-null object
5 Price 1067371 non-null float64
6 Customer ID 824364 non-null float64
7 Country 1067371 non-null object
dtypes: datetime64[ns](1), float64(2), int64(1), object(4)
memory usage: 65.1+ MB
>>> df['InvoiceDate'] = pd.to_datetime(df['InvoiceDate'])
>>> df.set_index('InvoiceDate', inplace=True)

What Kafka can do with time series data

The advantages of using DataFrames with Kafka

Creating a Kafka Producer to send the data

# Import packages
import pandas as pd
import json
import datetime as dt
from time import sleep
from kafka import KafkaProducer

# Initialize Kafka Producer Client
producer = KafkaProducer(bootstrap_servers=['localhost:9092'])
print(f'Initialized Kafka producer at {dt.datetime.utcnow()}')
# Set a basic message counter and define the file path
counter = 0
file = "online_retail_II.csv"

for chunk in pd.read_csv(file,encoding='unicode_escape',chunksize=10):

# For each chunk, convert the invoice date into the correct time format
chunk["InvoiceDate"] = pd.to_datetime(chunk["InvoiceDate"])

# Set a counter as the message key
key = str(counter).encode()

# Convert the data frame chunk into a dictionary
chunkd = chunk.to_dict()

# Encode the dictionary into a JSON Byte Array
data = json.dumps(chunkd, default=str).encode('utf-8')

# Send the data to Kafka
producer.send(topic="transactions", key=key, value=data)

# Sleep to simulate a real-world interval

# Increment the message counter for the message key
counter = counter + 1

print(f'Sent record to topic at time {dt.datetime.utcnow()}')

Creating a Kafka Consumer to read the data

from kafka import KafkaConsumer
import json
import pandas as pd

# Consume all the messages from the topic but do not mark them as 'read' (enable_auto_commit=False)
# so that we can re-read them as often as we like.
consumer = KafkaConsumer('transactions',
value_deserializer=lambda m: json.loads(m.decode('utf-8')),
for message in consumer:
mframe = pd.DataFrame(message.value)

# Multiply the quantity by the price and store in a new "revenue" column
mframe['revenue'] = mframe.apply(lambda x: x['Quantity'] * x['Price'], axis=1)

# Aggregate the StockCodes in the individual batch by revenue
summary = mframe.groupby('StockCode')['revenue'].sum()

Name: revenue, dtype: float64
16161P 10.50
16169N 10.50
21491 11.70
22065 17.40
22138 44.55
22139 44.55
22352 30.60
85014A 17.85
85014B 17.85
85183B 144.00

Wrapping up

1— You produced a high-frequency stream of messages and streamed them into a Kafka topic

2 — You consumed a high-frequency stream of messages from a Kafka topic and performed an aggregation on the data

