Skip to content

Snowflake

Getting Started

This section provides instructions on setting up and using UniQual for Snowflake observability.

Setup Steps

  1. Create Tables: Execute the DDL statements provided below to create the necessary tables in your Snowflake schema.
  2. Populate Data: Populate the UNIQUAL_ASSETS table with the assets you want to monitor.
  3. Sync Kestra Flows: Upon Github token is setup as kestra secret, create the flow "" using sync_flows_with_git.yml in Kestra_flows directory. This automatically Sync's all flows in Git to Kestra Instance
  4. Schedule Flows: Schedule the Kestra flows to run at desired intervals.
  5. Create Dashboards: Create dashboards using your preferred BI tool to visualize the data quality and observability metrics using the queries given in "Reporting" section.

Create objects in Snowflake

UNIQUAL_TABLES_DDL
CREATE TABLE IF NOT EXISTS UNIQUAL_ASSETS (
ASSET_ID VARCHAR(36) DEFAULT UUID_STRING() NOT NULL,
ASSET_CATALOG string NOT NULL,
ASSET_SCHEMA string NOT NULL,
ASSET_NAME string NOT NULL,
ASSET_TYPE string NOT NULL,
ASSET_GROUP string DEFAULT 'DEFAULT',
LOAD_TYPE string,
HAS_PII boolean,
FRESHNESS_THLD_MIN int,
VOLUME_THLD_PCT float,
IS_ACTIVE boolean NOT NULL,
CREATED_AT datetime DEFAULT current_timestamp(),
UPDATED_AT datetime NOT NULL
);

CREATE TABLE IF NOT EXISTS UNIQUAL_OBSERVABILITY_EVENTS (
    EVENT_ID VARCHAR(36) DEFAULT UUID_STRING() NOT NULL,
    EVENT_TS TIMESTAMP DEFAULT current_timestamp() NOT NULL,
    ASSET_ID STRING  NOT NULL,
    EVENT_TYPE STRING  NOT NULL,
    RECORDED_VALUE STRING,
    CALCULATED_VALUE STRING,
    THLD_BREACHED boolean,
    UPDATED_BY STRING DEFAULT CURRENT_USER()  NOT NULL
);

CREATE TABLE IF NOT EXISTS UNIQUAL_DATA_QUALITY_RESULTS (
    ID NUMBER(38,0) NOT NULL,
    TIMESTAMP VARCHAR(19) NOT NULL,
    CHECK_NAME VARCHAR(10) NOT NULL,
    LEVEL VARCHAR(7) NOT NULL,
    COLUMN_CHECKED VARCHAR(16777216) NOT NULL,
    RULE VARCHAR(16777216) NOT NULL,
    VALUE VARCHAR(16777216) NOT NULL,
    TOTAL_ROWS NUMBER(38,0) NOT NULL,
    VIOLATIONS FLOAT NOT NULL,
    PASS_RATE FLOAT NOT NULL,
    PASS_THRESHOLD FLOAT NOT NULL,
    STATUS VARCHAR(4) NOT NULL,
    ASSET_NAME VARCHAR(24) NOT NULL
);

Populating Assets

INSERT_ASSETS
insert into uniqual_assets(ASSET_CATALOG,ASSET_SCHEMA,ASSET_NAME,ASSET_TYPE,LOAD_TYPE,HAS_PII,FRESHNESS_THLD_MIN,VOLUME_THLD_PCT,IS_ACTIVE,UPDATED_AT)
values
('UNIQUAL','PUBLIC','CUSTOMERS','TABLE','INCREMENTAL',TRUE,120,1,TRUE,current_timestamp());

Data Quality Checks

Primary Key/Uniqueness Check

  1. When Kestra is configured, execute flow "sf_uniqueness_check" for Uniqueness check
  2. Else the below python script can be executed locally(intended for testing only)
    import snowflake.snowpark.functions as F
    import snowflake.snowpark.types as T
    import snowflake.snowpark.window as W
    from snowflake.snowpark.session import Session
    from cuallee import Check, CheckLevel
    tableName = 'UNIQUAL.PUBLIC.CUSTOMERS'
    sf_database = 'UNIQUAL'
    primary_key_cols = ["C_CUSTKEY"]
    target_audit_table = "UNIQUAL.PUBLIC.UNIQUAL_DATA_QUALITY_RESULTS"
    SNOWFLAKE_ENVIRONMENT = {
                "account": "{{ secret('SNOWFLAKE_ACCOUNT') }}",
                "user": "{{ secret('SNOWFLAKE_USERNAME') }}",
                "password": "{{ secret('SNOWFLAKE_PASSWORD') }}",
                "role": "ACCOUNTADMIN",
                "warehouse": "COMPUTE_WH",
                "database": sf_database,
                "schema": "PUBLIC",
            }
    snowpark = Session.builder.configs(SNOWFLAKE_ENVIRONMENT).create()
    
    dataframe = snowpark.table(tableName)
    check = Check(CheckLevel.WARNING, "Uniqueness",session=snowpark)
    check.is_composite_key(primary_key_cols)
    result_df= check.validate(dataframe).withColumnRenamed("CHECK", "CHECK_NAME").withColumn("ASSET_NAME", F.lit(tableName))
    result_df.write.mode("append").save_as_table(target_audit_table)
    

