Apache Kafka, also known as Kafka, is an enterprise-level messaging and streaming broker system. Kafka is a great technology to use to architect and build real-time data pipelines and streaming applications.
I highly recommend the architects to familiarise themselves with the Kafka ecosystem, in particular on the concepts of Kafka cluster, broker, topics, partitions, consumer, producer and offsets.
Article Aim
This article will highlight 12 of the important lessons I have learned whilst using Kafka.
To enable parallel processing of messages, create multiple partitions in a topic. This enables multiple consumers to process the messages in parallel. Each partition can be consumed by only one consumer within a consumer group. So, if there are multiple consumers in a consumer group, they can consume messages from different partitions. Therefore, if we want to parallelize the consumption of the messages, create multiple partitions in a topic.
Each message goes to every consumer group that has subscribed to a topic/partition, but within a group, it goes to only one consumer. Therefore, all consumer groups that have subscribed to the topic get the messages but only one consumer within a consumer group gets a message from a partition. So if you want to broadcast the message to multiple consumers, assign them different consumer groups.
The default setting of a message size in Kafka is 1MB. Messages can be compressed before they are delivered to Kafka. To store more data in a single topic, we can create multiple partitions across multiple servers
Ensure the messages that are required to be published or consumed are serializable. Take special care of date time and nested structures.
Use the function seek(TopicPartition, long) to specify the new position.
If we are designing an application where the order of messages is important and we want the order of messages to be guaranteed, then use the same Partition Id in all of the messages. The reason is that the ordering guarantee applies at the partition level. So, if you have more than one partition in a topic, you’ll need to ensure the messages you are required to appear in order to have the same partition Id. All messages that are pushed into a partition of a topic will be ordered correctly if they have the same partition ID.
If we want global ordering across all topics, use a single partition topic.
Keep your logs manageable and monitor disk space on regular basis.
To design a durable system, ensure a high replication factor is set in the Kafka setting. Kafka replicates the log for each topic’s partitions across multiple servers. When a server fails, this allows automatic failover to these replicas as the messages remain available in the presence of failures. We can set the replication factor on a topic-by-topic basis. Plus, we can set the producer batch size to 1. It will ensure each message is saved to the disk and the messages are not flushed in batches. This will impact the performance. For durable and highly available systems, it’s important to have high topic replication. Usually, a minimum of 3 brokers is recommended for reliable failover.
If we want to delete older messages, use compacted topics where older events for a key are removed as and when newer events are published to the topic.
To secure Kafka, use TLS client certificates, encrypt the messages, and add user permissions.
We could also Java DSL or Kafka’s SQL-like streaming language to create and process the streams of data that are stored in Kafka.
Summary
This article highlighted 10 important lessons I have learned whilst using Kafka.
A set of useful pandas tools to successfully load and transform a JSON file
Loading and doing Transformations over a JSON (JavaScript Object Notation) file is something pretty common in the Data Engineering/Science world.
JSON is a widely used format for storing and exchanging data. For example, NoSQL database like MongoDB store the data in JSON format, and REST API’s responses are mostly available in JSON.
Although JSON works great for exchanging data over a network, if we intend to process the data, we would need to convert it into a tabular form, meaning something with columns and rows.
Now, in general terms, we can encounter two types of JSON structures in a file:
a JSON object
a list of JSON objects
In this article, we will focus on the second one (a list of JSON objects), as I understand it’s the most common scenario and more importantly, by learning to deal with the list scenario, we can then easily deal with the single JSON object one.
To deal with a list of JSON objects we can use pandas, and more specifically, we can use 2 pandas functions: explode() and json_normalize().
Let’s explain each one briefly and then move over to a full example.
This article goes as follows:
Explain briefly the explode() function
Explain briefly the json_normalize() function
Use both of the in a full example to create a cured Data Frame.
Do some transformations
Dump the transformed data as CSV
Please check out the Notebook for the source code.
1. Explode
The explode() function is used to transform each element of a list like value to a row, maintaining the old index value for the new elements.
The json_normalize() function does what we call flattening over a semi-structured column. Meaning, it creates new columns for each property of the structure.
We should also notice that normalize takes all sub parameters found of “name” (first, last, given, family and a plain string) and for each one creates a column; filling with NaN for missing values.
We can choose the separator
pd.json_normalize(data, sep="<our_sep>")
We can also control the level of the JSON until which we want to normalize
pd.json_normalize(data, max_level=<my_int_level>)
where…
Let’s see that in action
3. Let’s look at a full example
As an example, we are going to use a list of cart items from an e-commerce store from the dummyjson api.
Suppose we have a list of shopping carts we need to process in some way.
Let’s say we have this JSON in a raw format in some blob we need to extract and transform:
In this example, we have a main object with the carts property, which inside has the list we need.
Let’s say we need to extract the cart products or items and do the following transformations over it and output the result into AWS S3 for further processing or storage:
Create a new boolean column called big_sale based on the total of a product and a threshold of what constitutes a big sale.
Add a <processed_ts> timestamp to each row.
If we load the cart data into a pandas Data Frame and explode the products column, it would look like this:
We loaded the data using the requests library. Then into pandas:
Looks good, now we have a row for each product in each cart. Notice the cart id repeats, thats what we want for now…
We now need to deal with the JSON format in which the products are in
For this, we are going to use the json_normalize from before to normalize the JSON data and join the result to our exploded Data Frame. Let’s take a look at that
Now we have a column for each JSON property of the products column
Next, we could join both exploded and normalized Data Frames to get a full representation of the JSON in one Data Frame. But there is a small issue we need to deal with first:
The index of the exploded_df is wrong, we need to reset it to join by index.
So, we do the following:
remove the old products column from the exploded Data Frame
reset the index on the exploded Data Frame
join both exploded and normalized Data Frames
Let’s go
First we remove the old products column and then we reset the index.
Obviously, this is an example, in a real-world scenario we would need to output the processed data into the cloud and maybe not CSV but parquet or any other columnar format.
Still, following the example, let’s output this as a processed.csv file locally
joined_df.to_csv('processed.csv', index=False)
Conclusion
Dealing with a raw JSON file is a pretty common real life scenario, and pandas is great for this.
By combining both explode and normalize, we can get a JSON file into a Data Frame for processing. This works great for processing data coming from, for example, an API which is usually in JSON format.
Granted, exploded is only needed when dealing with a list of objects, but this is a fairly common thing to have. Anyway, if your JSON doesn’t need it, the json_normalize() function would be enough for you to process your JSON.