wshobson / dbt-transformation-patterns
Install for your project team
Run this command in your project directory to install the skill for your entire team:
mkdir -p .claude/skills/dbt-transformation-patterns && curl -L -o skill.zip "https://fastmcp.me/Skills/Download/452" && unzip -o skill.zip -d .claude/skills/dbt-transformation-patterns && rm skill.zip
Project Skills
This skill will be saved in .claude/skills/dbt-transformation-patterns/ and checked into git. All team members will have access to it automatically.
Important: Please verify the skill by reviewing its instructions before using it.
Master dbt (data build tool) for analytics engineering with model organization, testing, documentation, and incremental strategies. Use when building data transformations, creating data models, or implementing analytics engineering best practices.
2 views
0 installs
Skill Content
---
name: dbt-transformation-patterns
description: Master dbt (data build tool) for analytics engineering with model organization, testing, documentation, and incremental strategies. Use when building data transformations, creating data models, or implementing analytics engineering best practices.
---
# dbt Transformation Patterns
Production-ready patterns for dbt (data build tool) including model organization, testing strategies, documentation, and incremental processing.
## When to Use This Skill
- Building data transformation pipelines with dbt
- Organizing models into staging, intermediate, and marts layers
- Implementing data quality tests
- Creating incremental models for large datasets
- Documenting data models and lineage
- Setting up dbt project structure
## Core Concepts
### 1. Model Layers (Medallion Architecture)
```
sources/ Raw data definitions
↓
staging/ 1:1 with source, light cleaning
↓
intermediate/ Business logic, joins, aggregations
↓
marts/ Final analytics tables
```
### 2. Naming Conventions
| Layer | Prefix | Example |
|-------|--------|---------|
| Staging | `stg_` | `stg_stripe__payments` |
| Intermediate | `int_` | `int_payments_pivoted` |
| Marts | `dim_`, `fct_` | `dim_customers`, `fct_orders` |
## Quick Start
```yaml
# dbt_project.yml
name: 'analytics'
version: '1.0.0'
profile: 'analytics'
model-paths: ["models"]
analysis-paths: ["analyses"]
test-paths: ["tests"]
seed-paths: ["seeds"]
macro-paths: ["macros"]
vars:
start_date: '2020-01-01'
models:
analytics:
staging:
+materialized: view
+schema: staging
intermediate:
+materialized: ephemeral
marts:
+materialized: table
+schema: analytics
```
```
# Project structure
models/
├── staging/
│ ├── stripe/
│ │ ├── _stripe__sources.yml
│ │ ├── _stripe__models.yml
│ │ ├── stg_stripe__customers.sql
│ │ └── stg_stripe__payments.sql
│ └── shopify/
│ ├── _shopify__sources.yml
│ └── stg_shopify__orders.sql
├── intermediate/
│ └── finance/
│ └── int_payments_pivoted.sql
└── marts/
├── core/
│ ├── _core__models.yml
│ ├── dim_customers.sql
│ └── fct_orders.sql
└── finance/
└── fct_revenue.sql
```
## Patterns
### Pattern 1: Source Definitions
```yaml
# models/staging/stripe/_stripe__sources.yml
version: 2
sources:
- name: stripe
description: Raw Stripe data loaded via Fivetran
database: raw
schema: stripe
loader: fivetran
loaded_at_field: _fivetran_synced
freshness:
warn_after: {count: 12, period: hour}
error_after: {count: 24, period: hour}
tables:
- name: customers
description: Stripe customer records
columns:
- name: id
description: Primary key
tests:
- unique
- not_null
- name: email
description: Customer email
- name: created
description: Account creation timestamp
- name: payments
description: Stripe payment transactions
columns:
- name: id
tests:
- unique
- not_null
- name: customer_id
tests:
- not_null
- relationships:
to: source('stripe', 'customers')
field: id
```
### Pattern 2: Staging Models
```sql
-- models/staging/stripe/stg_stripe__customers.sql
with source as (
select * from {{ source('stripe', 'customers') }}
),
renamed as (
select
-- ids
id as customer_id,
-- strings
lower(email) as email,
name as customer_name,
-- timestamps
created as created_at,
-- metadata
_fivetran_synced as _loaded_at
from source
)
select * from renamed
```
```sql
-- models/staging/stripe/stg_stripe__payments.sql
{{
config(
materialized='incremental',
unique_key='payment_id',
on_schema_change='append_new_columns'
)
}}
with source as (
select * from {{ source('stripe', 'payments') }}
{% if is_incremental() %}
where _fivetran_synced > (select max(_loaded_at) from {{ this }})
{% endif %}
),
renamed as (
select
-- ids
id as payment_id,
customer_id,
invoice_id,
-- amounts (convert cents to dollars)
amount / 100.0 as amount,
amount_refunded / 100.0 as amount_refunded,
-- status
status as payment_status,
-- timestamps
created as created_at,
-- metadata
_fivetran_synced as _loaded_at
from source
)
select * from renamed
```
### Pattern 3: Intermediate Models
```sql
-- models/intermediate/finance/int_payments_pivoted_to_customer.sql
with payments as (
select * from {{ ref('stg_stripe__payments') }}
),
customers as (
select * from {{ ref('stg_stripe__customers') }}
),
payment_summary as (
select
customer_id,
count(*) as total_payments,
count(case when payment_status = 'succeeded' then 1 end) as successful_payments,
sum(case when payment_status = 'succeeded' then amount else 0 end) as total_amount_paid,
min(created_at) as first_payment_at,
max(created_at) as last_payment_at
from payments
group by customer_id
)
select
customers.customer_id,
customers.email,
customers.created_at as customer_created_at,
coalesce(payment_summary.total_payments, 0) as total_payments,
coalesce(payment_summary.successful_payments, 0) as successful_payments,
coalesce(payment_summary.total_amount_paid, 0) as lifetime_value,
payment_summary.first_payment_at,
payment_summary.last_payment_at
from customers
left join payment_summary using (customer_id)
```
### Pattern 4: Mart Models (Dimensions and Facts)
```sql
-- models/marts/core/dim_customers.sql
{{
config(
materialized='table',
unique_key='customer_id'
)
}}
with customers as (
select * from {{ ref('int_payments_pivoted_to_customer') }}
),
orders as (
select * from {{ ref('stg_shopify__orders') }}
),
order_summary as (
select
customer_id,
count(*) as total_orders,
sum(total_price) as total_order_value,
min(created_at) as first_order_at,
max(created_at) as last_order_at
from orders
group by customer_id
),
final as (
select
-- surrogate key
{{ dbt_utils.generate_surrogate_key(['customers.customer_id']) }} as customer_key,
-- natural key
customers.customer_id,
-- attributes
customers.email,
customers.customer_created_at,
-- payment metrics
customers.total_payments,
customers.successful_payments,
customers.lifetime_value,
customers.first_payment_at,
customers.last_payment_at,
-- order metrics
coalesce(order_summary.total_orders, 0) as total_orders,
coalesce(order_summary.total_order_value, 0) as total_order_value,
order_summary.first_order_at,
order_summary.last_order_at,
-- calculated fields
case
when customers.lifetime_value >= 1000 then 'high'
when customers.lifetime_value >= 100 then 'medium'
else 'low'
end as customer_tier,
-- timestamps
current_timestamp as _loaded_at
from customers
left join order_summary using (customer_id)
)
select * from final
```
```sql
-- models/marts/core/fct_orders.sql
{{
config(
materialized='incremental',
unique_key='order_id',
incremental_strategy='merge'
)
}}
with orders as (
select * from {{ ref('stg_shopify__orders') }}
{% if is_incremental() %}
where updated_at > (select max(updated_at) from {{ this }})
{% endif %}
),
customers as (
select * from {{ ref('dim_customers') }}
),
final as (
select
-- keys
orders.order_id,
customers.customer_key,
orders.customer_id,
-- dimensions
orders.order_status,
orders.fulfillment_status,
orders.payment_status,
-- measures
orders.subtotal,
orders.tax,
orders.shipping,
orders.total_price,
orders.total_discount,
orders.item_count,
-- timestamps
orders.created_at,
orders.updated_at,
orders.fulfilled_at,
-- metadata
current_timestamp as _loaded_at
from orders
left join customers on orders.customer_id = customers.customer_id
)
select * from final
```
### Pattern 5: Testing and Documentation
```yaml
# models/marts/core/_core__models.yml
version: 2
models:
- name: dim_customers
description: Customer dimension with payment and order metrics
columns:
- name: customer_key
description: Surrogate key for the customer dimension
tests:
- unique
- not_null
- name: customer_id
description: Natural key from source system
tests:
- unique
- not_null
- name: email
description: Customer email address
tests:
- not_null
- name: customer_tier
description: Customer value tier based on lifetime value
tests:
- accepted_values:
values: ['high', 'medium', 'low']
- name: lifetime_value
description: Total amount paid by customer
tests:
- dbt_utils.expression_is_true:
expression: ">= 0"
- name: fct_orders
description: Order fact table with all order transactions
tests:
- dbt_utils.recency:
datepart: day
field: created_at
interval: 1
columns:
- name: order_id
tests:
- unique
- not_null
- name: customer_key
tests:
- not_null
- relationships:
to: ref('dim_customers')
field: customer_key
```
### Pattern 6: Macros and DRY Code
```sql
-- macros/cents_to_dollars.sql
{% macro cents_to_dollars(column_name, precision=2) %}
round({{ column_name }} / 100.0, {{ precision }})
{% endmacro %}
-- macros/generate_schema_name.sql
{% macro generate_schema_name(custom_schema_name, node) %}
{%- set default_schema = target.schema -%}
{%- if custom_schema_name is none -%}
{{ default_schema }}
{%- else -%}
{{ default_schema }}_{{ custom_schema_name }}
{%- endif -%}
{% endmacro %}
-- macros/limit_data_in_dev.sql
{% macro limit_data_in_dev(column_name, days=3) %}
{% if target.name == 'dev' %}
where {{ column_name }} >= dateadd(day, -{{ days }}, current_date)
{% endif %}
{% endmacro %}
-- Usage in model
select * from {{ ref('stg_orders') }}
{{ limit_data_in_dev('created_at') }}
```
### Pattern 7: Incremental Strategies
```sql
-- Delete+Insert (default for most warehouses)
{{
config(
materialized='incremental',
unique_key='id',
incremental_strategy='delete+insert'
)
}}
-- Merge (best for late-arriving data)
{{
config(
materialized='incremental',
unique_key='id',
incremental_strategy='merge',
merge_update_columns=['status', 'amount', 'updated_at']
)
}}
-- Insert Overwrite (partition-based)
{{
config(
materialized='incremental',
incremental_strategy='insert_overwrite',
partition_by={
"field": "created_date",
"data_type": "date",
"granularity": "day"
}
)
}}
select
*,
date(created_at) as created_date
from {{ ref('stg_events') }}
{% if is_incremental() %}
where created_date >= dateadd(day, -3, current_date)
{% endif %}
```
## dbt Commands
```bash
# Development
dbt run # Run all models
dbt run --select staging # Run staging models only
dbt run --select +fct_orders # Run fct_orders and its upstream
dbt run --select fct_orders+ # Run fct_orders and its downstream
dbt run --full-refresh # Rebuild incremental models
# Testing
dbt test # Run all tests
dbt test --select stg_stripe # Test specific models
dbt build # Run + test in DAG order
# Documentation
dbt docs generate # Generate docs
dbt docs serve # Serve docs locally
# Debugging
dbt compile # Compile SQL without running
dbt debug # Test connection
dbt ls --select tag:critical # List models by tag
```
## Best Practices
### Do's
- **Use staging layer** - Clean data once, use everywhere
- **Test aggressively** - Not null, unique, relationships
- **Document everything** - Column descriptions, model descriptions
- **Use incremental** - For tables > 1M rows
- **Version control** - dbt project in Git
### Don'ts
- **Don't skip staging** - Raw → mart is tech debt
- **Don't hardcode dates** - Use `{{ var('start_date') }}`
- **Don't repeat logic** - Extract to macros
- **Don't test in prod** - Use dev target
- **Don't ignore freshness** - Monitor source data
## Resources
- [dbt Documentation](https://docs.getdbt.com/)
- [dbt Best Practices](https://docs.getdbt.com/guides/best-practices)
- [dbt-utils Package](https://hub.getdbt.com/dbt-labs/dbt_utils/latest/)
- [dbt Discourse](https://discourse.getdbt.com/)