Snowflake

Before You Begin

Access to your Snowflake Console is required to complete this integration guide.

Configure a Snowflake Storage Integration and Stored Procedure

Step 1

Create a storage integration using the CREATE STORAGE INTEGRATION COMMAND

CREATE STORAGE INTEGRATION clario_integration
type = external_stage
storage_provider = gcs
enabled = true
storage_allowed_locations = ('gcs://<bucket>');

Retrieve the Cloud Storage Service Account for your Snowflake Account. Provide the value of STORAGE_GCP_SERVICE_ACCOUNT to your Clario Customer Success team.

DESC STORAGE INTEGRATION clario_integration;

Step 2

Create a stored procedure to handle the export:

create or replace procedure clario_unloader_sp(brand varchar, sourceType varchar, feedType varchar, db varchar, dbschema varchar, dbtable varchar)
returns string not null
language javascript
as
$$
var today = new Date().toISOString().slice(0,10).replace('-','').replace('-','');
var sql_command = 'copy into '.concat(
"'gcs://<bucket>/",FEEDTYPE,"/",BRAND,"/",SOURCETYPE,"/",today,"/'",
' from "', DB,'"."', DBSCHEMA, '"."', DBTABLE, '"',
" file_format = (type='PARQUET', compression='SNAPPY')",
" header = true",
" include_query_id = true",
" storage_integration = clario_integration");
try {
var statement = snowflake.createStatement({sqlText: sql_command});
statement.execute();
return sql_command;
} catch (err) {
return "failed " + err;
}
$$;

Schedule Unload Jobs

For each table you wish to unload, create a task using a schedule that fits your needs.

CREATE TASK clario_unload_<brand>_<sourceType>_<feedType>
warehouse = <your warehouse name>
schedule = 'USING CRON <cron definition>'
AS
CALL clario_unloader_sp('<brand>', '<sourceType>', '<feedType>', '<db>', '<schema>', '<table>');

The inputs for clario_unloader_sp are as follows:

brand

Should be a meaningful abbreviation of the brand you are sending data for (i.e. Avalanche Clothing would be ac).

sourceType

Should represent the type of data you are sending (i.e. customer_master, product_master, esp_activity, esp_subsciber, etc.).

feedType

ongoing (for recurring feeds) or backfill (for one-time historical loads).

db

The name of the Snowflake database your table lives in.

schema

The schema your Snowflake table is organized under.

table

The Snowflake table you are unloading to GCS.

Note:

All unload tasks are suspended upon creation. Ensure you enable them.

ALTER TASK IF EXISTS clario_unload_<brand>_<sourceType>_<feedType> RESUME;

Example Unload Job

You work on the data team for Avalanche Clothing. You are in the process of onboarding with Clario and need to get a feed established to pull transaction header data out of your Snowflake warehouse. Clario is asking for a historical backfill and an ongoing feed. The following assumptions can me made:

  • Warehouse: AVALANCHE_WH

  • Brand: ac (Avalanche Clothing)

  • Source Type: transaction_header

  • Schema: ERP

  • Table: TR_HEADER

  • Scheduled tasks will run at 3:00 AM daily, UTC

  • The data team maintains two databases in the warehouse: HISTORICAL and CURRENT.

Schedule and Enable the Backfill

CREATE TASK clario_unload_ac_transaction_header_backfill
warehouse = AVALANCHE_WH
schedule = 'USING CRON 0 3 * * * UTC'
AS
CALL clario_unloader_sp('ac', 'transaction_header', 'backfill', 'HISTORICAL', 'ERP', 'TR_HEADER');
ALTER TASK IF EXISTS clario_unload_ac_transaction_header_backfill RESUME;

Schedule and Enable the Ongoing Feed

CREATE TASK clario_unload_ac_transaction_header_ongoing
warehouse = AVALANCHE_WH
schedule = 'USING CRON 0 3 * * * UTC'
AS
CALL clario_unloader_sp('ac', 'transaction_header', ongoing, 'CURRENT, 'ERP', 'TR_HEADER');
ALTER TASK IF EXISTS clario_unload_ac_transaction_header_ongoing RESUME;

Working With Scheduled Jobs

List

SHOW TASKS;
Delete
DROP TASK IF EXISTS <task name>;

View History

select *
from table(information_schema.task_history())
order by scheduled_time DESC;

Last updated