Saturday, January 28, 2023

Kafka: An Overview

 Explain Kafka Like I am 5 : Kafka is like a big post office where people can send messages to different rooms (called “topics”) and other people can come and read the messages.

  1. Producers: Producers are the systems or applications that generate and send data to Kafka topics.
  2. Brokers: Brokers are the servers that make up the Kafka cluster. They are responsible for receiving data from producers, storing it in topics, and forwarding it to consumers. Brokers also handle replication and partitioning of data across the cluster, to ensure high availability and fault-tolerance.
  3. Topics: Topics are the logical container for data in Kafka. Topics are the categories or feeds to which records are written by a producer and read by a consumer. Topics are partitioned, meaning that each topic is split into a number of partitions, which are spread across the brokers in the cluster.
  4. Consumers: Consumers are the systems or applications that read data from Kafka topics. Consumers read data from topics and can process or forward it to other systems.
  1. Partitions: A partition is a portion of a topic that is stored on a single broker. Each partition is an ordered, immutable sequence of records. Each partition is replicated across a configurable number of brokers for fault tolerance.
  2. Consumer Groups: A consumer group is a set of consumers that work together to read data from a topic. Each consumer in the group is responsible for reading a unique subset of the partitions.
  3. Offsets: Offsets are the position of a consumer in a partition. Each consumer in a consumer group maintains its own offset, which is used to keep track of which records have been read.
  4. Messages: The core abstraction in Kafka is a message, which is a simple byte array that can be used to store any type of data, such as text, binary, or avro. Each message is assigned a unique offset, which is used to identify its position within a partition. Producers write messages to a specific topic and partition, and consumers read messages from a specific topic and partition.
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerRecord;
import java.util.Properties;

public class KafkaProducerExample {

public static void main(String[] args) {

Properties props = new Properties();
props.put("bootstrap.servers", "localhost:9092");
props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");

KafkaProducer<String, String> producer = new KafkaProducer<>(props);
String topic = "test";
String key = "key1";
String value = "value1";

ProducerRecord<String, String> record = new ProducerRecord<>(topic, key, value);
producer.send(record);
producer.close();
}
}
import org.apache.kafka.clients.consumer.KafkaConsumer;
import java.util.Properties;
import java.util.Arrays;
public class KafkaConsumerExample {
public static void main(String[] args) {

Properties props = new Properties();
props.put("bootstrap.servers", "localhost:9092");
props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
props.put("group.id", "test-group");

KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);
consumer.subscribe(Arrays.asList("test"));

while (true) {
ConsumerRecords<String, String> records = consumer.poll(100);
for (ConsumerRecord<String, String> record : records)
System.out.println("Received message: (" + record.key() + ", " + record.value() + ") at offset " + record.offset());
}
}
}

Definitions:

Log-based storage systemA log-based storage system is a type of data storage system that uses a log to record all changes made to the data, rather than storing the data in a traditional file or block format. This allows for a more efficient and reliable method of storing and recovering data, as well as providing features such as point-in-time recovery and data replication. Examples of log-based storage systems include databases like MySQL and PostgreSQL, as well as distributed systems like Apache Kafka and Apache Cassandra.

Kafka: Capabilities and Real world Applications

Can process millions of events per second.

  • High throughput and Low Latency: Kafka is able to handle a large volume of data at high speeds. This is achieved by the use of a distributed architecture, where multiple brokers can be used to handle the load. Producers write messages to a specific topic, and a partitioner is used to determine which partition the message should be written to. The partitioner can use a variety of strategies, such as round-robin, random, or key-based, to ensure that messages are distributed evenly across all partitions.
  • High scalability:As the number of messages increases, more brokers can be added to the cluster to handle the load. Additionally, partitions can be split or merged as needed, to ensure that the data is distributed evenly across all brokers.

Data is not lost, Even in case of failures

  • Durability: Kafka also provides robust durability, with all messages being written to disk and replicated to multiple brokers to ensure that they are not lost in case of broker failure. It also allows you to set a retention period for messages, after which they will be automatically deleted.
  • Replay Data: Since the data is stored on disk it allows the system to handle high loads of data and also makes it possible to replay the data if needed.
  • High data availability and fault tolerance: Kafka replicates data across multiple machines in the cluster, which ensures that data is always available even if one or more machines fail.

Powerful set of APIs and Integrations

  • Powerful set of APIs: Kafka also provides a powerful set of APIs for integrating with other systems, such as Kafka Connect for integrating with other systems for data ingestion, and Kafka Streams for building real-time streaming applications.
  • Supports a wide range of data formats: including text, JSON, Avro and Protobuf, which makes it easy to integrate with a wide range of systems.
  • Security features, such as authentication and encryption, that can be used to secure data in transit and at rest.
