The Mine Works
Browse on Apify
From Raw HTML to Clean Dataset: Data Pipeline Architecture for AI Teams
← All posts
engineering September 22, 2025 · 7 min read

From Raw HTML to Clean Dataset: Data Pipeline Architecture for AI Teams

The full architecture for a production-grade web data pipeline — collection, validation, transformation, storage, and freshness management.

Building a one-off scraper is straightforward. Building a data pipeline that runs reliably for months — with fresh data, validated schemas, and zero silent failures — requires a different level of engineering discipline.

TL;DR: A production web data pipeline has 5 layers: collection (tag every batch with run IDs and item counts), validation (quarantine schema errors rather than passing them downstream), transformation (normalize and enrich before storage), storage (Postgres for queries, S3/GCS for history, vector store for semantic search), and freshness monitoring (alert when a source goes stale). Every layer needs an explicit failure handler — silent failures are the real production threat.

This guide is about the latter. It covers the full architecture for a production web data pipeline, from collection to consumption.

Architecture Overview

Collector → Validator → Transformer → Storage → Freshness Monitor
    ↓            ↓            ↓           ↓              ↓
  Failed?    Schema err?  Transform    Time-series   Staleness
  Retry or    Alert and     log         DB + lake      alert
  Dead-letter  quarantine

Every step has a failure mode, and every failure mode needs a handler.

Layer 1: Collection

Use a managed scraper service (Apify actors) rather than self-hosted crawlers for most use cases. The practical reason is failure handling: cloud actors automatically handle retries, proxy rotation, and memory management in ways that self-hosted scrapers require you to implement explicitly.

from apify_client import ApifyClient
import json
from datetime import datetime

client = ApifyClient('YOUR_API_TOKEN')

def collect_with_metadata(actor_id: str, run_input: dict) -> dict:
    """Collect data and attach pipeline metadata."""
    run = client.actor(actor_id).call(run_input=run_input)
    
    run_info = client.run(run['id']).get()
    
    return {
        'run_id': run['id'],
        'actor_id': actor_id,
        'started_at': run_info['startedAt'],
        'finished_at': run_info['finishedAt'],
        'status': run_info['status'],
        'dataset_id': run['defaultDatasetId'],
        'item_count': run_info['stats']['itemCount'],
        'collected_at': datetime.utcnow().isoformat(),
    }

Key collection practices:

  • Tag every batch with a run ID and timestamp
  • Record expected vs actual item count (enables anomaly detection)
  • Log run duration — significant deviations signal site changes

Layer 2: Validation

Never pass raw scraped data directly to downstream consumers. Validate schema, check for data quality signals, and quarantine suspicious records.

from dataclasses import dataclass
from typing import Any, Optional

@dataclass
class ValidationResult:
    record_id: str
    is_valid: bool
    errors: list[str]
    warnings: list[str]

def validate_job_posting(record: dict) -> ValidationResult:
    errors = []
    warnings = []
    
    # Required fields
    for field in ['title', 'company_slug', 'url', 'ats']:
        if not record.get(field):
            errors.append(f"Missing required field: {field}")
    
    # Type checks
    if record.get('published_at'):
        try:
            datetime.fromisoformat(record['published_at'].replace('Z', '+00:00'))
        except (ValueError, AttributeError):
            errors.append(f"Invalid published_at format: {record['published_at']}")
    
    # Business logic checks
    if record.get('title') and len(record['title']) < 3:
        errors.append(f"Title too short: {record['title']!r}")
    
    if record.get('description_plain') and len(record['description_plain']) < 50:
        warnings.append("Short job description — may be incomplete")
    
    return ValidationResult(
        record_id=record.get('id', 'unknown'),
        is_valid=len(errors) == 0,
        errors=errors,
        warnings=warnings,
    )

def validate_batch(records: list[dict], schema_validator) -> tuple[list, list]:
    valid = []
    quarantined = []
    
    for record in records:
        result = schema_validator(record)
        if result.is_valid:
            valid.append(record)
        else:
            quarantined.append({**record, '_validation_errors': result.errors})
    
    print(f"Validation: {len(valid)} valid, {len(quarantined)} quarantined")
    return valid, quarantined

Layer 3: Transformation

Standardize, enrich, and normalize before loading into your data store.

import re
from html2text import html2text

def transform_job_posting(raw: dict) -> dict:
    """Normalize a job posting to the canonical schema."""
    
    # Clean HTML description if present
    description_plain = raw.get('description_plain')
    if not description_plain and raw.get('description_html'):
        description_plain = html2text(raw['description_html'])
        # Remove excessive whitespace
        description_plain = re.sub(r'\n{3,}', '\n\n', description_plain).strip()
    
    # Normalize remote status
    is_remote = raw.get('is_remote')
    if is_remote is None:
        location = (raw.get('location') or '').lower()
        is_remote = any(kw in location for kw in ['remote', 'anywhere', 'distributed'])
    
    # Extract skills from description
    skills = extract_skills(description_plain or '')
    
    return {
        'id': raw['id'],
        'ats': raw['ats'],
        'company_slug': raw['company_slug'],
        'title': raw['title'].strip(),
        'department': (raw.get('department') or '').strip() or None,
        'location': (raw.get('location') or '').strip() or None,
        'is_remote': is_remote,
        'employment_type': raw.get('employment_type'),
        'url': raw['url'],
        'description_plain': description_plain,
        'skills': skills,
        'published_at': raw.get('published_at'),
        '_collected_at': raw['_collected_at'],
        '_run_id': raw['_run_id'],
    }

