Python data pipeline development
Patterns for building production-quality data processing pipelines with Python.
Targeted at Python 3.11+ for asyncio.TaskGroup and exception groups; Python 3.12+ for the lighter type X = ... syntax. Pin a 3.13+ runtime if you want the JIT or experimental free-threading; the patterns here don't depend on either.
Choosing a DataFrame engine: pandas vs polars vs DuckDB
For a long time pandas was the default for any tabular work in Python. As of 2026 the default has shifted: polars is the right pick for multi-GB pipelines on a single machine, DuckDB is the right pick when SQL or larger-than-RAM scans are involved, and pandas stays useful for small data and the ML/notebook ecosystem (scikit-learn, statsmodels, plotnine all speak it natively).
| Tool | When | Why |
|---|---|---|
| pandas | < ~1 GB data, ML interop, single-threaded familiarity | Mature, ubiquitous, eager DataFrame model. Slowest in benchmarks but most ecosystem support. |
| polars | 1 GB - tens of GB on one box, performance-critical pipelines | Multithreaded by default, lazy query engine, Arrow-native. ~5x speedup over pandas on filter / aggregate at 100M rows. |
| DuckDB | SQL workflows, larger-than-RAM, parquet/CSV scanning, joins across many files | Vectorized + pipelined execution, cost-based optimizer, streaming scans. Works great as a thin wrapper over a directory of parquet files. |
All three speak Apache Arrow, so zero-copy interop between them is the pragmatic answer most of the time:
import polars as pl
import duckdb
# Polars: read a directory of CSVs, filter, group
df = (
pl.scan_csv('data/articles_*.csv')
.filter(pl.col('published_at') >= '2026-01-01')
.group_by('source')
.agg(pl.len().alias('count'), pl.col('word_count').mean())
.collect()
)
# DuckDB: same shape with SQL, no intermediate copy
con = duckdb.connect()
df = con.execute("""
SELECT source, COUNT(*) AS count, AVG(word_count) AS avg_wc
FROM 'data/articles_*.csv'
WHERE published_at >= '2026-01-01'
GROUP BY source
""").pl() # returns a Polars DataFrame; use .df() for pandas
# Hand off to pandas only at the boundary that needs it (e.g. scikit-learn)
import pandas as pd
pdf = df.to_pandas()
If your pipeline already uses pandas everywhere, don't pre-emptively rewrite. Migrate the bottleneck stages first — typically the CSV-load + filter step.
Architecture patterns
Modular processor architecture
src/
├── workflow.py # Main orchestrator
├── dispatcher.py # Content-type router
├── processors/
│ ├── __init__.py
│ ├── base.py # Abstract base class
│ ├── article_processor.py
│ ├── video_processor.py
│ └── audio_processor.py
├── services/
│ ├── sheets_service.py # Google Sheets integration
│ ├── drive_service.py # Google Drive integration
│ └── ai_service.py # Gemini API wrapper
├── utils/
│ ├── logger.py
│ └── rate_limiter.py
└── config.py # Environment configuration
Dispatcher pattern
from typing import Protocol
from urllib.parse import urlparse
class Processor(Protocol):
def can_process(self, url: str) -> bool: ...
def process(self, url: str, metadata: dict) -> dict: ...
class Dispatcher:
def __init__(self):
self.processors: list[Processor] = [
ArticleProcessor(),
VideoProcessor(),
AudioProcessor(),
SocialProcessor(),
]
def dispatch(self, url: str, metadata: dict) -> dict:
for processor in self.processors:
if processor.can_process(url):
return processor.process(url, metadata)
raise ValueError(f"No processor found for URL: {url}")
# Pattern-based routing
class ArticleProcessor:
DOMAINS = ['nytimes.com', 'washingtonpost.com', 'medium.com']
def can_process(self, url: str) -> bool:
domain = urlparse(url).netloc.replace('www.', '')
return any(d in domain for d in self.DOMAINS)
CSV-based pipeline workflow
import csv
from pathlib import Path
from dataclasses import dataclass, asdict
from typing import Iterator
@dataclass
class Record:
id: str
url: str
title: str | None = None
content: str | None = None
status: str = 'pending'
def read_input(path: Path) -> Iterator[Record]:
with open(path, 'r', encoding='utf-8') as f:
reader = csv.DictReader(f)
for row in reader:
yield Record(**{k: v for k, v in row.items() if k in Record.__annotations__})
def write_output(records: list[Record], path: Path):
with open(path, 'w', encoding='utf-8', newline='') as f:
writer = csv.DictWriter(f, fieldnames=list(Record.__annotations__.keys()))
writer.writeheader()
writer.writerows(asdict(r) for r in records)
def process_batch(input_path: Path, output_path: Path):
dispatcher = Dispatcher()
results = []
for record in read_input(input_path):
try:
processed = dispatcher.dispatch(record.url, asdict(record))
record.status = 'completed'
record.title = processed.get('title')
record.content = processed.get('content')
except Exception as e:
record.status = f'failed: {e}'
results.append(record)
write_output(results, output_path)
Google Sheets integration
import gspread
from google.oauth2.service_account import Credentials
SCOPES = [
'https://www.googleapis.com/auth/spreadsheets',
'https://www.googleapis.com/auth/drive'
]
class SheetsService:
def __init__(self, credentials_path: str):
creds = Credentials.from_service_account_file(credentials_path, scopes=SCOPES)
self.client = gspread.authorize(creds)
def get_worksheet(self, spreadsheet_id: str, sheet_name: str):
spreadsheet = self.client.open_by_key(spreadsheet_id)
return spreadsheet.worksheet(sheet_name)
def read_all(self, worksheet) -> list[dict]:
return worksheet.get_all_records()
def append_row(self, worksheet, row: list):
worksheet.append_row(row, value_input_option='USER_ENTERED')
def batch_update(self, worksheet, updates: list[dict]):
"""Update multiple cells efficiently."""
# Format: [{'range': 'A1', 'values': [[value]]}]
worksheet.batch_update(updates, value_input_option='USER_ENTERED')
def find_row_by_id(self, worksheet, id_value: str, id_column: int = 1) -> int | None:
"""Find row number by ID value."""
try:
cell = worksheet.find(id_value, in_column=id_column)
return cell.row
except gspread.CellNotFound:
return None
Rate limiting
import time
from functools import wraps
from ratelimit import limits, sleep_and_retry
# Simple rate limiter
@sleep_and_retry
@limits(calls=10, period=60) # 10 calls per minute
def rate_limited_api_call(url: str):
return requests.get(url)
# Custom rate limiter with backoff
class RateLimiter:
def __init__(self, calls_per_minute: int = 10):
self.delay = 60 / calls_per_minute
self.last_call = 0
def wait(self):
elapsed = time.time() - self.last_call
if elapsed < self.delay:
time.sleep(self.delay - elapsed)
self.last_call = time.time()
# Usage
limiter = RateLimiter(calls_per_minute=10)
def fetch_with_rate_limit(url: str):
limiter.wait()
return requests.get(url)
Concurrent fetching with asyncio.TaskGroup (3.11+)
For I/O-bound stages (HTTP fetches, API calls), asyncio.TaskGroup plus httpx.AsyncClient runs many requests in parallel without the boilerplate of asyncio.gather. TaskGroup's structured-concurrency model means an exception in one task cancels the rest and surfaces as an ExceptionGroup — easier to reason about than gather(return_exceptions=True).
import asyncio
import httpx
async