Skip to content

Solution: Level 7 / Project 08 - Ingestion Observability Kit

STOP — Try it yourself first!

You learn by building, not by reading answers. Spend at least 30 minutes attempting this project before looking here.

  • Re-read the README for requirements
  • Try the WALKTHROUGH for guided hints without spoilers

Complete solution

"""Level 7 / Project 08 — Ingestion Observability Kit.

Structured logging and metrics for data-ingestion pipelines.
Every record gets a correlation ID so operators can trace failures
back to specific input rows across pipeline stages.
"""

from __future__ import annotations

import argparse
import json
import logging
import time
import uuid
from dataclasses import dataclass, field
from pathlib import Path


# -- Data model ----------------------------------------------------------

# WHY structured log entries with correlation IDs? -- In a multi-stage
# pipeline, a single input row passes through extract -> validate -> load.
# When something fails, the correlation_id lets you trace that specific
# row's journey across all stages -- essential for debugging production issues.
@dataclass
class LogEntry:
    """One structured log line."""
    timestamp: float
    correlation_id: str
    stage: str
    level: str          # INFO | WARN | ERROR
    message: str
    extra: dict = field(default_factory=dict)


@dataclass
class StageMetrics:
    """Counters for a single pipeline stage."""
    stage: str
    rows_in: int = 0
    rows_out: int = 0
    errors: int = 0
    start_time: float = 0.0
    end_time: float = 0.0

    @property
    def duration(self) -> float:
        return round(self.end_time - self.start_time, 4)

    @property
    def error_rate(self) -> float:
        # WHY guard against zero? -- Division by zero when rows_in is 0
        # (empty input) would crash.  Returning 0.0 is semantically correct:
        # no rows processed means no errors.
        return round(self.errors / self.rows_in, 4) if self.rows_in else 0.0


# -- Core logic ----------------------------------------------------------

class ObservabilityKit:
    """Collect logs and metrics across pipeline stages."""

    def __init__(self) -> None:
        self.logs: list[LogEntry] = []
        self.metrics: dict[str, StageMetrics] = {}

    def start_stage(self, stage: str, rows_in: int) -> StageMetrics:
        m = StageMetrics(stage=stage, rows_in=rows_in, start_time=time.time())
        self.metrics[stage] = m
        self._log(stage, "INFO", f"stage started with {rows_in} rows")
        return m

    def end_stage(self, stage: str, rows_out: int, errors: int = 0) -> None:
        # WHY direct dict access (not .get)? -- If the stage was never
        # started, this KeyError is intentional: ending an unstarted stage
        # is a programming bug that should fail loudly.
        m = self.metrics[stage]
        m.rows_out = rows_out
        m.errors = errors
        m.end_time = time.time()
        self._log(stage, "INFO", f"stage completed: {rows_out} out, {errors} errors")

    def log_error(self, stage: str, correlation_id: str, message: str) -> None:
        self._log(stage, "ERROR", message, correlation_id=correlation_id)

    def log_warn(self, stage: str, correlation_id: str, message: str) -> None:
        self._log(stage, "WARN", message, correlation_id=correlation_id)

    def _log(self, stage: str, level: str, message: str,
             correlation_id: str = "") -> None:
        entry = LogEntry(
            timestamp=time.time(),
            # WHY generate a UUID if no correlation_id? -- System-level logs
            # (stage start/end) do not belong to a specific row but still
            # need a unique ID for log aggregation tools.
            correlation_id=correlation_id or str(uuid.uuid4())[:8],
            stage=stage,
            level=level,
            message=message,
        )
        self.logs.append(entry)
        logging.log(
            {"INFO": logging.INFO, "WARN": logging.WARNING, "ERROR": logging.ERROR}[level],
            "[%s] %s%s", entry.correlation_id, stage, message,
        )

    def summary(self) -> dict:
        stages = {}
        for name, m in self.metrics.items():
            stages[name] = {
                "rows_in": m.rows_in,
                "rows_out": m.rows_out,
                "errors": m.errors,
                "duration": m.duration,
                "error_rate": m.error_rate,
            }
        return {
            "stages": stages,
            "total_logs": len(self.logs),
            "total_errors": sum(1 for e in self.logs if e.level == "ERROR"),
        }


