From Raw HTML to Clean Dataset: Data Pipeline Architecture for AI Teams
The full architecture for a production-grade web data pipeline — collection, validation, transformation, storage, and freshness management.
Building a one-off scraper is straightforward. Building a data pipeline that runs reliably for months — with fresh data, validated schemas, and zero silent failures — requires a different level of engineering discipline.
TL;DR: A production web data pipeline has 5 layers: collection (tag every batch with run IDs and item counts), validation (quarantine schema errors rather than passing them downstream), transformation (normalize and enrich before storage), storage (Postgres for queries, S3/GCS for history, vector store for semantic search), and freshness monitoring (alert when a source goes stale). Every layer needs an explicit failure handler — silent failures are the real production threat.
This guide is about the latter. It covers the full architecture for a production web data pipeline, from collection to consumption.
Architecture Overview
Collector → Validator → Transformer → Storage → Freshness Monitor
↓ ↓ ↓ ↓ ↓
Failed? Schema err? Transform Time-series Staleness
Retry or Alert and log DB + lake alert
Dead-letter quarantine
Every step has a failure mode, and every failure mode needs a handler.
Layer 1: Collection
Use a managed scraper service (Apify actors) rather than self-hosted crawlers for most use cases. The practical reason is failure handling: cloud actors automatically handle retries, proxy rotation, and memory management in ways that self-hosted scrapers require you to implement explicitly.
from apify_client import ApifyClient
import json
from datetime import datetime
client = ApifyClient('YOUR_API_TOKEN')
def collect_with_metadata(actor_id: str, run_input: dict) -> dict:
"""Collect data and attach pipeline metadata."""
run = client.actor(actor_id).call(run_input=run_input)
run_info = client.run(run['id']).get()
return {
'run_id': run['id'],
'actor_id': actor_id,
'started_at': run_info['startedAt'],
'finished_at': run_info['finishedAt'],
'status': run_info['status'],
'dataset_id': run['defaultDatasetId'],
'item_count': run_info['stats']['itemCount'],
'collected_at': datetime.utcnow().isoformat(),
}
Key collection practices:
- Tag every batch with a run ID and timestamp
- Record expected vs actual item count (enables anomaly detection)
- Log run duration — significant deviations signal site changes
Layer 2: Validation
Never pass raw scraped data directly to downstream consumers. Validate schema, check for data quality signals, and quarantine suspicious records.
from dataclasses import dataclass
from typing import Any, Optional
@dataclass
class ValidationResult:
record_id: str
is_valid: bool
errors: list[str]
warnings: list[str]
def validate_job_posting(record: dict) -> ValidationResult:
errors = []
warnings = []
# Required fields
for field in ['title', 'company_slug', 'url', 'ats']:
if not record.get(field):
errors.append(f"Missing required field: {field}")
# Type checks
if record.get('published_at'):
try:
datetime.fromisoformat(record['published_at'].replace('Z', '+00:00'))
except (ValueError, AttributeError):
errors.append(f"Invalid published_at format: {record['published_at']}")
# Business logic checks
if record.get('title') and len(record['title']) < 3:
errors.append(f"Title too short: {record['title']!r}")
if record.get('description_plain') and len(record['description_plain']) < 50:
warnings.append("Short job description — may be incomplete")
return ValidationResult(
record_id=record.get('id', 'unknown'),
is_valid=len(errors) == 0,
errors=errors,
warnings=warnings,
)
def validate_batch(records: list[dict], schema_validator) -> tuple[list, list]:
valid = []
quarantined = []
for record in records:
result = schema_validator(record)
if result.is_valid:
valid.append(record)
else:
quarantined.append({**record, '_validation_errors': result.errors})
print(f"Validation: {len(valid)} valid, {len(quarantined)} quarantined")
return valid, quarantined
Layer 3: Transformation
Standardize, enrich, and normalize before loading into your data store.
import re
from html2text import html2text
def transform_job_posting(raw: dict) -> dict:
"""Normalize a job posting to the canonical schema."""
# Clean HTML description if present
description_plain = raw.get('description_plain')
if not description_plain and raw.get('description_html'):
description_plain = html2text(raw['description_html'])
# Remove excessive whitespace
description_plain = re.sub(r'\n{3,}', '\n\n', description_plain).strip()
# Normalize remote status
is_remote = raw.get('is_remote')
if is_remote is None:
location = (raw.get('location') or '').lower()
is_remote = any(kw in location for kw in ['remote', 'anywhere', 'distributed'])
# Extract skills from description
skills = extract_skills(description_plain or '')
return {
'id': raw['id'],
'ats': raw['ats'],
'company_slug': raw['company_slug'],
'title': raw['title'].strip(),
'department': (raw.get('department') or '').strip() or None,
'location': (raw.get('location') or '').strip() or None,
'is_remote': is_remote,
'employment_type': raw.get('employment_type'),
'url': raw['url'],
'description_plain': description_plain,
'skills': skills,
'published_at': raw.get('published_at'),
'_collected_at': raw['_collected_at'],
'_run_id': raw['_run_id'],
}
Layer 4: Storage
The storage architecture depends on your query patterns. For most AI data pipelines:
Raw data lake (S3/GCS):
→ Full history, every version, Parquet format
Operational database (Postgres):
→ Current snapshot, queryable by any field, indexed
Vector store (Pinecone/pgvector):
→ Semantic search over descriptions
# Write to Postgres using upsert (handle re-scrapes gracefully)
def upsert_jobs(conn, jobs: list[dict]):
with conn.cursor() as cur:
cur.executemany("""
INSERT INTO jobs (id, ats, company_slug, title, department, location,
is_remote, url, description_plain, skills, published_at,
collected_at, run_id)
VALUES (%(id)s, %(ats)s, %(company_slug)s, %(title)s, %(department)s,
%(location)s, %(is_remote)s, %(url)s, %(description_plain)s,
%(skills)s::text[], %(published_at)s, %(collected_at)s, %(run_id)s)
ON CONFLICT (id) DO UPDATE SET
title = EXCLUDED.title,
description_plain = EXCLUDED.description_plain,
collected_at = EXCLUDED.collected_at
""", jobs)
conn.commit()
Layer 5: Freshness Monitoring
Stale data is silent corruption. Build explicit staleness detection:
def check_data_freshness(conn) -> list[dict]:
"""Find data sources that haven't been updated recently."""
alerts = []
with conn.cursor() as cur:
# Check last update per company
cur.execute("""
SELECT company_slug, MAX(collected_at) as last_seen, COUNT(*) as job_count
FROM jobs
GROUP BY company_slug
HAVING MAX(collected_at) < NOW() - INTERVAL '7 days'
ORDER BY last_seen ASC
""")
for row in cur.fetchall():
alerts.append({
'type': 'stale_source',
'company': row[0],
'last_updated': row[1].isoformat(),
'job_count': row[2],
})
return alerts
The Complete Pipeline Script
def run_pipeline(actor_id: str, run_input: dict, validator, transformer, conn):
# 1. Collect
run_meta = collect_with_metadata(actor_id, run_input)
raw_items = list(client.dataset(run_meta['dataset_id']).iterate_items())
# Add pipeline metadata
for item in raw_items:
item['_run_id'] = run_meta['run_id']
item['_collected_at'] = run_meta['collected_at']
# 2. Validate
valid, quarantined = validate_batch(raw_items, validator)
if quarantined:
save_quarantine(quarantined) # Persist for review
# 3. Transform
transformed = [transformer(item) for item in valid]
# 4. Store
upsert_jobs(conn, transformed)
# 5. Log pipeline run
log_pipeline_run({
'run_id': run_meta['run_id'],
'actor': actor_id,
'raw_count': len(raw_items),
'valid_count': len(valid),
'quarantine_count': len(quarantined),
'loaded_count': len(transformed),
})
The discipline is worth it. A pipeline that reports its own failure rates and flags stale data is one you can trust. One that silently passes bad data downstream is one that surprises you in production.
Frequently Asked Questions
What are the five layers of a production web data pipeline?
The five layers are: collection (scraping with run IDs and item counts per batch), validation (schema checking that quarantines bad records rather than passing them downstream), transformation (normalization and enrichment before storage), storage (Postgres for queries, S3/GCS for history, vector store for semantic search), and freshness monitoring (staleness alerts when a source stops updating). Each layer needs explicit failure handling — a layer that silently succeeds with bad data is worse than one that fails loudly.
How should you handle validation failures in a scraping pipeline?
Quarantine failed records to a separate table or dead-letter queue with the full error context. Never silently drop bad records or substitute null values. A good quarantine table includes: the raw scraped item, the validation error(s), the source URL, the run ID, and a timestamp. A spike in quarantine rate is often the first signal that a source has changed its schema — treat it as a monitoring alert, not just a data issue.
What storage architecture works best for web-scraped data?
Use a three-tier approach: (1) Postgres or SQLite for structured queries and recent data access; (2) object storage (S3, GCS) for the raw scraped archive — keep raw data indefinitely so you can reprocess it when transformation logic changes; (3) a vector database (Pinecone, ChromaDB, pgvector) if you need semantic search or RAG. The raw archive is the most important tier: transformations are lossy, and you will need to reprocess historical data when requirements change.
How do you detect when scraped data has gone stale?
Track the last-updated timestamp per source and alert when it exceeds a configurable threshold — 24 hours for daily sources, 7 days for weekly. Also track item count per run: a sudden 50%+ drop often means a source changed its structure or started blocking your scraper before schema validation catches it. Store a rolling item-count per source per day and alert on both zero-item runs and significant count drops.
What is the most common way production data pipelines fail silently?
Silent validation bypasses are the most common failure mode — exception handlers that catch errors and continue rather than quarantining the bad record. The second most common is type coercion: None or empty strings cast to 0 or "" pass schema validation but corrupt downstream aggregations. Third is timestamp normalization bugs mixing UTC and local time, which breaks time-series queries. All three are preventable with strict schema validation that rejects rather than coerces unexpected values.
Web Scraping Legality in 2025: What Developers Actually Need to Know
The hiQ Labs ruling, CFAA, GDPR, ToS enforceability, and the robots.txt signal. A developer-focused legal primer on what web scraping is and is not
Web Scraping Without Getting Blocked in 2025: Proxies, Stealth, and Session Strategy
A technical guide to bypassing the five most common anti-bot systems — Cloudflare, Akamai, DataDome, PerimeterX, and reCAPTCHA