公開日 2026-05-18

PHP + PostgreSQLでジョブキューを作る(FOR UPDATE SKIP LOCKED 最小構成)

PHP(PDO)から PostgreSQL のテーブルをジョブキューとして使い、`FOR UPDATE SKIP LOCKED` で複数ワーカーから安全に 1 件ずつ取る流れと最小リトライを再現する。

目次

  1. 前提環境
  2. 1. ゴールと非対象
  3. 2. 最小デモ環境を作成する
  4. コードのポイント
  5. 3. キューの初期化とジョブ投入スクリプトを用意する
  6. コードのポイント
  7. 4. FOR UPDATE SKIP LOCKED で 1 件だけ取るワーカーを作る
  8. コードのポイント
  9. 5. 2 ワーカーで別々の ready job を取ることを確認する
  10. 6. 成功 / 失敗 / リトライの最小状態管理を確認する
  11. 7. 実務へ戻すときの勘所と詰まりどころ
  12. 8. まとめと次の一歩

この記事は、PDO + PostgreSQLSELECT ... FOR UPDATE を使った排他制御の基本は分かるが、複数ワーカーに広げたときの安全なジョブ取得方法にはまだ不安がある人を対象にしています。 題材はメール送信やレポート生成のような軽いバックグラウンドジョブです。PHP(PDO)から PostgreSQL のテーブルをキューとして使い、FOR UPDATE SKIP LOCKED で 1 ワーカー 1 ジョブを安全に取るところから、成功 / 失敗 / リトライの最小状態管理までを追います。

先に読んでおくと入りやすい記事:

前提環境

  • Windows 11
  • WSL2(Ubuntu)
  • VS Code(Remote - WSL)
  • Docker Desktop(WSL 連携有効)

以降のコマンドは、特記がない限り WSL 側ターミナルで実行します。
サービス名は appdb に固定しています。

1. ゴールと非対象

到達する状態:

  • PostgreSQL の jobs テーブルを、最小のジョブキューとして再現できる
  • FOR UPDATE SKIP LOCKED で 1 ワーカー 1 ジョブを claim し、複数ワーカーが同じジョブを取らないことを確認できる
  • queued / processing / done / failed と、available_at を使った最小リトライを実装できる

扱わない内容:

  • worker crash 後の stuck job 回収
  • LISTEN / NOTIFY や cron 連携
  • dead letter queue、指数バックオフ、優先度制御
  • Laravel Queue / Symfony Messenger の書き方
  • Redis / RabbitMQ / Kafka との詳細比較

前の記事では SELECT ... FOR UPDATE で「後続を待たせる」流れを見ました。
今回は逆に、別の ready job があるなら後続ワーカーを待たせずに次へ進めます。その違いを作るのが SKIP LOCKED です。

2. 最小デモ環境を作成する

記事用の最小デモから始めます。

mkdir -p ~/projects/postgresql-job-queue-skip-locked-demo
cd ~/projects/postgresql-job-queue-skip-locked-demo
mkdir -p bin docker/php docker/db
code .

compose.yml を用意します。

services:
  app:
    build:
      context: .
      dockerfile: docker/php/Dockerfile
    working_dir: /workspace
    volumes:
      - ./:/workspace
    env_file:
      - .env
    command: ["sleep", "infinity"]
    depends_on:
      db:
        condition: service_healthy

  db:
    image: postgres:17
    environment:
      POSTGRES_DB: ${POSTGRES_DB}
      POSTGRES_USER: ${POSTGRES_USER}
      POSTGRES_PASSWORD: ${POSTGRES_PASSWORD}
    ports:
      - "5432:5432"
    volumes:
      - db-data:/var/lib/postgresql/data
      - ./docker/db/init.sql:/docker-entrypoint-initdb.d/init.sql:ro
    healthcheck:
      test: ["CMD-SHELL", "pg_isready -U ${POSTGRES_USER} -d ${POSTGRES_DB}"]
      interval: 5s
      timeout: 3s
      retries: 20

volumes:
  db-data:

docker/php/Dockerfile は次の内容です。

FROM php:8.5-cli