Kafka Streaming Pipeline
  1. Event-Driven Architecture: Kafka can be used to build event-driven architectures, where data is processed as soon as it is generated, rather than in batches. This enables real-time data processing and analytics.
  2. Adtech: Kafka can be used to handle real-time data generated by adtech platforms, such as ad impressions, clicks, and conversions.
  3. Log Aggregation: One of the most common use cases for Kafka is log aggregation. Kafka can be used to collect log data, metrics from multiple sources, such as servers, applications, and devices, and store it in a central location for real-time analysis and reporting.
  4. Online Gaming: Kafka can be used to handle real-time data generated by online games, such as player actions, game states, and leaderboards.
  5. IoT: Kafka can be used to handle real-time data generated by IoT devices, such as sensor data, device status, and control commands.
  6. Financial Services: Kafka can be used to handle real-time financial data, such as stock prices, trades, and financial transactions.
  1. Resource Intensive: Kafka requires a significant amount of resources, including memory, CPU, and disk space. This can make it challenging to run Kafka on resource-constrained environments, such as small or low-powered machines.
  2. Lacks Complex Data Querying capabilities: Kafka is primarily designed for data ingestion and stream processing. While it does support data querying, the capabilities are limited, and it may not be the best choice for use cases that require complex data querying and analysis.
  3. Limited Security Features: While Kafka does support security features, such as authentication and encryption, it does not provide a comprehensive security solution.
  4. Complex Setup: Setting up and configuring a Kafka cluster can be complex and time-consuming.
  5. Basic Monitoring: Kafka provides only basic monitoring capabilities, which can make it difficult to diagnose and troubleshoot issues that arise in the cluster.

Thursday, January 26, 2023

How to send tabular time series data to Apache Kafka with Python and Pandas

What you’ll learn

What you should know already

Why use Apache Kafka for time series data?

Why use Python with Apache Kafka?

Prerequisites for this tutorial

SOFTWARE

Major steps

Setting up Apache Kafka

bin/zookeeper-server-start.sh config/zookeeper.properties
bin\windows\zookeeper-server-start.bat .\config\zookeeper.properties
bin/kafka-server-start.sh config/server.properties
bin\windows\kafka-server-start.bat .\config\server.properties
bin/kafka-topics.sh --create --topic transactions --bootstrap-server localhost:9092
bin\windows\kafka-topics.bat --create --topic transactions --bootstrap-server localhost:9092

Analyzing the data

>>> import pandas as pd
>>> df = pd.read_csv("online_retail_II.csv", encoding="unicode_escape")
>>> print(df.info())
RangeIndex: 1067371 entries, 0 to 1067370
Data columns (total 8 columns):
# Column Non-Null Count Dtype
- - - - - - - - - - - - - - -
0 Invoice 1067371 non-null object
1 StockCode 1067371 non-null object
2 Description 1062989 non-null object
3 Quantity 1067371 non-null int64
4 InvoiceDate 1067371 non-null object
5 Price 1067371 non-null float64
6 Customer ID 824364 non-null float64
7 Country 1067371 non-null object
dtypes: datetime64[ns](1), float64(2), int64(1), object(4)
memory usage: 65.1+ MB
>>> df['InvoiceDate'] = pd.to_datetime(df['InvoiceDate'])
>>> df.set_index('InvoiceDate', inplace=True)

What Kafka can do with time series data

The advantages of using DataFrames with Kafka

Creating a Kafka Producer to send the data

# Import packages
import pandas as pd
import json
import datetime as dt
from time import sleep
from kafka import KafkaProducer

# Initialize Kafka Producer Client
producer = KafkaProducer(bootstrap_servers=['localhost:9092'])
print(f'Initialized Kafka producer at {dt.datetime.utcnow()}')
# Set a basic message counter and define the file path
counter = 0
file = "online_retail_II.csv"

for chunk in pd.read_csv(file,encoding='unicode_escape',chunksize=10):

# For each chunk, convert the invoice date into the correct time format
chunk["InvoiceDate"] = pd.to_datetime(chunk["InvoiceDate"])

# Set a counter as the message key
key = str(counter).encode()

# Convert the data frame chunk into a dictionary
chunkd = chunk.to_dict()

# Encode the dictionary into a JSON Byte Array
data = json.dumps(chunkd, default=str).encode('utf-8')

# Send the data to Kafka
producer.send(topic="transactions", key=key, value=data)

# Sleep to simulate a real-world interval
sleep(0.5)

# Increment the message counter for the message key
counter = counter + 1

print(f'Sent record to topic at time {dt.datetime.utcnow()}')

Creating a Kafka Consumer to read the data

from kafka import KafkaConsumer
import json
import pandas as pd

# Consume all the messages from the topic but do not mark them as 'read' (enable_auto_commit=False)
# so that we can re-read them as often as we like.
consumer = KafkaConsumer('transactions',
group_id='test-consumer-group',
bootstrap_servers=['localhost:9092'],
value_deserializer=lambda m: json.loads(m.decode('utf-8')),
auto_offset_reset='earliest',
enable_auto_commit=False)
for message in consumer:
mframe = pd.DataFrame(message.value)

# Multiply the quantity by the price and store in a new "revenue" column
mframe['revenue'] = mframe.apply(lambda x: x['Quantity'] * x['Price'], axis=1)

# Aggregate the StockCodes in the individual batch by revenue
summary = mframe.groupby('StockCode')['revenue'].sum()

print(summary)
Name: revenue, dtype: float64
StockCode
16161P 10.50
16169N 10.50
21491 11.70
22065 17.40
22138 44.55
22139 44.55
22352 30.60
85014A 17.85
85014B 17.85
85183B 144.00

Wrapping up

1— You produced a high-frequency stream of messages and streamed them into a Kafka topic

2 — You consumed a high-frequency stream of messages from a Kafka topic and performed an aggregation on the data

About the author

Must Watch YouTube Videos for Databricks Platform Administrators

  While written word is clearly the medium of choice for this platform, sometimes a picture or a video can be worth 1,000 words. Below are  ...