Skip to content

Solution: Level 10 / Project 08 - Zero Downtime Migration Lab

STOP -- Have you attempted this project yourself first?

Learning happens in the struggle, not in reading answers. Spend at least 20 minutes trying before reading this solution. Check the README for requirements and the Walkthrough for guided hints.


Complete solution

"""Zero-Downtime Migration Lab -- Expand-migrate-contract pattern for schema changes."""
from __future__ import annotations

import json
from dataclasses import dataclass, field
from enum import Enum, auto
from typing import Any


# WHY five phases? -- The expand-migrate-contract pattern ensures zero downtime.
# EXPANDING adds new schema alongside old (both work). MIGRATING backfills data.
# CONTRACTING removes old schema. At every phase, the system is fully functional.
# ROLLED_BACK exists because any phase can fail and must be reversible.
class MigrationPhase(Enum):
    PENDING = auto()
    EXPANDING = auto()
    MIGRATING = auto()
    CONTRACTING = auto()
    COMPLETE = auto()
    ROLLED_BACK = auto()


class ColumnType(Enum):
    TEXT = "TEXT"
    INTEGER = "INTEGER"
    BOOLEAN = "BOOLEAN"
    TIMESTAMP = "TIMESTAMP"
    JSON = "JSON"


@dataclass(frozen=True)
class Column:
    name: str
    col_type: ColumnType
    nullable: bool = True
    default: str | None = None


@dataclass
class Table:
    name: str
    columns: dict[str, Column] = field(default_factory=dict)
    rows: list[dict[str, Any]] = field(default_factory=list)

    def add_column(self, col: Column) -> None:
        if col.name in self.columns:
            raise ValueError(f"Column '{col.name}' already exists in '{self.name}'")
        self.columns[col.name] = col
        # WHY backfill default on add? -- Existing rows must have a value for
        # the new column. Using the column's default value ensures reads work
        # immediately after the expand step, before the migrate step runs.
        for row in self.rows:
            row[col.name] = col.default

    def drop_column(self, col_name: str) -> None:
        if col_name not in self.columns:
            raise ValueError(f"Column '{col_name}' not found in '{self.name}'")
        del self.columns[col_name]
        for row in self.rows:
            row.pop(col_name, None)

    def insert(self, data: dict[str, Any]) -> None:
        row = {}
        for col_name, col in self.columns.items():
            if col_name in data:
                row[col_name] = data[col_name]
            elif col.nullable or col.default is not None:
                row[col_name] = col.default
            else:
                raise ValueError(f"Missing required column: {col_name}")
        self.rows.append(row)

    @property
    def row_count(self) -> int:
        return len(self.rows)

    @property
    def column_names(self) -> list[str]:
        return list(self.columns.keys())


@dataclass
class MigrationStep:
    phase: MigrationPhase
    description: str
    forward_fn: str
    rollback_fn: str
    executed: bool = False
    rolled_back: bool = False


@dataclass
class MigrationPlan:
    migration_id: str
    title: str
    steps: list[MigrationStep] = field(default_factory=list)
    current_phase: MigrationPhase = MigrationPhase.PENDING

    @property
    def progress_pct(self) -> float:
        if not self.steps:
            return 0.0
        executed = sum(1 for s in self.steps if s.executed and not s.rolled_back)
        return (executed / len(self.steps)) * 100

    @property
    def is_complete(self) -> bool:
        return self.current_phase == MigrationPhase.COMPLETE


class MigrationError(Exception):
    pass


class MigrationExecutor:
    """State machine: PENDING -> EXPANDING -> MIGRATING -> CONTRACTING -> COMPLETE."""

    PHASE_ORDER = [MigrationPhase.PENDING, MigrationPhase.EXPANDING,
                   MigrationPhase.MIGRATING, MigrationPhase.CONTRACTING,
                   MigrationPhase.COMPLETE]

    def __init__(self, table: Table) -> None:
        self._table = table
        self._history: list[dict[str, str]] = []

    @property
    def history(self) -> list[dict[str, str]]:
        return list(self._history)

    # WHY auto-rollback on failure? -- A partially applied migration leaves the
    # schema in an inconsistent state. Rolling back all executed steps restores
    # the original schema, which is safe because both old and new code paths
    # work during the expand/migrate phases.
    def execute_plan(self, plan: MigrationPlan) -> MigrationPlan:
        for step in plan.steps:
            try:
                self._execute_step(plan, step)
            except MigrationError:
                self._rollback_plan(plan)
                return plan
        plan.current_phase = MigrationPhase.COMPLETE
        self._log(plan.migration_id, "COMPLETE", "Migration finished successfully")
        return plan

    def _execute_step(self, plan: MigrationPlan, step: MigrationStep) -> None:
        plan.current_phase = step.phase
        self._log(plan.migration_id, step.phase.name, step.description)
        step.executed = True

    # WHY reverse order for rollback? -- Steps are rolled back in reverse to
    # undo the most recent changes first, like unwinding a stack.
    def _rollback_plan(self, plan: MigrationPlan) -> None:
        for step in reversed(plan.steps):
            if step.executed and not step.rolled_back:
                step.rolled_back = True
                self._log(plan.migration_id, "ROLLBACK", f"Rolled back: {step.description}")
        plan.current_phase = MigrationPhase.ROLLED_BACK

    def _log(self, migration_id: str, phase: str, message: str) -> None:
        self._history.append({"migration_id": migration_id, "phase": phase, "message": message})


