Snowflake Development
Snowflake SQL, data pipelines, Cortex AI, and Snowpark Python development. Covers the colon-prefix rule, semi-structured data, MERGE upserts, Dynamic Tables, Streams+Tasks, Cortex AI functions, agent specs, performance tuning, and security hardening.
Originally contributed by James Cha-Earley — enhanced and integrated by the claude-skills team.
Quick Start
# Generate a MERGE upsert template
python scripts/snowflake_query_helper.py merge --target customers --source staging_customers --key customer_id --columns name,email,updated_at
# Generate a Dynamic Table template
python scripts/snowflake_query_helper.py dynamic-table --name cleaned_events --warehouse transform_wh --lag "5 minutes"
# Generate RBAC grant statements
python scripts/snowflake_query_helper.py grant --role analyst_role --database analytics --schemas public,staging --privileges SELECT,USAGE
SQL Best Practices
Naming and Style
- Use
snake_casefor all identifiers. Avoid double-quoted identifiers -- they force case-sensitive names that require constant quoting. - Use CTEs (
WITHclauses) over nested subqueries. - Use
CREATE OR REPLACEfor idempotent DDL. - Use explicit column lists -- never
SELECT *in production. Snowflake's columnar storage scans only referenced columns, so explicit lists reduce I/O.
Stored Procedures -- Colon Prefix Rule
In SQL stored procedures (BEGIN...END blocks), variables and parameters must use the colon : prefix inside SQL statements. Without it, Snowflake treats them as column identifiers and raises "invalid identifier" errors.
-- WRONG: missing colon prefix
SELECT name INTO result FROM users WHERE id = p_id;
-- CORRECT: colon prefix on both variable and parameter
SELECT name INTO :result FROM users WHERE id = :p_id;
This applies to DECLARE variables, LET variables, and procedure parameters when used inside SELECT, INSERT, UPDATE, DELETE, or MERGE.
Semi-Structured Data
- VARIANT, OBJECT, ARRAY for JSON/Avro/Parquet/ORC.
- Access nested fields:
src:customer.name::STRING. Always cast with::TYPE. - VARIANT null vs SQL NULL: JSON
nullis stored as the string"null". UseSTRIP_NULL_VALUE = TRUEon load. - Flatten arrays:
SELECT f.value:name::STRING FROM my_table, LATERAL FLATTEN(input => src:items) f;
MERGE for Upserts
MERGE INTO target t USING source s ON t.id = s.id
WHEN MATCHED THEN UPDATE SET t.name = s.name, t.updated_at = CURRENT_TIMESTAMP()
WHEN NOT MATCHED THEN INSERT (id, name, updated_at) VALUES (s.id, s.name, CURRENT_TIMESTAMP());
See
references/snowflake_sql_and_pipelines.mdfor deeper SQL patterns and anti-patterns.
Data Pipelines
Choosing Your Approach
| Approach | When to Use |
|---|---|
| Dynamic Tables | Declarative transformations. Default choice. Define the query, Snowflake handles refresh. |
| Streams + Tasks | Imperative CDC. Use for procedural logic, stored procedure calls, complex branching. |
| Snowpipe | Continuous file loading from cloud storage (S3, GCS, Azure). |
Dynamic Tables
CREATE OR REPLACE DYNAMIC TABLE cleaned_events
TARGET_LAG = '5 minutes'
WAREHOUSE = transform_wh
AS
SELECT event_id, event_type, user_id, event_timestamp
FROM raw_events
WHERE event_type IS NOT NULL;
Key rules:
- Set
TARGET_LAGprogressively: tighter at the top of the DAG, looser downstream. - Incremental DTs cannot depend on Full-refresh DTs.
SELECT *breaks on upstream schema changes -- use explicit column lists.- Views cannot sit between two Dynamic Tables in the DAG.
Streams and Tasks
CREATE OR REPLACE STREAM raw_stream ON TABLE raw_events;
CREATE OR REPLACE TASK process_events
WAREHOUSE = transform_wh
SCHEDULE = 'USING CRON 0 */1 * * * America/Los_Angeles'
WHEN SYSTEM$STREAM_HAS_DATA('raw_stream')
AS INSERT INTO cleaned_events SELECT ... FROM raw_stream;
-- Tasks start SUSPENDED. You MUST resume them.
ALTER TASK process_events RESUME;
See
references/snowflake_sql_and_pipelines.mdfor DT debugging queries and Snowpipe patterns.
Cortex AI
Function Reference
| Function | Purpose |
|---|---|
AI_COMPLETE | LLM completion (text, images, documents) |
AI_CLASSIFY | Classify text into categories (up to 500 labels) |
AI_FILTER | Boolean filter on text or images |
AI_EXTRACT | Structured extraction from text/images/documents |
AI_SENTIMENT | Sentiment score (-1 to 1) |
AI_PARSE_DOCUMENT | OCR or layout extraction from documents |
AI_REDACT | PII removal from text |
Deprecated names (do NOT use): COMPLETE, CLASSIFY_TEXT, EXTRACT_ANSWER, PARSE_DOCUMENT, SUMMARIZE, TRANSLATE, SENTIMENT, EMBED_TEXT_768.
TO_FILE -- Common Pitfall
Stage path and filename are separate arguments:
-- WRONG: single combined argument
TO_FILE('@stage/file.pdf')
-- CORRECT: two arguments
TO_FILE('@db.schema.mystage', 'invoice.pdf')
Cortex Agents
Agent specs use a JSON structure with top-level keys: models, instructions, tools, tool_resources.
- Use
$spec$delimiter (not$$). modelsmust be an object, not an array.tool_resourcesis a separate top-level key, not nested insidetools.- Tool descriptions are the single biggest factor in agent quality.
See
references/cortex_ai_and_agents.mdfor full agent spec examples and Cortex Search patterns.
Snowpark Python
from snowflake.snowpark import Session
import os
session = Session.builder.configs({
"account": os.environ["SNOWFLAKE_ACCOUNT"],
"user": os.environ["SNOWFLAKE_USER"],
"password": os.environ["SNOWFLAKE_PASSWORD"],
"role": "my_role", "warehouse": "my_wh",
"database": "my_db", "schema": "my_schema"
}).create()
- Never hardcode credentials. Use environment variables or key pair auth.
- DataFrames are lazy -- executed on
collect()/show(). - Do NOT call
collect()on large DataFrames. Process server-side with DataFrame operations. - Use vectorized UDFs (10-100x faster) for batch and ML workloads.
dbt on Snowflake
-- Dynamic table materialization (streaming/near-real-time marts):
{{ config(materialized='dynamic_table', snowflake_warehouse='transforming', target_lag='1 hour') }}
-- Incremental materialization (large fact tables):
{{ config(materialized='incremental', unique_key='event_id') }}
-- Snowflake-specific configs (combine with any materialization):
{{ config(transient=true, copy_grants=true, query_tag='team_daily') }}
- Do NOT use
{{ this }}without{% if is_incremental() %}guard. - Use
dynamic_tablematerialization for streaming or near-real-time marts.
Performance
- Cluster keys: Only for multi-TB tables. Apply on WHERE / JOIN / GROUP BY columns.
- Search Optimization:
ALTER TABLE t ADD SEARCH OPTIMIZATION ON EQUALITY(col); - Warehouse sizing: Start X-Small, scale up. Set
AUTO_SUSPEND = 60,AUTO_RESUME = TRUE. - Separate warehouses per workload (load, transform, query).
Security
- Follow least-privilege RBAC. Use database roles for object-level grants.
- Audit ACCOUNTADMIN regularly:
SHOW GRANTS OF ROLE ACCOUNTADMIN; - Use network policies for IP allowlisting.
- Use masking policies for PII columns and row access policies for multi-tenant isolation.
Proactive Triggers
Surface these issues without being asked when you notice them in context:
-
Missing colon prefix in SQL stored procedures -- flag immediately, this causes "invalid identifier" at runtime.
-
SELECT *in Dynamic Tables -- flag as a schema-change time bomb. -
Deprecated Cortex function names (
CLASSIFY_TEXT,SUMMARIZE, etc.) -- suggest the currentAI_*equivalents. -
Task not resumed after creation -- remind that tasks start SUSPENDED.
-
Hardcoded credentials in Snowpark code -- flag as a security risk.