Skip to content

Solution: Level 6 / Project 01 - SQL Connection Simulator

STOP -- Have you attempted this project yourself first?

Learning happens in the struggle, not in reading answers. Spend at least 20 minutes trying before reading this solution. If you are stuck, try the README hints or re-read the relevant concept docs first.


Complete solution

"""Level 6 / Project 01 — SQL Connection Simulator.

Teaches SQLite connection management with context managers,
connection pooling simulation, retry logic, and health checks.
"""

from __future__ import annotations

import argparse
import json
import logging
import random
import sqlite3
import time
from dataclasses import dataclass, field
from pathlib import Path

# ---------------------------------------------------------------------------
# Configuration
# ---------------------------------------------------------------------------

MAX_POOL_SIZE = 5
MAX_RETRIES = 3
BASE_BACKOFF_SEC = 0.01  # kept small for fast demo runs


@dataclass
class ConnectionConfig:
    """Immutable connection configuration."""

    db_path: str = ":memory:"
    timeout: float = 5.0
    max_retries: int = MAX_RETRIES
    pool_size: int = MAX_POOL_SIZE


# ---------------------------------------------------------------------------
# Connection pool
# ---------------------------------------------------------------------------


class ConnectionPool:
    """Simple SQLite connection pool.

    WHY pool connections? -- Creating a new database connection for every
    query involves TCP handshakes, authentication, and memory allocation.
    A pool keeps idle connections ready to reuse, cutting per-query
    overhead from milliseconds to microseconds.
    """

    def __init__(self, config: ConnectionConfig) -> None:
        self.config = config
        # WHY a list for the pool? -- A list acts as a LIFO stack; pop()
        # gives the most recently released connection, which is most likely
        # to still be alive and warm in the OS cache.
        self._pool: list[sqlite3.Connection] = []
        self._created = 0
        self._reused = 0

    def acquire(self) -> sqlite3.Connection:
        """Return an existing connection or create a new one."""
        # WHY try the pool first? -- Reusing an idle connection avoids the
        # overhead of sqlite3.connect() entirely. Only when the pool is
        # empty do we pay the cost of opening a new connection.
        if self._pool:
            self._reused += 1
            logging.info("pool_reuse  total=%d reused=%d", self._created, self._reused)
            return self._pool.pop()

        conn = self._connect_with_retry()
        self._created += 1
        logging.info("pool_create total=%d reused=%d", self._created, self._reused)
        return conn

    def release(self, conn: sqlite3.Connection) -> None:
        """Return a connection to the pool (or close if pool is full)."""
        # WHY check pool_size? -- Without a cap, the pool could grow
        # unboundedly during burst traffic, holding open file descriptors
        # that the OS eventually runs out of.
        if len(self._pool) < self.config.pool_size:
            self._pool.append(conn)
        else:
            conn.close()

    def close_all(self) -> None:
        """Drain the pool and close every connection."""
        while self._pool:
            self._pool.pop().close()

    def stats(self) -> dict:
        """Return pool health metrics."""
        return {
            "created": self._created,
            "reused": self._reused,
            "idle": len(self._pool),
            "pool_size": self.config.pool_size,
        }

    def _connect_with_retry(self) -> sqlite3.Connection:
        """Open a connection, retrying on transient errors."""
        last_err: Exception | None = None
        for attempt in range(1, self.config.max_retries + 1):
            try:
                conn = sqlite3.connect(
                    self.config.db_path, timeout=self.config.timeout
                )
                # WHY SELECT 1? -- A connection can be "open" but the
                # database might be locked or corrupted. This lightweight
                # query verifies the connection works end-to-end.
                conn.execute("SELECT 1")
                return conn
            except sqlite3.OperationalError as exc:
                last_err = exc
                # WHY exponential backoff? -- A fixed delay hammers a
                # struggling server at constant rate. Doubling the wait
                # gives the server progressively more breathing room.
                wait = BASE_BACKOFF_SEC * (2 ** (attempt - 1))
                logging.warning(
                    "connect_retry attempt=%d wait=%.3fs err=%s",
                    attempt, wait, exc,
                )
                time.sleep(wait)
        raise ConnectionError(
            f"Failed after {self.config.max_retries} retries: {last_err}"
        )


# ---------------------------------------------------------------------------
# Health check
# ---------------------------------------------------------------------------


def health_check(conn: sqlite3.Connection) -> dict:
    """Run a lightweight ping and return status info.

    WHY a dedicated health check? -- In production, load balancers and
    monitoring systems poll /health endpoints. This function gives them
    a quick pass/fail signal without running real business queries.
    """
    try:
        cur = conn.execute("SELECT sqlite_version()")
        version = cur.fetchone()[0]
        return {"status": "healthy", "sqlite_version": version}
    except sqlite3.Error as exc:
        return {"status": "unhealthy", "error": str(exc)}


# ---------------------------------------------------------------------------
# Demo workload
# ---------------------------------------------------------------------------