Layer 4: Storage

The storage architecture depends on your query patterns. For most AI data pipelines:

Raw data lake (S3/GCS):
  → Full history, every version, Parquet format

Operational database (Postgres):
  → Current snapshot, queryable by any field, indexed

Vector store (Pinecone/pgvector):
  → Semantic search over descriptions
# Write to Postgres using upsert (handle re-scrapes gracefully)
def upsert_jobs(conn, jobs: list[dict]):
    with conn.cursor() as cur:
        cur.executemany("""
            INSERT INTO jobs (id, ats, company_slug, title, department, location,
                              is_remote, url, description_plain, skills, published_at,
                              collected_at, run_id)
            VALUES (%(id)s, %(ats)s, %(company_slug)s, %(title)s, %(department)s,
                    %(location)s, %(is_remote)s, %(url)s, %(description_plain)s,
                    %(skills)s::text[], %(published_at)s, %(collected_at)s, %(run_id)s)
            ON CONFLICT (id) DO UPDATE SET
                title = EXCLUDED.title,
                description_plain = EXCLUDED.description_plain,
                collected_at = EXCLUDED.collected_at
        """, jobs)
        conn.commit()

Layer 5: Freshness Monitoring

Stale data is silent corruption. Build explicit staleness detection:

def check_data_freshness(conn) -> list[dict]:
    """Find data sources that haven't been updated recently."""
    alerts = []
    
    with conn.cursor() as cur:
        # Check last update per company
        cur.execute("""
            SELECT company_slug, MAX(collected_at) as last_seen, COUNT(*) as job_count
            FROM jobs
            GROUP BY company_slug
            HAVING MAX(collected_at) < NOW() - INTERVAL '7 days'
            ORDER BY last_seen ASC
        """)
        
        for row in cur.fetchall():
            alerts.append({
                'type': 'stale_source',
                'company': row[0],
                'last_updated': row[1].isoformat(),
                'job_count': row[2],
            })
    
    return alerts

The Complete Pipeline Script

def run_pipeline(actor_id: str, run_input: dict, validator, transformer, conn):
    # 1. Collect
    run_meta = collect_with_metadata(actor_id, run_input)
    raw_items = list(client.dataset(run_meta['dataset_id']).iterate_items())
    
    # Add pipeline metadata
    for item in raw_items:
        item['_run_id'] = run_meta['run_id']
        item['_collected_at'] = run_meta['collected_at']
    
    # 2. Validate
    valid, quarantined = validate_batch(raw_items, validator)
    if quarantined:
        save_quarantine(quarantined)  # Persist for review
    
    # 3. Transform
    transformed = [transformer(item) for item in valid]
    
    # 4. Store
    upsert_jobs(conn, transformed)
    
    # 5. Log pipeline run
    log_pipeline_run({
        'run_id': run_meta['run_id'],
        'actor': actor_id,
        'raw_count': len(raw_items),
        'valid_count': len(valid),
        'quarantine_count': len(quarantined),
        'loaded_count': len(transformed),
    })

The discipline is worth it. A pipeline that reports its own failure rates and flags stale data is one you can trust. One that silently passes bad data downstream is one that surprises you in production.

Frequently Asked Questions

What are the five layers of a production web data pipeline?

The five layers are: collection (scraping with run IDs and item counts per batch), validation (schema checking that quarantines bad records rather than passing them downstream), transformation (normalization and enrichment before storage), storage (Postgres for queries, S3/GCS for history, vector store for semantic search), and freshness monitoring (staleness alerts when a source stops updating). Each layer needs explicit failure handling — a layer that silently succeeds with bad data is worse than one that fails loudly.

How should you handle validation failures in a scraping pipeline?

Quarantine failed records to a separate table or dead-letter queue with the full error context. Never silently drop bad records or substitute null values. A good quarantine table includes: the raw scraped item, the validation error(s), the source URL, the run ID, and a timestamp. A spike in quarantine rate is often the first signal that a source has changed its schema — treat it as a monitoring alert, not just a data issue.

What storage architecture works best for web-scraped data?

Use a three-tier approach: (1) Postgres or SQLite for structured queries and recent data access; (2) object storage (S3, GCS) for the raw scraped archive — keep raw data indefinitely so you can reprocess it when transformation logic changes; (3) a vector database (Pinecone, ChromaDB, pgvector) if you need semantic search or RAG. The raw archive is the most important tier: transformations are lossy, and you will need to reprocess historical data when requirements change.

How do you detect when scraped data has gone stale?

Track the last-updated timestamp per source and alert when it exceeds a configurable threshold — 24 hours for daily sources, 7 days for weekly. Also track item count per run: a sudden 50%+ drop often means a source changed its structure or started blocking your scraper before schema validation catches it. Store a rolling item-count per source per day and alert on both zero-item runs and significant count drops.

What is the most common way production data pipelines fail silently?

Silent validation bypasses are the most common failure mode — exception handlers that catch errors and continue rather than quarantining the bad record. The second most common is type coercion: None or empty strings cast to 0 or "" pass schema validation but corrupt downstream aggregations. Third is timestamp normalization bugs mixing UTC and local time, which breaks time-series queries. All three are preventable with strict schema validation that rejects rather than coerces unexpected values.