The prefect-snowflake integration makes it easy to connect to Snowflake in your Prefect flows. You can run queries both synchronously and asynchronously as Prefect flows and tasks.

Getting started

Prerequisites

Installation

Install prefect-snowflake as a dependency of Prefect. If you don’t already have Prefect installed, it will install the newest version of prefect as well.

pip install "prefect[snowflake]"

Upgrade to the latest versions of prefect and prefect-snowflake:

pip install -U "prefect[snowflake]"

Blocks setup

The prefect-snowflake integration has two blocks: one for storing credentials and one for storing connection information. Register blocks in this module to view and edit them on Prefect Cloud:

prefect block register -m prefect_snowflake

Create the credentials block

Below is a walkthrough on saving a SnowflakeCredentials block through code. Log into your Snowflake account to find your credentials. The example below uses a user and password combination, but refer to the SDK documentation for a full list of authentication and connection options.

from prefect_snowflake import SnowflakeCredentials

credentials = SnowflakeCredentials(
    account="ACCOUNT-PLACEHOLDER",  # resembles nh12345.us-east-2.snowflake
    user="USER-PLACEHOLDER",
    password="PASSWORD-PLACEHOLDER"
)
credentials.save("CREDENTIALS-BLOCK-NAME-PLACEHOLDER")

Create the connection block

Then, to create a SnowflakeConnector block:

  1. After logging in, click on any worksheet.
  2. On the left side, select a database and schema.
  3. On the top right, select a warehouse.
  4. Create a short script, replacing the placeholders below.
from prefect_snowflake import SnowflakeCredentials, SnowflakeConnector

credentials = SnowflakeCredentials.load("CREDENTIALS-BLOCK-NAME-PLACEHOLDER")

connector = SnowflakeConnector(
    credentials=credentials,
    database="DATABASE-PLACEHOLDER",
    schema="SCHEMA-PLACEHOLDER",
    warehouse="COMPUTE_WH",
)
connector.save("CONNECTOR-BLOCK-NAME-PLACEHOLDER")

You can now easily load the saved block, which holds your credentials and connection info:

from prefect_snowflake import SnowflakeCredentials, SnowflakeConnector

SnowflakeConnector.load("CONNECTOR-BLOCK-NAME-PLACEHOLDER")

Examples

To set up a table, use the execute and execute_many methods. Then, use the fetch_all method. If the results are too large to fit into memory, use the fetch_many method to retrieve data in chunks.

By using the SnowflakeConnector as a context manager, you can make sure that the Snowflake connection and cursors are closed properly after you’re done with them.

from prefect import flow, task
from prefect_snowflake import SnowflakeConnector


@task
def setup_table(block_name: str) -> None:
    with SnowflakeConnector.load(block_name) as connector:
        connector.execute(
            "CREATE TABLE IF NOT EXISTS customers (name varchar, address varchar);"
        )
        connector.execute_many(
            "INSERT INTO customers (name, address) VALUES (%(name)s, %(address)s);",
            seq_of_parameters=[
                {"name": "Ford", "address": "Highway 42"},
                {"name": "Unknown", "address": "Space"},
                {"name": "Me", "address": "Myway 88"},
            ],
        )

@task
def fetch_data(block_name: str) -> list:
    all_rows = []
    with SnowflakeConnector.load(block_name) as connector:
        while True:
            # Repeated fetch* calls using the same operation will
            # skip re-executing and instead return the next set of results
            new_rows = connector.fetch_many("SELECT * FROM customers", size=2)
            if len(new_rows) == 0:
                break
            all_rows.append(new_rows)
    return all_rows

@flow
def snowflake_flow(block_name: str) -> list:
    setup_table(block_name)
    all_rows = fetch_data(block_name)
    return all_rows


if __name__=="__main__":
    snowflake_flow()

If the native methods of the block don’t meet your requirements, don’t worry. You have the option to access the underlying Snowflake connection and utilize its built-in methods as well.

import pandas as pd
from prefect import flow
from prefect_snowflake.database import SnowflakeConnector
from snowflake.connector.pandas_tools import write_pandas


@flow
def snowflake_write_pandas_flow():
    connector = SnowflakeConnector.load("my-block")
    with connector.get_connection() as connection:
        table_name = "TABLE_NAME"
        ddl = "NAME STRING, NUMBER INT"
        statement = f'CREATE TABLE IF NOT EXISTS {table_name} ({ddl})'
        with connection.cursor() as cursor:
            cursor.execute(statement)

        # case sensitivity matters here!
        df = pd.DataFrame([('Marvin', 42), ('Ford', 88)], columns=['NAME', 'NUMBER'])
        success, num_chunks, num_rows, _ = write_pandas(
            conn=connection,
            df=df,
            table_name=table_name,
            database=snowflake_connector.database,
            schema=snowflake_connector.schema_  # note the "_" suffix
        )

Resources

Refer to the prefect-snowflake SDK documentation to explore other capabilities of the prefect-snowflake library, such as async methods.

For further assistance using Snowflake, consult the Snowflake documentation or the Snowflake Python Connector documentation.