Observability

FRESHNESS_CHECK
insert into UNIQUAL_OBSERVABILITY_EVENTS(ASSET_ID,EVENT_TYPE,RECORDED_VALUE,CALCULATED_VALUE,THLD_BREACHED)
select ast.ASSET_ID,'FRESHNESS_CHECK' as EVENT_TYPE,LAST_ALTERED as RECORDED_VALUE,datediff(minute,ist.LAST_ALTERED,current_timestamp()) as CALCULATED_VALUE,
case when CALCULATED_VALUE>FRESHNESS_THLD_MIN THEN TRUE
ELSE FALSE
END as THLD_BREACHED
FROM information_schema.tables ist
join UNIQUAL_ASSETS ast on ist.table_name=ast.asset_name
WHERE table_type = 'BASE TABLE';
VOLUME_TRACKING
insert into UNIQUAL_OBSERVABILITY_EVENTS(ASSET_ID,EVENT_TYPE,RECORDED_VALUE,CALCULATED_VALUE,THLD_BREACHED)
with prv_result as (
select asset_id,RECORDED_VALUE as PRV_ROW_COUNT,event_ts,
row_number() over(partition by asset_id order by event_ts desc) as RC_RANK
from UNIQUAL_OBSERVABILITY_EVENTS
where event_type='VOLUME_CHECK'
qualify rc_rank=1
)
select ast.ASSET_ID,'VOLUME_CHECK' as EVENT_TYPE,
ROW_COUNT as RECORDED_VALUE,
CASE WHEN ROW_COUNT=0 THEN 'TABLE_EMPTY'
WHEN ROW_COUNT=PRV_ROW_COUNT THEN 'NO_CHANGE'
else 'NA'
end as CALCULATED_VALUE
,
CASE 
WHEN CALCULATED_VALUE='NO_CHANGE' THEN FALSE
WHEN CALCULATED_VALUE='TABLE_EMPTY' THEN TRUE
WHEN PRV_ROW_COUNT IS NULL THEN FALSE
WHEN (PRV_ROW_COUNT-ROW_COUNT)/ROW_COUNT>VOLUME_THLD_PCT THEN TRUE
end
as THLD_BREACHED
FROM information_schema.tables ist
join UNIQUAL_ASSETS ast on ist.table_name=ast.asset_name
left join prv_result uoe on ast.asset_id=uoe.asset_id
WHERE table_type = 'BASE TABLE'
;

Reporting

UNIQUAL_ASSET_STATUS
create or replace view VW_UNIQUAL_HEALTH_SUMMARY
as
with latest_events as (
select ua.asset_name,event_type,recorded_value,calculated_value,thld_breached,event_ts,
row_number() over(partition by uoe.asset_id,event_type order by event_ts desc) as RANK
from UNIQUAL_ASSETS ua
join UNIQUAL_OBSERVABILITY_EVENTS uoe on ua.asset_id=uoe.asset_id
qualify RANK=1
)
select 
          asset_name,
          max(case when event_type = 'FRESHNESS_CHECK' and thld_breached ='TRUE' then 'R' else 'G' end) as FRESHNESS_STATUS,
          max(case when event_type = 'FRESHNESS_CHECK' then event_ts end) as LAST_FRESHNESS_CHECK_TS,
          max(case when event_type = 'VOLUME_CHECK' and thld_breached ='TRUE' then 'R' else 'G' end) as VOLUME_STATUS,
          max(case when event_type = 'VOLUME_CHECK' then event_ts end) as LAST_VOLUME_CHECK_TS
from latest_events
group by asset_name;
UNIQUAL_OBSERVABILITY_EVENTS
create or replace view VW_UNIQUAL_OBSERVABILITY_EVENTS
as
select ua.asset_name,event_type,recorded_value,calculated_value,thld_breached,event_ts
from UNIQUAL_ASSETS ua
join UNIQUAL_OBSERVABILITY_EVENTS uoe on ua.asset_id=uoe.asset_id
qualify row_number() over(partition by uoe.asset_id,event_type order by event_ts desc)=1;
UNIQUAL_DATA_QUALITY_RESULTS
create or replace view VW_UNIQUAL_DATAQUALITY_STATUS
as
select ua.asset_name,check_name,column_checked,total_rows,violations,
case when status='PASS' then 'G'
else 'R'
end as DQRESULT,timestamp
from UNIQUAL_ASSETS ua
join uniqual_data_quality_results udq on ua.asset_name=udq.asset_name
qualify row_number() over(partition by udq.asset_name,check_name order by timestamp desc)=1;