OLAP in Snowflake
Intro
Snowflake is the platform delivered as a service which means we don't have to worry about patches, upgrades or maintenance. I like the way they describe it: combines the power of data warehousing, the flexibility of big data platforms, and the elasticity of the cloud.
Navigate to Snowflake aspects:
Features
App includes following features:
Demo
Snowflake architecture
- In Snowflake's architecture, the key is a separation of storage, compute and services:
soruce: snowflake
- S3 as a storage layer (Google Cloud Storage or Azure Blob Storage),
- EC2 Virtual Warehouse as a compute layer .
- metadata, security, transactions and concurrency management as service layer. - Building pipelines with Snowflake:
soruce: snowflake - Snowflake objects:
- Database - containing schemas, tables, views, stages, file formats, sequences. All the tables are created in PUBLIC schema by default.
- Stage - a data lake like place from which data files are being copied into the tables.
- File format - we need to define format of the files bieng stored at stage.
- Warehouse - computing enginge.
- Worksheet - a place where we can proceed with writing or loading ready SQL scripts.
Snowflake mechanism:
- Snowflake for OLAP:
- snowflake supports Online Analytical Processing (OLAP) for scanning all of the data in columns in order to get aggregated values as opposed to Online Transactional Processing (OLTP) where we want to retrieve one or many rows from database. The main purpose of OLAP is data analysis and when it comes to OLTP then main purpose is data processing (inserting, updating).
soruce: Data Council
- while a classic databases like PostgreSQL or MySQL are type of raw store (OLTP) that keeps data linearly row by row, from left to right, from top to bottom, Snowflake supports columnar store (OLAP) which allows to traverse entire columns one by one.
soruce: Data Council
- using columnar storage makes sense when we want to analyze one or two specific columns and skip reading other ones. - Snowflake workflow:
- Dividing big tabular data into x-consecutive rows segments called micropartitions.
- Reorganizing the data of each micropartition by flipping row store into columnar - column values of each row get stored together.
- Adding a header to each micropartition. Header contains metadata of given micropartition and columns it contains.
- Micropartitions are being stored in S3 as the immutable files.
- All the information about micropartitions are being stored in Sknowflake's metadata layer.
- When query comes in, metadata layer knows which micropartition in S3 belongs to the queried table.
- Reading a S3 file, it searches for a specific header first and then reads the only column we specified in the query.
- It reads the only queried data ingnoring the rest of S3 files. - Every action on db can be performed with sql script along with setting up stages, file formats,
configuring warehouse, copying data to a table from a stage.
- Data Loading consists of two parts:
1. Staging the files into the place where Snowflake can retrieve data files from. We can have both internal and exteranl (S3, GSP) storages.
2. Loding the data into a specific table.
Quering Snowlake's table:
- We can either qury table jaki that:
USE DATABSE my_db;
USE SCHEMA my_schema;
SELECT * FROM my_table;
- or like that:
SELECT * FROM my_db.my_schema.my_table
SnowSQL
- The command line interface client for connecting to Snowflake.
- Allows to execute SQL queries to perform DDL and DML operations
- We can install it from Snowflake UI and there is info on how to connect at the end:
- Example:
USE DATABASE my_db;
CREATE OR REPLACE TABLE my_tbl {
    ...
};
CREATE STAGE csvfiles;
PUT file:///tmp/load/abc*.csv @csvfiles;
LIST @csvfiles;
CREATE WAREHOUSE IF NOT EXISTS "dataload" WAREHOUSE.SIZE = "Large"
    AUTOSUSPEND = 300 AUTO_RESUME = TRUE;
USE WAREHUSE dataload;
COPY INTO my_tbl
    FROM @csvfiles
    PATTERN = '.*file0[1-4].csv.gz'
    ON_ERROR = 'skip_file';
