Saturday, November 26, 2022

10 Best Practices For Using Kafka In Your Architecture

 

Key lessons I have learned While Using Kafka

Apache Kafka, also known as Kafka, is an enterprise-level messaging and streaming broker system. Kafka is a great technology to use to architect and build real-time data pipelines and streaming applications.


I highly recommend the architects to familiarise themselves with the Kafka ecosystem, in particular on the concepts of Kafka cluster, broker, topics, partitions, consumer, producer and offsets.

Article Aim

This article will highlight 12 of the important lessons I have learned whilst using Kafka.

  1. To enable parallel processing of messages, create multiple partitions in a topic. This enables multiple consumers to process the messages in parallel. Each partition can be consumed by only one consumer within a consumer group. So, if there are multiple consumers in a consumer group, they can consume messages from different partitions. Therefore, if we want to parallelize the consumption of the messages, create multiple partitions in a topic.
  2. Each message goes to every consumer group that has subscribed to a topic/partition, but within a group, it goes to only one consumer. Therefore, all  that have subscribed to the topic get the messages but only one consumer within a consumer group gets a message from a partition. So if you want to broadcast the message to multiple consumers, assign them different .
  3. The default setting of a message size in Kafka is 1MB. Messages can be compressed before they are delivered to Kafka. To store more data in a single topic, we can create multiple partitions across multiple servers
  4. Ensure the messages that are required to be published or consumed are serializable. Take special care of date time and nested structures.
  5. Use the function seek(TopicPartition, long) to specify the new position.
  6. If we are designing an application where the order of messages is important and we want the order of messages to be guaranteed, then use the same Partition Id in all of the messages. The reason is that the ordering guarantee applies at the partition level. So, if you have more than one partition in a topic, you’ll need to ensure the messages you are required to appear in order to have the same partition Id. All messages that are pushed into a partition of a topic will be ordered correctly if they have the same partition ID.
  7. If we want global ordering across all topics, use a single partition topic.
  8. Keep your logs manageable and monitor disk space on regular basis.
  9. To design a durable system, ensure a high replication factor is set in the Kafka setting. Kafka replicates the log for each topic’s partitions across multiple servers. When a server fails, this allows automatic failover to these replicas as the messages remain available in the presence of failures. We can set the replication factor on a topic-by-topic basis. Plus, we can set the producer batch size to 1. It will ensure each message is saved to the disk and the messages are not flushed in batches. This will impact the performance. For durable and highly available systems, it’s important to have high topic replication. Usually, a minimum of 3 brokers is recommended for reliable failover.
  10. If we want to delete older messages, use compacted topics where older events for a key are removed as and when newer events are published to the topic.
  11. To secure Kafka, use TLS client certificates, encrypt the messages, and add user permissions.
  12. We could also Java DSL or Kafka’s SQL-like streaming language to create and process the streams of data that are stored in Kafka.

Summary

This article highlighted 10 important lessons I have learned whilst using Kafka.

Thursday, November 17, 2022

Transformations on a JSON file using Pandas

 

A set of useful pandas tools to successfully load and transform a JSON file



Loading and doing Transformations over a JSON (JavaScript Object Notation) file is something pretty common in the Data Engineering/Science world.

JSON is a widely used format for storing and exchanging data. For example, NoSQL database like MongoDB store the data in JSON format, and REST API’s responses are mostly available in JSON.

Although JSON works great for exchanging data over a network, if we intend to process the data, we would need to convert it into a tabular form, meaning something with columns and rows.

Now, in general terms, we can encounter two types of JSON structures in a file:

  1. a JSON object
  2. a list of JSON objects
JSON object vs list of JSON objects (image by author)

In this article, we will focus on the second one (a list of JSON objects), as I understand it’s the most common scenario and more importantly, by learning to deal with the list scenario, we can then easily deal with the single JSON object one.

To deal with a list of JSON objects we can use pandas, and more specifically, we can use 2 pandas functions: explode() and json_normalize().

Let’s explain each one briefly and then move over to a full example.

This article goes as follows:

  1. Explain briefly the explode() function
  2. Explain briefly the json_normalize() function
  3. Use both of the in a full example to create a cured Data Frame.
  4. Do some transformations
  5. Dump the transformed data as CSV

Please check out the Notebook for the source code.

1. Explode

Like this:

explode() simple example (image by author)

The function uses the following syntax:

df.explode('column_to_explode', ignore_index=False)

we can also send a list of columns to explode:

df.explode(['col_1', 'col_2'], ignore_index=False)

It would look something like this

basic code example before explode (image by author)

Now let’s explode by col_2

basic code example after explode (image by author)

Notice how the index of the exploded row is kept.

2. JSON Normalize

This is the syntax:

data = [
{
"id": 1,
"name": {
"first": "Coleen",
"last": "Volk"
}
},
{
"name": {
"given": "Mark",
"family": "Regner"
}
},
{
"id": 2,
"name": "Faye Raker"
},

]
pd.json_normalize(data)or within a column of your Data Framepd.json_normalize(df['a_column'])

