Snowflake
Before You Begin
Access to your Snowflake Console is required to complete this integration guide.
Configure a Snowflake Storage Integration and Stored Procedure
Step 1
Create a storage integration using the CREATE STORAGE INTEGRATION COMMAND
CREATE STORAGE INTEGRATION clario_integration
type = external_stage
storage_provider = gcs
enabled = true
storage_allowed_locations = ('gcs://<bucket>');
Retrieve the Cloud Storage Service Account for your Snowflake Account. Provide the value of STORAGE_GCP_SERVICE_ACCOUNT
to your Clario Customer Success team.
DESC STORAGE INTEGRATION clario_integration;
Step 2
Create a stored procedure to handle the export:
create or replace procedure clario_unloader_sp(brand varchar, sourceType varchar, feedType varchar, db varchar, dbschema varchar, dbtable varchar)
returns string not null
language javascript
as
$$
var today = new Date().toISOString().slice(0,10).replace('-','').replace('-','');
var sql_command = 'copy into '.concat(
"'gcs://<bucket>/",FEEDTYPE,"/",BRAND,"/",SOURCETYPE,"/",today,"/'",
' from "', DB,'"."', DBSCHEMA, '"."', DBTABLE, '"',
" file_format = (type='PARQUET', compression='SNAPPY')",
" header = true",
" include_query_id = true",
" storage_integration = clario_integration");
try {
var statement = snowflake.createStatement({sqlText: sql_command});
statement.execute();
return sql_command;
} catch (err) {
return "failed " + err;
}
$$;
Schedule Unload Jobs
For each table you wish to unload, create a task using a schedule that fits your needs.
CREATE TASK clario_unload_<brand>_<sourceType>_<feedType>
warehouse = <your warehouse name>
schedule = 'USING CRON <cron definition>'
AS
CALL clario_unloader_sp('<brand>', '<sourceType>', '<feedType>', '<db>', '<schema>', '<table>');
The inputs for clario_unloader_sp
are as follows:
brand
Should be a meaningful abbreviation of the brand you are sending data for (i.e. Avalanche Clothing would be ac).
sourceType
Should represent the type of data you are sending (i.e. customer_master, product_master, esp_activity, esp_subsciber, etc.).
feedType
ongoing (for recurring feeds) or backfill (for one-time historical loads).
db
The name of the Snowflake database your table lives in.
schema
The schema your Snowflake table is organized under.
table
The Snowflake table you are unloading to GCS.
Example Unload Job
You work on the data team for Avalanche Clothing. You are in the process of onboarding with Clario and need to get a feed established to pull transaction header data out of your Snowflake warehouse. Clario is asking for a historical backfill and an ongoing feed. The following assumptions can me made:
Warehouse:
AVALANCHE_WH
Brand:
ac
(Avalanche Clothing)Source Type:
transaction_header
Schema:
ERP
Table:
TR_HEADER
Scheduled tasks will run at 3:00 AM daily, UTC
The data team maintains two databases in the warehouse:
HISTORICAL
andCURRENT
.
Schedule and Enable the Backfill
CREATE TASK clario_unload_ac_transaction_header_backfill
warehouse = AVALANCHE_WH
schedule = 'USING CRON 0 3 * * * UTC'
AS
CALL clario_unloader_sp('ac', 'transaction_header', 'backfill', 'HISTORICAL', 'ERP', 'TR_HEADER');
ALTER TASK IF EXISTS clario_unload_ac_transaction_header_backfill RESUME;
Schedule and Enable the Ongoing Feed
CREATE TASK clario_unload_ac_transaction_header_ongoing
warehouse = AVALANCHE_WH
schedule = 'USING CRON 0 3 * * * UTC'
AS
CALL clario_unloader_sp('ac', 'transaction_header', ongoing, 'CURRENT, 'ERP', 'TR_HEADER');
ALTER TASK IF EXISTS clario_unload_ac_transaction_header_ongoing RESUME;
Working With Scheduled Jobs
List
SHOW TASKS;
Delete
DROP TASK IF EXISTS <task name>;
View History
select *
from table(information_schema.task_history())
order by scheduled_time DESC;
Last updated