SELECT * FROM my_tbl LIMIT 10;
- We can learn a lot from the documentation: Snowflake documentation
SnowPipe:
- It loads data from a sources like AWS S3 bucket into a certain target table.
- With SnowPipe we can perform whole ETL process:
CREATE PIPE my_db.my_stage.my_pipe AUTO_INGEST = FALSE AS COPY INTO "MY_DB"."MY_STAGE"."tbl_my_table" (col1, col2, col3, col4, col5, col6, col7, col8) FROM ( SELECT substr(stage_alias.$3, 1, 7), iff(stage_alias.$1 = 100, replace(stage_alias, '100', 'Done'), 'In progress'), stage_alias.$2, stage_alias.$3, replace(stage_alias.$5, ',', '.'), 'Artur', metadata$filename, CURRENT_TIMESTAMP() FROM @"MY_DB"."MY_STAGE"."s3_bucket_name" stage_alias ) FILE FORMAT = (format_name = "MY_DB"."MY_STAGE"."SAP_X") FORCE = FALSE;
- at first, we create a pipe name and assign it to the COPY INTO,
- each pipe definition starts with COPY INTO then we state a destination table according to the path:
database
.stage
.destination table
,
then we list all the destination columns that we want to feed with the data load.
- In SELECT statement, we perform entire data manipulation where each line corresponds to a signle column listed in the COPY INTO line,
- In SELECT statement, we work with indexes which refer to a column number in the data source,
- In SELECT statement, we can jump over an index if we want to avoid some columns or we can manipulate the data source columns sequence,
- In SELECT statement, we can either use source column (original or transformed) or we can give a hard-coded value there,
- For example the first line in SELECT takes 3rd column from data source and transforms it with substr(),
- Usingmetadata$filename
gives us the name of current file's name,
- with FILE FORMAT we tell Snowflake what is the structure of the data source, what is the delimiter there etc,
- withFORCE = FALSE
we tell Snowflake not to import the file agian if it was imported before,
- withAUTO_INGEST = TRUE
we ensure automatic dtata ingestion from S3 file when newly-created,
- Integration S3 with Snowflake:
1. Creating snowpipe.
2. Displaying pipes withSHOW pipes;
.
3. Taking value from field notification_channel:
4. Pasting it as SQS queue ARN in S3 Event Notification - one of the S3 property where we can set the connection with Snowflake.
5. Setting all types of object creation:
6. Providing SQS ARN:
- AWS stage:
- in the example above we use the external stage of AWS S3,
- to create the stage we need to provide S3 bucket's url along with credentials,
CREATE OR REPLACE STAGE "MY_DB"."MY_SCHEMA"."stage_name" URL = 's3://bucket_name/folder_name' CREDENTIALS = (aws_key_id='.....' aws_secret_key='.....') ENCRYPTION = (type='AWS_SSE_KMS' kms_key_id = '......');
- encryption is being used in order to encrypt the confidential data.
- instead of passing credentials to each stage we can move on with so-called STORAGE INTEGRAION:
CREATE STORAGE INTEGRATION "MY_DB"."MY_SCHEMA"."AWS_111111_Integration" TYPE = EXTERNAL_STAGE STORAGE_PROVIDER = S3 ENABLED = TRUE STORAGE_AWS_ROLE_ARN = 'arn:aws:iam::111111:role/my_role' STORAGE_ALLOWED_LOCATIONS = ( 's3://bucket_name/', 's3://another_bucket_name/')
- which we can use in CREAT OR REPLACE STAGE script:
CREATE OR REPLACE STAGE "MY_DB"."MY_STAGE"."stage_name" URL = 's3://bucket_name/folder_name' STORAGE_INTEGRATION = "MY_DB"."MY_SCHEMA"."AWS_111111_Integration"
- ARN stands for Amazon Resource Name. - With the snowpipe we can understand what are the all dependencies and refreneces to the Snowflake objects based on the source code itself.
Time Travel:
- Every time a DML operation is executed on a table, Snowflake saves previous versions of the table data. This allows to query earlier data versions using the AT | BEFORE clause.
- The data retention period specifies the number of days for which this historical data is preserved and, therefore, Time Travel operations (SELECT, CREATE … CLONE, UNDROP) can be performed on the data. The standard retention period is 1 day (24 hours). When the retention period ends for an object, the historical data is moved into Snowflake Fail-safe (background process).
- The data retention period can be modified up to 90 days. It can be overridden using the DATA_RETENTION_TIME_IN_DAYS while creating table, schema or database.
create table mytable(col1 number, col2 date) data_retention_time_in_days=90;
alter table mytable set data_retention_time_in_days=30;
- If a retention period is specified for a database or schema, the period is inherited by default for all objects created in the database/schema. - We can query a specified point in the table’s history within the retention period giving timestamp, offset from a current date or giving statement query ID.
-- timestamp select * from my_table at(timestamp => 'Mon, 01 May 2015 16:20:00 -0700'::timestamp_tz); -- time offset from current date select * from my_table at(offset => -60*5); -- statement id select * from my_table before(statement => '8e5d0ca9-005e-44e6-b858-a8f5b37c5726');
- Querying outside the data retention period for the table, the query fails and returns an error.
Bulk Unloading
- We can use
COPY INTO location from tbl_name
command to copy the data from the Snowflake database table into one or more files in a Snowflake or external stage. - We can also unload the data into S3 Bucket directly:
copy into 's3://bucket_name/folder_name/'
from mytable_name
storage_integration = s3_int_name;
- we must specify the URI for the S3 bucket and the storage integration or credentials for accessing the bucket in the COPY command:
COPY INTO 's3://mybucket/folder_name/'
from my_table
credentials = (aws_key_id='xxxx' aws_secret_key='xxxxx' aws_token='xxxxxx')
- Unloading to a File:
copy into @mystage/myfile.csv.gz from mytable file_format = (type=csv compression='gzip') single=true max_file_size=4900000000;
source: docs.snowflake.com
- loading table data to a single file named myfile.csv in a named stage. - Bulk unloading with the stored procedure:
CREATE OR REPLACE PROCEDURE "db_name"."db_schema".unload_procedure() RETURNS string LANGUAGE javascript AS $$ var tbl_arr = [ 'tbl_clients', 'tbl_products', 'tbl_vendors', 'tbl_employees', 'tbl_discounts', 'tbl_premium_products', ]; var tbl_arr_length = tbl_arr.length; var today = new Date(); var date = today.getFullYear()+'-'+(today.getMonth()+1)+'-'+today.getDate(); for (var i = 0; i < tbl_arr_length; i++){ var table_name = tbl_arr[i]; var file_name = date + "_" + table_name; var s3_bucket_url = 's3://my_backup/db-backup/' + file_name; var full_table_name = '"db_name"."db_schema"."' + table_name + '"'; var storage_integration_name = '"db_name.db_schema.AWS_12345_Integration"' var sql = ""; sql += "COPY INTO " + s3_bucket_url + " "; sql += "FROM " + full_table_name + " "; sql += "STORAGE_INTEGRATION = " + storage_integration_name + " "; sql += "FILE_FORMAT = ( TYPE = CSV null_if=('Null') field_optionally_enclosed_by='\"');"; var sql_command = snowflake.createStatement({sqlText: sql}); try { var sql_result = sql_command.execute(); } catch (e) { return = 'bulk unloading failed: ' + e; }; }; result = 'DONE' return result ; $$;
- the procedure loops through all of the tables in the database,
- within the loop, it gives a file a name of the table along with the timestamp beign concatinated which is meant to build the S3 bucket's url,
- the rest of the loop is responsible for buildin the sql statement and executing it within try-catch block.
Update statement:
- When update statement sent to a cloud database, there is no change to an existing record. Actually, a new record is being inserted with an updated value(s) instead. There is also the metadata that controlls which one of those records is the most updated. The old records are being kept in the storage and kept available so that we can go back to these if needed.
- This is unlike sql sever database for example, where update statement changes the records value in place.
Stored procedures:
- A stored procedure returns a single value only.
- Although we run SELECT inside a stored procedure, the results must be used within the stored procedure, or be narrowed to a single value and then returned.
- Language used in JavaScript that provides the control structures (branching and looping) and SQL statements are executed by calling functions in a JavaScript API.
- Example procedure that takes one variable from an user:
CREATE OR REPLACE PROCEDURE procedure_name ( project_name varchar ) RETURNS ARRAY LANGUAGE javascript AS $$ var query = "SELECT MANAGER_NAME FROM PROJECTS WHERE PROJECT_NAME = ?"; var statement = snowflake.createStatement({sqlText: query, binds: [PROJECT_NAME]}); var resultSet = statement.execute(); var arr = []; while (resultSet.next())( arr.push(resultSet.GetColumnValue(1)); ) return arr; $$; CALL procedure_name('asap_project')
- When defining a stored procedure we need to declare the type of returned vlaue, here ARRAY.
- If we want to return different type depending on flow control we can declare returned value as VARIANT.
- $$ is the delimiter for the beginning and end of the stored procedure.
- With Javascript API global object's function: snowflake.createStatement() we pass JSON object with two key-value pairs.
- With binds key we bind '?' in the SELECT statement with passed variables.
- We can either bind variables with query or we can concat variables in a SELECT statment like:"SELECT MANAGER_NAME FROM PROJECTS WHERE
PROJECT_NAME = '" + PROJECT_NAME + "';"
- The variable: resultSet holds the outcome of SELECT statement.
- In order to access each row from the resultSet, we need to iterate it calling resultSet.next() one time for each record.
- As far as there are records still available, resultSet.next() returns TRUE in the while loop.
- Witharr.push(resultSet)
we put all record's columns into the array.
- We can narrow down the result to one column only with the following:arr.push(resultSet.GetColumnValue(1)).
- Result of above:['Artur', 'Johnny']
- IMPORTANT to note, we can pass variable names into procedrue in either upper or lower case but using these variables inside a procedure we need to use them with upper cases. This is because SQL by default converts columns in upper case.
- We can execute many sql statements within one procedure:
CREATE OR REPLACE PROCEDURE "db_name"."schema_name".prcedure_name() RETURNS string LANGUAGE javascript AS $$ // Creating 2 temporary tables: // 1st one contains all ITEM with a single category assigned (there may be duplicates) // 2nd one contains unique ITEM with order_status defined // Creating temporary table with all categories per ITEM var tbl_products = '"db_name"."schema_name"."tbl_products"'; var tbl_tools = '"db_name"."schema_name"."tbl_tools"'; var tbl_items = '"db_name"."schema_name"."tbl_items"'; var tbl_things = '"db_name"."schema_name"."tbl_things"'; var tbl_spare_parts = '"db_name"."schema_name"."tbl_spare_parts"'; var tbl_temporary_name = '"db_name"."schema_name"."tbl_all_stuff"'; var tbl_temporary_definition = ""; tbl_temporary_definition += "CREATE OR REPLACE TABLE " + tbl_temporary_name + " AS "; tbl_temporary_definition += "SELECT *, SUM(CASE WHEN CATEGORY = 'Spare' THEN 1 ELSE 0 END) OVER(PARTITION BY ITEM ORDER BY ITEM) AS IF_SPARE_SOMEWHERE, COUNT(*) OVER(PARTITION BY ITEM ORDER BY ITEM) ITEM_COUNT FROM(" tbl_temporary_definition += "SELECT ITEM, CATEGORY, PART_NUMBER, DATA_SRC, DATA_SRC_FILENAME FROM " + tbl_products + " "; tbl_temporary_definition += "UNION "; tbl_temporary_definition += "SELECT ITEM, CATEGORY, PART_NUMBER, DATA_SRC, DATA_SRC_FILENAME FROM " + tbl_tools + " "; tbl_temporary_definition += "UNION "; tbl_temporary_definition += "SELECT ITEM, CATEGORY, PART_NUMBER, DATA_SRC, DATA_SRC_FILENAME FROM " + tbl_items + " "; tbl_temporary_definition += "UNION "; tbl_temporary_definition += "SELECT ITEM, CATEGORY, PART_NUMBER, DATA_SRC, DATA_SRC_FILENAME FROM " + tbl_things + " "; tbl_temporary_definition += "UNION "; tbl_temporary_definition += "SELECT ITEM, 'Spare', PART_NUMBER, DATA_SRC, DATA_SRC_FILENAME FROM " + tbl_spare_parts; tbl_temporary_definition += ") ORDER BY 1;"; // example // ------------------------------------------------------------------------------ // ITEM | CATEGORY | ... | DATA_SRC | ... | IF_SPARE_SOMEWHERE | // ------------------------------------------------------------------------------ // 111 | Spare | ... | ERP | ... | 0 | // ------------------------------------------------------------------------------ // 123 | Spare | ... | ERP | ... | 0 | // 123 | different | ... | ERP | ... | 0 | // 123 | different_2 | ... | ERP | ... | 0 | // ------------------------------------------------------------------------------ // 999 | something | ... | ERP | ... | 1 | // 999 | something_2 | ... | ERP | ... | 1 | // 999 | Spare | ... | SSYSTEM | ... | 1 | var sql_command = snowflake.createStatement({sqlText: tbl_temporary_definition}); try { var sql_result = sql_command.execute(); } catch (e) { return 'creating temporary table failed: ' + e; } // deleting duplicated ITEM when coming from both SAP and SUPER_SYSTEM (deleting SAP ones) // prioritizing SUPER_SYSTEM entires // due to example - it will remove two records with ITEM of 999 where DATA_SRC is ERP var delete_sql_statement = ""; delete_sql_statement += "DELETE "; delete_sql_statement += "FROM " + tbl_temporary_name + " "; delete_sql_statement += "WHERE ITEM_COUNT > 1 AND CATEGORY != 'Spare' AND IF_SPARE_SOMEWHERE = 1;"; var sql_command = snowflake.createStatement({sqlText: delete_sql_statement}); try { var sql_result = sql_command.execute(); } catch (e) { return 'deleting ERP duplicates from tbl_all_stuff failed: ' + e; } // dropping ITEM_COUNT_COLUMN and IF_CURRIMA_SOMEWHERE from tbl_all_stuff var drop_column = ""; drop_column += "ALTER TABLE " + tbl_temporary_name + " "; drop_column += "DROP COLUMN ITEM_COUNT, IF_SPARE_SOMEWHERE;"; var sql_command = snowflake.createStatement({sqlText: drop_column}); try { var sql_result = sql_command.execute(); } catch (e) { return 'droping column from tbl_all_stuff failed: ' + e; } // example: // -------------------------------------------------- // ITEM | CATEGORY | ... | DATA_SRC | ... // -------------------------------------------------- // 111 | Spare | ... | ERP | ... // -------------------------------------------------- // 123 | Spare | ... | ERP | ... // 123 | different | ... | ERP | ... // 123 | different_2 | ... | ERP | ... // -------------------------------------------------- // 999 | Spare | ... | SSYSTEM | ... // Insert for testing -> ITEM 666111 should appear in the tbl_items_master at the end of procedure - if so then test passed - omment out when want to skip // var insert_text = "INSERT INTO " + tbl_temporary_name + // " (ITEM, CATEGORY, PART_NUMBER, DATA_SRC) VALUES('666111', 'Spare', '12345', 'SSYSTEM_B')"; // var stmt_insertIntoTblProjectsMaster_command = snowflake.createStatement({sqlText: insert_text}); // var res = stmt_insertIntoTblProjectsMaster_command.execute(); // Creating table with unique item and order_status var tbl_temporary_unique_item_name = '"db_name"."schema_name"."tbl_temp_unique_item"'; // everything from super_system is open and has higher priority than SAP entieris // (sap ones should be dropped when diplicate super_system ones) var case_logic = "" case_logic += "CASE "; case_logic += "WHEN CATEGORY = 'Spare' THEN 'open' "; case_logic += "WHEN if_spare >= 1 AND count > 1 THEN 'open' "; case_logic += "WHEN if_spare = 1 AND count = 1 THEN 'to be checked' "; case_logic += "WHEN if_spare = 0 AND count > 0 THEN 'open' "; case_logic += "WHEN if_spare = 0 AND count = 0 THEN 'closed' "; case_logic += "END "; var tbl_temporary_definition = ""; tbl_temporary_definition += "CREATE OR REPLACE TABLE " + tbl_temporary_unique_item_name + " AS "; tbl_temporary_definition += "SELECT DISTINCT ITEM, "; tbl_temporary_definition += "SUM(CASE WHEN CATEGORY = 'Spare' THEN 1 ELSE 0 END) OVER(PARTITION BY ITEM ORDER BY ITEM) AS if_spare, "; tbl_temporary_definition += "COUNT(*) OVER(PARTITION BY ITEM ORDER BY ITEM) AS count, "; tbl_temporary_definition += case_logic + " AS order_status "; tbl_temporary_definition += "FROM " + tbl_temporary_name; var sql_command = snowflake.createStatement({sqlText: tbl_temporary_definition}); try { var sql_result = sql_command.execute(); } catch (e) { return 'creating temporary table wiht unique ITEM failed: ' + e; } // example // ------------------------------------------------------- // ITEM | if_spare | count | order_status // ------------------------------------------------------- // 111 | 1 | 1 | to be checked // ------------------------------------------------------- // 123 | 1 | 3 | open // ------------------------------------------------------- // 999 | 0 | 1 | open // ------------------------------------------------------- // Main var tbl_items_master = '"db_name"."schema_name"."tbl_items_master"'; var tbl_erpdata = "ERP "; var insert_text = "INSERT INTO " + tbl_items_master + " (ITEM, PART_NUMBER, ORDER_STATUS, ABC, BUDGET, USER_ID, EDIT_TIMESTAMP) "; var outer_query = ""; outer_query += "SELECT DISTINCT erpdata.ITEM, "; outer_query += "'none (this record is system-generted based on " + tbl_erpdata + " data import)', "; outer_query += "erpdata.ORDER_STATUS, "; // logic to define ABC: // ERP_A - A100 (based on DATA_SRC) // ERP_B - B300 (based on DATA_SRC) // SSYSTEM_A - A100 (based on DATA_SRC_FILENAME) // SSYSTEM_B - B300 (based on DATA_SRC_FILENAME) // else - tbd outer_query += "iff(erpdata.DATA_SRC like 'ERP%', iff(erpdata.DATA_SRC like 'ERP_A', 'A100', 'B300'), iff(erpdata.DATA_SRC_FILENAME like 'SSYSTEM%', iff(erpdata.DATA_SRC_FILENAME like '%SSYSTEM_A%', 'A100', 'B300'), 'tbd')), "; outer_query += "left(erpdata.PART_NUMBER,5), "; outer_query += "'system' user_id, "; outer_query += 'current_timestamp()' + " "; var inner_query = ""; inner_query += "SELECT a.*, p.ORDER_STATUS "; inner_query += "FROM " + tbl_temporary_name + " a "; inner_query += "JOIN " + tbl_temporary_unique_psp_name + " p "; inner_query += "ON a.ITEM = p.ITEM"; // inner_query example result: // --------------------------------------------------------------------------- // ITEM | CATEGORY | PART_NUMBER | DATA_SRC | ORDER_STATUS // --------------------------------------------------------------------------- // 123 | Spare | ... | ... | open // --------------------------------------------------------------------------- // 123 | different | ... | ... | open // --------------------------------------------------------------------------- // 123 | different_2 | ... | ... | open // --------------------------------------------------------------------------- var sub_from_text = "FROM (" + inner_query + ") erpdata "; var sub_join_text = "FULL OUTER JOIN " + tbl_items_master + " projects on projects.ITEM = erpdata.ITEM "; var sub_where_text = "WHERE projects.ITEM is null and erpdata.ITEM is not null;"; var values_text = outer_query + sub_from_text + sub_join_text + sub_where_text; var insertIntoTblProjectsMaster_command = insert_text + values_text; var stmt_insertIntoTblProjectsMaster_command = snowflake.createStatement( {sqlText: insertIntoTblProjectsMaster_command}); try { var res = stmt_insertIntoTblProjectsMaster_command.execute(); } catch (e) { return 'inserting to master table failed: ' + e; } // Dropping tbl_temp_unique_psp // comment these out if you want to keep the table for further investigation var drop_definition = "DROP TABLE " + tbl_temporary_unique_psp_name; var sql_command = snowflake.createStatement({sqlText: drop_definition}); var sql_result = sql_command.execute(); // Dropping temporary table // comment these out if you want to keep the table for further investigation var drop_definition = "DROP TABLE " + tbl_temporary_name; var sql_command = snowflake.createStatement({sqlText: drop_definition}); var sql_result = sql_command.execute(); result = "Done" return result; $$; CALL "db_name"."schema_name".prcedure_name()
Variables
- We can make an usage of variables:
SET name = 'Artur' SHOW VARIABLES; SELECT * FROM employees WHERE employee_name = $name
- calling variable we need to precede it with dollar sign. - When variable stores the name of db object we need to wrap it in indetifier() function:
SET my_table_name = "employees" SELECT * FROM IDENTIFIER($my_table_name); DROP TABLE IDENTIFIER($my_table_name);
Streams:
- Make it easy to grab just the new data in a table as they take a snapshot of all rows in a table at a point in time.
- Stream records any DML changes to a table as well as metadata about each change:
-METADATA$ACTION
-METADATA$ISUDATE
-METADATA$ROWID
- Stream itself doesn't contain any data. It stores only an offset for the souce table and returns Change Data Capture (CDC) records using the versioning history.
soruce: Joyce Kay Avila - Here is how to create a stream based on a table:
USE DATABSE my_db;
USE SCHEMA my_schema;
CREATE OR REPLACE STREAM my_stream ON TABLE my_table
- A stream can be queried just like a table:
SELECT * FROM my_stream;
- if there wasn't any data upload then my_stream is returns nothing.
- when we upload 3 records to my_table then my_stream returns exactly the same 3 records along with the metadata.
- so stream will be always empty as long as there is no DML yet since the stream was created. - If we recreate/replace stream called my_stream:
CREATE OR REPLACE STREAM my_stream ON TABLE my_table
and there was a data upload to my_table, then my_stream will be epmty again because of the replacement. - We can use them in continuous processing as a queues that are the palces we put the work to be taken from that queue and done.
Taking data from that queue is as simple as selecting a regular table:
- we put a stram on a table that is being updated regularly,
CREATE OR REPLACE TABLE RATING_EVENTS( RAW_DATA VARIANT ); CREATE OR REPLACE STREAM STREAM_A ON TABLE RATING_EVENTS; CREATE OR REPLACE STREAM STREAM_B ON TABLE RATING_EVENTS; INSERT INTO RATING_EVENTS(RAW_DATA) SELECT PARSE_JSON(' {'event_id': 1, 'rating': 10} '); SELECT * FROM STREAM_A; Row | RAW_DATA | METADATA$ACTION | METADATA$ISUDATE | METADATA$ROW_ID --------------------------------------------------------------------------------------------------- 1 | {'event_id': 1, 'rating': 10} | INSERT | FALSE | 123456 --------------------------------------------------------------------------------------------------- INSERT INTO MAIN_TABLE(EVENT_ID, EVENT_TITLE, RATING)( SELECT STREAM_A.RAW_DATA:"event_id" AS EVENT_ID 'Random_title' AS EVENT_TITLE STREAM_A.RAW_DATA:"rating" AS RATING FROM STREAM_A ); SELECT * FROM STREAM_A; Row | RAW_DATA | METADATA$ACTION | METADATA$ISUDATE | METADATA$ROW_ID --------------------------------------------------------------------------------------------------- | | | | --------------------------------------------------------------------------------------------------- SELECT * FROM STREAM_B; Row | RAW_DATA | METADATA$ACTION | METADATA$ISUDATE | METADATA$ROW_ID --------------------------------------------------------------------------------------------------- 1 | {'event_id': 1, 'rating': 10} | INSERT | FALSE | 123456 --------------------------------------------------------------------------------------------------- INSERT INTO RATING_EVENTS(RAW_DATA) SELECT PARSE_JSON(' {'event_id': 2, 'rating': 09} '); INSERT INTO RATING_EVENTS(RAW_DATA) SELECT PARSE_JSON(' {'event_id': 3, 'rating': 05} '); SELECT * FROM STREAM_A; Row | RAW_DATA | METADATA$ACTION | METADATA$ISUDATE | METADATA$ROW_ID --------------------------------------------------------------------------------------------------- 2 | {'event_id': 2, 'rating': 09} | INSERT | FALSE | 123457 --------------------------------------------------------------------------------------------------- 3 | {'event_id': 3, 'rating': 05} | INSERT | FALSE | 123458 --------------------------------------------------------------------------------------------------- SELECT * FROM STREAM_B; Row | RAW_DATA | METADATA$ACTION | METADATA$ISUDATE | METADATA$ROW_ID --------------------------------------------------------------------------------------------------- 1 | {'event_id': 1, 'rating': 10} | INSERT | FALSE | 123456 --------------------------------------------------------------------------------------------------- 2 | {'event_id': 2, 'rating': 09} | INSERT | FALSE | 123457 --------------------------------------------------------------------------------------------------- 3 | {'event_id': 3, 'rating': 05} | INSERT | FALSE | 123458 --------------------------------------------------------------------------------------------------- INSERT INTO MAIN_TABLE(EVENT_ID, EVENT_TITLE, RATING)( SELECT STREAM_A.RAW_DATA:"event_id" AS EVENT_ID 'Random_title' AS EVENT_TITLE STREAM_A.RAW_DATA:"rating" AS RATING FROM STREAM_A ); SELECT * FROM STREAM_A; Row | RAW_DATA | METADATA$ACTION | METADATA$ISUDATE | METADATA$ROW_ID --------------------------------------------------------------------------------------------------- | | | | --------------------------------------------------------------------------------------------------- DELETE FROM RATING_EVENTS WHERE RAW_DATA:"rating" IS NOT NULL; SELECT * FROM STREAM_A; Row | RAW_DATA | METADATA$ACTION | METADATA$ISUDATE | METADATA$ROW_ID --------------------------------------------------------------------------------------------------- 1 | {'event_id': 1, 'rating': 10} | DELETE | FALSE | 123456 --------------------------------------------------------------------------------------------------- 2 | {'event_id': 2, 'rating': 09} | DELETE | FALSE | 123457 --------------------------------------------------------------------------------------------------- 3 | {'event_id': 3, 'rating': 05} | DELETE | FALSE | 123458 --------------------------------------------------------------------------------------------------- SELECT * FROM STREAM_B; Row | RAW_DATA | METADATA$ACTION | METADATA$ISUDATE | METADATA$ROW_ID --------------------------------------------------------------------------------------------------- | | | | ---------------------------------------------------------------------------------------------------
- only freshest data that has been loaded to RATING_EVENTS table will exist in the STREAM_A stream,
- when reading from a stream (STREAM_A) and inserting the data somewhere else (MAIN_TABLE), it's getting cleared out,
- that way we can be inserting to the MAIN_TABLE and we will not ever insert any duplicated data,
- this is what the stream is, to catch the delta (newly-inserted data) being a temporary placeholder for it, and inserting that delta into destination table getting stram cleared out,
- very powerful where there is a non-stop stream of the data and there is no way to idnetify any new record (no timestamp).
- each record in the stream has its own metadata row id. - When updating a record in a main table here is what happens in the related stream:
- two records appear in the stream - first record with old value(s) labeled as DELETE, second record with new value(s) labeled as INSERT.
- When I make another update but with yet another new value (different then preciously) then stream gets two records - first record with old original value(s) labeled as DELETE, second record with the latest update value(s).
- When I make another update but reverting to old value(s) (the same as it was or they were originally) then stream gets cleared out from that records.
Tasks:
- Object that can schedule a SQL statement to be automtically executed as a recurring event.
- Task can either execute a single SQL or call to a stored procedure.
- There is no any event that triggers a task as it runs relying on the schedule:
CREATE OR REPLACE TASK EVENT_RATINGS_TASK WAREHOUSE = My_Warehouse SCHEDULE = '5 minute' // we can set it to every minute even WHEN SYSTEM$STREAM_HAS_DATA('STREAM_A') // checking if stream has data AS INSERT INTO MAIN_TABLE(EVENT_ID, EVENT_TITLE, RATING)( SELECT STREAM_A.RAW_DATA:"event_id" AS EVENT_ID Random_title' AS EVENT_TITLE STREAM_A.RAW_DATA:"rating" AS RATING FROM STREAM_A ); ALTER TASK EVENT_RATINGS_TASK RESUME;
- LineSYSTEM$STREAM_HAS_DATA('STREAM_A')
checks if stream has data - if so then the task gets trigered.
- When the task is run, it executes insert query.
- When tasks are created, they are off - we need to resume them to activet it. - Also it can be run in response to another task finishing - so tasks can be chained together:
CREATE OR REPLACE TASK TASK_2 WAREHOUSE = My_Warehouse AFTER TASK_1 AS INSERT INTO MAIN_TABLE(EVENT_ID, EVENT_TITLE, RATING) SELECT * FROM my_table;
- this way we can create a sequence of tasks by specifing the exisitng TASK_1 as the predecessor that triggers TASK_2 if run successfully.
Parse JSON
- Parsing JSON doesn't work in the standard way:
INSERT INTO table1 (id, varchar1, variant1) VALUES (4, 'Fourier', parse_json('{ "key1": "value1", "key2": "value2" }'));
- it has to be implemented with SELECT query instead:INSERT INTO table1 (id, varchar1, variant1) SELECT 4, 'Fourier', parse_json('{ "key1": "value1", "key2": "value2" }');
Example integration with Amazon services:
- We can build the entire process flow of ETL within AWS and loading processed data into Snowflake table:
1. S3 (raw data).
2. Lambda function (ETL).
3. S3 (processed data).
4. Event notification (triggers data pipe).
4. Snowpipe (loads data into db's table).
5. Snowflake table. - At best, S3 reagion should be as the same as Snowflake account.
- We can keep all the code along with dependencies (modules) in the another s3 so that we can run it as lambda function.
The whole package should be encapsulated in a zip file. - We need to set up the trust relationship between our AWS account and Snowflake's AWS account:
- with trust relationship, Snowflake can use or assume a role to access S3 data,
- the role that we stay in the full controll of in AWS - we allow another AWS account to use that role,
- due to Amazon's documentation:
You can use IAM roles to delegate access to your AWS resources. With IAM roles, you can establish trust relationships between your trusting account and other AWS trusted accounts. The trusting account owns the resource to be accessed and the trusted account contains the users who need access to the resource (...) the trusting account might allow the trusted account to create new resources, such as creating new objects in an Amazon S3 bucket (...) After you create the trust relationship, an IAM user or an application from the trusted account can use the AWS Security Token Service (AWS STS). This operation provides temporary security credentials that enable access to AWS resources.
Integration with SAP
- Here is the whole integration from system flat files into data visualization:
I. SAP:
   1. Proxy.
II. VPC - AWS account:
   1. AWS Family Transfer SFTP server (SFTP users or each connection).
   2. IAM write role (one for all buckets).
   4. KMS encryption (individual for each bucket).
   5. S3 buckets.
   6. SQS notification (event of new file).
   7. KMS encryption (individual for each bucket).
   8. IAM read role (individual for each bucket).
III. Snowflake AWS account:
   1. IAM read role from Trust Relationship.
   2. Storage Integration object.
   3. Stage object.
   4. Snowpipe.
   5. Table object.