Let’s se it:

json_normalize example (image by author)

As we can see, it created a Data Frame from our raw JSON, taking each property from the root as a column.

Nested properties are the interesting part here. JSON normalize takes each one and creates a column maintaining its original path.

ie.

"name": {
"first": "Coleen",
"last": "Volk"
}
becomesname.first, name.last

We should also notice that normalize takes all sub parameters found of “name” (first, last, given, family and a plain string) and for each one creates a column; filling with NaN for missing values.

We can choose the separator

pd.json_normalize(data, sep="<our_sep>")
custom separator for normalize (image by author)

We can also control the level of the JSON until which we want to normalize

pd.json_normalize(data, max_level=<my_int_level>)

where…

Let’s see that in action

max_level for normalize (image by author)

3. Let’s look at a full example

As an example, we are going to use a list of cart items from an e-commerce store from the dummyjson api.

Suppose we have a list of shopping carts we need to process in some way.

Let’s say we have this JSON in a raw format in some blob we need to extract and transform:

Taken from dummy json (source)

In this example, we have a main object with the carts property, which inside has the list we need.

Let’s say we need to extract the cart products or items and do the following transformations over it and output the result into AWS S3 for further processing or storage:

  • Create a new boolean column called big_sale based on the total of a product and a threshold of what constitutes a big sale.
  • Add a <processed_ts> timestamp to each row.

If we load the cart data into a pandas Data Frame and explode the products column, it would look like this:

example-loading-data (image by author)

We loaded the data using the requests library. Then into pandas:

example-data-frame (image by author)
example-exploded (image by author)

Looks good, now we have a row for each product in each cart. Notice the cart id repeats, thats what we want for now…

We now need to deal with the JSON format in which the products are in

example-explode-info (image by author)

For this, we are going to use the json_normalize from before to normalize the JSON data and join the result to our exploded Data Frame. Let’s take a look at that

example-normalize (image by author)
example-normalize-info (image by author)

Now we have a column for each JSON property of the products column

Next, we could join both exploded and normalized Data Frames to get a full representation of the JSON in one Data Frame. But there is a small issue we need to deal with first:

The index of the exploded_df is wrong, we need to reset it to join by index.

So, we do the following:

  1. remove the old products column from the exploded Data Frame
  2. reset the index on the exploded Data Frame
  3. join both exploded and normalized Data Frames

Let’s go

First we remove the old products column and then we reset the index.

exploded_df.drop(['products'], axis=1, inplace=True)exploded_df.reset_index(inplace=True)
example-drop (image by author)
example-indexes-match (image by author)

Notice the indexes now match.

Before we can join, we need to deal with two columns that overlap.

The columns id and total exist in both exploded and normalized dfs. For this, when we join we will use the suffix options of panda’s join.

Let’s see that

joined_df = exploded_df.join(normalized_df, lsuffix='_cart', rsuffix='_product')
example-joined (image by author)
example-joined-info (image by author)

perfect, now we can do some transformations…

4. Transformations

  • Create a new boolean column called big_sale based on the total of a product and a threshold of what constitutes a big sale.
  • Add a <processed_ts> timestamp to each row.Let’s go…

We will set a threshold to 100 for big sale (≥ is big) and using the apply() function of pandas we will create a new Series for the big_sale column.

Note that the total column for the product is called total_product after the join.

threshold = 100big_sale_col = joined_df.apply(lambda row: row.total_product >= threshold, axis=1)

now we can add it to our joined Data Frame

joined_df['big_sale'] = big_sale_col
example-big-sale (image by author)

Great, now for the next transformation, let’s simply add a new column with the current date as timestamp.

from datetime import datetimenow = datetime.now()
timestamp = datetime.timestamp(now)
joined_df['processed_ts'] = timestamp

Don’t forget to import datetime

example-processed (image by author)
example-processed-info (image by author)

Great! We have everything. Let’s export the data

5. Dump the transformed data as CSV

Still, following the example, let’s output this as a processed.csv file locally

joined_df.to_csv('processed.csv', index=False)
processed.csv file (image by author)

Conclusion

By combining both explode and normalize, we can get a JSON file into a Data Frame for processing. This works great for processing data coming from, for example, an API which is usually in JSON format.

Granted, exploded is only needed when dealing with a list of objects, but this is a fairly common thing to have. Anyway, if your JSON doesn’t need it, the json_normalize() function would be enough for you to process your JSON.

Let’s summarize what we did:

  • Loaded a JSON into a Data Frame
  • Exploded the list like columns
  • Normalized the JSON format columns
  • made some simple transformations
  • exported the data

Thanks for reading!

Please check out the notebook for the source.

References

Must Watch YouTube Videos for Databricks Platform Administrators

  While written word is clearly the medium of choice for this platform, sometimes a picture or a video can be worth 1,000 words. Below are  ...