Skip to content

Solution: Level 7 / Project 01 - API Query Adapter

STOP — Try it yourself first!

You learn by building, not by reading answers. Spend at least 30 minutes attempting this project before looking here.

  • Re-read the README for requirements
  • Try the WALKTHROUGH for guided hints without spoilers

Complete solution

"""Level 7 / Project 01 — API Query Adapter.

Adapts different API response formats into a unified schema.
Uses simulated API responses (no network calls) to teach
normalization patterns.
"""

from __future__ import annotations

import argparse
import json
import logging
import time
from dataclasses import dataclass, field
from pathlib import Path
from typing import Any, Callable


# ---------------------------------------------------------------------------
# Unified schema
# ---------------------------------------------------------------------------

# WHY a dataclass for the unified record? -- Each API returns different field
# names (item_id vs id vs sku).  By funnelling everything into one shape,
# downstream code only understands ONE interface.  This is the Adapter pattern:
# many inputs, one output contract.
@dataclass
class UnifiedRecord:
    id: str
    name: str
    value: float
    source: str
    timestamp: str


# ---------------------------------------------------------------------------
# Simulated API responses (mock data)
# ---------------------------------------------------------------------------

# WHY mock data instead of real HTTP? -- At this level we are learning the
# *pattern*, not the network layer.  Mocks let us test adapters in isolation
# without flaky network dependencies or API keys.
MOCK_API_A = [
    {"item_id": "A-001", "item_name": "Widget", "price": 9.99, "ts": "2025-01-15T08:00:00"},
    {"item_id": "A-002", "item_name": "Gadget", "price": 24.99, "ts": "2025-01-15T09:00:00"},
]

MOCK_API_B = [
    {"id": "B-001", "label": "Bolt Pack", "cost": 3.49, "created": "2025-01-15T10:00:00"},
    {"id": "B-002", "label": "Nut Set", "cost": 2.99, "created": "2025-01-15T11:00:00"},
]

MOCK_API_C = [
    {"sku": "C-001", "title": "Spring", "amount": 1.50, "date": "2025-01-15T12:00:00"},
]


# ---------------------------------------------------------------------------
# Adapters — one per source, each maps source fields → UnifiedRecord
# ---------------------------------------------------------------------------

# WHY one function per API? -- Each source has its own quirks (field names,
# nesting, optional fields).  Isolating the mapping into its own function
# means a change to API A's format only touches adapt_api_a — zero risk to
# the other adapters.
def adapt_api_a(raw: list[dict]) -> list[UnifiedRecord]:
    """Adapter for API A: uses item_id, item_name, price, ts."""
    results = []
    for r in raw:
        results.append(UnifiedRecord(
            id=r["item_id"], name=r["item_name"],
            value=r["price"], source="api_a", timestamp=r["ts"],
        ))
    return results


def adapt_api_b(raw: list[dict]) -> list[UnifiedRecord]:
    """Adapter for API B: uses id, label, cost, created."""
    results = []
    for r in raw:
        results.append(UnifiedRecord(
            id=r["id"], name=r["label"],
            value=r["cost"], source="api_b", timestamp=r["created"],
        ))
    return results


def adapt_api_c(raw: list[dict]) -> list[UnifiedRecord]:
    """Adapter for API C: uses sku, title, amount, date."""
    results = []
    for r in raw:
        results.append(UnifiedRecord(
            id=r["sku"], name=r["title"],
            value=r["amount"], source="api_c", timestamp=r["date"],
        ))
    return results


# ---------------------------------------------------------------------------
# Adapter registry
# ---------------------------------------------------------------------------

# WHY a registry dict instead of if/elif? -- Adding a new API means adding
# one dict entry, not modifying control flow.  The registry is also iterable,
# so query_all_sources can loop over it without knowing adapter names in advance.
ADAPTERS: dict[str, Callable[..., Any]] = {
    "api_a": adapt_api_a,
    "api_b": adapt_api_b,
    "api_c": adapt_api_c,
}


def adapt_response(source: str, raw: list[dict]) -> list[UnifiedRecord]:
    """Route raw data to the correct adapter by source name."""
    adapter = ADAPTERS.get(source)
    if adapter is None:
        # WHY raise instead of silent skip? -- A missing adapter is a
        # configuration bug, not a data issue.  Failing loudly prevents
        # silently dropping an entire source of records.
        raise ValueError(f"No adapter for source '{source}'. Available: {list(ADAPTERS.keys())}")
    return adapter(raw)


# ---------------------------------------------------------------------------
# Query engine
# ---------------------------------------------------------------------------

