Snowflake
Getting Started
This section provides instructions on setting up and using UniQual for Snowflake observability.
Setup Steps
- Create Tables: Execute the DDL statements provided below to create the necessary tables in your Snowflake schema.
- Populate Data: Populate the
UNIQUAL_ASSETStable with the assets you want to monitor. - Sync Kestra Flows: Upon Github token is setup as kestra secret, create the flow "" using sync_flows_with_git.yml in Kestra_flows directory. This automatically Sync's all flows in Git to Kestra Instance
- Schedule Flows: Schedule the Kestra flows to run at desired intervals.
- Create Dashboards: Create dashboards using your preferred BI tool to visualize the data quality and observability metrics using the queries given in "Reporting" section.
Create objects in Snowflake
UNIQUAL_TABLES_DDL
CREATE TABLE IF NOT EXISTS UNIQUAL_ASSETS (
ASSET_ID VARCHAR(36) DEFAULT UUID_STRING() NOT NULL,
ASSET_CATALOG string NOT NULL,
ASSET_SCHEMA string NOT NULL,
ASSET_NAME string NOT NULL,
ASSET_TYPE string NOT NULL,
ASSET_GROUP string DEFAULT 'DEFAULT',
LOAD_TYPE string,
HAS_PII boolean,
FRESHNESS_THLD_MIN int,
VOLUME_THLD_PCT float,
IS_ACTIVE boolean NOT NULL,
CREATED_AT datetime DEFAULT current_timestamp(),
UPDATED_AT datetime NOT NULL
);
CREATE TABLE IF NOT EXISTS UNIQUAL_OBSERVABILITY_EVENTS (
EVENT_ID VARCHAR(36) DEFAULT UUID_STRING() NOT NULL,
EVENT_TS TIMESTAMP DEFAULT current_timestamp() NOT NULL,
ASSET_ID STRING NOT NULL,
EVENT_TYPE STRING NOT NULL,
RECORDED_VALUE STRING,
CALCULATED_VALUE STRING,
THLD_BREACHED boolean,
UPDATED_BY STRING DEFAULT CURRENT_USER() NOT NULL
);
CREATE TABLE IF NOT EXISTS UNIQUAL_DATA_QUALITY_RESULTS (
ID NUMBER(38,0) NOT NULL,
TIMESTAMP VARCHAR(19) NOT NULL,
CHECK_NAME VARCHAR(10) NOT NULL,
LEVEL VARCHAR(7) NOT NULL,
COLUMN_CHECKED VARCHAR(16777216) NOT NULL,
RULE VARCHAR(16777216) NOT NULL,
VALUE VARCHAR(16777216) NOT NULL,
TOTAL_ROWS NUMBER(38,0) NOT NULL,
VIOLATIONS FLOAT NOT NULL,
PASS_RATE FLOAT NOT NULL,
PASS_THRESHOLD FLOAT NOT NULL,
STATUS VARCHAR(4) NOT NULL,
ASSET_NAME VARCHAR(24) NOT NULL
);
Populating Assets
INSERT_ASSETS
insert into uniqual_assets(ASSET_CATALOG,ASSET_SCHEMA,ASSET_NAME,ASSET_TYPE,LOAD_TYPE,HAS_PII,FRESHNESS_THLD_MIN,VOLUME_THLD_PCT,IS_ACTIVE,UPDATED_AT)
values
('UNIQUAL','PUBLIC','CUSTOMERS','TABLE','INCREMENTAL',TRUE,120,1,TRUE,current_timestamp());
Data Quality Checks
Primary Key/Uniqueness Check
- When Kestra is configured, execute flow "sf_uniqueness_check" for Uniqueness check
- Else the below python script can be executed locally(intended for testing only)
import snowflake.snowpark.functions as F import snowflake.snowpark.types as T import snowflake.snowpark.window as W from snowflake.snowpark.session import Session from cuallee import Check, CheckLevel tableName = 'UNIQUAL.PUBLIC.CUSTOMERS' sf_database = 'UNIQUAL' primary_key_cols = ["C_CUSTKEY"] target_audit_table = "UNIQUAL.PUBLIC.UNIQUAL_DATA_QUALITY_RESULTS" SNOWFLAKE_ENVIRONMENT = { "account": "{{ secret('SNOWFLAKE_ACCOUNT') }}", "user": "{{ secret('SNOWFLAKE_USERNAME') }}", "password": "{{ secret('SNOWFLAKE_PASSWORD') }}", "role": "ACCOUNTADMIN", "warehouse": "COMPUTE_WH", "database": sf_database, "schema": "PUBLIC", } snowpark = Session.builder.configs(SNOWFLAKE_ENVIRONMENT).create() dataframe = snowpark.table(tableName) check = Check(CheckLevel.WARNING, "Uniqueness",session=snowpark) check.is_composite_key(primary_key_cols) result_df= check.validate(dataframe).withColumnRenamed("CHECK", "CHECK_NAME").withColumn("ASSET_NAME", F.lit(tableName)) result_df.write.mode("append").save_as_table(target_audit_table)
Observability
FRESHNESS_CHECK
insert into UNIQUAL_OBSERVABILITY_EVENTS(ASSET_ID,EVENT_TYPE,RECORDED_VALUE,CALCULATED_VALUE,THLD_BREACHED)
select ast.ASSET_ID,'FRESHNESS_CHECK' as EVENT_TYPE,LAST_ALTERED as RECORDED_VALUE,datediff(minute,ist.LAST_ALTERED,current_timestamp()) as CALCULATED_VALUE,
case when CALCULATED_VALUE>FRESHNESS_THLD_MIN THEN TRUE
ELSE FALSE
END as THLD_BREACHED
FROM information_schema.tables ist
join UNIQUAL_ASSETS ast on ist.table_name=ast.asset_name
WHERE table_type = 'BASE TABLE';
VOLUME_TRACKING
insert into UNIQUAL_OBSERVABILITY_EVENTS(ASSET_ID,EVENT_TYPE,RECORDED_VALUE,CALCULATED_VALUE,THLD_BREACHED)
with prv_result as (
select asset_id,RECORDED_VALUE as PRV_ROW_COUNT,event_ts,
row_number() over(partition by asset_id order by event_ts desc) as RC_RANK
from UNIQUAL_OBSERVABILITY_EVENTS
where event_type='VOLUME_CHECK'
qualify rc_rank=1
)
select ast.ASSET_ID,'VOLUME_CHECK' as EVENT_TYPE,
ROW_COUNT as RECORDED_VALUE,
CASE WHEN ROW_COUNT=0 THEN 'TABLE_EMPTY'
WHEN ROW_COUNT=PRV_ROW_COUNT THEN 'NO_CHANGE'
else 'NA'
end as CALCULATED_VALUE
,
CASE
WHEN CALCULATED_VALUE='NO_CHANGE' THEN FALSE
WHEN CALCULATED_VALUE='TABLE_EMPTY' THEN TRUE
WHEN PRV_ROW_COUNT IS NULL THEN FALSE
WHEN (PRV_ROW_COUNT-ROW_COUNT)/ROW_COUNT>VOLUME_THLD_PCT THEN TRUE
end
as THLD_BREACHED
FROM information_schema.tables ist
join UNIQUAL_ASSETS ast on ist.table_name=ast.asset_name
left join prv_result uoe on ast.asset_id=uoe.asset_id
WHERE table_type = 'BASE TABLE'
;
Reporting
UNIQUAL_ASSET_STATUS
create or replace view VW_UNIQUAL_HEALTH_SUMMARY
as
with latest_events as (
select ua.asset_name,event_type,recorded_value,calculated_value,thld_breached,event_ts,
row_number() over(partition by uoe.asset_id,event_type order by event_ts desc) as RANK
from UNIQUAL_ASSETS ua
join UNIQUAL_OBSERVABILITY_EVENTS uoe on ua.asset_id=uoe.asset_id
qualify RANK=1
)
select
asset_name,
max(case when event_type = 'FRESHNESS_CHECK' and thld_breached ='TRUE' then 'R' else 'G' end) as FRESHNESS_STATUS,
max(case when event_type = 'FRESHNESS_CHECK' then event_ts end) as LAST_FRESHNESS_CHECK_TS,
max(case when event_type = 'VOLUME_CHECK' and thld_breached ='TRUE' then 'R' else 'G' end) as VOLUME_STATUS,
max(case when event_type = 'VOLUME_CHECK' then event_ts end) as LAST_VOLUME_CHECK_TS
from latest_events
group by asset_name;
UNIQUAL_OBSERVABILITY_EVENTS
create or replace view VW_UNIQUAL_OBSERVABILITY_EVENTS
as
select ua.asset_name,event_type,recorded_value,calculated_value,thld_breached,event_ts
from UNIQUAL_ASSETS ua
join UNIQUAL_OBSERVABILITY_EVENTS uoe on ua.asset_id=uoe.asset_id
qualify row_number() over(partition by uoe.asset_id,event_type order by event_ts desc)=1;
UNIQUAL_DATA_QUALITY_RESULTS
create or replace view VW_UNIQUAL_DATAQUALITY_STATUS
as
select ua.asset_name,check_name,column_checked,total_rows,violations,
case when status='PASS' then 'G'
else 'R'
end as DQRESULT,timestamp
from UNIQUAL_ASSETS ua
join uniqual_data_quality_results udq on ua.asset_name=udq.asset_name
qualify row_number() over(partition by udq.asset_name,check_name order by timestamp desc)=1;