def run_demo_queries(pool: ConnectionPool, labels: list[str]) -> list[dict]:
    """Simulate a workload: create a table, insert rows, query them back.

    WHY try/finally for pool.release? -- If an exception occurs mid-query,
    the connection must still be returned to the pool. Without finally,
    a leaked connection would exhaust the pool under repeated failures.
    """
    conn = pool.acquire()
    try:
        conn.execute(
            "CREATE TABLE IF NOT EXISTS events "
            "(id INTEGER PRIMARY KEY, label TEXT NOT NULL)"
        )
        for label in labels:
            conn.execute("INSERT INTO events (label) VALUES (?)", (label,))
        conn.commit()

        rows = conn.execute("SELECT id, label FROM events ORDER BY id").fetchall()
        return [{"id": r[0], "label": r[1]} for r in rows]
    finally:
        pool.release(conn)


# ---------------------------------------------------------------------------
# Orchestrator
# ---------------------------------------------------------------------------


def run(input_path: Path, output_path: Path, config: ConnectionConfig | None = None) -> dict:
    """Full demo: read labels -> pool -> queries -> stats -> JSON output.

    WHY a single orchestrator? -- Centralizing the pipeline in one function
    makes the flow visible at a glance and gives tests a single entry
    point to exercise the complete happy path.
    """
    config = config or ConnectionConfig()
    pool = ConnectionPool(config)

    if not input_path.exists():
        raise FileNotFoundError(f"Input not found: {input_path}")
    labels = [
        ln.strip()
        for ln in input_path.read_text(encoding="utf-8").splitlines()
        if ln.strip()
    ]

    rows = run_demo_queries(pool, labels)

    # Second acquire to demonstrate pool reuse
    conn2 = pool.acquire()
    hc = health_check(conn2)
    pool.release(conn2)

    pool.close_all()

    summary = {
        "rows_inserted": len(rows),
        "rows": rows,
        "health": hc,
        "pool_stats": pool.stats(),
    }

    output_path.parent.mkdir(parents=True, exist_ok=True)
    output_path.write_text(json.dumps(summary, indent=2), encoding="utf-8")
    logging.info("output=%s rows=%d", output_path, len(rows))
    return summary


# ---------------------------------------------------------------------------
# CLI
# ---------------------------------------------------------------------------


def parse_args() -> argparse.Namespace:
    parser = argparse.ArgumentParser(
        description="SQL Connection Simulator — connection pooling & retry demo"
    )
    parser.add_argument("--input", default="data/sample_input.txt")
    parser.add_argument("--output", default="data/output_summary.json")
    parser.add_argument("--db", default=":memory:", help="SQLite database path")
    parser.add_argument(
        "--pool-size", type=int, default=MAX_POOL_SIZE, help="Max idle connections"
    )
    return parser.parse_args()


def main() -> None:
    logging.basicConfig(
        level=logging.INFO,
        format="%(asctime)s | %(levelname)s | %(message)s",
    )
    args = parse_args()
    config = ConnectionConfig(db_path=args.db, pool_size=args.pool_size)
    summary = run(Path(args.input), Path(args.output), config)
    print(json.dumps(summary, indent=2))


if __name__ == "__main__":
    main()

Design decisions

Decision Why Alternative considered
LIFO pool (list with pop/append) Most-recently-used connection is warmest in OS cache and most likely still valid FIFO queue -- fairer rotation but older connections are more likely stale
Exponential backoff on retry Gives a struggling database progressively more breathing room between attempts Fixed delay -- simpler but can overwhelm a recovering server
ConnectionConfig as a dataclass Groups related settings into one immutable object, easier to pass around and test Bare kwargs -- flexible but easy to mis-spell or forget a parameter
Separate health_check function Decouples monitoring from business logic; can be reused by different callers Inline check inside run -- couples monitoring to the demo workload

Alternative approaches

Approach B: Context manager protocol

class PooledConnection:
    """Use the pool via 'with' blocks for automatic release."""

    def __init__(self, pool: ConnectionPool) -> None:
        self._pool = pool
        self._conn: sqlite3.Connection | None = None

    def __enter__(self) -> sqlite3.Connection:
        self._conn = self._pool.acquire()
        return self._conn

    def __exit__(self, exc_type, exc_val, exc_tb) -> None:
        if self._conn:
            self._pool.release(self._conn)
            self._conn = None

# Usage:
# with PooledConnection(pool) as conn:
#     conn.execute("SELECT ...")

Trade-off: The context manager guarantees release even if the caller forgets finally, which is safer in production code. The manual acquire/release approach used in the primary solution is more explicit and better for learning what happens under the hood.

Approach C: Thread-safe pool with queue.Queue

import queue

class ThreadSafePool:
    def __init__(self, config: ConnectionConfig) -> None:
        self._pool = queue.Queue(maxsize=config.pool_size)

    def acquire(self, timeout: float = 5.0) -> sqlite3.Connection:
        try:
            return self._pool.get(timeout=timeout)
        except queue.Empty:
            return sqlite3.connect(self.config.db_path)

Trade-off: Required when multiple threads share the pool (e.g., a web server). Adds complexity that is unnecessary for single-threaded scripts like this demo.

Common pitfalls

Scenario What happens Prevention
Forgetting to release a connection Pool drains to zero idle connections; every subsequent acquire creates a new connection, defeating the pool's purpose Always use try/finally or a context manager around acquire()/release()
Setting pool_size=0 Every released connection gets closed immediately -- the pool becomes a no-op and you pay full connect cost every time Validate pool_size >= 1 in ConnectionConfig.__post_init__
Returning a closed connection to the pool Next acquire() returns a dead connection; the caller's first query raises ProgrammingError Ping the connection (SELECT 1) in release() before putting it back