EzAI
Back to Blog
Use Cases Mar 8, 2026 8 min read

Build an AI Data Pipeline with Python and Claude

E

EzAI Team

Build an AI Data Pipeline with Python and Claude

Traditional ETL pipelines break on messy data. Inconsistent date formats, mixed languages, free-text fields with typos — regex and rule-based parsers choke on all of it. An AI data pipeline uses Claude to handle the ambiguity that deterministic code can't, turning chaotic inputs into structured, enriched outputs ready for your database or analytics layer.

In this guide, you'll build a production-ready data pipeline with Python that uses Claude via EzAI API to clean, normalize, and enrich CSV data. We'll process records in batches, handle failures gracefully, and keep costs under control with smart prompting.

Why AI in Your Data Pipeline?

Consider a CSV dump from a CRM. The company_name field contains "Google LLC", "google", "GOOGLE INC.", and "Alphabet (Google)". The address column mixes formats across countries. The notes field has free-text descriptions you need categorized.

A traditional pipeline needs hand-written rules for each edge case. With Claude in the loop, you describe the desired output once and let the model handle the variations. The tradeoff is latency and cost — which is why batching and caching matter.

Architecture Overview

AI data pipeline architecture: CSV input → batch processor → Claude API → structured JSON → output

Pipeline flow: raw CSV → batch chunking → Claude API (via EzAI) → validated JSON → clean output

The pipeline follows four stages: ingest raw data from CSV or API, chunk records into batches of 10-20 for efficient API usage, transform each batch through Claude with a structured prompt, and validate the output before writing to your destination. Each stage is isolated, so failures in one batch don't kill the whole run.

Setting Up the Project

You'll need Python 3.10+ and an EzAI API key. If you don't have one yet, grab a key here — new accounts get 15 free credits.

bash
pip install anthropic pandas

Building the Pipeline Core

The heart of the pipeline is a class that reads raw records, sends them to Claude in batches, and collects structured results. Here's the complete implementation:

python
import json, csv, time
from pathlib import Path
import anthropic

client = anthropic.Anthropic(
    api_key="sk-your-key",
    base_url="https://ezaiapi.com"
)

TRANSFORM_PROMPT = """You are a data cleaning pipeline. For each record, return a JSON object with:
- company_name: normalized legal name (e.g., "Google LLC")
- industry: one of [tech, finance, healthcare, retail, manufacturing, other]
- country_code: ISO 3166-1 alpha-2 (e.g., "US")
- confidence: float 0-1 for your classification confidence

Input records:
{records}

Return ONLY a JSON array. No markdown, no explanation."""

def chunk_records(records, size=15):
    for i in range(0, len(records), size):
        yield records[i:i + size]

def transform_batch(batch, retries=3):
    records_str = json.dumps(batch, indent=2)
    for attempt in range(retries):
        try:
            response = client.messages.create(
                model="claude-sonnet-4-5",
                max_tokens=4096,
                messages=[{
                    "role": "user",
                    "content": TRANSFORM_PROMPT.format(records=records_str)
                }]
            )
            return json.loads(response.content[0].text)
        except json.JSONDecodeError:
            print(f"JSON parse failed, attempt {attempt + 1}/{retries}")
            time.sleep(2 ** attempt)
        except anthropic.RateLimitError:
            print("Rate limited, backing off...")
            time.sleep(30)
    return None

The chunk_records generator splits your data into manageable batches. Sending 15 records per request hits the sweet spot between throughput and response quality — too many records and Claude starts dropping fields; too few and you burn tokens on repeated system context.

Running the Full Pipeline

Wire it together with a runner function that processes the entire CSV and writes clean output:

python
import pandas as pd

def run_pipeline(input_csv, output_csv):
    df = pd.read_csv(input_csv)
    raw_records = df.to_dict(orient="records")

    results = []
    failed_batches = []

    for i, batch in enumerate(chunk_records(raw_records)):
        print(f"Processing batch {i + 1}...")
        cleaned = transform_batch(batch)

        if cleaned:
            results.extend(cleaned)
        else:
            failed_batches.append(i)
            print(f"Batch {i + 1} failed after retries, skipping")

        time.sleep(0.5)  # gentle rate limiting

    out_df = pd.DataFrame(results)
    out_df.to_csv(output_csv, index=False)

    print(f"\n✅ Processed {len(results)}/{len(raw_records)} records")
    if failed_batches:
        print(f"⚠️ Failed batches: {failed_batches}")

    return out_df

