Skip to content

Solution: Level 6 / Project 15 - Level 6 Mini Capstone

STOP -- Have you attempted this project yourself first?

Learning happens in the struggle, not in reading answers. Spend at least 20 minutes trying before reading this solution. If you are stuck, try the README hints or re-read the relevant concept docs first.


Complete solution

"""Level 6 / Project 15 — Mini Capstone: Full Database ETL Pipeline.

Combines all Level 6 skills into a single ETL pipeline: staging load,
validation, upsert, lineage tracking, incremental watermark, and
health metrics — all backed by SQLite.
"""

from __future__ import annotations

import argparse
import csv
import hashlib
import json
import logging
import sqlite3
import time
from dataclasses import dataclass, field
from io import StringIO
from pathlib import Path

# ---------------------------------------------------------------------------
# Schema
# ---------------------------------------------------------------------------

# WHY separate staging and target tables? -- Staging is a scratch area
# for raw ingest; target is the clean, deduplicated production table.
# This separation lets you validate and transform in staging without
# risking corruption of production data.
SCHEMA_DDL = """\
CREATE TABLE IF NOT EXISTS staging (
    id   INTEGER PRIMARY KEY AUTOINCREMENT,
    key  TEXT NOT NULL,
    name TEXT NOT NULL,
    value REAL NOT NULL,
    ts   TEXT NOT NULL
);

CREATE TABLE IF NOT EXISTS target (
    key        TEXT PRIMARY KEY,
    name       TEXT NOT NULL,
    value      REAL NOT NULL,
    ts         TEXT NOT NULL,
    updated_at TEXT NOT NULL DEFAULT (datetime('now'))
);

CREATE TABLE IF NOT EXISTS dead_letters (
    id      INTEGER PRIMARY KEY AUTOINCREMENT,
    raw     TEXT NOT NULL,
    error   TEXT NOT NULL,
    created TEXT NOT NULL DEFAULT (datetime('now'))
);

CREATE TABLE IF NOT EXISTS lineage (
    id         INTEGER PRIMARY KEY AUTOINCREMENT,
    record_key TEXT NOT NULL,
    step       TEXT NOT NULL,
    detail     TEXT,
    created    TEXT NOT NULL DEFAULT (datetime('now'))
);

CREATE TABLE IF NOT EXISTS watermarks (
    name  TEXT PRIMARY KEY,
    value TEXT NOT NULL
);

CREATE TABLE IF NOT EXISTS run_log (
    id         INTEGER PRIMARY KEY AUTOINCREMENT,
    status     TEXT NOT NULL,
    staged     INTEGER DEFAULT 0,
    loaded     INTEGER DEFAULT 0,
    rejected   INTEGER DEFAULT 0,
    duration_ms INTEGER DEFAULT 0,
    created    TEXT NOT NULL DEFAULT (datetime('now'))
);
"""


def init_db(conn: sqlite3.Connection) -> None:
    # WHY executescript instead of execute? -- The SCHEMA_DDL contains
    # multiple CREATE TABLE statements. executescript handles multiple
    # semicolon-delimited statements in one call; execute() only
    # handles one statement at a time.
    conn.executescript(SCHEMA_DDL)
    conn.commit()


# ---------------------------------------------------------------------------
# Watermark
# ---------------------------------------------------------------------------


def get_watermark(conn: sqlite3.Connection, name: str) -> str | None:
    """Read the current high-water mark.

    WHY return None for missing? -- First run has no watermark. None
    signals "load everything" to the caller, avoiding a special flag.
    """
    row = conn.execute("SELECT value FROM watermarks WHERE name = ?", (name,)).fetchone()
    return row[0] if row else None


def set_watermark(conn: sqlite3.Connection, name: str, value: str) -> None:
    """Upsert the watermark value."""
    conn.execute(
        "INSERT INTO watermarks (name, value) VALUES (?, ?) "
        "ON CONFLICT(name) DO UPDATE SET value = excluded.value",
        (name, value),
    )


# ---------------------------------------------------------------------------
# Pipeline stages
# ---------------------------------------------------------------------------


@dataclass
class PipelineResult:
    staged: int = 0
    loaded: int = 0
    rejected: int = 0
    errors: list[str] = field(default_factory=list)


def validate_record(rec: dict) -> str | None:
    """Return error string if record is invalid.

    WHY validate all four fields? -- Each field is required by the target
    table schema. Catching missing/invalid fields here means the staging
    INSERT never fails with a database constraint error, which would be
    harder to diagnose.
    """
    if not rec.get("key", "").strip():
        return "missing_key"
    if not rec.get("name", "").strip():
        return "missing_name"
    try:
        float(rec["value"])
    except (KeyError, ValueError, TypeError):
        return "invalid_value"
    if not rec.get("ts", "").strip():
        return "missing_timestamp"
    return None