def build_add_column_migration(migration_id: str, table: Table, new_column: Column,
                                old_column: str | None = None,
                                transform_fn: str = "direct_copy") -> MigrationPlan:
    steps = [
        MigrationStep(MigrationPhase.EXPANDING,
                       f"Add column '{new_column.name}' (nullable) to '{table.name}'",
                       f"ALTER TABLE {table.name} ADD COLUMN {new_column.name} {new_column.col_type.value}",
                       f"ALTER TABLE {table.name} DROP COLUMN {new_column.name}"),
        MigrationStep(MigrationPhase.MIGRATING,
                       f"Backfill '{new_column.name}' from '{old_column or 'default'}'",
                       f"UPDATE {table.name} SET {new_column.name} = transform({old_column or 'default'})",
                       f"UPDATE {table.name} SET {new_column.name} = NULL"),
    ]
    if old_column:
        steps.append(MigrationStep(MigrationPhase.CONTRACTING,
                                    f"Drop old column '{old_column}' from '{table.name}'",
                                    f"ALTER TABLE {table.name} DROP COLUMN {old_column}",
                                    f"ALTER TABLE {table.name} ADD COLUMN {old_column}"))
    return MigrationPlan(migration_id=migration_id, title=f"Add {new_column.name}", steps=steps)


def build_rename_column_migration(migration_id: str, table: Table,
                                   old_name: str, new_name: str,
                                   col_type: ColumnType) -> MigrationPlan:
    new_col = Column(new_name, col_type, nullable=True)
    return build_add_column_migration(migration_id, table, new_col, old_column=old_name)


def validate_migration_safety(plan: MigrationPlan) -> list[str]:
    warnings: list[str] = []
    if not plan.steps:
        warnings.append("Migration plan has no steps")
    has_expand = any(s.phase == MigrationPhase.EXPANDING for s in plan.steps)
    has_contract = any(s.phase == MigrationPhase.CONTRACTING for s in plan.steps)
    if has_contract and not has_expand:
        warnings.append("Contracting without expanding -- data loss risk")
    if len(plan.steps) > 10:
        warnings.append("Migration has many steps -- consider splitting")
    return warnings


def main() -> None:
    users = Table("users", {
        "id": Column("id", ColumnType.INTEGER, nullable=False),
        "username": Column("username", ColumnType.TEXT, nullable=False),
        "email": Column("email", ColumnType.TEXT),
    })
    users.insert({"id": 1, "username": "alice", "email": "alice@example.com"})
    users.insert({"id": 2, "username": "bob", "email": "bob@example.com"})

    new_col = Column("display_name", ColumnType.TEXT, nullable=True, default="")
    plan = build_add_column_migration("MIG-001", users, new_col, old_column=None)

    warnings = validate_migration_safety(plan)
    for w in warnings:
        print(f"WARNING: {w}")

    executor = MigrationExecutor(users)
    executor.execute_plan(plan)

    print(f"Migration: {plan.title}")
    print(f"Phase: {plan.current_phase.name}")
    print(f"Progress: {plan.progress_pct:.0f}%")
    print(f"\nHistory ({len(executor.history)} entries):")
    for entry in executor.history:
        print(f"  [{entry['phase']}] {entry['message']}")


if __name__ == "__main__":
    main()

Design decisions

Decision Why Alternative considered
Expand-migrate-contract as explicit phases Each phase is independently reversible; the system works at every phase boundary "Big bang" migration -- requires downtime and is irreversible once started
State machine with ROLLED_BACK state Makes rollback a first-class state visible in the plan's lifecycle Boolean rolled_back flag -- less expressive, harder to audit
Safety validation as a separate function Can be run before execution to catch issues early without side effects Validation inside the executor -- mixes concerns and makes dry runs harder
Forward and rollback descriptions stored as strings Makes migration plans inspectable and auditable without executing them Executable callables -- more powerful but harder to serialize and review

Alternative approaches

Approach B: Alembic-style versioned migrations

class Migration:
    revision = "001"
    down_revision = None

    def upgrade(self, table: Table) -> None:
        table.add_column(Column("display_name", ColumnType.TEXT, default=""))

    def downgrade(self, table: Table) -> None:
        table.drop_column("display_name")

Trade-off: Alembic-style versioned migrations are the industry standard for real databases. They support branching, dependency chains, and auto-generation from model diffs. However, they do not explicitly model the expand-migrate-contract lifecycle, which is the key learning objective of this project.

Common pitfalls

Scenario What happens Prevention
Adding a duplicate column name ValueError from add_column Make add_column idempotent (skip silently if column exists)
Contracting step without expanding step validate_migration_safety warns about data loss risk Always run safety validation before executing a plan
Dropping a non-existent column ValueError from drop_column Check column_names before dropping, or make drop_column idempotent