# Run it
run_pipeline("messy_crm_export.csv", "clean_companies.csv")

Each batch gets three retry attempts with exponential backoff. Failed batches are logged but don't halt the pipeline — you can reprocess them later or inspect the raw data manually. The half-second delay between batches keeps you well within rate limits.

Adding Data Enrichment

Cleaning is just the start. The real power comes from enrichment — extracting structured information that doesn't exist in the raw data. Here's an enrichment prompt that pulls insights from free-text notes:

python
ENRICH_PROMPT = """Analyze these customer notes and extract structured data.

For each record, return:
- sentiment: positive | neutral | negative
- topics: list of up to 3 keywords
- action_required: boolean
- priority: low | medium | high | urgent
- summary: one-sentence summary (max 100 chars)

Notes:
{notes}

Return ONLY a JSON array."""

def enrich_notes(notes_batch):
    notes_str = json.dumps(notes_batch, indent=2)
    response = client.messages.create(
        model="claude-haiku-3-5",  # cheaper model for classification
        max_tokens=2048,
        messages=[{
            "role": "user",
            "content": ENRICH_PROMPT.format(notes=notes_str)
        }]
    )
    return json.loads(response.content[0].text)

Notice we use claude-haiku-3-5 for enrichment tasks. Classification and entity extraction don't need the reasoning depth of Sonnet — Haiku handles them at 10x lower cost. Use the right model for each stage. Check our model routing guide for more strategies.

Cost comparison: Sonnet vs Haiku for different pipeline stages

Model selection impact: using Haiku for classification stages cuts pipeline costs by 60-80%

Production Hardening

Before shipping this to production, add three things: output validation, checkpointing, and cost tracking.

Validate every Claude response against your expected schema. JSON parsing can succeed but still return garbage — missing fields, wrong types, values outside your allowed set. A quick validation function catches these before they hit your database:

python
VALID_INDUSTRIES = {"tech", "finance", "healthcare", "retail", "manufacturing", "other"}

def validate_record(record):
    required = ["company_name", "industry", "country_code", "confidence"]
    if not all(k in record for k in required):
        return False
    if record["industry"] not in VALID_INDUSTRIES:
        return False
    if not isinstance(record["confidence"], (int, float)):
        return False
    if len(record["country_code"]) != 2:
        return False
    return True

# Filter valid results
cleaned = [r for r in transform_batch(batch) if validate_record(r)]

For checkpointing, write results to disk after each batch. If the pipeline crashes at batch 47 of 100, you resume from 47 instead of starting over. Store the last processed index in a .checkpoint file and read it on startup.

Cost Breakdown

Running 1,000 records through this pipeline with a 15-record batch size costs roughly:

  • Transform stage (Sonnet): ~67 batches × ~2K tokens each ≈ 134K tokens → ~$0.40
  • Enrichment stage (Haiku): ~67 batches × ~1.5K tokens each ≈ 100K tokens → ~$0.03
  • Total: Under $0.50 for 1,000 records — less with prompt caching

Through EzAI's pricing, that same run costs roughly 30-50% less than hitting Anthropic directly. The pricing page has the exact per-model rates.

When Not to Use AI in Your Pipeline

AI pipelines aren't always the right call. Skip them when:

  • Your data follows strict, predictable formats — regex is faster and free
  • Latency matters more than accuracy — Claude adds 500ms-2s per batch
  • You're processing millions of records daily — cost scales linearly
  • The transformation is purely structural (CSV → JSON, date reformatting)

Use AI for the messy 20% of your data that breaks deterministic rules. Chain it after your regex/rule-based stages so you're only sending the hard cases to Claude.

Next Steps

You now have a working AI data pipeline that handles real-world messy data. Extend it further by adding a log analysis stage for monitoring pipeline health, or connect it to a FastAPI endpoint for on-demand processing.

The full source code is on GitHub. If you're running into rate limits on large datasets, check our guide on handling rate limits — it covers retry strategies, concurrent batching, and how EzAI's built-in queue management helps.


Related Posts