def stage_records(
    conn: sqlite3.Connection,
    records: list[dict],
    watermark: str | None,
) -> PipelineResult:
    """Validate and stage records, filtering by watermark.

    WHY combine watermark filtering and validation in one pass? -- Each
    record is touched exactly once. First we check the watermark (skip
    old data), then validate (route to dead-letters or staging). This
    single-pass design avoids iterating the data twice.
    """
    result = PipelineResult()

    for rec in records:
        ts = rec.get("ts", "")

        # WHY filter by watermark first? -- Skipping old records before
        # validation avoids dead-lettering records that are merely stale,
        # not actually invalid.
        if watermark and ts <= watermark:
            continue

        error = validate_record(rec)
        if error:
            conn.execute(
                "INSERT INTO dead_letters (raw, error) VALUES (?, ?)",
                (json.dumps(rec), error),
            )
            conn.execute(
                "INSERT INTO lineage (record_key, step, detail) VALUES (?, 'reject', ?)",
                (rec.get("key", "?"), error),
            )
            result.rejected += 1
            result.errors.append(f"{rec.get('key', '?')}: {error}")
            continue

        conn.execute(
            "INSERT INTO staging (key, name, value, ts) VALUES (?, ?, ?, ?)",
            (rec["key"], rec["name"], float(rec["value"]), ts),
        )
        conn.execute(
            "INSERT INTO lineage (record_key, step, detail) VALUES (?, 'stage', 'validated and staged')",
            (rec["key"],),
        )
        result.staged += 1

    conn.commit()
    return result


def load_to_target(conn: sqlite3.Connection) -> int:
    """Upsert staged records into the target table.

    WHY upsert (ON CONFLICT) instead of plain INSERT? -- The same key
    may appear in multiple runs. ON CONFLICT DO UPDATE merges the new
    data into the existing row, preserving the target as a deduplicated
    current-state table.
    """
    rows = conn.execute("SELECT key, name, value, ts FROM staging").fetchall()
    loaded = 0

    for r in rows:
        conn.execute(
            "INSERT INTO target (key, name, value, ts) VALUES (?, ?, ?, ?) "
            "ON CONFLICT(key) DO UPDATE SET "
            "name = excluded.name, value = excluded.value, "
            "ts = excluded.ts, updated_at = datetime('now')",
            (r[0], r[1], r[2], r[3]),
        )
        conn.execute(
            "INSERT INTO lineage (record_key, step, detail) VALUES (?, 'load', 'upserted to target')",
            (r[0],),
        )
        loaded += 1

    # WHY clear staging after load? -- Staging is a scratch area. If
    # we leave old rows, the next run's load_to_target would re-upsert
    # them unnecessarily (idempotent but wasteful).
    conn.execute("DELETE FROM staging")
    conn.commit()
    return loaded


# ---------------------------------------------------------------------------
# Orchestrator
# ---------------------------------------------------------------------------


def run(input_path: Path, output_path: Path, db_path: str = ":memory:") -> dict:
    """Full pipeline: read -> validate -> stage -> upsert -> track.

    WHY measure duration? -- The run_log table tracks how long each
    pipeline execution takes. Duration regressions (a run that used
    to take 200ms now takes 5000ms) are an early warning that
    something changed — data volume, missing indexes, etc.
    """
    if not input_path.exists():
        raise FileNotFoundError(f"Input not found: {input_path}")

    records = json.loads(input_path.read_text(encoding="utf-8"))

    conn = sqlite3.connect(db_path)
    try:
        init_db(conn)

        start = time.perf_counter()

        watermark = get_watermark(conn, "events_ts")
        stage_result = stage_records(conn, records, watermark)
        loaded = load_to_target(conn)
        stage_result.loaded = loaded

        # WHY set watermark from MAX(ts) in target? -- The target table
        # contains all successfully loaded records. Its max timestamp is
        # the authoritative watermark, even if some staged records had
        # higher timestamps but failed to load.
        max_ts = conn.execute("SELECT MAX(ts) FROM target").fetchone()[0]
        if max_ts:
            set_watermark(conn, "events_ts", max_ts)

        elapsed = int((time.perf_counter() - start) * 1000)

        # Log the run for health monitoring
        conn.execute(
            "INSERT INTO run_log (status, staged, loaded, rejected, duration_ms) "
            "VALUES (?, ?, ?, ?, ?)",
            ("success", stage_result.staged, loaded, stage_result.rejected, elapsed),
        )
        conn.commit()

        # Gather final stats
        target_count = conn.execute("SELECT COUNT(*) FROM target").fetchone()[0]
        dl_count = conn.execute("SELECT COUNT(*) FROM dead_letters").fetchone()[0]
        lineage_count = conn.execute("SELECT COUNT(*) FROM lineage").fetchone()[0]

    finally:
        conn.close()

    summary = {
        "input_records": len(records),
        "staged": stage_result.staged,
        "loaded": stage_result.loaded,
        "rejected": stage_result.rejected,
        "errors": stage_result.errors,
        "target_rows": target_count,
        "dead_letters": dl_count,
        "lineage_entries": lineage_count,
        "watermark": max_ts,
        "duration_ms": elapsed,
    }

    output_path.parent.mkdir(parents=True, exist_ok=True)
    output_path.write_text(json.dumps(summary, indent=2), encoding="utf-8")
    logging.info(
        "capstone: staged=%d loaded=%d rejected=%d in %dms",
        stage_result.staged, loaded, stage_result.rejected, elapsed,
    )
    return summary


