EzAI
Back to Blog
Tutorial Mar 28, 2026 8 min read

Build an AI Cron Job Runner with Python

E

EzAI Team

Build an AI Cron Job Runner with Python

Every production system generates noise — log files pile up, error counts tick over, and customer messages go unread overnight. An AI cron job runner turns that noise into actionable intelligence on a schedule. Instead of manually sifting through data each morning, you configure a Python script to wake up at 6 AM, feed the last 24 hours of logs to Claude, and drop a summary in Slack before your first coffee.

This tutorial walks through building a complete cron-based AI task runner using Python, APScheduler, and the Claude API via EzAI. By the end, you'll have a daemon that runs scheduled AI jobs with retry logic, cost tracking, and dead-letter handling for failed runs.

Why Schedule AI Tasks?

Plenty of AI workloads don't need real-time responses. They need reliable, periodic execution:

  • Daily log summaries — condense 50K lines of server logs into a 200-word brief
  • Weekly report generation — pull metrics from your database and write a narrative report
  • Customer feedback digests — classify and summarize incoming support tickets every 4 hours
  • Anomaly detection — compare today's metrics against the past week and flag outliers
  • Content generation — draft social posts, changelog entries, or newsletter sections on schedule

Running these as cron jobs instead of ad-hoc scripts means they execute consistently, failures get logged, and you can track costs per job over time.

Project Setup

