Day 82 β BI ETL and Pipeline Automation
Day 82 extends the roadmap by transforming the ETL and automation nodes into a workshop on orchestrated analytics delivery. The lesson groups the roadmap material into three facilitation threads:
- Pipeline foundations β Revisit the pillars of extract, transform, and load so that cross-functional stakeholders can align on service-level expectations, data contracts, and refresh cadences.
- Automation toolkit β Show how dedicated tooling (Airflow, dbt, and vendor ETL platforms) codifies business logic, introduces observability, and makes deployments repeatable.
- Delivery lifecycle β Tie the upstream mechanics to the end-to-end analytics project lifecycle, from source audits through dashboard refreshes and stakeholder communications.
The accompanying notebook-style script assembles a canonical pipeline outline, projects it into an Airflow DAG stub, and demonstrates how dbt models and exposures consume those assets. Use the walkthrough to frame automation practices such as dependency management, retries, lineage tracking, and BI handoffs without requiring access to a live orchestrator.
Additional Topic: Industry Applications
This lesson is part of the Phase 5 Business Intelligence specialization. Use the Phase 5 overview to see how the developer-roadmap topics align across Days 68β84.
Why it matters
Translate BI playbooks into high-impact industry verticals.
Developer-roadmap alignment
- Retail & E-commerce
- Finance
- Healthcare
- Manufacturing
Next steps
- Draft case studies and notebooks that exercise these roadmap nodes.
- Update the Phase 5 cheat sheet with the insights you capture here.
Previous: Day 81 β Day 81 β BI Architecture and Data Modeling β’ Next: Day 83 β Day 83 β BI Cloud and Modern Data Stack
You are on lesson 82 of 108.
Additional Materials
lesson.py
# %%
"""Day 82 β BI ETL and Pipeline Automation classroom script."""
# %%
from __future__ import annotations
from Day_82_BI_ETL_and_Pipeline_Automation.solutions import (
build_airflow_dag_stub,
build_dbt_project_stub,
build_pipeline_outline,
load_topics,
)
# %%
TOPIC_GROUPS = load_topics()
PIPELINE_OUTLINE = build_pipeline_outline()
AIRFLOW_DAG = build_airflow_dag_stub()
DBT_PROJECT = build_dbt_project_stub()
# %%
def summarize_topics() -> None:
"""Print the roadmap groupings that frame the ETL automation lesson."""
print("\nDay 82 roadmap groupings\n")
for section, topics in TOPIC_GROUPS.items():
titles = ", ".join(topic.title for topic in topics)
print(f"- {section}: {titles}")
# %%
def outline_pipeline() -> None:
"""Print the canonical pipeline steps and ownership model."""
print("\nPipeline outline\n")
for task in PIPELINE_OUTLINE:
upstream = ", ".join(task.upstream) if task.upstream else "start"
print(f"{task.task_id} -> depends on [{upstream}] ({task.owner})")
print(f" {task.description}")
# %%
def review_airflow_stub() -> None:
"""Explain how the pipeline outline maps into an Airflow DAG."""
print("\nAirflow DAG stub\n")
print(f"DAG id: {AIRFLOW_DAG['dag_id']} | schedule: {AIRFLOW_DAG['schedule']}")
for task_id, config in AIRFLOW_DAG["tasks"].items():
upstream = ", ".join(config["upstream"]) if config["upstream"] else "start"
print(
f"- {task_id}: upstream [{upstream}], owner={config['owner']}, retries={config['retries']}"
)
# %%
def review_dbt_stub() -> None:
"""Highlight the downstream dbt manifest that consumes the Airflow outputs."""
print("\ndbt project stub\n")
print("Sources:")
for source, config in DBT_PROJECT["sources"].items():
print(f"- {source} <- {config['loaded_by']}")
print("\nStaging models:")
for model, config in DBT_PROJECT["staging"].items():
deps = ", ".join(config["depends_on"])
print(f"- {model} ({config['materialized']}) depends on [{deps}]")
print("\nMart models:")
for model, config in DBT_PROJECT["marts"].items():
deps = ", ".join(config["depends_on"])
print(f"- {model} ({config['materialized']}) depends on [{deps}]")
print("\nExposures:")
for exposure, config in DBT_PROJECT["exposures"].items():
deps = ", ".join(config["depends_on"])
print(
f"- {exposure} ({config['type']}) depends on [{deps}] -> owner {config['owner']}"
)
print(f"\nPipeline completion task: {DBT_PROJECT['final_task']}")
# %%
def main() -> None:
"""Run the Day 82 classroom walkthrough."""
summarize_topics()
outline_pipeline()
review_airflow_stub()
review_dbt_stub()
# %%
if __name__ == "__main__":
main()
solutions.py
"""Utilities for the Day 82 BI ETL and Pipeline Automation lesson."""
from __future__ import annotations
from dataclasses import dataclass
from typing import Mapping, Sequence
from mypackage.bi_curriculum import BiTopic, group_topics_by_titles
# --- Roadmap groupings -----------------------------------------------------
TOPIC_GROUP_TITLES: Mapping[str, Sequence[str]] = {
"Pipeline foundations": (
"ETL basics",
"Data Pipeline Design",
),
"Automation toolkit": (
"ETL Tools",
"Airflow",
"dbt",
),
"Delivery lifecycle": ("End-to-end Analytics Project",),
}
def load_topics(
groups: Mapping[str, Sequence[str]] = TOPIC_GROUP_TITLES,
) -> dict[str, list[BiTopic]]:
"""Return roadmap topics grouped for the ETL automation lesson."""
return group_topics_by_titles(groups)
# --- Pipeline sketch helpers -----------------------------------------------
@dataclass(frozen=True, slots=True)
class PipelineTask:
"""Representation of a pipeline task and its upstream dependencies."""
task_id: str
description: str
upstream: tuple[str, ...] = ()
owner: str = "analytics_engineering"
PIPELINE_STEPS: tuple[PipelineTask, ...] = (
PipelineTask(
"extract_sources",
"Land CRM, product, and finance extracts into a controlled raw zone.",
),
PipelineTask(
"validate_raw",
"Run schema and row-count checks to gate downstream transformations.",
("extract_sources",),
),
PipelineTask(
"stage_clean",
"Normalize field names, cast types, and deduplicate business keys.",
("validate_raw",),
),
PipelineTask(
"load_warehouse",
"Persist curated tables into the analytics warehouse for modeling.",
("stage_clean",),
),
PipelineTask(
"run_dbt_models",
"Execute dbt models to assemble marts and publish metrics.",
("load_warehouse",),
owner="analytics_engineering",
),
PipelineTask(
"refresh_dashboards",
"Trigger downstream dashboards and notify stakeholders of completion.",
("run_dbt_models",),
owner="bi_operations",
),
)
def build_pipeline_outline(
steps: Sequence[PipelineTask] = PIPELINE_STEPS,
) -> list[PipelineTask]:
"""Return a mutable outline of the canonical ETL pipeline tasks."""
return list(steps)
def build_airflow_dag_stub(
*,
dag_id: str = "analytics_etl",
schedule: str = "@daily",
steps: Sequence[PipelineTask] = PIPELINE_STEPS,
) -> dict[str, object]:
"""Return a minimal Airflow DAG definition mapping tasks to dependencies."""
task_definitions: dict[str, dict[str, object]] = {}
for task in steps:
task_definitions[task.task_id] = {
"upstream": task.upstream,
"owner": task.owner,
"retries": (
2 if task.task_id in {"extract_sources", "load_warehouse"} else 1
),
}
return {
"dag_id": dag_id,
"schedule": schedule,
"default_args": {
"start_date": "2024-01-01",
"email_on_failure": True,
},
"tasks": task_definitions,
}
def build_dbt_project_stub(
*,
steps: Sequence[PipelineTask] = PIPELINE_STEPS,
) -> dict[str, object]:
"""Return a lightweight dbt manifest showing staging and mart dependencies."""
staging_models = {
"stg_crm_contacts": {
"materialized": "view",
"depends_on": ("raw_crm_contacts", "raw_product_users"),
},
"stg_finance_transactions": {
"materialized": "incremental",
"depends_on": ("raw_finance_ledger",),
},
"stg_product_events": {
"materialized": "view",
"depends_on": ("raw_product_events",),
},
}
mart_models = {
"fct_customer_lifecycle": {
"materialized": "table",
"depends_on": ("stg_crm_contacts", "stg_finance_transactions"),
},
"dim_product_usage": {
"materialized": "table",
"depends_on": ("stg_product_events",),
},
}
exposures = {
"weekly_revenue_review": {
"type": "dashboard",
"depends_on": ("fct_customer_lifecycle",),
"owner": "finance_lead",
}
}
return {
"name": "analytics_etl",
"sources": {
"raw_crm_contacts": {"loaded_by": "extract_sources"},
"raw_product_users": {"loaded_by": "extract_sources"},
"raw_finance_ledger": {"loaded_by": "extract_sources"},
"raw_product_events": {"loaded_by": "extract_sources"},
},
"staging": staging_models,
"marts": mart_models,
"exposures": exposures,
"final_task": steps[-1].task_id if steps else "refresh_dashboards",
}
__all__ = [
"PIPELINE_STEPS",
"PipelineTask",
"TOPIC_GROUP_TITLES",
"build_airflow_dag_stub",
"build_dbt_project_stub",
"build_pipeline_outline",
"load_topics",
]