Databricks DQX to Generate DLT Expectations

David Ogden
8 min readJan 31, 2025

--

How to Automatically Generate 100’s or 1000’s of Delta Live Table Expectations

I love Databricks Delta Live Tables (“DLT”). It’s a great ETL tool. Of course, as with any software used on a regular basis, there’s the inevitable wishlist of improvements. “Why doesn’t it do this?” One of those wishlist items for me: How to generate DLT data quality “Expectations” when you have 100’s or 1000’s of columns, without having to manually code the logic for 100’s or 1000’s of quality checks?

With all the amazing news and developments in the world of Artificial Intelligence, if I were reading this article, my first thought would be that the answer here is GenAI. I’m sure that’s also a viable answer to this question. But this article is not about GenAI :-).

Enter Databricks DQX: https://databrickslabs.github.io/dqx/

DQX provides a powerful tool for performing data quality checks, anomaly detection, real-time validation…all via in-line scripting for PySpark applications. As soon as it came out, I wanted to “kick the tires”. And I love DQX! Buuuuuut….

There was one issue: when I ran the demo for integrating DQX with DLT (https://github.com/databrickslabs/dqx/blob/main/demos/dqx_dlt_demo.py), I expected to see the data quality Expectations in the DLT UI. But they weren’t there:

no Expectations in the DLT UI via the DQX DLT demo

DQX generated quality checks for the data, applied those quality checks, and generated tables in DLT that were the direct result of those quality checks (producing a “quarantine” table that isolated the records that failed the checks); but there was no visibility in the DLT UI for the Expectations that were used to isolate anomalous records. To see those data quality rules, I had to go back and look at notebook results outside the DLT UI.

Thus was born this simple hack, leveraging the power of DQX to automatically generate data quality checks, and to see those as Expectations in the DLT UI.

The code below is run outside DLT (a Databricks Python notebook) to create a table of Expectations that will be used within DLT.

Step 1: Leverage the Power of DQX to Produce DLT Expectations

Installing the awesome sauce that is DQX:

%pip install databricks-labs-dqx

And you may need to restart the kernel to have access to the installed libraries:

dbutils.library.restartPython()

Set Spark config keys to designate some relevant tables. One table is the baseline input that DQX will profile, to produce the quality checks. The other table is a new output table where you will store the resulting Expectations:

# pointing to the baseline input data to be used by dqx for profiling distributions
# using sample data available in UC, for example:
spark.conf.set("c.input_table", "samples.tpch.lineitem")

# change this to your own schema and your own desired table name for output constraints
spark.conf.set("c.saved_constraints", "main.default.bronze_constraints")

Read-in the baseline table that DQX will profile, from which to establish the quality checks:

input_df = spark.read.table(spark.conf.get("c.input_table"))

Straight from DQX documentation and demos, use DQX to profile the data:

from databricks.labs.dqx.profiler.profiler import DQProfiler
from databricks.labs.dqx.profiler.generator import DQGenerator
from databricks.labs.dqx.profiler.dlt_generator import DQDltGenerator
from databricks.labs.dqx.engine import DQEngine
from databricks.sdk import WorkspaceClient
import yaml

ws = WorkspaceClient()

# profile the input data
profiler = DQProfiler(ws)
summary_stats, profiles = profiler.profile(input_df)

Now for the really cool part! DQX generates SQL code in the format that DLT expects, for defining Expectations:

# generate DLT expectations in SQL CONSTRAINT format
dlt_generator = DQDltGenerator(ws)
dlt_expectations = dlt_generator.generate_dlt_rules(profiles)

print("DQX checks to be used in DLT:")
print(dlt_expectations)
partial output of the DLT Expectations produced by DQX

Step 2: Convert the SQL CONSTRAINTs to Pythonic DLT Expectations

There’s one little problem with the Expectations produced by DQX: these Expectations are in SQL format. I could copy-and-paste those into a SQL declaration for DLT, but I really don’t want to manually copy-and-paste. And, though I tried, I could not figure out how to automatically plug those SQL CONSTRAINTs into a SQL-based approach to DLT table declarations. So, I abandoned that path and embraced the Python approach for DLT declarations. I just needed to parse and format the SQL CONSTRAINTs syntax into something digestible by Python. I used this doc as a guide for what final format I wanted these Python Expectations to look like: https://docs.databricks.com/en/delta-live-tables/expectation-patterns.html#portable-and-reusable-expectations

So, the next steps are to parse the SQL CONSTRAINTs into a Pythonic representation for DLT.

Creating a dataframe with one row per Expectation, still in the SQL CONSTRAINT format:

from pyspark.sql.types import StringType

# creating a spark dataframe of CONSTRAINTs
constraints_df = spark.createDataFrame(dlt_expectations, StringType())
display(constraints_df)
partial output showing the dataframe of CONSTRAINTs

Parsing the CONSTRAINTs into a format conducive to Python declaration of Expectations in DLT:

from pyspark.sql.functions import split

# losing "CONSTRAINT"
split_df = constraints_df.withColumn("split_col", split(constraints_df["value"], "CONSTRAINT").getItem(1))
# splitting the remainder in two
split_df = split_df.withColumn("name", split(split_df["split_col"], "EXPECT").getItem(0))
split_df = split_df.withColumn("constraint", split(split_df["split_col"], "EXPECT").getItem(1))
split_df = split_df.drop("value", "split_col")
display(split_df)
partial output showing the dataframe of Pythonic DLT Expectation declarations

Update to this article:

Shortly after I wrote this article, one of the DQX developers used this article as inspiration to enhance DQX functionality so that the generate_dlt_rules function could output the quality expectations as a Python dictionary, thus making the parsing steps above simpler and more streamlined. The above code still works fine (generating quality checks in SQL CONSTRAINT format), but an alternative would be to output the DLT Expectations as as follows:

# update to DQX allows for rules to be produced as Python Dict
dlt_expectations = dlt_generator.generate_dlt_rules(profiles, language="Python_Dict")

…thus allowing for a simplified approach to the parsing-of-Expectations steps above, prior to saving to a table, as follows:

split_df = spark.createDataFrame(dlt_expectations.items(), ["name", "constraint"])
display(split_df)

Saving the dataframe of Pythonic Expectations to a Delta table to be used within DLT:

# Create a temporary view of the dataframes to enable SQL
split_df.createOrReplaceTempView("constraints_view")
%sql
CREATE OR REPLACE TABLE ${c.saved_constraints}
AS SELECT * FROM constraints_view
;
SELECT * FROM ${c.saved_constraints}
;

Step 3: Create a notebook for Python declarations in DLT

At this point, we’ve produced a table of Expectations (i.e. data quality checks) suitable for Pythonic table declarations in DLT. The above code could be run once, just to produce a static table of Expectations. Or it could be periodically updated, as data changes over time, to ensure that new data profiles are reflected in your ongoing DLT processing.

There are some key questions to ask here, in terms of turning this into a production-ready process: “What baseline data should be used to establish the profiles that seed these Expectations? Is there a notion of ground-truth? How often should this process be re-run? Should there be a human-in-the-loop to review, validate, and perhaps alter these Expectations?” Etc.

I’m not going to attempt to answer those questions, simply because each one will have the oh-so-loved “it all depends” as part of the answer. I’d love to see some comments on this article that propose answers to these questions, and additional considerations to be taken before putting this into production. For now, I’ll just stick with providing the basic starting-point code :-).

