Walkthrough: Level 1 Mini Automation¶
This guide walks through the thinking process for building this project. It does NOT give you the complete solution. For that, see SOLUTION.md.
Before reading this¶
Try the project yourself first. Spend at least 20 minutes. If you have not tried yet, close this file and open the project README.
Understanding the problem¶
You need to build a multi-step data pipeline that reads pipe-delimited records from a file, filters out inactive ones, transforms the data, and produces a summary report. This is a simplified version of an ETL (Extract-Transform-Load) workflow -- the kind of automation that runs behind the scenes in every data-driven company.
The input file looks like:
Each line has three fields separated by |: name, status, and value.
Planning before code¶
flowchart LR
A[Step 1\nRead Lines] --> B[Step 2\nParse Records]
B --> C[Step 3\nFilter Active]
C --> D[Step 4\nTransform]
D --> E[Step 5\nSummarise]
The pipeline pattern: each step takes data in, processes it, and passes results to the next step. Five steps, five functions:
- step_read_lines() -- read and clean lines from the file
- step_parse_records() -- split pipe-delimited lines into dicts
- step_filter_active() -- keep only records with active-like statuses
- step_transform() -- normalise names, parse numeric values
- step_summarise() -- aggregate totals and averages
Plus an orchestrator function run_pipeline() that calls them in sequence.
Step 1: Read and clean lines¶
def step_read_lines(path: Path) -> list[str]:
raw = path.read_text(encoding="utf-8").splitlines()
return [line.strip() for line in raw if line.strip()]
This reads the file, splits into lines, strips whitespace, and filters out blank lines. The list comprehension does three things in one line -- but each is simple:
- line.strip() removes leading/trailing spaces
- if line.strip() skips blank lines
- The result is a clean list
Step 2: Parse records¶
Each line is pipe-delimited. Split on | and build a dictionary:
def step_parse_records(lines: list[str]) -> list[dict[str, str]]:
records = []
for line in lines:
parts = [p.strip() for p in line.split("|")]
if len(parts) < 3:
continue # skip malformed lines
records.append({
"name": parts[0],
"status": parts[1].lower(),
"value": parts[2],
})
return records
Predict before you scroll¶
Why does the code check len(parts) < 3? What would happen if a line had only one | separator?
Step 3: Filter active records¶
Not all records should be processed. Filter down to ones with "active", "ok", "pass", or "success" statuses:
def step_filter_active(records: list[dict]) -> list[dict]:
active_statuses = {"active", "ok", "pass", "success"}
return [r for r in records if r["status"] in active_statuses]
Using a set for active_statuses is intentional: checking if x in some_set is faster than if x in some_list because sets use hashing. For a small list it does not matter much, but it is a good habit.
Predict before you scroll¶
Looking at the sample data, which records have statuses that will pass the filter? How many of the 10 records will survive?
Step 4: Transform records¶
Raw data rarely matches the format you need. Transform names to title case and values to floats:
def step_transform(records: list[dict]) -> list[dict]:
transformed = []
for r in records:
name = r["name"].strip().title()
try:
value = float(r["value"])
except ValueError:
value = 0.0
transformed.append({"name": name, "status": r["status"], "value": value})
return transformed
The try/except ValueError is a safety net: if a value is not a valid number (like "N/A"), use 0.0 instead of crashing.
Step 5: Summarise¶
The final step produces aggregate statistics:
def step_summarise(records: list[dict]) -> dict:
if not records:
return {"count": 0, "total_value": 0.0, "average_value": 0.0, "names": []}
values = [r["value"] for r in records]
total = round(sum(values), 2)
return {
"count": len(records),
"total_value": total,
"average_value": round(total / len(records), 2),
"names": [r["name"] for r in records],
}
The guard at the top prevents division by zero if all records were filtered out.
The orchestrator¶
run_pipeline() calls each step in sequence and tracks counts at each stage:
def run_pipeline(input_path: Path) -> dict:
lines = step_read_lines(input_path)
records = step_parse_records(lines)
active = step_filter_active(records)
transformed = step_transform(active)
summary = step_summarise(transformed)
return {
"total_lines": len(lines),
"parsed_records": len(records),
"active_records": len(active),
"summary": summary,
}
flowchart TD
A["10 lines"] -->|parse| B["10 records"]
B -->|filter| C["7 active"]
C -->|transform| D["7 transformed"]
D -->|summarise| E["Summary: count=7, total=$X"]
Tracking the count at each stage is a debugging aid: if the final count looks wrong, you can see exactly where records were lost.
Common mistakes¶
| Mistake | Why it happens | How to fix |
|---|---|---|
line.split("|") leaves extra whitespace |
"alice | active" splits to "alice " and " active" |
Strip each part: [p.strip() for p in line.split("|")] |
| Division by zero in summarise | All records were filtered out (none were "active") | Check if not records at the start |
Non-numeric value crashes float() |
Data is messy -- not all values are numbers | Wrap in try/except ValueError, default to 0.0 |
| Pipeline order is wrong | Transforming before filtering, or summarising before transforming | Follow the logical sequence: read, parse, filter, transform, summarise |
Testing your solution¶
Run the tests from the project directory:
The eight tests verify each step individually and the pipeline as a whole:
- step_parse_records() correctly splits pipe-delimited lines
- step_filter_active() keeps only active/ok/pass/success records
- step_transform() normalises names and parses values
- step_summarise() computes correct totals
- run_pipeline() chains everything together correctly
What to explore next¶
- Add a Step 6:
step_export_csv()that writes the active, transformed records to a CSV file - Add a
--verboseflag that prints the intermediate result of each pipeline step as it executes