def query_all_sources(
    sources: dict[str, list[dict]] | None = None,
) -> list[UnifiedRecord]:
    """Query all configured sources and merge into unified records."""
    if sources is None:
        sources = {"api_a": MOCK_API_A, "api_b": MOCK_API_B, "api_c": MOCK_API_C}

    all_records: list[UnifiedRecord] = []
    for source_name, raw_data in sources.items():
        try:
            records = adapt_response(source_name, raw_data)
            all_records.extend(records)
            logging.info("adapted source=%s records=%d", source_name, len(records))
        except (KeyError, ValueError) as exc:
            # WHY catch and continue? -- One broken source should not prevent
            # the other sources from being processed.  Log the error so
            # operators can investigate, but keep the pipeline running.
            logging.warning("skip source=%s error=%s", source_name, exc)

    return all_records


def filter_records(
    records: list[UnifiedRecord],
    min_value: float | None = None,
    source: str | None = None,
) -> list[UnifiedRecord]:
    """Filter unified records by optional criteria."""
    result = records
    if min_value is not None:
        result = [r for r in result if r.value >= min_value]
    if source is not None:
        result = [r for r in result if r.source == source]
    return result


# ---------------------------------------------------------------------------
# Orchestrator
# ---------------------------------------------------------------------------

def run(input_path: Path, output_path: Path) -> dict:
    """Load source config, adapt all APIs, write unified output."""
    if input_path.exists():
        config = json.loads(input_path.read_text(encoding="utf-8"))
        sources = config.get("sources", None)
    else:
        sources = None  # WHY fallback? -- Use built-in mocks when no config file

    start = time.perf_counter()
    records = query_all_sources(sources)
    elapsed_ms = round((time.perf_counter() - start) * 1000, 1)

    summary = {
        "total_records": len(records),
        "sources_queried": len(sources) if sources else 3,
        "elapsed_ms": elapsed_ms,
        "records": [
            {"id": r.id, "name": r.name, "value": r.value,
             "source": r.source, "timestamp": r.timestamp}
            for r in records
        ],
    }

    output_path.parent.mkdir(parents=True, exist_ok=True)
    output_path.write_text(json.dumps(summary, indent=2), encoding="utf-8")
    logging.info("adapted %d records in %.1fms", len(records), elapsed_ms)
    return summary


# ---------------------------------------------------------------------------
# CLI
# ---------------------------------------------------------------------------

def parse_args() -> argparse.Namespace:
    parser = argparse.ArgumentParser(
        description="API Query Adapter — normalize multiple API formats"
    )
    parser.add_argument("--input", default="data/sample_input.json")
    parser.add_argument("--output", default="data/output_summary.json")
    parser.add_argument("--run-id", default="manual-run")
    return parser.parse_args()


def main() -> None:
    logging.basicConfig(level=logging.INFO, format="%(asctime)s | %(levelname)s | %(message)s")
    args = parse_args()
    summary = run(Path(args.input), Path(args.output))
    print(json.dumps(summary, indent=2))


if __name__ == "__main__":
    main()

Design decisions

Decision Why Alternative considered
Dataclass for UnifiedRecord Typed fields catch mismatches at construction time; immutable-feeling shape communicates the contract clearly Plain dict -- flexible but no field-name typo protection
Registry dict for adapters Open/closed principle -- add new sources without modifying dispatch logic if/elif chain -- works but every new source modifies the same function
try/except around each source in query_all_sources One broken source should not take down the whole pipeline Fail-fast -- simpler but less resilient in production
Mock data instead of HTTP Focuses on the pattern (normalization), not the transport layer responses or httpx mock -- realistic but adds dependencies

Alternative approaches

Approach B: Class-based adapters with a common Protocol

from typing import Protocol

class SourceAdapter(Protocol):
    def adapt(self, raw: list[dict]) -> list[UnifiedRecord]: ...

class ApiAAdapter:
    def adapt(self, raw: list[dict]) -> list[UnifiedRecord]:
        return [UnifiedRecord(id=r["item_id"], ...) for r in raw]

Trade-off: Class-based adapters are better when each source needs its own state (auth tokens, pagination cursors). The function-based approach here is simpler because we have no per-source state.

Common pitfalls

Scenario What happens Prevention
Source returns a field with a new name after an API update KeyError crashes the adapter for that source Wrap field access in .get() with a default, or catch KeyError per record
Two sources return records with the same id Downstream consumers silently get duplicate IDs Add a dedup step or prefix IDs with the source name (e.g. api_a:A-001)
A source returns an empty list No crash, but total_records may be misleadingly low Log a warning when a source returns zero records so operators notice