Start with a clean Python project. You need three dependencies: the Anthropic SDK (works seamlessly with EzAI's endpoint), APScheduler for cron-style scheduling, and structlog for structured logging.

bash
mkdir ai-cron-runner && cd ai-cron-runner
pip install anthropic apscheduler structlog

Set your EzAI API key as an environment variable. If you don't have one yet, grab a free key — new accounts get 15 credits, enough for thousands of scheduled runs with Sonnet.

bash
export EZAI_API_KEY="sk-your-key-here"

The AI Job Definition

Each scheduled job is a dataclass with a cron expression, a system prompt, a function that gathers input data, and a callback for the AI's output. This keeps job definitions clean and testable.

AI Cron Runner architecture flow diagram

Job lifecycle: schedule triggers → data collector runs → AI processes → callback delivers result

python
from dataclasses import dataclass
from typing import Callable
import anthropic, os, structlog

log = structlog.get_logger()

client = anthropic.Anthropic(
    api_key=os.environ["EZAI_API_KEY"],
    base_url="https://ezaiapi.com",
)

@dataclass
class AIJob:
    name: str
    cron: str                          # "0 6 * * *" = daily at 6 AM
    model: str                         # "claude-sonnet-4-5" or "claude-haiku-3-5"
    system_prompt: str
    collect_data: Callable[[], str]    # gathers input before each run
    on_result: Callable[[str], None]  # handles AI output
    max_tokens: int = 2048
    retries: int = 2

Building the Runner Engine

The runner wraps APScheduler and adds retry logic, cost tracking, and structured logging. Each job execution gets a unique run ID, so you can trace failures back to specific triggers in your logs.

python
import uuid, time
from apscheduler.schedulers.blocking import BlockingScheduler
from apscheduler.triggers.cron import CronTrigger

class AICronRunner:
    def __init__(self):
        self.scheduler = BlockingScheduler()
        self.stats = {"runs": 0, "failures": 0, "total_tokens": 0}

    def register(self, job: AIJob):
        parts = job.cron.split()
        trigger = CronTrigger(
            minute=parts[0], hour=parts[1],
            day=parts[2], month=parts[3],
            day_of_week=parts[4],
        )
        self.scheduler.add_job(
            self._execute, trigger,
            args=[job], id=job.name,
            misfire_grace_time=300,
        )
        log.info("job_registered", name=job.name, cron=job.cron)

    def _execute(self, job: AIJob):
        run_id = str(uuid.uuid4())[:8]
        log.info("job_start", job=job.name, run=run_id)

        # Collect input data
        try:
            user_input = job.collect_data()
        except Exception as e:
            log.error("data_collect_failed", job=job.name, err=str(e))
            self.stats["failures"] += 1
            return

        # Call AI with retries
        for attempt in range(job.retries + 1):
            try:
                t0 = time.monotonic()
                resp = client.messages.create(
                    model=job.model,
                    max_tokens=job.max_tokens,
                    system=job.system_prompt,
                    messages=[{"role": "user", "content": user_input}],
                )
                elapsed = time.monotonic() - t0
                tokens = resp.usage.input_tokens + resp.usage.output_tokens

                self.stats["runs"] += 1
                self.stats["total_tokens"] += tokens
                log.info("job_done", job=job.name, run=run_id,
                     tokens=tokens, seconds=round(elapsed, 2))

                job.on_result(resp.content[0].text)
                return

            except anthropic.RateLimitError:
                wait = 2 ** attempt
                log.warn("rate_limited", job=job.name, retry_in=wait)
                time.sleep(wait)
            except Exception as e:
                log.error("job_failed", job=job.name, attempt=attempt, err=str(e))
                if attempt == job.retries:
                    self.stats["failures"] += 1
                time.sleep(1)

    def start(self):
        log.info("runner_starting", jobs=len(self.scheduler.get_jobs()))
        self.scheduler.start()

The retry logic uses exponential backoff specifically for rate limits (which EzAI handles gracefully), and linear retries for other transient errors. After exhausting retries, the job gets logged as a failure but doesn't crash the runner.

Real Example: Daily Log Summarizer

Here's a concrete job that reads the last 24 hours of Nginx access logs, feeds them to Claude Haiku (fast and cheap — roughly $0.001 per summary), and writes the result to a file. Haiku is the right model here: log summaries don't need deep reasoning, just pattern recognition.

python
import subprocess
from datetime import datetime

def collect_nginx_logs() -> str:
    # Grab last 24h of access logs, capped at 500 lines
    result = subprocess.run(
        ["journalctl", "-u", "nginx", "--since", "24 hours ago",
         "--no-pager", "-q"],
        capture_output=True, text=True
    )
    lines = result.stdout.strip().split("\n")
    if len(lines) > 500:
        lines = lines[-500:]  # keep most recent
    return f"Nginx logs ({len(lines)} entries):\n" + "\n".join(lines)

def save_summary(text: str):
    date = datetime.now().strftime("%Y-%m-%d")
    path = f"/var/log/ai-summaries/nginx-{date}.md"
    with open(path, "w") as f:
        f.write(text)
    log.info("summary_saved", path=path)

log_summarizer = AIJob(
    name="daily-log-summary",
    cron="0 6 * * *",             # 6:00 AM daily
    model="claude-haiku-3-5",
    system_prompt="""You are a DevOps analyst. Summarize server logs into:
1. Traffic overview (total requests, unique IPs, peak hour)
2. Error summary (4xx and 5xx counts, top error paths)
3. Anomalies (unusual patterns, spikes, suspicious IPs)
4. One-line recommendation
Keep it under 200 words. Use markdown.""",
    collect_data=collect_nginx_logs,
    on_result=save_summary,
)

Adding a Webhook Delivery Callback

Most teams want results pushed somewhere — Slack, Discord, or a webhook endpoint. Here's a reusable callback that posts the AI output to any webhook URL, with a fallback to local file storage if the POST fails.

python
import httpx

def webhook_callback(url: str, fallback_dir: str = "/tmp/ai-cron"):
    def deliver(text: str):
        try:
            resp = httpx.post(url, json={
                "text": text,
                "source": "ai-cron-runner",
            }, timeout=10)
            resp.raise_for_status()
            log.info("webhook_sent", status=resp.status_code)
        except Exception as e:
            log.error("webhook_failed", err=str(e))
            # Fallback: save to disk so nothing is lost
            os.makedirs(fallback_dir, exist_ok=True)
            ts = datetime.now().strftime("%Y%m%d-%H%M%S")
            with open(f"{fallback_dir}/{ts}.md", "w") as f:
                f.write(text)
    return deliver

# Example: push AI summaries to Slack
slack_job = AIJob(
    name="error-digest",
    cron="0 */4 * * *",            # every 4 hours
    model="claude-sonnet-4-5",
    system_prompt="Analyze these error logs. List top 5 issues by severity. Be terse.",
    collect_data=collect_nginx_logs,
    on_result=webhook_callback("https://hooks.slack.com/services/YOUR/WEBHOOK/URL"),
)

Model Selection per Job

AI model selection guide for cron jobs

Pick the right model for each job — balance cost vs. reasoning depth

Not every cron job needs Opus-level reasoning. EzAI gives you access to 20+ models through one API key, so match the model to the task:

  • claude-haiku-3-5 — Log parsing, classification, simple summaries. ~$0.001 per run.
  • claude-sonnet-4-5 — Anomaly detection, report writing, multi-step analysis. ~$0.01 per run.
  • claude-opus-4 — Complex reasoning, code review, architecture decisions. ~$0.05 per run.
  • gpt-4o — Alternative for teams wanting OpenAI-compatible outputs. Same EzAI key works.

A team running 10 Haiku jobs daily spends about $0.30/month. That's the cost of a single Starbucks drip — for a month of automated AI intelligence.

Putting It All Together

Here's the main entry point that registers jobs and starts the scheduler. Run it as a systemd service or inside a Docker container for production use.

python
#!/usr/bin/env python3
# main.py — AI Cron Runner entry point

if __name__ == "__main__":
    runner = AICronRunner()

    # Register your jobs
    runner.register(log_summarizer)
    runner.register(slack_job)

    # Add a weekly code review summary
    runner.register(AIJob(
        name="weekly-pr-digest",
        cron="0 9 * * 1",           # Monday 9 AM
        model="claude-sonnet-4-5",
        system_prompt="""Summarize this week's merged PRs.
Group by: features, bugfixes, refactors.
Note any breaking changes. Keep under 300 words.""",
        collect_data=lambda: subprocess.run(
            ["gh", "pr", "list", "--state", "merged",
             "--limit", "30", "--json", "title,body,labels"],
            capture_output=True, text=True
        ).stdout,
        on_result=webhook_callback("https://hooks.slack.com/services/YOUR/WEBHOOK"),
    ))

    print("🕐 AI Cron Runner active. Press Ctrl+C to stop.")
    runner.start()

Cost Tracking and Observability

The stats dict in the runner is a starting point. For production, expose these metrics via a /health endpoint or push them to your monitoring stack. Here's a quick addition using Python's built-in HTTP server:

python
import json, threading
from http.server import HTTPServer, BaseHTTPRequestHandler

class HealthHandler(BaseHTTPRequestHandler):
    def do_GET(self):
        self.send_response(200)
        self.send_header("Content-Type", "application/json")
        self.end_headers()
        self.wfile.write(json.dumps({
            "status": "healthy",
            "total_runs": runner.stats["runs"],
            "failures": runner.stats["failures"],
            "tokens_used": runner.stats["total_tokens"],
            "jobs": [j.id for j in runner.scheduler.get_jobs()],
        }).encode())

# Start health server in background thread
threading.Thread(
    target=lambda: HTTPServer(("", 8090), HealthHandler).serve_forever(),
    daemon=True
).start()

Now you can hit curl localhost:8090 to see how many runs completed, how many failed, and your total token consumption across all jobs. Pair this with your EzAI dashboard for a complete cost picture.

Production Deployment Tips

  • Use systemd — wrap the runner as a .service unit with Restart=always so it survives reboots
  • Cap input size — always truncate input data before sending to the AI. A 100K-line log file will burn tokens fast. The collect_data function should enforce a ceiling (500-1000 lines is usually enough)
  • Separate environments — use different EzAI API keys for staging and production so you can track costs independently
  • Dead letter queue — persist failed job inputs to disk. After an outage, you can replay them manually instead of losing data
  • Model fallback — if Sonnet times out, retry with Haiku. EzAI supports multi-model fallback natively through its routing layer

What to Build Next

This runner handles the fundamentals — scheduling, retries, delivery. From here, you could add:

  • A YAML config file for job definitions so non-developers can add jobs
  • A SQLite-backed run history for audit trails
  • Streaming output for long-running analysis jobs
  • Conditional chaining — if the log summary mentions "critical", trigger an immediate follow-up job using multi-step chains

The full source is about 150 lines of Python. No framework lock-in, no complex dependencies. Just a scheduler, an API client, and the AI doing the heavy lifting on autopilot.


Related Posts