# -- Simulated pipeline -------------------------------------------------

def ingest_stage(records: list[dict], kit: ObservabilityKit) -> list[dict]:
    """Parse raw records, flag bad ones."""
    kit.start_stage("ingest", len(records))
    good, errs = [], 0
    for rec in records:
        cid = rec.get("id", str(uuid.uuid4())[:8])
        # WHY check for "value" field? -- This simulates contract validation
        # at the ingestion boundary.  Records without a value field are
        # malformed and should be logged as errors, not passed downstream.
        if "value" not in rec:
            kit.log_error("ingest", cid, "missing 'value' field")
            errs += 1
            continue
        good.append(rec)
    kit.end_stage("ingest", len(good), errs)
    return good


def transform_stage(records: list[dict], kit: ObservabilityKit) -> list[dict]:
    """Normalise values to uppercase."""
    kit.start_stage("transform", len(records))
    out = []
    for rec in records:
        rec["value"] = str(rec["value"]).upper()
        out.append(rec)
    kit.end_stage("transform", len(out))
    return out


def run_pipeline(records: list[dict]) -> tuple[list[dict], dict]:
    kit = ObservabilityKit()
    data = ingest_stage(records, kit)
    data = transform_stage(data, kit)
    return data, kit.summary()


# -- Entry points --------------------------------------------------------

def run(input_path: Path, output_path: Path) -> dict:
    config = json.loads(input_path.read_text(encoding="utf-8")) if input_path.exists() else {}
    records = config.get("records", [])
    _, summary = run_pipeline(records)
    output_path.parent.mkdir(parents=True, exist_ok=True)
    output_path.write_text(json.dumps(summary, indent=2), encoding="utf-8")
    return summary


def parse_args() -> argparse.Namespace:
    parser = argparse.ArgumentParser(description="Ingestion Observability Kit")
    parser.add_argument("--input", default="data/sample_input.json")
    parser.add_argument("--output", default="data/output_summary.json")
    return parser.parse_args()


def main() -> None:
    logging.basicConfig(level=logging.INFO, format="%(asctime)s | %(levelname)s | %(message)s")
    args = parse_args()
    summary = run(Path(args.input), Path(args.output))
    print(json.dumps(summary, indent=2))


if __name__ == "__main__":
    main()

Design decisions

Decision Why Alternative considered
Correlation IDs on every log entry Enables tracing a single record's journey across all pipeline stages Stage-level logging only -- cannot pinpoint which specific record failed
ObservabilityKit passed into each stage Centralizes all logs and metrics in one object; stages do not need to know about logging infrastructure Global logger -- works but makes testing harder and couples stages to a specific logging backend
StageMetrics as a dataclass with computed properties duration and error_rate always reflect current state; no risk of stale pre-computed values Store computed values at end_stage time -- faster access but requires careful update sequencing
_log maps level strings to logging constants Structured logs use string levels ("ERROR") for readability; the mapping bridges to Python's logging module Use logging.ERROR constants directly -- less readable in structured log output

Alternative approaches

Approach B: Decorator-based stage instrumentation

def instrumented(stage_name: str, kit: ObservabilityKit):
    def decorator(fn):
        def wrapper(records, *args, **kwargs):
            kit.start_stage(stage_name, len(records))
            try:
                result = fn(records, *args, **kwargs)
                kit.end_stage(stage_name, len(result))
                return result
            except Exception as e:
                kit.log_error(stage_name, "system", str(e))
                raise
        return wrapper
    return decorator

Trade-off: Decorators reduce boilerplate (no manual start/end calls) and guarantee end_stage is called even on exceptions. But they hide the observability logic, making it harder for learners to see what metrics are being collected.

Common pitfalls

Scenario What happens Prevention
end_stage() called for a stage never started KeyError on self.metrics[stage] Check if stage exists first, or document that start_stage must precede end_stage
Empty records list passed to pipeline rows_in = 0, error_rate property returns 0.0 (guarded) The zero-division guard handles this; no crash
Two stages with the same name Second start_stage overwrites the first in self.metrics Use unique stage names, or append a counter to detect duplicates