Snowflake Streams and Tasks: Your Path to Real-Time Data Excellence

Introduction:

In today’s fast-paced world of data analytics and cloud computing, businesses and data professionals need solutions that enable them to efficiently process, analyze, and manage data. Snowflake, a leading cloud data platform, has been at the forefront of providing innovative tools and features to meet these demands. Two such features that have gained prominence are Snowflake Tasks and Streams.

In this comprehensive guide, we will delve deep into the world of Snowflake Tasks and Streams, exploring what they are, how they work, and how they can be harnessed to enhance your data workflows. Whether you’re a data engineer, analyst, or business decision-maker, understanding these powerful Snowflake features can revolutionize the way you handle data, from real-time data ingestion to automation of routine tasks.

Let’s embark on a journey to unlock the full potential of Snowflake Tasks and Streams and discover how they can empower your data initiatives.

Prerequisite:

  1. Snowflake Account: You need access to a Snowflake account or instance. If you don’t have one, you’ll need to sign up for a Snowflake account.
  2. Database and Warehouse: You should have a Snowflake database and a virtual warehouse created. These are the foundational components where your data and tasks will operate.
  3. Table(s): You’ll need one or more tables in your Snowflake database to capture changes in data. Streams are typically created on existing tables.
Example:

Imagine you’re managing an e-commerce platform, and you want to monitor real-time sales data and generate the hourly sale report.

Sample sales table

Use Account Admin Role

–Use your own database name instead of DEV
USE DATABASE DEV;

USE ROLE ACCOUNTADMIN;

Let’s Create sales table with sample data

USE SCHEMA RAW_DATA;

— Create the sales table
CREATE OR REPLACE TABLE sales (
sale_id INT AUTOINCREMENT PRIMARY KEY,
order_date TIMESTAMP,
product_name STRING,
sales_amount FLOAT
);

— Insert sample data into the sales table
INSERT INTO sales (order_date, product_name, sales_amount)
VALUES
(‘2023-09-10 10:00:00’, ‘Product A’, 100.00),
(‘2023-09-10 10:15:00’, ‘Product B’, 75.50),
(‘2023-09-10 10:30:00’, ‘Product A’, 120.75),
(‘2023-09-10 10:45:00’, ‘Product C’, 50.25),
(‘2023-09-10 10:15:00’, ‘Product B’, 85.00),
(‘2023-09-10 10:30:00’, ‘Product A’, 110.50);

— Check the sales table
SELECT * FROM sales;

Reference:

Sales data

Snowflake Streams: The Real-Time Data Channel

Before we dive into Snowflake Tasks, let’s start with Snowflake Streams. Streams in Snowflake are continuous, ordered sequences of changes (inserts, updates, or deletes) made to one or more tables in a database. They act as real-time data channels, capturing data changes as they occur, making it easier to track and react to real-time events.

Creating a Stream

— Create a stream for the sales table
CREATE OR REPLACE STREAM sales_stream
ON TABLE sales;

— Check the stream metadata
SHOW STREAMS;

Snowflake Tasks: Automating Real-Time Workflows

Snowflake Tasks are automated workflows that can be scheduled to run based on triggers or a defined schedule. You can use Snowflake Tasks to perform actions based on the data changes captured by Streams. For example, you can use Tasks to update a dashboard, send notifications, or perform ETL (Extract, Transform, Load) operations in response to real-time events.

Creating a Task

To create a Snowflake Task, you need to define the SQL statement(s) you want to execute when the task is triggered. Here’s an example of creating a simple task:

Let’s create reporting table before creating task

— Create the hourly_sales_report table
CREATE OR REPLACE TABLE hourly_sales_report (
report_date DATE,
report_hour TIMESTAMP,
total_sales FLOAT
);

Task:

CREATE OR REPLACE TASK load_hourly_sales_data
WAREHOUSE = ‘COMPUTE_WH’ –Specify your warehouse
SCHEDULE = ’60 MINUTE’ — Specify the frequency of task to run
COMMENT = ‘Analyzes new feedback entries in real-time’
AS
— SQL script for real-time analysis
BEGIN
INSERT INTO hourly_sales_report (report_date, report_hour, total_sales)
SELECT
DATE_TRUNC(DAY, CURRENT_TIMESTAMP()) AS report_date,
DATE_TRUNC(HOUR, CURRENT_TIMESTAMP()) AS report_hour,
SUM(sales_amount) AS total_sales
FROM sales_stream;
END;

By default, Snowflake tasks are created in the SUSPEND state. To execute the task and trigger the script, you need to explicitly change its state to RESUME. You can do this using the ALTER TASK statement. Here’s how to alter your task to resume it:

— Enable the task
ALTER TASK load_hourly_sales_data RESUME;

Let’s test the flow by inserting some record to the sales table

 INSERT INTO sales (order_date, product_name, sales_amount)
VALUES
(‘2023-09-10 11:00:00’, ‘Product A’, 200.00),
(‘2023-09-10 11:15:00’, ‘Product B’, 750.50);

To check the Change is capture in stream execute the below query

SELECT * FROM sales_stream;

CDC stream data

Sales report table

SELECT * FROM hourly_sales_report;

Monitoring:

Below query will show the tasks current status.

show tasks like ‘%load_hourly_sales_data%’ in dev.raw_data;

Below query will provide access to your task history and scheduled runs, allowing you to keep a close eye on error statuses and performance.

SELECT *
FROM TABLE(INFORMATION_SCHEMA.TASK_HISTORY(
scheduled_time_range_start => DATEADD(HOUR, -1, CURRENT_TIMESTAMP())
));

Conclusion:

Snowflake Streams and Tasks offer a powerful combination for real-time data processing and automation. By capturing data changes in Streams and triggering Tasks based on those changes, you can build dynamic and responsive data workflows that keep your organization informed and agile in an increasingly fast-paced data landscape.

Whether it’s real-time analytics, automated notifications, or ETL processes, Snowflake provides the tools to make it happen, opening up a world of possibilities for real-time data processing.

Source: https://medium.com/bi3-technologies/snowflake-streams-and-tasks-your-path-to-real-time-data-excellence-fd2d2b1ca63a

Share

Leave a Reply

Your email address will not be published. Required fields are marked *

Shahnewaz Khan

10 years of experience with BI and Analytics delivery.

Shahnewaz is a technically minded and accomplished Data management and technology leader with over 19 years’ experience in Data and Analytics.

Including;

  • Data Science
  • Strategic transformation
  • Delivery management
  • Data strategy
  • Artificial intelligence
  • Machine learning
  • Big data
  • Cloud transformation
  • Data governance. 


Highly skilled in developing and executing effective data strategies, conducting operational analysis, revamping technical systems, maintaining smooth workflow, operating model design and introducing change to organisational programmes. A proven leader with remarkable efficiency in building and leading cross-functional, cross-region teams & implementing training programmes for performance optimisation. 


Thiru Ps

Solution/ Data/ Technical / Cloud Architect

Thiru has 15+ years experience in the business intelligence community and has worked in a number of roles and environments that have positioned him to confidently speak about advancements in corporate strategy, analytics, data warehousing, and master data management. Thiru loves taking a leadership role in technology architecture always seeking to design solutions that meet operational requirements, leveraging existing operations, and innovating data integration and extraction solutions.

Thiru’s experience covers;

  • Database integration architecture
  • Big data
  • Hadoop
  • Software solutions
  • Data analysis, analytics, and quality. 
  • Global markets

 

In addition, Thiru is particularly equipped to handle global market shifts and technology advancements that often limit or paralyse corporations having worked in the US, Australia and India.