Project: ETL Data Pipeline
Build a Python ETL pipeline — extract data from CSV and APIs, transform with Pandas, load into SQLite, and schedule with a CLI.
ETL (Extract, Transform, Load) pipelines move data from sources into a structured store for analysis. This project builds a reusable pipeline that ingests sales data and produces daily summaries.
What You’ll Build
extract/ Read CSV files and fetch API data
transform/ Clean, validate, aggregate with Pandas
load/ Write to SQLite database
cli.py Run pipeline on demand or by date range
Output tables:
raw_sales— cleaned individual recordsdaily_summary— revenue and order count per day per region
Prerequisites
Setup
mkdir etl-pipeline && cd etl-pipeline
python -m venv .venv && source .venv/bin/activate
pip install pandas sqlalchemy requests click python-dotenv
Project Structure
etl-pipeline/
├── data/
│ └── sample_sales.csv # input data
├── etl/
│ ├── __init__.py
│ ├── extract.py
│ ├── transform.py
│ ├── load.py
│ └── pipeline.py
├── cli.py
├── tests/
│ └── test_transform.py
└── .env
Sample Input Data
# data/sample_sales.csv
order_id,date,region,product,quantity,unit_price
1001,2026-06-01,North,Widget,2,19.99
1002,2026-06-01,South,Gadget,1,49.99
1003,2026-06-02,North,Widget,5,19.99
1004,2026-06-02,East,Gadget,3,49.99
1005,2026-06-03,North,Widget,1,19.99
1006,2026-06-03,South,Widget,10,19.99
Extract
# etl/extract.py
import pandas as pd
import requests
from pathlib import Path
def extract_csv(path: str | Path) -> pd.DataFrame:
df = pd.read_csv(path, parse_dates=["date"])
return df
def extract_api(url: str, params: dict | None = None) -> pd.DataFrame:
"""Fetch JSON array from an API and return as DataFrame."""
response = requests.get(url, params=params, timeout=30)
response.raise_for_status()
return pd.DataFrame(response.json())
For local development, use the CSV. In production, replace or supplement with API extraction.
Transform
# etl/transform.py
import pandas as pd
REQUIRED_COLUMNS = {"order_id", "date", "region", "product", "quantity", "unit_price"}
def validate(df: pd.DataFrame) -> pd.DataFrame:
missing = REQUIRED_COLUMNS - set(df.columns)
if missing:
raise ValueError(f"Missing columns: {missing}")
df = df.dropna(subset=["order_id", "date", "quantity", "unit_price"])
df = df[df["quantity"] > 0]
df = df[df["unit_price"] >= 0]
df["order_id"] = df["order_id"].astype(int)
df["region"] = df["region"].str.strip().str.title()
df["product"] = df["product"].str.strip()
return df
def enrich(df: pd.DataFrame) -> pd.DataFrame:
df = df.copy()
df["revenue"] = df["quantity"] * df["unit_price"]
df["date"] = pd.to_datetime(df["date"]).dt.date
return df
def aggregate_daily(df: pd.DataFrame) -> pd.DataFrame:
summary = (
df.groupby(["date", "region"], as_index=False)
.agg(
order_count=("order_id", "count"),
total_quantity=("quantity", "sum"),
total_revenue=("revenue", "sum"),
)
.sort_values(["date", "region"])
)
return summary
def transform(raw: pd.DataFrame) -> tuple[pd.DataFrame, pd.DataFrame]:
cleaned = enrich(validate(raw))
summary = aggregate_daily(cleaned)
return cleaned, summary
Load
# etl/load.py
import pandas as pd
from sqlalchemy import create_engine, text
def get_engine(db_url: str = "sqlite:///data/warehouse.db"):
return create_engine(db_url)
def load(df: pd.DataFrame, table: str, engine, if_exists: str = "replace"):
df.to_sql(table, engine, if_exists=if_exists, index=False)
def load_incremental(df: pd.DataFrame, table: str, engine, key_column: str = "order_id"):
"""Append only records not already in the table."""
with engine.connect() as conn:
existing = pd.read_sql(f"SELECT {key_column} FROM {table}", conn)
if existing.empty:
df.to_sql(table, engine, if_exists="replace", index=False)
else:
new_rows = df[~df[key_column].isin(existing[key_column])]
if not new_rows.empty:
new_rows.to_sql(table, engine, if_exists="append", index=False)
return len(df)
Pipeline Orchestration
# etl/pipeline.py
import logging
from pathlib import Path
from etl.extract import extract_csv
from etl.transform import transform
from etl.load import get_engine, load
logging.basicConfig(level=logging.INFO, format="%(asctime)s %(levelname)s %(message)s")
logger = logging.getLogger(__name__)
def run_pipeline(csv_path: str, db_url: str = "sqlite:///data/warehouse.db"):
logger.info("Starting ETL pipeline")
# Extract
logger.info("Extracting from %s", csv_path)
raw = extract_csv(csv_path)
logger.info("Extracted %d rows", len(raw))
# Transform
logger.info("Transforming data")
sales, summary = transform(raw)
logger.info("Cleaned %d rows, %d summary rows", len(sales), len(summary))
# Load
engine = get_engine(db_url)
Path("data").mkdir(exist_ok=True)
load(sales, "raw_sales", engine)
load(summary, "daily_summary", engine)
logger.info("Loaded to %s", db_url)
return {"raw_rows": len(sales), "summary_rows": len(summary)}
CLI Entry Point
# cli.py
import click
from etl.pipeline import run_pipeline
@click.command()
@click.option("--input", "-i", default="data/sample_sales.csv", help="CSV input path")
@click.option("--db", default="sqlite:///data/warehouse.db", help="Database URL")
def main(input: str, db: str):
"""Run the sales ETL pipeline."""
result = run_pipeline(input, db)
click.echo(f"Done: {result['raw_rows']} sales rows, {result['summary_rows']} summary rows")
if __name__ == "__main__":
main()
Run:
python cli.py
python cli.py --input data/june_sales.csv --db sqlite:///data/warehouse.db
Query Results
import pandas as pd
from sqlalchemy import create_engine
engine = create_engine("sqlite:///data/warehouse.db")
sales = pd.read_sql("SELECT * FROM raw_sales LIMIT 5", engine)
print(sales)
summary = pd.read_sql("""
SELECT date, region, order_count, total_revenue
FROM daily_summary
ORDER BY total_revenue DESC
""", engine)
print(summary)
Expected summary output:
date region order_count total_revenue
0 2026-06-03 South 1 199.90
1 2026-06-02 East 1 149.97
...
Tests
# tests/test_transform.py
import pandas as pd
import pytest
from etl.transform import validate, enrich, aggregate_daily, transform
@pytest.fixture
def sample_df():
return pd.DataFrame({
"order_id": [1, 2, 3],
"date": ["2026-06-01", "2026-06-01", "2026-06-02"],
"region": ["north", " south ", "East"],
"product": ["Widget", "Gadget", "Widget"],
"quantity": [2, 0, 5], # row 2 should be filtered
"unit_price": [10.0, 20.0, 10.0],
})
def test_validate_filters_bad_rows(sample_df):
result = validate(sample_df)
assert len(result) == 2 # quantity=0 row removed
def test_enrich_adds_revenue(sample_df):
cleaned = validate(sample_df)
enriched = enrich(cleaned)
assert "revenue" in enriched.columns
assert enriched.iloc[0]["revenue"] == 20.0
def test_aggregate_daily(sample_df):
cleaned = enrich(validate(sample_df))
summary = aggregate_daily(cleaned)
assert "total_revenue" in summary.columns
assert summary["order_count"].sum() == 2
Run: pytest tests/ -v
Pipeline Architecture
CSV / API
│
▼
Extract ──► raw DataFrame
│
▼
Transform ──► validate → enrich → aggregate
│
├──► raw_sales (detail table)
└──► daily_summary (aggregated table)
│
▼
SQLite / PostgreSQL
Scheduling in Production
Run daily via cron:
# crontab -e
0 2 * * * cd /app/etl-pipeline && .venv/bin/python cli.py >> /var/log/etl.log 2>&1
Or trigger from CI/CD / Airflow / cloud scheduler after new files land in S3.
Switch to PostgreSQL for production:
python cli.py --db postgresql://user:pass@localhost/warehouse
Concepts Applied
- Pandas — DataFrames, groupby, validation
- Databases — SQLAlchemy,
to_sql - CLI Applications — Click commands
- Logging — pipeline observability
- Kafka — event-driven ETL (bonus: trigger on new events)
Bonus Challenges
- Idempotent runs — use
load_incrementalto skip duplicateorder_ids - Data quality report — log counts of dropped rows and reasons
- Parquet output — write intermediate files to
data/staging/for audit - API extract — pull live data from a public REST API and merge with CSV
- Great Expectations — add formal data validation rules
- Docker + cron — containerize and schedule with DevOps
- dbt integration — move SQL transformations to dbt models
ETL pipelines are the backbone of data engineering — this project gives you the core Extract → Transform → Load pattern used everywhere from startups to enterprise data warehouses.