Implementing data quality with Databricks

 






Data quality is one of the key factors that we need to consider when designing our data platform. It is one of the core pillars of data governance and should be at the center of the platform and pipeline design.

The purpose of this article is to provide practical recommendations for implementing data quality, based on samples and my personal experience. The focus is not on discussing the concept of data quality itself. This article is written in the context of Databricks’ spin-off of a Lakehouse platform, with Unity catalog for governance on top. However, the general ideas can be applied to any Lakehouse platform.

Databricks published a comprehensive article that provides a deep dive into data quality principles and how features of Delta and Databricks can help you achieve them. I highly recommend reading it through. The article begins with a diagram that showcases all the parts and Databricks Lakehouse features to address these principles. This diagram is the perfect starting point to understand the complexity of implementing data quality at scale.

According to the article, there are the “six dimensions” of data quality:

  • Consistency — Data values should not conflict with other values across data sets.
  • Accuracy — There should be no errors in the data.
  • Validity — The data should conform to a certain format.
  • Completeness — There should be no missing data.
  • Timeliness — The data should be up to date.
  • Uniqueness — There should be no duplicates.

The best practices promoted by Databricks help enforce most data quality principles. However, some practices allow for personal implementation and design, particularly regarding validity and accuracy. To address some of these issues, Delta Live Tables (DLT) is being introduced. While I recognize the potential of DLT, there are still significant limitations that prevent me from recommending its use in enterprise solutions. For instance, according to the Databricks documentation, “You cannot use row filters or column masks with materialized views or streaming tables published to Unity Catalog.” A complete list of limitations can be found in the Databricks documentation.

Scenario

The scenario we are considering for this article is a Databricks lakehouse platform with a classic medallion architecture based on Delta. We are assuming the data comes to the platform from files that arrive at our landing zone or through streaming events via Kafka or a similar system.

The data will be inserted into the bronze layer with a structured streaming job, using cloud files for the file-based feeds or reading events from the stream directly. Once the data is in the first layer, it will be promoted through layers using structured streaming jobs from Delta to Delta, using the change data feed feature of Delta.

With that scenario in mind, let’s go through the six dimensions of data quality and discuss how to implement them:

Consistency

Consistency is one of the principles that will be a free win from the lakehouse architecture. The lakehouse is designed to be the single source of truth, and the delta format provides ACID transactions and concurrency control to ensure consistency for multiple producers and consumers.

Validity

This dimensions refer to the correctness of the shape of the data: schema and format. Databricks and delta format provides a few options to manage the schemas of the tables, mainly schema enforcement and automatic schema evolution. We need to decide where to use them depending on the layer of the data.

In Bronze layer, our goal is to not drop any data, we want to keep as much as possible. Because of that, I recommend using all the tools available to make schema flexible.

First, schema evolution should be enabled, to allow new columns to appear in the delta when they appear on the source:

(df.write.format("delta")
.option("mergeSchema", "true") ## schema evolution
.mode("append")
.save(delta_path)

For the file sources, I encourage everyone to use the autoloader with the rescue column enabled and schema hints. This way, we will have all the flexibility. If something happens, we will store all the information in the rescue column and decide how to merge it back into your bronze table.

For Silver and Gold, I recommend a different approach. In those layers, the data could already be a product, so we cannot allow the shape of the data to change easily. I advise having a fixed schema and using the alter table command for any changes. These changes should be automated and gated. A generic notebook can be used within a CI/CD pipeline with gates, so the relevant stakeholders need to approve the schema change before it is automatically deployed in the different environments.

Accuracy

Accuracy is the data quality dimension that has more room for design and implementation. Data accuracy means that the values of the data are correct for a dataset. There are some Databricks features that aim to help in this space, but not solve completely. DLT is coming to solve that, but as I mentioned before, I think it’s not ready for many enterprise solutions yet.

The first tool that we should use for accuracy is Delta constraints. This is not a Databricks feature, but a Delta feature. The check constraints allow us to verify if the data of a certain column is within an acceptable range:

-- Date example 
ALTER TABLE tableName ADD CONSTRAINT dateWithinRange CHECK (dateColumn > '1900-01-01');

-- Range example
ALTER TABLE tableName ADD CONSTRAINT validIds CHECK (id > 1 and id < 99999999);

The constraints can be found in the metadata of the table:

Constraints are checked before the data land in the table. When some of the data doesn’t meet the criteria, the whole transaction fail and the table is rolled back to it’s previous state. So, those ensure that the data will not go into the table, but it will make our pipeline stop.

We would like to analyse the new data, check if the data is valid, push through the records that doesn’t follow the rules, and quarantine the records that doesn’t automatically. The constraints are stored in the table metadata, so I recomend you the following approach:

To get the table constraints and transform them to a condition, the folliwng functions can be used:

def get_table_constraints_conditions(table_name: str) -> Column:

constraints_df = (
spark.sql(f"SHOW TBLPROPERTIES {table_name}")
.filter(F.col("key").startswith("delta.cons"))
.select("value")
)
constraints = [c[0] for c in constraints_df.filter(F.col("key").startswith("delta.cons")).select("value").collect()]

return reduce(lambda x,y: (~F.expr(x) | ~F.expr(y)) ,constraints)

Using the same sample table as before, here we can see how we get the constraints in condition format:

Taking all that into consideration, we can add the constraint check to our merge function to check if the records are valid just before the merge in the for each batch operation. This is what it needs to be added to the merge:

def merge(df: DataFrame, sink_table: str, key: str, time_col: str):
...
# Quarantine invalid recrods based on constraints
constrains_conditions = get_table_constraints_conditions(sink_table)
quarantine_records = df.filter(constrains_conditions)
correct_records = df.filter(~constrains_conditions)
...

Accuracy example

This is one merge example to show the quarantine logic. Here is our silver table with 5 records, with a simple schema.

We generate a set of new inserts, in this case 3 new recrods. You can see that one of the records has productId -10, this will be against the constraints that we have set on the table. So it should be quarantined.

Once we have the inserts, we try to merge the new records into the silver table. As we expected, it inserted the 2 correct records with productId 5 and 30, and the incorrect record hasen’t been merged.

Lastly, we can check the quarantine folder, and display the data in there. As we expect, the quarantined record ended up there for us to manage.

Timeliness

Timeliness is another easy win of the proposed implementations. Timeliness means that the data should be up to date. Delta allows high concurrency, so multiple writers could efficiently update the data.

Furthermore, the structured streaming approach enables everything from batch updates using the option trigger available now to near real-time applications with a cluster always on.

In my experience, this approach to the data platform has no limitations in timeliness, and the constraints come from the project’s budget. It’s important to analyze the use case to suggest the most efficient solution.

Uniqueness

A well-designed merge statement with proper deduplication of the microbatches of structured streaming jobs ensures that there are no duplicates past Bronze. Two common approaches are using dropDuplicates(), if taking one random unique value from the microbatch is enough, or ranking window if we care about the order.

def merge(df: DataFrame, sink_table: str, key: str):

# Dedupe by key (take one random from the batch)
df_dedupe = df.dropDuplicates([key])
...

That will be the drop duplicates approach, very simple and efficient, if we don’t care about the order in the microbatch.

def merge(df: DataFrame, sink_table: str, key: str, max_col: str):

# Dedupe getting last value by max col
w = Window.partitionBy(key).orderBy(col(max_col).desc())
df_dedupe = (
df
.withColumn("rank", F.rank().over(w))
.filter(col("rank") == 1)
.drop("rank")
)
...

This will be using max col and rank function the get the most recent value of the microbatch. This approach is more complex computationally, so study your use case carefully.

Completeness

Completeness refers to the presence and availability of all the necessary data for a specific use case. Databricks provides the following features to assist in ensuring the completeness of both data and metadata during the process of data ingestion and transformation.

Delta ACID guarantees serve as a foundation for completeness. The atomic transactions ensure that if a problem occurs during the write, the entire operation can be rolled back. The optimistic concurrency control makes it impossible for deadlocks to occur when managing simultaneous reads and writes at the table level.

  • Metadata management with Unity Catalog.
  • Enrichments — Adding processing timestamp and filename when they enter the lake house for traceability.
def add_metadata(df: DataFrame) -> DataFrame:
"""
Add input file, generate datekey(from path) and process timestamp
"""


patern_date = "[0-9]{4}\/[0-9]{2}\/[0-9]{2}" # Assuming the sourrce batch is partitioned yyyy/mm/dd

return (df
.withColumn("process_timestamp", F.current_timestamp())

# If the data is batch, read the input file name
.withColumn("sourcefile", F.input_file_name())
.withColumn("datekey_file", F.date_format((F.to_timestamp(F.regexp_extract(col("sourcefile"), patern_date, 0),'yyyy/MM/dd')),'yyyyMMdd'))

)

Monitoring

While monitoring is not necessarily a dimension of data quality, it is a crucial factor in managing it effectively. There are various approaches to monitoring, depending on the cloud provider you are using. Personally, I primarily work with Azure, so I utilize application insight and Azure monitor in addition to Databricks monitoring features.

To ensure effective monitoring, it is recommended to extract metrics from your preferred orchestration tool. These metrics can help track job duration and job failures, etc. Additionally, during job execution, custom metrics can be collected and sent to Application Insight. This allows us to create alerts when specific metrics surpass a certain threshold.

When it comes to data quality, I suggest creating custom metrics for the following aspects:

  • Monitoring the size of columns in autoloader
  • Keeping track of the number of quarantine records during a merge
  • Calculating the ratio of quarantined records versus correct records
  • Checking the number of columns in bronze tables to detect any schema evolution changes
  • Ensuring proper enforcement of schema changes

For more general data quality metrics, I recommend using Databricks lakehouse monitoring. Please note that as of the publication date of this article, it is still in preview, but , it is already quite useful. This feature allows you to monitor metrics such as %nulls, count, size on disk, partitions, etc. You can track the evolution and drift of these metrics over time, avoiding the slow deterioration of data quality. The tool generates a dashboard for each table, providing a comprehensive view of the metrics.

Proposed implementation

Taking all the aspect of data quality, this wil be the proposed architecture. It sumarised the feature of all the layers and the jobs between the layers.

And this will be the complete merge incorporating the check constraints, deduplication, and utilizing the CDC feed for the merge operations. The merge logic is quite straightforward, allowing you to include any additional logic that may be required for your specific use case, while still serving as a solid foundation.

def get_table_constraints_conditions(table_name: str) -> Column:

constraints_df = (
spark.sql(f"SHOW TBLPROPERTIES {table_name}")
.filter(F.col("key").startswith("delta.cons"))
.select("value")
)
constraints = [c[0] for c in constraints_df.filter(F.col("key").startswith("delta.cons")).select("value").collect()]

return reduce(lambda x,y: (~F.expr(x) | ~F.expr(y)) ,constraints)


def merge(df: DataFrame, sink_table: str, key: str, max_col: str):

# get last based on max column
w = Window.partitionBy(key).orderBy(col(max_col).desc())
df_dedupe = (
df
.withColumn("rank", F.rank().over(w))
.filter(col("rank") == 1)
.drop("rank")
)

# Write last merge table to temp storage for debugging purposes
df_dedupe.write.format('parquet').mode("overwrite").save(f"tmp/silver_merges/{sink_table}/batch_log/")

# Quarantine invalid recrods based on constraints
constrains_conditions = get_table_constraints_conditions(sink_table)
quarantine_records = df.filter(constrains_conditions)
correct_records = df.filter(~constrains_conditions)

quarantine_records.write.format('parquet').mode("append").save(f"tmp/silver_merges/{sink_table}/quarantine_records/")

delta = DeltaTable.forName(spark, tableOrViewName=sink_table)

(delta.alias('delta')
.merge(
correct_records.alias('updates'),
(f'delta.{key} = updates.{key}')
)
.whenMatchedDelete(
condition = "updates._change_type = 'delete'"
)
.whenMatchedUpdate(
set = {'column_to_update': 'updates.column_to_update'}
)
.whenNotMatchedInsertAll()
.execute()
)

Consclusions

To summarize, data quality is an extremely important factor that should receive top priority when building data platforms. It should not be seen as an afterthought, but rather as a fundamental aspect to be integrated from the very beginning. By leveraging the power of Databricks and the lake house architecture, we can unlock numerous advantages in terms of data quality, with their best practices. However, it is important to note that there is still ample room for customization and implementation to ensure that the platform aligns perfectly with your specific use case.

While it is true that Databricks alone cannot completely solve all data quality problems, it does offer a wide array of mechanisms to address them. These mechanisms, when combined with the flexibility of the delta format configuration and the inclusion of specific features, empower us to establish a robust and comprehensive data quality plan for our platform. By leveraging these tools effectively, we can ensure that our data is accurate, reliable, and of the highest quality for the bussines.

Comments

Popular posts from this blog

Flutter for Single-Page Scrollable Websites with Navigator 2.0

A Data Science Portfolio is More Valuable than a Resume

Better File Storage in Oracle Cloud