RUN apt-get update \
    && apt-get install -y --no-install-recommends libpq-dev \
    && docker-php-ext-install pdo_pgsql \
    && rm -rf /var/lib/apt/lists/*

WORKDIR /workspace

.env.example は次の内容です。

POSTGRES_DB=app
POSTGRES_USER=app
POSTGRES_PASSWORD=app

APP_DB_HOST=db
APP_DB_PORT=5432
APP_DB_NAME=app
APP_DB_USER=app
APP_DB_PASS=app
APP_RETRY_DELAY_SECONDS=5

docker/db/init.sql には次を入れます。

CREATE TABLE IF NOT EXISTS jobs (
  id BIGSERIAL PRIMARY KEY,
  job_type TEXT NOT NULL,
  payload JSONB NOT NULL DEFAULT '{}'::jsonb,
  status TEXT NOT NULL CHECK (status IN ('queued', 'processing', 'done', 'failed')),
  attempts INTEGER NOT NULL DEFAULT 0 CHECK (attempts >= 0),
  max_attempts INTEGER NOT NULL DEFAULT 3 CHECK (max_attempts > 0),
  available_at TIMESTAMPTZ NOT NULL DEFAULT NOW(),
  reserved_by TEXT,
  reserved_at TIMESTAMPTZ,
  last_error TEXT,
  finished_at TIMESTAMPTZ,
  created_at TIMESTAMPTZ NOT NULL DEFAULT NOW(),
  updated_at TIMESTAMPTZ NOT NULL DEFAULT NOW()
);

CREATE INDEX IF NOT EXISTS idx_jobs_dequeue
  ON jobs (status, available_at, id)
  WHERE status = 'queued';

status は最小の 4 状態に絞りました。 available_at は「今すぐ取れるか」を表し、通常投入と retry 待機の両方に使います。

コードのポイント

CREATE TABLE IF NOT EXISTS jobs (
  id BIGSERIAL PRIMARY KEY,
  job_type TEXT NOT NULL,
  payload JSONB NOT NULL DEFAULT '{}'::jsonb,
  status TEXT NOT NULL CHECK (status IN ('queued', 'processing', 'done', 'failed')),
  attempts INTEGER NOT NULL DEFAULT 0 CHECK (attempts >= 0),
  max_attempts INTEGER NOT NULL DEFAULT 3 CHECK (max_attempts > 0),
  available_at TIMESTAMPTZ NOT NULL DEFAULT NOW(),
  reserved_by TEXT,
  reserved_at TIMESTAMPTZ,
  last_error TEXT,
  finished_at TIMESTAMPTZ,
  created_at TIMESTAMPTZ NOT NULL DEFAULT NOW(),
  updated_at TIMESTAMPTZ NOT NULL DEFAULT NOW()
);

CREATE INDEX IF NOT EXISTS idx_jobs_dequeue
  ON jobs (status, available_at, id)
  WHERE status = 'queued';
  • 5行目 CHECK (status IN (...)) — 有効な状態値を 4 種に絞る DB 制約。アプリ側の typo やバグで不正な状態が入ることを防ぐ。
  • 8行目 available_at TIMESTAMPTZ NOT NULL DEFAULT NOW() — デフォルトは NOW() なので即時投入はそのまま使える。retry 時は将来時刻をセットすることで、通常投入と待機中ジョブを同じカラムで表現する。
  • 17〜19行目 WHERE status = 'queued'queued 行だけを対象にする部分インデックス。donefailed が積み上がっても dequeue クエリのスキャン対象に含まれない。

起動します。

cp .env.example .env
docker compose up -d --build
docker compose exec db pg_isready -U app -d app
docker compose exec db psql -U app -d app -c "\dt"

jobs テーブルが見えれば準備完了です。

jobs テーブル作成直後の \dt 確認

詰まったとき:

docker compose logs db
docker compose down -v
docker compose up -d --build

3. キューの初期化とジョブ投入スクリプトを用意する

ここからは、初期化、投入、状態確認を毎回同じ手順で回せるようにします。

最初に置くのは bin/bootstrap.php です。

<?php
declare(strict_types=1);

function createPdo(): PDO
{
    $dsn = sprintf(
        'pgsql:host=%s;port=%s;dbname=%s',
        getenv('APP_DB_HOST') ?: 'db',
        getenv('APP_DB_PORT') ?: '5432',
        getenv('APP_DB_NAME') ?: 'app'
    );

    return new PDO(
        $dsn,
        getenv('APP_DB_USER') ?: 'app',
        getenv('APP_DB_PASS') ?: 'app',
        [
            PDO::ATTR_ERRMODE => PDO::ERRMODE_EXCEPTION,
            PDO::ATTR_DEFAULT_FETCH_MODE => PDO::FETCH_ASSOC,
        ]
    );
}

function retryDelaySeconds(): int
{
    $value = getenv('APP_RETRY_DELAY_SECONDS') ?: '5';
    $delay = filter_var($value, FILTER_VALIDATE_INT);

    if ($delay === false || $delay < 0) {
        throw new RuntimeException('APP_RETRY_DELAY_SECONDS must be a non-negative integer.');
    }

    return $delay;
}

bin/reset-demo.php を置きます。

<?php
declare(strict_types=1);

require __DIR__ . '/bootstrap.php';

$pdo = createPdo();
$result = $pdo->exec('TRUNCATE TABLE jobs RESTART IDENTITY');

if ($result === false) {
    fwrite(STDERR, "failed to truncate jobs\n");
    exit(1);
}

echo "queue reset completed" . PHP_EOL;

bin/enqueue-job.php を用意します。

<?php
declare(strict_types=1);

require __DIR__ . '/bootstrap.php';

$jobType = $argv[1] ?? '';
$payloadJson = $argv[2] ?? '{}';
$maxAttempts = isset($argv[3]) ? (int) $argv[3] : 3;

if ($jobType === '' || $maxAttempts < 1) {
    fwrite(
        STDERR,
        "Usage: php bin/enqueue-job.php JOB_TYPE JSON_PAYLOAD [MAX_ATTEMPTS]" . PHP_EOL
    );
    exit(1);
}

try {
    $payload = json_decode($payloadJson, true, 512, JSON_THROW_ON_ERROR);
} catch (JsonException $e) {
    fwrite(STDERR, 'Invalid JSON payload: ' . $e->getMessage() . PHP_EOL);
    exit(1);
}

if (!is_array($payload)) {
    fwrite(STDERR, "JSON payload must decode to an object or array" . PHP_EOL);
    exit(1);
}

$pdo = createPdo();
$stmt = $pdo->prepare(
    "INSERT INTO jobs (job_type, payload, status, max_attempts)
     VALUES (:job_type, CAST(:payload AS JSONB), 'queued', :max_attempts)
     RETURNING id"
);
$stmt->bindValue(':job_type', $jobType, PDO::PARAM_STR);
$stmt->bindValue(
    ':payload',
    json_encode($payload, JSON_UNESCAPED_UNICODE | JSON_THROW_ON_ERROR),
    PDO::PARAM_STR
);
$stmt->bindValue(':max_attempts', $maxAttempts, PDO::PARAM_INT);
$stmt->execute();

$jobId = $stmt->fetchColumn();

if ($jobId === false) {
    fwrite(STDERR, "failed to enqueue job\n");
    exit(1);
}

printf("enqueued job #%d" . PHP_EOL, (int) $jobId);

ここでは status'queued' で明示的に入れます。今回の init.sql では statusDEFAULT を付けていないため、省略すると null value in column "status" で投入に失敗します。

bin/show-queue.php は queue 状態を JSON で確認するためのスクリプトです。

<?php
declare(strict_types=1);

require __DIR__ . '/bootstrap.php';

$pdo = createPdo();
$rows = $pdo->query(
    'SELECT id,
            job_type,
            status,
            attempts,
            max_attempts,
            reserved_by,
            reserved_at,
            available_at,
            finished_at,
            last_error,
            payload::text AS payload_json
       FROM jobs
      ORDER BY id ASC'
)->fetchAll();

$jobs = array_map(
    static function (array $row): array {
        return [
            'id' => (int) $row['id'],
            'job_type' => (string) $row['job_type'],
            'status' => (string) $row['status'],
            'attempts' => (int) $row['attempts'],
            'max_attempts' => (int) $row['max_attempts'],
            'reserved_by' => $row['reserved_by'] !== null ? (string) $row['reserved_by'] : null,
            'reserved_at' => $row['reserved_at'] !== null ? (string) $row['reserved_at'] : null,
            'available_at' => (string) $row['available_at'],
            'finished_at' => $row['finished_at'] !== null ? (string) $row['finished_at'] : null,
            'last_error' => $row['last_error'] !== null ? (string) $row['last_error'] : null,
            'payload' => json_decode((string) $row['payload_json'], true, 512, JSON_THROW_ON_ERROR),
        ];
    },
    $rows
);

echo json_encode($jobs, JSON_PRETTY_PRINT | JSON_UNESCAPED_UNICODE) . PHP_EOL;

一度リセットしてから、ジョブを 3 件入れます。

docker compose exec app php bin/reset-demo.php
docker compose exec app php bin/enqueue-job.php send-mail '{"to":"alice@example.com"}'
docker compose exec app php bin/enqueue-job.php send-mail '{"to":"bob@example.com"}'
docker compose exec app php bin/enqueue-job.php build-report '{"report_id":42,"fail_once":true}' 2
docker compose exec app php bin/show-queue.php

初期状態は次のようになります。

[
    {
        "id": 1,
        "job_type": "send-mail",
        "status": "queued",
        "attempts": 0,
        "max_attempts": 3,
        "reserved_by": null,
        "reserved_at": null,
        "available_at": "...",
        "finished_at": null,
        "last_error": null,
        "payload": {
            "to": "alice@example.com"
        }
    },
    {
        "id": 2,
        "job_type": "send-mail",
        "status": "queued",
        "attempts": 0,
        "max_attempts": 3,
        "reserved_by": null,
        "reserved_at": null,
        "available_at": "...",
        "finished_at": null,
        "last_error": null,
        "payload": {
            "to": "bob@example.com"
        }
    },
    {
        "id": 3,
        "job_type": "build-report",
        "status": "queued",
        "attempts": 0,
        "max_attempts": 2,
        "reserved_by": null,
        "reserved_at": null,
        "available_at": "...",
        "finished_at": null,
        "last_error": null,
        "payload": {
            "report_id": 42,
            "fail_once": true
        }
    }
]

payload.fail_once は 6 章の retry 用です。
ジョブの見え方を固定したいので、ここでは投入側と確認側を先に用意してから worker に進みます。

ジョブ投入直後の queue 状態

コードのポイント

$stmt = $pdo->prepare(
    "INSERT INTO jobs (job_type, payload, status, max_attempts)
     VALUES (:job_type, CAST(:payload AS JSONB), 'queued', :max_attempts)
     RETURNING id"
);
$stmt->bindValue(':job_type', $jobType, PDO::PARAM_STR);
$stmt->bindValue(
    ':payload',
    json_encode($payload, JSON_UNESCAPED_UNICODE | JSON_THROW_ON_ERROR),
    PDO::PARAM_STR
);
  • 3行目 CAST(:payload AS JSONB) — PDO はプレースホルダーを文字列として渡すため、JSONB カラムへの INSERT には明示的なキャストが必要。省略すると型不一致エラーになる場合がある。
  • 8〜10行目 json_encode(..., PDO::PARAM_STR) — PHP 配列を JSON 文字列にしてからバインドし、SQL 側の CASTJSONB に変換する。JSON_THROW_ON_ERROR を付けることで、エンコード失敗を例外として検出できる。

4. FOR UPDATE SKIP LOCKED で 1 件だけ取るワーカーを作る

次は work-once.php です。1 回の起動で 1 件だけ claim(ジョブを取得して自分の担当にすること)し、結果を done / queued / failed へ反映します。

bin/work-once.php を作成します。

<?php
declare(strict_types=1);

require __DIR__ . '/bootstrap.php';

$workerId = $argv[1] ?? '';
$processingSeconds = isset($argv[2]) ? (int) $argv[2] : 0;

if ($workerId === '' || $processingSeconds < 0) {
    fwrite(
        STDERR,
        "Usage: php bin/work-once.php WORKER_ID [PROCESSING_SECONDS]" . PHP_EOL
    );
    exit(1);
}

$pdo = createPdo();

try {
    $pdo->beginTransaction();

    $claim = $pdo->prepare(
        <<<'SQL'
WITH next_job AS (
    SELECT id
      FROM jobs
     WHERE status = 'queued'
       AND available_at <= NOW()
     ORDER BY available_at ASC, id ASC
     FOR UPDATE SKIP LOCKED
     LIMIT 1
)
UPDATE jobs
   SET status = 'processing',
       attempts = attempts + 1,
       reserved_by = :worker_id,
       reserved_at = NOW(),
       updated_at = NOW(),
       last_error = NULL
 WHERE id = (SELECT id FROM next_job)
 RETURNING id,
           job_type,
           payload::text AS payload_json,
           attempts,
           max_attempts
SQL
    );
    $claim->execute([':worker_id' => $workerId]);
    $job = $claim->fetch();

    if (!is_array($job)) {
        $pdo->commit();
        printf("[%s] no ready jobs" . PHP_EOL, $workerId);
        exit(0);
    }

    $pdo->commit();

    $jobId = (int) $job['id'];
    $jobType = (string) $job['job_type'];
    $attempts = (int) $job['attempts'];
    $maxAttempts = (int) $job['max_attempts'];
    $payload = json_decode((string) $job['payload_json'], true, 512, JSON_THROW_ON_ERROR);

    if (!is_array($payload)) {
        throw new RuntimeException('Job payload must decode to an object or array.');
    }

    printf(
        "[%s] claimed job #%d (%s), attempt %d/%d" . PHP_EOL,
        $workerId,
        $jobId,
        $jobType,
        $attempts,
        $maxAttempts
    );

    if ($processingSeconds > 0) {
        printf(
            "[%s] processing for %d second(s)" . PHP_EOL,
            $workerId,
            $processingSeconds
        );
        sleep($processingSeconds);
    }

    $shouldFailOnce = ($payload['fail_once'] ?? false) === true && $attempts === 1;

    if ($shouldFailOnce) {
        $errorMessage = 'simulated failure on first attempt';

        if ($attempts < $maxAttempts) {
            $retry = $pdo->prepare(
                "UPDATE jobs
                    SET status = 'queued',
                        available_at = NOW() + (:retry_delay * INTERVAL '1 second'),
                        reserved_by = NULL,
                        reserved_at = NULL,
                        finished_at = NULL,
                        last_error = :last_error,
                        updated_at = NOW()
                  WHERE id = :id"
            );
            $retry->bindValue(':retry_delay', retryDelaySeconds(), PDO::PARAM_INT);
            $retry->bindValue(':last_error', $errorMessage, PDO::PARAM_STR);
            $retry->bindValue(':id', $jobId, PDO::PARAM_INT);
            $retry->execute();

            printf(
                "[%s] job #%d failed, re-queued after %d second(s)" . PHP_EOL,
                $workerId,
                $jobId,
                retryDelaySeconds()
            );
            exit(0);
        }

        $fail = $pdo->prepare(
            "UPDATE jobs
                SET status = 'failed',
                    finished_at = NOW(),
                    last_error = :last_error,
                    updated_at = NOW()
              WHERE id = :id"
        );
        $fail->bindValue(':last_error', $errorMessage, PDO::PARAM_STR);
        $fail->bindValue(':id', $jobId, PDO::PARAM_INT);
        $fail->execute();

        printf("[%s] job #%d failed permanently" . PHP_EOL, $workerId, $jobId);
        exit(0);
    }

    $done = $pdo->prepare(
        "UPDATE jobs
            SET status = 'done',
                finished_at = NOW(),
                last_error = NULL,
                updated_at = NOW()
          WHERE id = :id"
    );
    $done->bindValue(':id', $jobId, PDO::PARAM_INT);
    $done->execute();

    printf("[%s] job #%d completed" . PHP_EOL, $workerId, $jobId);
} catch (Throwable $e) {
    if ($pdo->inTransaction()) {
        $pdo->rollBack();
    }

    fwrite(STDERR, $e->getMessage() . PHP_EOL);
    exit(1);
}

やりたいことは単純で、まだ取られていない ready job を 1 件選び、その場で processing に変えることです。これを 1 回の claim トランザクションでまとめて行います。

claim 部分の核は次の SQL です。

WITH next_job AS (
    SELECT id
      FROM jobs
     WHERE status = 'queued'
       AND available_at <= NOW()
     ORDER BY available_at ASC, id ASC
     FOR UPDATE SKIP LOCKED
     LIMIT 1
)
UPDATE jobs
   SET status = 'processing',
       attempts = attempts + 1,
       reserved_by = :worker_id,
       reserved_at = NOW()
 WHERE id = (SELECT id FROM next_job)
 RETURNING ...

このデモでは attempts を「失敗回数」ではなく「処理開始回数」として扱うため、claim 時点で 1 増やしています。

この場面で押さえたいのは 2 点です。

  1. SKIP LOCKED は claim の瞬間だけ使う
  2. 実処理の sleep() や本物のメール送信は、コミット後に進める

在庫引当の記事では、後続側は同じ行を待つ形でした。
ジョブキューでは、別の ready job があるなら待たせず次へ進めたい場面が増えます。SKIP LOCKED はその差を作るためのオプションです。

SKIP LOCKED は「競合中の claim で待たない」ための仕組みで、実行中ジョブを表す本体は status = 'processing'reserved_* です。

もう 1 つ重要なのは、コミット後に同じ job が再取得されない理由です。 ロックは claim トランザクションの終了で解放されます。とはいえ、その前に status = 'processing' へ更新しているため、次の worker は queued 条件に引っかかりません。

コードのポイント

        if ($attempts < $maxAttempts) {
            $retry = $pdo->prepare(
                "UPDATE jobs
                    SET status = 'queued',
                        available_at = NOW() + (:retry_delay * INTERVAL '1 second'),
                        reserved_by = NULL,
                        reserved_at = NULL,
                        finished_at = NULL,
                        last_error = :last_error,
                        updated_at = NOW()
                  WHERE id = :id"
            );
  • 4行目 status = 'queued' — 失敗したジョブを再度 queued に戻す。available_at が未来時刻であれば、次の dequeue クエリの available_at <= NOW() 条件でまだ取得されない。
  • 5行目 available_at = NOW() + (:retry_delay * INTERVAL '1 second') — 遅延秒数を INTERVAL に変換してリトライ開始時刻をずらす。available_at を 1 列持つだけで、通常投入とリトライ待機を同じ WHERE 条件で扱える。

5. 2 ワーカーで別々の ready job を取ることを確認する

3 件のジョブを入れ直してから確認します。 この手順でまず確認したいのは、2 ワーカーが同じ job を二重取得せず、別々の ready job を取れることです。 なお、今回の work-once.php は claim 後すぐにコミットするので、SKIP LOCKED による「ロック中の行を飛ばす」瞬間そのものは見えにくい場合があります。ここで見ている中心は、短い claim と processing への状態更新を組み合わせた安全な取得です。

先に流れだけ図で見ると、worker-a が job #1 を claim しても、worker-b は同じ job を再取得せず、次の ready job へ進みます。ここで差を作っているのが FOR UPDATE SKIP LOCKED です。

sequenceDiagram
    participant WA as worker-a
    participant DB as PostgreSQL
    participant WB as worker-b

    WA->>DB: BEGIN
    WA->>DB: claim next queued job with SKIP LOCKED
    DB-->>WA: job #1
    WA->>DB: UPDATE job #1 -> processing
    WA->>DB: COMMIT

    WB->>DB: BEGIN
    WB->>DB: claim next queued job with SKIP LOCKED
    DB-->>WB: job #2
    WB->>DB: UPDATE job #2 -> processing
    WB->>DB: COMMIT

    WA->>DB: UPDATE job #1 -> done
    WB->>DB: UPDATE job #2 -> done

ready job が 1 件しかない場合、後続ワーカーは次の候補を取れないので no ready jobs になります。この記事では、まず「別の ready job があるなら待たずに進む」形を先に確認します。

docker compose exec app php bin/reset-demo.php
docker compose exec app php bin/enqueue-job.php send-mail '{"to":"alice@example.com"}'
docker compose exec app php bin/enqueue-job.php send-mail '{"to":"bob@example.com"}'
docker compose exec app php bin/enqueue-job.php build-report '{"report_id":42,"fail_once":true}' 2

ターミナルA:

docker compose exec app php bin/work-once.php worker-a 5

ターミナルB:

docker compose exec app php bin/work-once.php worker-b 0

期待する出力:

ターミナルA:

[worker-a] claimed job #1 (send-mail), attempt 1/3
[worker-a] processing for 5 second(s)
[worker-a] job #1 completed

ターミナルB:

[worker-b] claimed job #2 (send-mail), attempt 1/3
[worker-b] job #2 completed

worker-b は job #1 を再取得せず、次の ready job へ進みます。
claim の瞬間に row lock が残っていれば SKIP LOCKED で飛ばし、すでにコミット済みなら status = 'processing' のため候補から外れます。今回のデモでは後者として見えることもありますが、どちらにせよ同じ job #1 は取りません。

実行後の状態を確認します。

docker compose exec app php bin/show-queue.php

期待する結果:

[
    {
        "id": 1,
        "job_type": "send-mail",
        "status": "done",
        "attempts": 1,
        "max_attempts": 3,
        "reserved_by": "worker-a",
        "reserved_at": "...",
        "available_at": "...",
        "finished_at": "...",
        "last_error": null,
        "payload": {
            "to": "alice@example.com"
        }
    },
    {
        "id": 2,
        "job_type": "send-mail",
        "status": "done",
        "attempts": 1,
        "max_attempts": 3,
        "reserved_by": "worker-b",
        "reserved_at": "...",
        "available_at": "...",
        "finished_at": "...",
        "last_error": null,
        "payload": {
            "to": "bob@example.com"
        }
    },
    {
        "id": 3,
        "job_type": "build-report",
        "status": "queued",
        "attempts": 0,
        "max_attempts": 2,
        "reserved_by": null,
        "reserved_at": null,
        "available_at": "...",
        "finished_at": null,
        "last_error": null,
        "payload": {
            "report_id": 42,
            "fail_once": true
        }
    }
]

ORDER BY available_at, id を入れているので、ready な job が複数あるときの取り順も追いやすくなります。

2 ワーカー実行後の queue 状態

6. 成功 / 失敗 / リトライの最小状態管理を確認する

次は job #3 を使って retry を見ます。
この job は 1 回目だけ失敗し、2 回目で成功するようにしてあります。

docker compose exec app php bin/work-once.php worker-c 0
docker compose exec app php bin/show-queue.php

期待する出力:

[worker-c] claimed job #3 (build-report), attempt 1/2
[worker-c] job #3 failed, re-queued after 5 second(s)

この時点の queue 状態:

[
    {
        "id": 1,
        "job_type": "send-mail",
        "status": "done",
        "attempts": 1,
        "max_attempts": 3,
        "reserved_by": "worker-a",
        "reserved_at": "...",
        "available_at": "...",
        "finished_at": "...",
        "last_error": null,
        "payload": {
            "to": "alice@example.com"
        }
    },
    {
        "id": 2,
        "job_type": "send-mail",
        "status": "done",
        "attempts": 1,
        "max_attempts": 3,
        "reserved_by": "worker-b",
        "reserved_at": "...",
        "available_at": "...",
        "finished_at": "...",
        "last_error": null,
        "payload": {
            "to": "bob@example.com"
        }
    },
    {
        "id": 3,
        "job_type": "build-report",
        "status": "queued",
        "attempts": 1,
        "max_attempts": 2,
        "reserved_by": null,
        "reserved_at": null,
        "available_at": "... future ...",
        "finished_at": null,
        "last_error": "simulated failure on first attempt",
        "payload": {
            "report_id": 42,
            "fail_once": true
        }
    }
]

statusqueued に戻っていますが、available_at が未来時刻なので、すぐには再取得されません。
この 1 列があるだけで、遅延実行と retry 待機を同じ形で扱えます。

少し待ってからもう一度 worker を動かします。

sleep 6
docker compose exec app php bin/work-once.php worker-d 0
docker compose exec app php bin/show-queue.php

期待する出力:

[worker-d] claimed job #3 (build-report), attempt 2/2
[worker-d] job #3 completed

成功と retry の確認はここまでです。
次は failed を確認するために、max_attempts = 1 の job を別で 1 件だけ入れます。

docker compose exec app php bin/reset-demo.php
docker compose exec app php bin/enqueue-job.php build-report '{"report_id":99,"fail_once":true}' 1
docker compose exec app php bin/work-once.php worker-e 0
docker compose exec app php bin/show-queue.php

期待する結果:

[
    {
        "id": 1,
        "job_type": "build-report",
        "status": "failed",
        "attempts": 1,
        "max_attempts": 1,
        "reserved_by": "worker-e",
        "reserved_at": "...",
        "available_at": "...",
        "finished_at": "...",
        "last_error": "simulated failure on first attempt",
        "payload": {
            "report_id": 99,
            "fail_once": true
        }
    }
]

この最小構成では、次の 3 つで十分です。

  • 成功したら done
  • 再試行したいなら queued に戻し、available_at を未来へずらす
  • 上限到達なら failedlast_error を残す
max_attempts 到達で failed になった queue 状態

7. 実務へ戻すときの勘所と詰まりどころ

このデモを実務コードへ戻すときに、最低限外したくない点は 5 つです。

  1. claim トランザクションを長く持たない

今回は SELECT ... FOR UPDATE SKIP LOCKEDUPDATE ... SET status = 'processing' までを 1 トランザクションに閉じ、処理本体はコミット後に進める構成にしました。
メール送信や HTTP 呼び出しまで同じトランザクションへ入れると、別 worker の待ち時間が伸びる原因になります。

  1. SKIP LOCKED だけで安全になるわけではない

同じ job を再取得させないためには、claim 直後に processing へ更新する必要があります。
ロックはコミットで解放されるので、その後は状態列で ownership を表現します。

  1. retry は available_atattempts で持つ

ジョブキューの最小要件は「今すぐ取れるか」と「何回やったか」です。
ここを先に持っておくと、あとから指数バックオフや cron 的な遅延実行にも広げやすくなります。

  1. 部分インデックスを先に入れる

今回は status = 'queued' の行だけを探すインデックスを入れました。
行数が増えるほど、queued 以外を毎回なめないで済みます。

  1. 次に足すべきは stuck job 回収

worker が claim 後に落ちると、その job は processing のまま残ります。
この記事では扱いませんが、実務では reserved_at を見て一定時間超過分を戻す仕組みが次の課題になります。

詰まりやすい点と対処は次の表のとおりです。

症状主原因まず見る場所戻る章
2 本目の worker が no ready jobs になる1 件しか投入していない、または available_at が未来になっているdocker compose exec app php bin/show-queue.php3章, 6章
2 本目の worker が待っているように見えるclaim 後の処理を同じトランザクションで持っているsleep() の位置、commit() の位置4章
retry されずにすぐ failed になるmax_attempts = 1 で投入しているenqueue-job.php の第3引数3章, 6章
Invalid JSON payload が出るpayload の JSON 文字列が壊れているenqueue-job.php の引数3章
could not find driverpdo_pgsql が入っていない`docker compose exec app php -mgrep pgsql`

切り分けに使う最小コマンド:

docker compose exec app php bin/show-queue.php
docker compose exec app php bin/reset-demo.php
docker compose exec app php -m | grep pgsql
docker compose logs db
docker compose down -v

8. まとめと次の一歩

本記事では、次の 3 点を確認しました。

  • PostgreSQL の jobs テーブルだけでも、FOR UPDATE SKIP LOCKED で最小のジョブキューを作れる
  • 同じ job を二重に取らないためには、短い claim トランザクションと processing への状態更新をセットで使う
  • retry は available_at、失敗確定は failedlast_error で最小管理できる

SKIP LOCKED を使うのは、「同じ行を待たせたい場面」ではなく、「別の ready job があるなら次へ進ませたい場面」です。

次の一歩としては、次の 2 つを候補にできます。

  1. この work-once.php を常駐ワーカー化し、docker compose 上で回し続ける
  2. processing のまま取り残された job を回収する timeout / heartbeat を足す

今回の最小構成を自分の手で 1 回再現して、2 本目の worker が「待つ」のではなく「別の job を取る」瞬間を確認しておくと、その後にフレームワークのキューへ進んでも挙動を追いやすくなります。