How to log all DBT test results into a centralized snowflake table

DBT has many inbuilt features for automating time-consuming work. One of those features is the DBT test.

DBT test will generate singular tests based on the source.yaml file. Suppose we want to add a NOT NULL check to a column in the source table, we would simply add the following:

version: 2
sources:
- name: source_name
database: db_name
schema: schema_name
tables:
- name: table_name
columns:
- name: column_name
tests:
- not_null

DBT will automatically generate a test which looks like…

select column_name
from db_name.schema_name.table_name
where column_nameis null

The result of this query is displayed in the console where the DBT test is invoked. To save the logs in a database, we can add a — store-failures flag to the DBT test command, and DBT will create a table in the database where we can query the table later.

The problem with this approach is that the number of tables is directly proportional to the number of models with tests.

To overcome this problem, we used Python. Using Python, it is possible to get a singular table with the following columns: model name, test name, status of the test, number of failures, and so on.

When DBT executes a DBT test command, a run_results.json file is generated.

{
“status”: “error”,
“timing”: [],
“thread_id”: “Thread-3 (worker)”,
“execution_time”: 3.470108985900879,
“adapter_response”: {},
“message”: “”,
“failures”: null,
“unique_id”: “test.project.test_name_model_name__column_names”
}

Implementation

This run_results.json file contains all the necessary information to parse using Python and post to a database such as Snowflake using the snowflake-connector-python package.

The dependencies for this Python script are:

  1. snowflake-connector-python==2.8.2
  2. PyYAML==6.0.0

To first load the run_results.json file and source.yaml file.

from json import loads
import snowflake.connector as sf
from yaml.loader import SafeLoader

with open(rf’{target_path}/run_results.json’, ‘r’) as input_file:
my_json = input_file.read()

new_json = loads(my_json)

my_tables = []
with open(rf’{dbt_path}/models/source/source.yml’) as f:
my_source = load(f, Loader=SafeLoader)

for i in (range(len(my_source[‘sources’]))):
for j in (range(len(my_source[‘sources’][i][‘tables’]))):
schema_name = my_source[‘sources’][i][‘name’]
table_name = my_source[‘sources’][i][‘tables’][j][‘name’]
result = table_name
my_tables.append(result)

And then adding the names of the tests. (It is possible to fetch this information for the source.yaml file, but it’ll add unnecessary complexity to this particular solution.)

my_test_names = [‘source_not_null’,‘source_table_not_empty’, ‘not_null’, ‘relationships’]

The parsing logic:

for i in range(len(new_json['results'])):
unique_id = new_json['results'][i]['unique_id']
test_name_model_name_column_name = ''
test_name = ''
ref_test_name = ''
model_name = ''
column_name = ''
severity = 'error'
if unique_id.split('.')[0] == 'test':
test_name_model_name_column_name = unique_id.split('.')[2]
for test in my_test_names:
if test in test_name_model_name_column_name:
test_name = test
if test_name == 'relationships':
test_name_model_name_column_name = test_name_model_name_column_name.replace('relationships', '')
ref_test_name = test_name_model_name_column_name.lstrip('_').replace('__', ',').split(',')[-1]
test_name_model_name_column_name = test_name_model_name_column_name.replace(ref_test_name, '')
test_name_model_name_column_name = test_name_model_name_column_name.replace(
test_name, '')

for model in my_tables:
if model in test_name_model_name_column_name:
model_name = model
if '__warn' in test_name_model_name_column_name:
severity = 'warn'
test_name_model_name_column_name = test_name_model_name_column_name.replace(
'__warn', '')
else:
test_name_model_name_column_name = test_name_model_name_column_name.replace(
'__error', '')

if test_name != 'relationships':
column_name = test_name_model_name_column_name.replace(model, '').replace('_amis_', '').lstrip(
'_').replace('__', ',').replace('sk', model+'_sk').replace('key', model+'_key') # has to be in this order
else:
column_name = test_name_model_name_column_name.replace(model, '').lstrip(
'_').replace('__', ',')

# print(model_name, '-->', column_name)
# print(severity)
execution_time = new_json['results'][i]['execution_time']
status = new_json['results'][i]['status']
failures = new_json['results'][i]['failures']
message = new_json['results'][i]['message']

values += f"""
(
'{'Relationship Integrity test' if ref_test_name != '' else test_name}',
'{model_name}',
'{column_name if column_name != '' else 'NULL'}',
'{status}',
'{0 if failures == None else failures}',
'{'NULL' if message == None else message.replace("'", '`')}',
'{execution_time}',
current_timestamp::timestamp_ntz
),"""

And finally, post the results to a snowflake table.

    def insert_values_into_log():
conn = sf.connect(user=user, password=password, account=account,
warehouse=warehouse, database=database, authenticator=authenticator)

def run_query(conn, query):
cursor = conn.cursor()
cursor.execute(query)
cursor.close()
insert_values = f"""insert into DATABASE_NAME.SCHEMA_NAME.TABLE_NAME(
test_name,
model_name,
column_names,
test_status,
failures,
message,
test_execution_time,
effective_timestamp
)
values {values[:-1]}"
""
run_query(conn, insert_values)
print('Success')

insert_values_into_log()

Note: Don’t forget to add your own credentials in sf.connect. Also change the insert into command’s DATABASE_NAME.SCHEMA_NAME.TABLE_NAME to whatever is appropriate for your table.

The resulting table will contain all the tests with their model names, column names, status, failures, and so on in one singular table.

Source: https://medium.com/bi3-technologies/how-to-log-all-dbt-test-results-into-a-centralized-snowflake-table-2cc6941972bf

Share

Leave a Reply

Your email address will not be published. Required fields are marked *

Shahnewaz Khan

10 years of experience with BI and Analytics delivery.

Shahnewaz is a technically minded and accomplished Data management and technology leader with over 19 years’ experience in Data and Analytics.

Including;

  • Data Science
  • Strategic transformation
  • Delivery management
  • Data strategy
  • Artificial intelligence
  • Machine learning
  • Big data
  • Cloud transformation
  • Data governance. 


Highly skilled in developing and executing effective data strategies, conducting operational analysis, revamping technical systems, maintaining smooth workflow, operating model design and introducing change to organisational programmes. A proven leader with remarkable efficiency in building and leading cross-functional, cross-region teams & implementing training programmes for performance optimisation. 


Thiru Ps

Solution/ Data/ Technical / Cloud Architect

Thiru has 15+ years experience in the business intelligence community and has worked in a number of roles and environments that have positioned him to confidently speak about advancements in corporate strategy, analytics, data warehousing, and master data management. Thiru loves taking a leadership role in technology architecture always seeking to design solutions that meet operational requirements, leveraging existing operations, and innovating data integration and extraction solutions.

Thiru’s experience covers;

  • Database integration architecture
  • Big data
  • Hadoop
  • Software solutions
  • Data analysis, analytics, and quality. 
  • Global markets

 

In addition, Thiru is particularly equipped to handle global market shifts and technology advancements that often limit or paralyse corporations having worked in the US, Australia and India.