# ---------------------------------------------------------------------------
# CLI
# ---------------------------------------------------------------------------


def parse_args() -> argparse.Namespace:
    parser = argparse.ArgumentParser(
        description="Level 6 Mini Capstone — full database ETL pipeline"
    )
    parser.add_argument("--input", default="data/sample_input.json")
    parser.add_argument("--output", default="data/output_summary.json")
    parser.add_argument("--db", default=":memory:")
    return parser.parse_args()


def main() -> None:
    logging.basicConfig(level=logging.INFO, format="%(asctime)s | %(levelname)s | %(message)s")
    args = parse_args()
    summary = run(Path(args.input), Path(args.output), args.db)
    print(json.dumps(summary, indent=2))


if __name__ == "__main__":
    main()

Design decisions

Decision Why Alternative considered
Six tables in one schema (staging, target, dead_letters, lineage, watermarks, run_log) Each table has a single responsibility; combining all Level 6 patterns into one cohesive pipeline Fewer tables with merged concerns -- simpler schema but each table tries to do too much
Watermark filter before validation Skipping stale records early avoids dead-lettering records that are merely old, not invalid Validate first, then filter -- wastes effort validating records that will be skipped anyway
Staging table cleared after load Prevents stale records from being re-upserted on the next run; staging is a scratch area Keep staging as an archive -- useful for debugging but requires a separate cleanup strategy
run_log for pipeline execution history Enables health monitoring (Project 12 pattern); duration tracking catches performance regressions No logging -- simpler but you lose visibility into pipeline health over time
Single-pass pipeline (stage -> load -> watermark -> log) Each step feeds the next; clear data flow with no circular dependencies Multi-pass with intermediate files -- more resilient to crashes but much more complex

Alternative approaches

Approach B: Conditional upsert (only update if newer timestamp)

def load_to_target_conditional(conn: sqlite3.Connection) -> int:
    """Only overwrite target rows if the staged data is newer.

    Prevents stale data from overwriting fresher data when records
    arrive out of order across multiple pipeline runs.
    """
    rows = conn.execute("SELECT key, name, value, ts FROM staging").fetchall()
    loaded = 0

    for r in rows:
        key, name, value, ts = r
        conn.execute(
            "INSERT INTO target (key, name, value, ts) VALUES (?, ?, ?, ?) "
            "ON CONFLICT(key) DO UPDATE SET "
            "  name = excluded.name, "
            "  value = excluded.value, "
            "  ts = excluded.ts, "
            "  updated_at = datetime('now') "
            "WHERE excluded.ts > target.ts",  # only if newer
            (key, name, value, ts),
        )
        loaded += 1

    conn.execute("DELETE FROM staging")
    conn.commit()
    return loaded

Trade-off: The conditional upsert prevents stale data from overwriting newer data, which is critical when records arrive out of chronological order (common with distributed systems). The primary solution unconditionally overwrites, which is simpler but assumes records always arrive in order. In production, the conditional approach is strongly preferred.

Common pitfalls

Scenario What happens Prevention
All records are rejected (100% failure rate) stage_records returns staged=0; load_to_target processes zero rows; watermark does not advance; pipeline completes with no error This is correct behavior -- but add a warning log when staged == 0 and rejected > 0 so operators notice
Corrupted watermark (set to a far-future date) Every record is filtered out as "already loaded"; pipeline runs but loads nothing Validate that the new watermark is reasonable; compare against the max timestamp in the source data
Running twice with :memory: database The database is fresh each time (no persistence); watermark starts at None; every record is re-loaded Use --db data/pipeline.db for persistent state across runs; :memory: is for testing only