Implementing Incremental Loading of Data Using Streams and Merge in Snowflake

Understanding Incremental Loading

Incremental loading, the process of updating a target table with only the newest or modified data since the last load, provides several advantages over full data loads:

  • Reduced processing time: Only processing new data significantly improves performance compared to full loads.
  • Reduced network bandwidth: Transferring only new data decreases the bandwidth required for data transfer.
  • Lower resource consumption: Processing smaller datasets reduces the resources required for storage and processing.

Understanding Stream and Merge

Stream:

  1. In Snowflake, Streams are objects that capture changes to a table, such as inserts, updates, and deletes. They provide a reliable mechanism for tracking modifications to the data, which is essential for implementing incremental loading.
  2. Three additional columns are added to the source table in a stream:
    • METADATA$ACTION: It may have only two values Insert/Delete
    • METADATA$ISUPDATE: This will be flagged as True if the record is an updated
    • METADATA$ROW_ID: There are unique hash keys that will be tracked against each change.

Merge: This command takes two inputs: a target table and a stream of data. It compares the data in the stream with the data in the target table based on a specified join condition and performs the following actions:

  1. Inserts new rows from the source into the target table.
  2. Updates existing rows in the target table that have been modified in the source.
  3. Deletes rows from the target table that no longer exist in the source table.

Steps for Implementing Incremental Loading

  1. Create a source table and a staging table. The source table will contain the original data, while the staging table will hold the new or changed data identified by Stream.
  2. Create a stream on the source table. This stream will capture all changes made to the source table.
  3. Set up a Merge statement. The Merge statement will join the stream with the target table, identify the differences, and perform the necessary actions on the target table.

Let’s see the how we can implement incremental load using Streams and merge command with an example:

1. Create Source and Target tables as shown below. Here I am using RandomTrees as database and Sales as Information schema.

---- cretaing source and target tables -----
CREATE OR REPLACE TABLE SALES(
sale_id INT PRIMARY KEY,
sale_date DATE,
customer_id INT,
product_id INT,
quantity INT,
total_amount DECIMAL(10,2)
);

CREATE OR REPLACE TABLE SALES_TARGET(
sale_id INT PRIMARY KEY,
sale_date DATE,
customer_id INT,
product_id INT,
quantity INT,
total_amount DECIMAL(10,2),
inserted_date TIMESTAMP,
updated_date TIMESTAMP
);

2. Create Stream object on top of Source table i.e. Sales using below query

CREATE OR REPLACE STREAM SALES_STREAM ON TABLE SALES;

3. Now implement the following merge command to implement incremental load using SCD type-1 concept and stream. The Merge command zero rows inserted/deleted/updated as there is no data in stream yet.

INSERT INTO sales (sale_id, sale_date, customer_id, product_id, quantity, total_amount) VALUES
(1, '2024-04-22', 1, 101, 2, 50.00),
(2, '2024-04-22', 2, 102, 1, 30.50), 
(3,'2024-04-23', 1, 103, 3, 75.30), 
(4,'2024-04-23', 3, 101, 1, 25.00), 
(5,'2024-04-24', 2, 104, 2, 46.80); 
select * from sales_stream;

4. Let’s insert 5 rows into source table and see how stream gets data loaded.

5. Now execute merge command to see whether these 5 records will be inserted or not into target table.

update sales set total_amount = 48.80 where sale_id =5;
DELETE from sales where sale_id = 1;
select * from sales_stream;

6. Let’s see whether updated and deleted records in source will be replicated or not in target table.

7. Now let’s verify the concept with insert and update commands.

update sales set total_amount = 40.50 where product_id= 102;
INSERT INTO sales (sale_id, sale_date, customer_id, product_id, quantity, total_amount) VALUES
(6,'2024-04-20', 4, 105, 1, 15.1),
(7,'2024-04-20', 4, 104, 1, 23.4);
select * from sales_stream;

Here are some best practices for implementing incremental loading in Snowflake:

  • Identify a unique identifier: Choose a unique column in the source data that can be used to identify records and track changes.
  • Implement data quality checks: Before loading data, perform data quality checks to ensure accuracy and consistency.
  • Optimize performance: Use features like clustering and materialized views to optimize the performance of your incremental loading process.
  • Monitor and troubleshoot: Regularly monitor your incremental loading pipelines and troubleshoot any issues that arise.
  • By following these guidelines and using the STREAM and MERGE functions effectively, you can achieve efficient and reliable incremental loading in Snowflake.

Conclusion:

Snowflake’s Stream and Merge features offer powerful tools for implementing efficient incremental loading strategies. This approach can significantly improve performance and reduce the overall processing time, making it ideal for managing large, frequently changing datasets.