Solution: Level 7 / Project 15 - Level 7 Mini Capstone¶
STOP — Try it yourself first!
You learn by building, not by reading answers. Spend at least 30 minutes attempting this project before looking here.
- Re-read the README for requirements
- Try the WALKTHROUGH for guided hints without spoilers
Complete solution¶
"""Level 7 / Project 15 — Level 7 Mini-Capstone.
Ties together all Level 7 concepts into a unified API integration
pipeline: fetch from multiple sources via adapters, cache results,
validate contracts, check freshness, reconcile, and produce a
combined report — all with feature flags and observability.
"""
from __future__ import annotations
import argparse
import hashlib
import json
import logging
import time
from dataclasses import dataclass, field
from pathlib import Path
# -- Adapters ------------------------------------------------------------
# WHY adapters per source type? -- Each upstream API uses different field
# names (uid vs identifier). The adapter normalizes into {id, value, source}
# so all downstream stages (cache, validation, reconciliation) work with
# a single shape. The else branch is a passthrough for unknown sources.
def adapt_source(source_type: str, raw: list[dict]) -> list[dict]:
"""Normalise raw records from different source formats."""
out: list[dict] = []
for rec in raw:
if source_type == "alpha":
out.append({"id": rec.get("uid"), "value": rec.get("data"),
"source": "alpha"})
elif source_type == "beta":
out.append({"id": rec.get("identifier"), "value": rec.get("payload"),
"source": "beta"})
else:
# WHY passthrough for unknown sources? -- Rather than rejecting
# data, we keep it with the original fields and tag the source.
# Downstream validation will catch missing required fields.
out.append({**rec, "source": source_type})
return out
# -- Cache ---------------------------------------------------------------
class SimpleCache:
"""In-memory cache with hit/miss tracking."""
def __init__(self) -> None:
self._store: dict[str, dict] = {}
self.hits = 0
self.misses = 0
def get(self, key: str) -> dict | None:
if key in self._store:
self.hits += 1
return self._store[key]
self.misses += 1
return None
def put(self, key: str, value: dict) -> None:
self._store[key] = value
@property
def stats(self) -> dict:
total = self.hits + self.misses
return {"hits": self.hits, "misses": self.misses,
"hit_rate": round(self.hits / total, 4) if total else 0.0}
# -- Contract validation -------------------------------------------------
def validate_contract(record: dict, required: list[str]) -> list[str]:
"""Return list of missing required fields.
WHY check for None as well as missing? -- A field can be present in
the dict but set to None (e.g. adapter mapped a missing source field).
Treating None as missing catches this common data quality issue.
"""
return [f for f in required if f not in record or record[f] is None]
# -- Freshness -----------------------------------------------------------
def check_freshness(last_updated: float, max_age: float, now: float) -> str:
"""Classify a timestamp as fresh or stale."""
age = now - last_updated
if age > max_age:
return "stale"
return "fresh"
# -- Reconciliation ------------------------------------------------------
def reconcile_sources(groups: dict[str, list[dict]], key: str) -> dict:
"""Compare records across sources by key, return match/mismatch counts.
WHY group by key across all sources? -- This builds a lookup where
each key maps to {source_name: record}. Records appearing in multiple
sources can then be compared; records in only one source are flagged.
"""
all_keys: dict[str, dict[str, dict]] = {}
for source, records in groups.items():
for rec in records:
k = str(rec.get(key, ""))
if k:
all_keys.setdefault(k, {})[source] = rec
matched, mismatched, single = 0, 0, 0
for k, sources in all_keys.items():
if len(sources) < 2:
single += 1
else:
# WHY json.dumps for comparison? -- Serializing to JSON provides
# a simple deep-equality check for the value field regardless of
# its type (string, number, nested dict).
vals = [json.dumps(r.get("value")) for r in sources.values()]
if len(set(vals)) == 1:
matched += 1
else:
mismatched += 1
return {"matched": matched, "mismatched": mismatched, "single_source": single}
# -- Observability -------------------------------------------------------
@dataclass
class PipelineMetrics:
"""Aggregate metrics across all pipeline stages."""
records_in: int = 0
records_out: int = 0
cache_hits: int = 0
cache_misses: int = 0
validation_errors: int = 0
stale_records: int = 0
stages_run: list[str] = field(default_factory=list)
# -- Pipeline orchestrator -----------------------------------------------
def run_pipeline(config: dict) -> dict:
"""Execute the full Level 7 capstone pipeline.
WHY feature flags on each stage? -- Operators can disable any stage at
runtime without redeploying. This is essential for incident response
(skip non-critical stages) and gradual rollouts (test new stages on a
subset of data).
"""
flags = config.get("flags", {})
sources_config = config.get("sources", {})
required_fields = config.get("required_fields", ["id", "value"])
max_age = config.get("max_age_seconds", 600)
now = config.get("now", time.time())
metrics = PipelineMetrics()
cache = SimpleCache()
all_records: list[dict] = []
grouped: dict[str, list[dict]] = {}
# Stage 1: Adapt sources
if flags.get("adapt", True):
metrics.stages_run.append("adapt")
for src_name, src_data in sources_config.items():
adapted = adapt_source(src_name, src_data.get("records", []))
metrics.records_in += len(adapted)
all_records.extend(adapted)
grouped[src_name] = adapted
logging.info("adapted %d records from %s", len(adapted), src_name)
# Stage 2: Cache dedup
if flags.get("cache", True):
metrics.stages_run.append("cache")
deduped: list[dict] = []
for rec in all_records:
# WHY hash the entire record for dedup? -- Two records from
# different sources might have the same id but different values.
# Hashing the full record catches exact duplicates only.
key = hashlib.md5(json.dumps(rec, sort_keys=True).encode()).hexdigest()[:12]
cached = cache.get(key)
if cached:
continue
cache.put(key, rec)
deduped.append(rec)
all_records = deduped
metrics.cache_hits = cache.hits
metrics.cache_misses = cache.misses
# Stage 3: Contract validation
if flags.get("validate", True):
metrics.stages_run.append("validate")
valid: list[dict] = []
for rec in all_records:
missing = validate_contract(rec, required_fields)
if missing:
metrics.validation_errors += 1
logging.warning("contract violation: %s missing %s", rec.get("id"), missing)
else:
valid.append(rec)
all_records = valid
# Stage 4: Freshness check
if flags.get("freshness", True):
metrics.stages_run.append("freshness")
for rec in all_records:
ts = rec.get("timestamp", now)
status = check_freshness(ts, max_age, now)
rec["freshness"] = status
if status == "stale":
metrics.stale_records += 1
# Stage 5: Reconciliation
recon = {}
if flags.get("reconcile", True) and len(grouped) >= 2:
metrics.stages_run.append("reconcile")
recon = reconcile_sources(grouped, "id")
metrics.records_out = len(all_records)
return {
"records_in": metrics.records_in,
"records_out": metrics.records_out,
"stages_run": metrics.stages_run,
"cache": cache.stats,
"validation_errors": metrics.validation_errors,
"stale_records": metrics.stale_records,
"reconciliation": recon,
}
# -- Entry points --------------------------------------------------------
def run(input_path: Path, output_path: Path) -> dict:
config = json.loads(input_path.read_text(encoding="utf-8")) if input_path.exists() else {}
summary = run_pipeline(config)
output_path.parent.mkdir(parents=True, exist_ok=True)
output_path.write_text(json.dumps(summary, indent=2), encoding="utf-8")
return summary
def parse_args() -> argparse.Namespace:
parser = argparse.ArgumentParser(description="Level 7 Mini-Capstone Pipeline")
parser.add_argument("--input", default="data/sample_input.json")
parser.add_argument("--output", default="data/output_summary.json")
return parser.parse_args()
def main() -> None:
logging.basicConfig(level=logging.INFO, format="%(asctime)s | %(levelname)s | %(message)s")
args = parse_args()
summary = run(Path(args.input), Path(args.output))
print(json.dumps(summary, indent=2))
if __name__ == "__main__":
main()
Design decisions¶
| Decision | Why | Alternative considered |
|---|---|---|
| Feature flags on every stage | Operators can disable any stage without redeploying; essential for incident response and gradual rollouts | Hardcoded pipeline -- simpler but requires a new deploy to change behavior |
| Hash-based dedup in cache stage | Full-record hash catches exact duplicates from different sources without false positives | ID-based dedup -- faster but misses records with same ID and different values |
PipelineMetrics dataclass |
Single object aggregates all metrics; easy to serialize and inspect | Scattered counters across stages -- harder to collect into a single report |
| Reconciliation requires 2+ sources | Comparing one source against itself is meaningless; the guard prevents confusing output | Always run reconciliation -- would produce "0 matched, 0 mismatched" for single source |
| Passthrough adapter for unknown sources | Keeps data flowing; downstream validation catches missing fields rather than rejecting at ingestion | Reject unknown sources -- safer but loses data that might be partially usable |
Alternative approaches¶
Approach B: Pipeline as a chain of composable stages¶
from typing import Callable
Stage = Callable[[list[dict], dict], list[dict]]
def compose_pipeline(stages: list[Stage]) -> Callable:
def pipeline(records, config):
for stage in stages:
records = stage(records, config)
return records
return pipeline
pipeline = compose_pipeline([adapt_stage, cache_stage, validate_stage])
result = pipeline(raw_records, config)
Trade-off: Composable stages make the pipeline extensible -- adding or reordering stages is just changing the list. But the current approach with explicit flag checks per stage is more readable for learners and makes the control flow obvious. In production, you would likely evolve toward the composable pattern.
Common pitfalls¶
| Scenario | What happens | Prevention |
|---|---|---|
| Adapt stage disabled but downstream stages enabled | All downstream stages process an empty list; report shows 0 records but stages still "ran" | Have downstream stages check for empty input and short-circuit with a clear log message |
Record has id: None from a source with missing uid field |
validate_contract catches it (None treated as missing), but the warning message shows None |
Log the record index or other identifying info when id is missing |
| Two sources return conflicting values for the same key | Reconciliation reports the mismatch, but the pipeline keeps both records | Add a conflict-resolution strategy (e.g. prefer source with newer timestamp) |