Now that we have our table of Expectations to be digested by DLT, the next step is to create a new Python notebook with the following code.

Declare a couple of tables to be used by DLT:

# pointing to the input data to which DLT will apply the Expectations from DQX
# Note: This is the same data that was used in the part1 notebook...just to keep things simple, but...
# this is NOT how this would be done "for real".
# In a realistic scenario, we'd be reading in new input data (e.g. new transactions just processed today).
# using sample data available in UC, for example:
spark.conf.set("c.input_table", "samples.tpch.lineitem")

# change this to point to the table where you saved your constraints in part1
spark.conf.set("c.saved_constraints", "main.default.bronze_constraints")

The code above tells DLT what data we are using as input. In this example, we’re using the same data that DQX profiled. This is not ideal, and not realistic. But it’s an easy way to show how this process works. In a realistic scenario, the input to DLT would be some source files or a source database that provides our pipeline with new records to process each time it executes.

The code above also tells DLT where to find the table of Expectations that we saved in the previous step.

Defining a function that’s used for reading-in the table of Expectations and dumping into a dictionary format expected by DLT:

def get_rules(constraints_table):
"""
loads data quality rules from a table
:param tag: tag to match. <-- not actually used...a placeholder for filtering rules
:return: dictionary of rules that matched the tag
"""
df = spark.read.table(constraints_table).collect()
return {
row['name']: row['constraint']
for row in df
}

Defining a DLT table that reads from the input_table and applies the Expectations read from the saved_constraints table:

import dlt

@dlt.table
@dlt.expect_all(get_rules(spark.conf.get("c.saved_constraints")))
def bronze_MV():
df = spark.read.table(spark.conf.get("c.input_table"))
return df

Step 4: Create a new DLT pipeline that uses the notebook created in step 3.

As the Databricks platform evolves, the precise steps for creating a DLT pipeline will evolve. As it stands today, the process is:

`Workflows` > `Pipelines` > `Create pipeline`

When you are creating the DLT pipeline and defining the Settings, point to the notebook created in step 3 as the “Source code”. After you Start your new DLT pipeline, if all goes well, you’ll see one materialized view in your pipeline…and you’ll also see all the Expectations right where you expect them to be:

There are several improvements and alterations that could/should be made to this process. For example, the approach above produced only the “warn” action for the Expectations; meaning that failing records are not dropped, nor does the process fail when violating records are observed. Certainly some customization of the Expectations could be warranted. For example, perhaps there’s a primary key in the data, and any time that primary key is null we want the whole pipeline to fail. To accomplish that, all you’d need to do is manually edit the Delta table of Expectations, adding a new Expectation in the proper format. Additionally, the table of Expectations could include a third column (a tag or flag) that would designate different actions for different Expectations (warn vs drop vs fail). This article really just provides the basic approach, and it would be expected for more sophistication to be added to support a production-ready process.

This simple hack, offered in the approach and code above, is designed to facilitate automatic generation of DLT Expectations for tables that have 100’s or 1000’s of columns. Because, as it stands today, the alternative would be hand-coding those Expectations. I’m certain that as DLT evolves, this hack will become entirely unnecessary. At some point, there will inevitably be the “easy button” that produces quality Expectations automatically for all columns in a DLT table. For now, I hope this approach makes someone’s life a little easier :-).

--

--

No responses yet