Get the FREE Ultimate OpenClaw Setup Guide →

postgres-job-queue

npx machina-cli add skill wpank/ai/postgres-job-queue --openclaw
Files (1)
SKILL.md
5.4 KB

PostgreSQL Job Queue

Production-ready job queue using PostgreSQL with priority scheduling, batch claiming, and progress tracking.

Installation

OpenClaw / Moltbot / Clawbot

npx clawhub@latest install postgres-job-queue

When to Use

  • Need job queue but want to avoid Redis/RabbitMQ dependencies
  • Jobs need priority-based scheduling
  • Long-running jobs need progress visibility
  • Jobs should survive service restarts

Schema Design

CREATE TABLE jobs (
    id UUID PRIMARY KEY DEFAULT gen_random_uuid(),
    job_type VARCHAR(50) NOT NULL,
    priority INT NOT NULL DEFAULT 100,
    status VARCHAR(20) NOT NULL DEFAULT 'pending',
    data JSONB NOT NULL DEFAULT '{}',
    
    -- Progress tracking
    progress INT DEFAULT 0,
    current_stage VARCHAR(100),
    events_count INT DEFAULT 0,
    
    -- Worker tracking
    worker_id VARCHAR(100),
    claimed_at TIMESTAMPTZ,
    
    -- Timing
    created_at TIMESTAMPTZ DEFAULT NOW(),
    started_at TIMESTAMPTZ,
    completed_at TIMESTAMPTZ,
    
    -- Retry handling
    attempts INT DEFAULT 0,
    max_attempts INT DEFAULT 3,
    last_error TEXT,
    
    CONSTRAINT valid_status CHECK (
        status IN ('pending', 'claimed', 'running', 'completed', 'failed', 'cancelled')
    )
);

-- Critical: Partial index for fast claiming
CREATE INDEX idx_jobs_claimable ON jobs (priority DESC, created_at ASC) 
    WHERE status = 'pending';
CREATE INDEX idx_jobs_worker ON jobs (worker_id) 
    WHERE status IN ('claimed', 'running');

Batch Claiming with SKIP LOCKED

CREATE OR REPLACE FUNCTION claim_job_batch(
    p_worker_id VARCHAR(100),
    p_job_types VARCHAR(50)[],
    p_batch_size INT DEFAULT 10
) RETURNS SETOF jobs AS $$
BEGIN
    RETURN QUERY
    WITH claimable AS (
        SELECT id
        FROM jobs
        WHERE status = 'pending'
          AND job_type = ANY(p_job_types)
          AND attempts < max_attempts
        ORDER BY priority DESC, created_at ASC
        LIMIT p_batch_size
        FOR UPDATE SKIP LOCKED  -- Critical: skip locked rows
    ),
    claimed AS (
        UPDATE jobs
        SET status = 'claimed',
            worker_id = p_worker_id,
            claimed_at = NOW(),
            attempts = attempts + 1
        WHERE id IN (SELECT id FROM claimable)
        RETURNING *
    )
    SELECT * FROM claimed;
END;
$$ LANGUAGE plpgsql;

Go Implementation

const (
    PriorityExplicit   = 150  // User-requested
    PriorityDiscovered = 100  // System-discovered
    PriorityBackfill   = 30   // Background backfills
)

type JobQueue struct {
    db       *pgx.Pool
    workerID string
}

func (q *JobQueue) Claim(ctx context.Context, types []string, batchSize int) ([]Job, error) {
    rows, err := q.db.Query(ctx,
        "SELECT * FROM claim_job_batch($1, $2, $3)",
        q.workerID, types, batchSize,
    )
    if err != nil {
        return nil, err
    }
    defer rows.Close()

    var jobs []Job
    for rows.Next() {
        var job Job
        if err := rows.Scan(&job); err != nil {
            return nil, err
        }
        jobs = append(jobs, job)
    }
    return jobs, nil
}

func (q *JobQueue) Complete(ctx context.Context, jobID uuid.UUID) error {
    _, err := q.db.Exec(ctx, `
        UPDATE jobs 
        SET status = 'completed',
            progress = 100,
            completed_at = NOW()
        WHERE id = $1`,
        jobID,
    )
    return err
}

func (q *JobQueue) Fail(ctx context.Context, jobID uuid.UUID, errMsg string) error {
    _, err := q.db.Exec(ctx, `
        UPDATE jobs 
        SET status = CASE 
                WHEN attempts >= max_attempts THEN 'failed' 
                ELSE 'pending' 
            END,
            last_error = $2,
            worker_id = NULL,
            claimed_at = NULL
        WHERE id = $1`,
        jobID, errMsg,
    )
    return err
}

Stale Job Recovery

func (q *JobQueue) RecoverStaleJobs(ctx context.Context, timeout time.Duration) (int, error) {
    result, err := q.db.Exec(ctx, `
        UPDATE jobs 
        SET status = 'pending',
            worker_id = NULL,
            claimed_at = NULL
        WHERE status IN ('claimed', 'running')
          AND claimed_at < NOW() - $1::interval
          AND attempts < max_attempts`,
        timeout.String(),
    )
    if err != nil {
        return 0, err
    }
    return int(result.RowsAffected()), nil
}

Decision Tree

ScenarioApproach
Need guaranteed deliveryPostgreSQL queue
Need sub-ms latencyUse Redis instead
< 1000 jobs/secPostgreSQL is fine
> 10000 jobs/secAdd Redis layer
Need strict orderingSingle worker per type

Related Skills


NEVER Do

  • NEVER use SELECT then UPDATE — Race condition. Use SKIP LOCKED.
  • NEVER claim without SKIP LOCKED — Workers will deadlock.
  • NEVER store large payloads — Store references only.
  • NEVER forget partial index — Claiming is slow without it.

Source

git clone https://github.com/wpank/ai/blob/main/skills/backend/postgres-job-queue/SKILL.mdView on GitHub

Overview

This skill provides a production-ready PostgreSQL job queue featuring priority scheduling, batch claiming, and progress tracking. It enables building reliable background task systems without external dependencies, with retry support and restart resilience.

How This Skill Works

Jobs are stored in a SQL table with status, priority, progress, and retry fields. Batch claiming uses a SELECT ... FOR UPDATE SKIP LOCKED to fetch claimable jobs in priority order, then updates their status to claimed and records worker_id, claimed_at, and updated attempts. A partial index on (priority DESC, created_at ASC) accelerates the claim process.

When to Use It

  • You need a queue without Redis/RabbitMQ dependencies
  • You require priority-based scheduling across jobs
  • Long-running tasks need visible progress and retry tracking
  • Jobs must survive service restarts or crashes
  • You want efficient batch claiming with SKIP LOCKED

Quick Start

  1. Step 1: Install: npx clawhub@latest install postgres-job-queue
  2. Step 2: Create the jobs table as defined and insert initial jobs
  3. Step 3: Run a worker that calls claim_job_batch and processes, then marks completed or failed

Best Practices

  • Make jobs idempotent so retries don't cause duplication
  • Choose a batchSize that balances latency and throughput
  • Expose and monitor progress fields (progress, current_stage)
  • Use max_attempts with backoff to control retries
  • Leverage the claimable partial index for fast selection

Example Use Cases

  • Image processing pipeline with per-image tasks
  • Email delivery queue with retry on transient failures
  • Data import/export jobs with progress updates
  • Analytics batch jobs aggregating large datasets
  • Report generation for scheduled, long-running tasks

Frequently Asked Questions

Add this skill to your agents
Sponsor this space

Reach thousands of developers