Day 50 introduced model persistence. Day 65 expands that foundation into production-grade automation that glues together feature engineering, training, registration, and deployment inside a repeatable delivery pipeline.
solutions.py
ships a lightweight pipeline simulator that mirrors a
feature store refresh, model training job, model registry promotion, and
GitHub Actions deployment stage. The tasks are wired together with a
miniature DAG executor inspired by Airflow/PyPrefect semantics so you
can experiment with dependency resolution locally.
Run the module to see the orchestration trace:
python Day_65_MLOps_Pipelines_and_CI/solutions.py
The included tests (tests/test_day_65.py
) stub raw feature inputs and
assert that the DAG executes in topological order, promoting a versioned
model artefact only after automated evaluation passes.
Run this lesson’s code interactively in your browser:
!!! tip “About JupyterLite” JupyterLite runs entirely in your browser using WebAssembly. No installation or server required! Note: First launch may take a moment to load.
???+ example “solutions.py” View on GitHub
```python title="solutions.py"
"""Utility helpers for orchestrating an end-to-end MLOps pipeline.
The module intentionally mirrors the stages that appear in a production
GitHub Actions workflow: refresh a feature store, train and evaluate a
model, register the resulting artefact, and perform a deployment gate.
Instead of depending on heavy external services, the code uses
lightweight, deterministic stubs so unit tests can simulate an Apache
Airflow or Prefect DAG locally. Each task receives a consolidated
context dictionary (similar to Airflow's XCom or Prefect's task result)
and may add new keys for downstream tasks.
"""
from __future__ import annotations
from dataclasses import dataclass, field
from datetime import UTC, datetime
from typing import (
Any,
Callable,
Dict,
Iterable,
List,
Mapping,
MutableMapping,
Optional,
)
FeatureRow = Mapping[str, Any]
@dataclass
class Task:
"""Represents a node in an orchestration graph.
Attributes
----------
name:
Unique identifier for the task. Names are used to resolve
dependencies and to expose results in the execution context.
run:
Callable that receives the merged execution context and returns a
value that is stored under ``name`` for downstream tasks.
upstream:
Optional list of task names that must finish before this task
executes. The dependency semantics align with Airflow DAGs and
Prefect flows.
"""
name: str
run: Callable[[MutableMapping[str, Any]], Any]
upstream: List[str] = field(default_factory=list)
class PipelineDAG:
"""A minimal directed acyclic graph executor for ML pipelines."""
def __init__(self, tasks: Iterable[Task]):
self._tasks: Dict[str, Task] = {}
for task in tasks:
if task.name in self._tasks:
raise ValueError(f"Duplicate task name detected: {task.name}")
self._tasks[task.name] = task
for task in self._tasks.values():
for dependency in task.upstream:
if dependency not in self._tasks:
raise ValueError(
f"Task '{task.name}' references unknown dependency '{dependency}'"
)
@property
def tasks(self) -> Dict[str, Task]:
return self._tasks
def topological_order(self) -> List[str]:
"""Return a deterministic topological ordering of the tasks."""
temporary_marks: set[str] = set()
permanent_marks: set[str] = set()
ordered: List[str] = []
def visit(node_name: str) -> None:
if node_name in permanent_marks:
return
if node_name in temporary_marks:
raise ValueError("Cycle detected in DAG definition")
temporary_marks.add(node_name)
node = self._tasks[node_name]
for dependency in node.upstream:
visit(dependency)
permanent_marks.add(node_name)
temporary_marks.remove(node_name)
ordered.append(node_name)
for name in sorted(self._tasks):
if name not in permanent_marks:
visit(name)
return ordered
def execute(
self, base_context: Optional[MutableMapping[str, Any]] = None
) -> Dict[str, Any]:
"""Execute tasks respecting dependencies.
Parameters
----------
base_context:
Optional dictionary containing static inputs (for example raw
features or configuration). Tasks may mutate this dictionary,
mimicking orchestration platforms that provide shared context
objects.
"""
context: MutableMapping[str, Any]
if base_context is None:
context = {}
else:
context = base_context
ordered = self.topological_order()
for name in ordered:
task = self._tasks[name]
context[name] = task.run(context)
context["execution_order"] = ordered
return context
def upsert_feature_store(rows: Iterable[FeatureRow]) -> Dict[str, FeatureRow]:
"""Materialise feature rows into an in-memory feature store.
The function keeps the most recent row for each primary key and
stamps the ingestion time. Production feature stores (Feast, Tecton,
Vertex AI Feature Store) provide similar semantics.
"""
feature_store: Dict[str, FeatureRow] = {}
for row in rows:
entity_id = str(row.get("entity_id"))
feature_store[entity_id] = {
**row,
"ingested_at": datetime.now(UTC).isoformat(timespec="seconds"),
}
return feature_store
def train_model_from_store(store: Mapping[str, FeatureRow]) -> Dict[str, Any]:
"""Train and evaluate a trivial model using feature store contents."""
feature_values = [row.get("feature_value", 0.0) for row in store.values()]
if not feature_values:
raise ValueError("Feature store is empty; cannot train model")
avg_feature = sum(feature_values) / len(feature_values)
# The "model" is encoded as a slope anchored by the mean feature value.
model_artifact = {
"parameters": {"slope": avg_feature / (1 + abs(avg_feature))},
"metrics": {"validation_accuracy": 0.8 + (avg_feature % 0.2)},
}
return model_artifact
def register_model(
model: Mapping[str, Any], *, name: str, stage: str
) -> Dict[str, Any]:
"""Record model metadata as if interacting with an MLflow-style registry."""
if "metrics" not in model:
raise KeyError("Model metadata must include 'metrics'")
version = datetime.now(UTC).strftime("%Y%m%d%H%M%S")
registry_entry = {
"name": name,
"version": version,
"stage": stage,
"metrics": model["metrics"],
}
return registry_entry
def github_actions_deploy(entry: Mapping[str, Any]) -> Dict[str, Any]:
"""Simulate a GitHub Actions job that deploys a registered model."""
if entry.get("stage") != "Staging":
return {
"status": "skipped",
"reason": "Only staging models deploy automatically",
}
if entry.get("metrics", {}).get("validation_accuracy", 0.0) < 0.85:
return {
"status": "failed",
"reason": "Quality gate failed",
}
return {
"status": "success",
"environment": "production",
"commit_sha": "demo-sha",
}
def build_mlops_pipeline(raw_rows: Iterable[FeatureRow]) -> PipelineDAG:
"""Construct the pipeline DAG with deterministic task wiring."""
# Persist raw rows in the base context so the feature-store task can
# consume them. The orchestrator will attach results by task name.
base_context = {"raw_rows": list(raw_rows)}
def feature_task(context: MutableMapping[str, Any]) -> Dict[str, FeatureRow]:
return upsert_feature_store(context["raw_rows"])
def training_task(context: MutableMapping[str, Any]) -> Dict[str, Any]:
return train_model_from_store(context["feature_store"])
def registry_task(context: MutableMapping[str, Any]) -> Dict[str, Any]:
return register_model(
context["model_training"], name="churn_model", stage="Staging"
)
def deployment_task(context: MutableMapping[str, Any]) -> Dict[str, Any]:
return github_actions_deploy(context["model_registry"])
tasks = [
Task(name="feature_store", run=feature_task),
Task(name="model_training", run=training_task, upstream=["feature_store"]),
Task(name="model_registry", run=registry_task, upstream=["model_training"]),
Task(name="deployment", run=deployment_task, upstream=["model_registry"]),
]
dag = PipelineDAG(tasks)
# Attach the base context so callers can re-use it between runs.
dag.base_context = base_context # type: ignore[attr-defined]
return dag
def run_pipeline(raw_rows: Iterable[FeatureRow]) -> Dict[str, Any]:
"""Helper for scripts/tests: build the DAG and execute it."""
dag = build_mlops_pipeline(raw_rows)
context = getattr(dag, "base_context", {})
return dag.execute(context)
if __name__ == "__main__":
rows = [
{"entity_id": 1, "feature_value": 0.42},
{"entity_id": 2, "feature_value": 0.58},
]
results = run_pipeline(rows)
print("Execution order:", results["execution_order"]) # noqa: T201
print("Deployment status:", results["deployment"]) # noqa: T201
```