Cross-File Joiner — Annotated Solution¶
STOP! Try solving this yourself first. Use the project README and walkthrough before reading the solution.
Complete Solution¶
"""Level 5 / Project 08 — Cross-File Joiner.
Joins records across multiple CSV/JSON files by a shared key field.
Supports inner join, left join, and full outer join strategies.
"""
from __future__ import annotations
import argparse
import csv
import json
import logging
from pathlib import Path
from typing import Any, Callable
# ---------- logging ----------
def configure_logging() -> None:
logging.basicConfig(
level=logging.INFO,
format="%(asctime)s | %(levelname)s | %(message)s",
)
# ---------- file loading ----------
def load_file(path: Path) -> list[dict]:
"""Load a CSV or JSON file into a list of dicts based on extension."""
if not path.exists():
raise FileNotFoundError(f"File not found: {path}")
# WHY: Support both formats so the joiner works with heterogeneous
# data sources (one team exports CSV, another exports JSON).
if path.suffix == ".json":
data = json.loads(path.read_text(encoding="utf-8"))
if not isinstance(data, list):
raise ValueError(f"JSON file must contain an array: {path}")
return data
# Default: treat as CSV
text = path.read_text(encoding="utf-8")
return list(csv.DictReader(text.splitlines()))
def validate_key_exists(records: list[dict], key: str, source_name: str) -> None:
"""Verify that at least one record contains the join key.
WHY: A typo in the --key flag would silently produce zero matches
instead of the expected join. This validation catches the mistake
early with a helpful error message listing available columns.
"""
if records and not any(key in r for r in records):
available = sorted(records[0].keys()) if records else []
raise ValueError(
f"Join key '{key}' not found in {source_name}. "
f"Available columns: {available}"
)
# ---------- indexing ----------
def index_by_key(records: list[dict], key: str) -> dict[str, dict]:
"""Build a lookup dictionary indexed by the value of *key*.
WHY index first, then join? -- Indexing both sides into dicts makes
lookups O(1) instead of scanning the entire right side for every
left row (O(n*m) down to O(n+m)). This is the same technique
databases use with hash joins.
If duplicate key values exist, the last record wins.
"""
index: dict[str, dict] = {}
duplicates = 0
for record in records:
# WHY: str() + strip() normalizes the key so "101" matches "101 "
# and integer 101 matches string "101".
key_value = str(record.get(key, "")).strip()
if not key_value:
continue
if key_value in index:
duplicates += 1
index[key_value] = record
if duplicates:
logging.warning("Found %d duplicate key values (last-wins policy)", duplicates)
return index
# ---------- join strategies ----------
def join_inner(left: dict[str, dict], right: dict[str, dict]) -> list[dict]:
"""Inner join: only keys present in both sides are included."""
result: list[dict] = []
for key in left:
if key in right:
# WHY: {**left, **right} merges both dicts. If both sides
# have a column with the same name, the right side wins.
result.append({**left[key], **right[key]})
return result
def join_left(left: dict[str, dict], right: dict[str, dict]) -> list[dict]:
"""Left join: all records from left, matched fields from right."""
result: list[dict] = []
for key, row in left.items():
if key in right:
result.append({**row, **right[key]})
else:
# WHY: Unmatched left rows are still included so no left-side
# data is lost. This is useful for "show all employees even
# if their department is unknown."
result.append(dict(row))
return result
def join_full(left: dict[str, dict], right: dict[str, dict]) -> list[dict]:
"""Full outer join: all records from both sides."""
# WHY: sorted(set union) ensures deterministic output order regardless
# of which keys come from which side.
all_keys = sorted(set(left.keys()) | set(right.keys()))
result: list[dict] = []
for key in all_keys:
merged: dict = {}
if key in left:
merged.update(left[key])
if key in right:
merged.update(right[key])
result.append(merged)
return result
# WHY: A strategy map lets the join type be specified via CLI flag
# without if/elif chains. Adding a new strategy (e.g., "right") only
# requires writing the function and adding one entry.
JOIN_STRATEGIES: dict[str, Callable[..., Any]] = {
"inner": join_inner,
"left": join_left,
"full": join_full,
}
# ---------- pipeline ----------
def run(
left_path: Path,
right_path: Path,
output_path: Path,
key: str,
strategy: str = "inner",
) -> dict:
left_data = load_file(left_path)
right_data = load_file(right_path)
validate_key_exists(left_data, key, str(left_path))
validate_key_exists(right_data, key, str(right_path))
left_idx = index_by_key(left_data, key)
right_idx = index_by_key(right_data, key)
join_func = JOIN_STRATEGIES.get(strategy, join_inner)
joined = join_func(left_idx, right_idx)
# WHY: Match statistics help the user verify the join worked correctly.
matched_keys = set(left_idx.keys()) & set(right_idx.keys())
left_only = set(left_idx.keys()) - set(right_idx.keys())
right_only = set(right_idx.keys()) - set(left_idx.keys())
report = {
"left_records": len(left_data),
"right_records": len(right_data),
"joined_records": len(joined),
"strategy": strategy,
"matched_keys": len(matched_keys),
"left_only_keys": len(left_only),
"right_only_keys": len(right_only),
"data": joined,
}
output_path.parent.mkdir(parents=True, exist_ok=True)
output_path.write_text(json.dumps(report, indent=2), encoding="utf-8")
logging.info("%s join: %d matched rows on key '%s'",
strategy.capitalize(), len(joined), key)
return report
# ---------- CLI ----------
def parse_args() -> argparse.Namespace:
parser = argparse.ArgumentParser(description="Join records across files by a shared key")
parser.add_argument("--left", default="data/employees.csv")
parser.add_argument("--right", default="data/departments.csv")
parser.add_argument("--output", default="data/joined.json")
parser.add_argument("--key", default="dept_id")
parser.add_argument("--join", choices=["inner", "left", "full"], default="inner")
return parser.parse_args()
def main() -> None:
configure_logging()
args = parse_args()
report = run(Path(args.left), Path(args.right), Path(args.output), args.key, args.join)
print(f"{args.join.capitalize()} join: {report['joined_records']} matched rows on key '{args.key}'")
if __name__ == "__main__":
main()
Design Decisions¶
| Decision | Why |
|---|---|
| Index both sides into dictionaries before joining | Hash-based indexing makes lookups O(1). Without indexing, an inner join on two lists of 1000 rows each would require up to 1,000,000 comparisons. With indexing, it takes about 2,000. |
| Last-wins policy for duplicate keys | This matches SQL's behavior when joining on a non-unique key with DISTINCT. The alternative (collecting all duplicates) would require list-of-lists and complicate the merge logic. |
| Key validation before joining | A typo in --key produces zero matches and empty output with no error. Validating early catches this with a helpful message listing available columns. |
| Support both CSV and JSON input | Real data pipelines often combine sources in different formats. Detecting the format by file extension makes the tool more versatile. |
Alternative Approaches¶
Using pandas merge¶
import pandas as pd
def join_with_pandas(left_path, right_path, key, how="inner"):
left = pd.read_csv(left_path)
right = pd.read_csv(right_path)
merged = pd.merge(left, right, on=key, how=how)
return merged.to_dict(orient="records")
Pandas handles duplicate keys, type mismatches, column name conflicts (with suffixes), and many join types out of the box. For production data pipelines, pandas is the standard tool. Building the join manually teaches the underlying algorithm.
Common Pitfalls¶
- Join key exists in only one file — Using
--key dept_idwhen the right file usesdepartment_idproduces zero matches. Always verify column names in both files before joining. - Duplicate keys with different data — If the left file has two rows with
dept_id=1, only the last one appears in the index. This silently drops data. Log the duplicate count so the user is aware. - Column name collisions — If both files have a column called
name, the right side's value overwrites the left side's in the merged dict. In production, you would add suffixes (name_left,name_right) to preserve both.