この記事は、PDO + PostgreSQL で SELECT ... FOR UPDATE を使った排他制御の基本は分かるが、複数ワーカーに広げたときの安全なジョブ取得方法にはまだ不安がある人を対象にしています。
題材はメール送信やレポート生成のような軽いバックグラウンドジョブです。PHP(PDO)から PostgreSQL のテーブルをキューとして使い、FOR UPDATE SKIP LOCKED で 1 ワーカー 1 ジョブを安全に取るところから、成功 / 失敗 / リトライの最小状態管理までを追います。
先に読んでおくと入りやすい記事:
前提環境
- Windows 11
- WSL2(Ubuntu)
- VS Code(Remote - WSL)
- Docker Desktop(WSL 連携有効)
以降のコマンドは、特記がない限り WSL 側ターミナルで実行します。
サービス名は app と db に固定しています。
1. ゴールと非対象
到達する状態:
- PostgreSQL の
jobsテーブルを、最小のジョブキューとして再現できる FOR UPDATE SKIP LOCKEDで 1 ワーカー 1 ジョブを claim し、複数ワーカーが同じジョブを取らないことを確認できるqueued/processing/done/failedと、available_atを使った最小リトライを実装できる
扱わない内容:
- worker crash 後の stuck job 回収
LISTEN / NOTIFYや cron 連携- dead letter queue、指数バックオフ、優先度制御
- Laravel Queue / Symfony Messenger の書き方
- Redis / RabbitMQ / Kafka との詳細比較
前の記事では SELECT ... FOR UPDATE で「後続を待たせる」流れを見ました。
今回は逆に、別の ready job があるなら後続ワーカーを待たせずに次へ進めます。その違いを作るのが SKIP LOCKED です。
2. 最小デモ環境を作成する
記事用の最小デモから始めます。
mkdir -p ~/projects/postgresql-job-queue-skip-locked-demo
cd ~/projects/postgresql-job-queue-skip-locked-demo
mkdir -p bin docker/php docker/db
code .
compose.yml を用意します。
services:
app:
build:
context: .
dockerfile: docker/php/Dockerfile
working_dir: /workspace
volumes:
- ./:/workspace
env_file:
- .env
command: ["sleep", "infinity"]
depends_on:
db:
condition: service_healthy
db:
image: postgres:17
environment:
POSTGRES_DB: ${POSTGRES_DB}
POSTGRES_USER: ${POSTGRES_USER}
POSTGRES_PASSWORD: ${POSTGRES_PASSWORD}
ports:
- "5432:5432"
volumes:
- db-data:/var/lib/postgresql/data
- ./docker/db/init.sql:/docker-entrypoint-initdb.d/init.sql:ro
healthcheck:
test: ["CMD-SHELL", "pg_isready -U ${POSTGRES_USER} -d ${POSTGRES_DB}"]
interval: 5s
timeout: 3s
retries: 20
volumes:
db-data:
docker/php/Dockerfile は次の内容です。
FROM php:8.5-cli
RUN apt-get update \
&& apt-get install -y --no-install-recommends libpq-dev \
&& docker-php-ext-install pdo_pgsql \
&& rm -rf /var/lib/apt/lists/*
WORKDIR /workspace
.env.example は次の内容です。
POSTGRES_DB=app
POSTGRES_USER=app
POSTGRES_PASSWORD=app
APP_DB_HOST=db
APP_DB_PORT=5432
APP_DB_NAME=app
APP_DB_USER=app
APP_DB_PASS=app
APP_RETRY_DELAY_SECONDS=5
docker/db/init.sql には次を入れます。
CREATE TABLE IF NOT EXISTS jobs (
id BIGSERIAL PRIMARY KEY,
job_type TEXT NOT NULL,
payload JSONB NOT NULL DEFAULT '{}'::jsonb,
status TEXT NOT NULL CHECK (status IN ('queued', 'processing', 'done', 'failed')),
attempts INTEGER NOT NULL DEFAULT 0 CHECK (attempts >= 0),
max_attempts INTEGER NOT NULL DEFAULT 3 CHECK (max_attempts > 0),
available_at TIMESTAMPTZ NOT NULL DEFAULT NOW(),
reserved_by TEXT,
reserved_at TIMESTAMPTZ,
last_error TEXT,
finished_at TIMESTAMPTZ,
created_at TIMESTAMPTZ NOT NULL DEFAULT NOW(),
updated_at TIMESTAMPTZ NOT NULL DEFAULT NOW()
);
CREATE INDEX IF NOT EXISTS idx_jobs_dequeue
ON jobs (status, available_at, id)
WHERE status = 'queued';
status は最小の 4 状態に絞りました。
available_at は「今すぐ取れるか」を表し、通常投入と retry 待機の両方に使います。
コードのポイント
CREATE TABLE IF NOT EXISTS jobs (
id BIGSERIAL PRIMARY KEY,
job_type TEXT NOT NULL,
payload JSONB NOT NULL DEFAULT '{}'::jsonb,
status TEXT NOT NULL CHECK (status IN ('queued', 'processing', 'done', 'failed')),
attempts INTEGER NOT NULL DEFAULT 0 CHECK (attempts >= 0),
max_attempts INTEGER NOT NULL DEFAULT 3 CHECK (max_attempts > 0),
available_at TIMESTAMPTZ NOT NULL DEFAULT NOW(),
reserved_by TEXT,
reserved_at TIMESTAMPTZ,
last_error TEXT,
finished_at TIMESTAMPTZ,
created_at TIMESTAMPTZ NOT NULL DEFAULT NOW(),
updated_at TIMESTAMPTZ NOT NULL DEFAULT NOW()
);
CREATE INDEX IF NOT EXISTS idx_jobs_dequeue
ON jobs (status, available_at, id)
WHERE status = 'queued';
- 5行目
CHECK (status IN (...))— 有効な状態値を 4 種に絞る DB 制約。アプリ側の typo やバグで不正な状態が入ることを防ぐ。 - 8行目
available_at TIMESTAMPTZ NOT NULL DEFAULT NOW()— デフォルトはNOW()なので即時投入はそのまま使える。retry 時は将来時刻をセットすることで、通常投入と待機中ジョブを同じカラムで表現する。 - 17〜19行目
WHERE status = 'queued'—queued行だけを対象にする部分インデックス。doneやfailedが積み上がっても dequeue クエリのスキャン対象に含まれない。
起動します。
cp .env.example .env
docker compose up -d --build
docker compose exec db pg_isready -U app -d app
docker compose exec db psql -U app -d app -c "\dt"
jobs テーブルが見えれば準備完了です。
詰まったとき:
docker compose logs db
docker compose down -v
docker compose up -d --build
3. キューの初期化とジョブ投入スクリプトを用意する
ここからは、初期化、投入、状態確認を毎回同じ手順で回せるようにします。
最初に置くのは bin/bootstrap.php です。
<?php
declare(strict_types=1);
function createPdo(): PDO
{
$dsn = sprintf(
'pgsql:host=%s;port=%s;dbname=%s',
getenv('APP_DB_HOST') ?: 'db',
getenv('APP_DB_PORT') ?: '5432',
getenv('APP_DB_NAME') ?: 'app'
);
return new PDO(
$dsn,
getenv('APP_DB_USER') ?: 'app',
getenv('APP_DB_PASS') ?: 'app',
[
PDO::ATTR_ERRMODE => PDO::ERRMODE_EXCEPTION,
PDO::ATTR_DEFAULT_FETCH_MODE => PDO::FETCH_ASSOC,
]
);
}
function retryDelaySeconds(): int
{
$value = getenv('APP_RETRY_DELAY_SECONDS') ?: '5';
$delay = filter_var($value, FILTER_VALIDATE_INT);
if ($delay === false || $delay < 0) {
throw new RuntimeException('APP_RETRY_DELAY_SECONDS must be a non-negative integer.');
}
return $delay;
}
bin/reset-demo.php を置きます。
<?php
declare(strict_types=1);
require __DIR__ . '/bootstrap.php';
$pdo = createPdo();
$result = $pdo->exec('TRUNCATE TABLE jobs RESTART IDENTITY');
if ($result === false) {
fwrite(STDERR, "failed to truncate jobs\n");
exit(1);
}
echo "queue reset completed" . PHP_EOL;
bin/enqueue-job.php を用意します。
<?php
declare(strict_types=1);
require __DIR__ . '/bootstrap.php';
$jobType = $argv[1] ?? '';
$payloadJson = $argv[2] ?? '{}';
$maxAttempts = isset($argv[3]) ? (int) $argv[3] : 3;
if ($jobType === '' || $maxAttempts < 1) {
fwrite(
STDERR,
"Usage: php bin/enqueue-job.php JOB_TYPE JSON_PAYLOAD [MAX_ATTEMPTS]" . PHP_EOL
);
exit(1);
}
try {
$payload = json_decode($payloadJson, true, 512, JSON_THROW_ON_ERROR);
} catch (JsonException $e) {
fwrite(STDERR, 'Invalid JSON payload: ' . $e->getMessage() . PHP_EOL);
exit(1);
}
if (!is_array($payload)) {
fwrite(STDERR, "JSON payload must decode to an object or array" . PHP_EOL);
exit(1);
}
$pdo = createPdo();
$stmt = $pdo->prepare(
"INSERT INTO jobs (job_type, payload, status, max_attempts)
VALUES (:job_type, CAST(:payload AS JSONB), 'queued', :max_attempts)
RETURNING id"
);
$stmt->bindValue(':job_type', $jobType, PDO::PARAM_STR);
$stmt->bindValue(
':payload',
json_encode($payload, JSON_UNESCAPED_UNICODE | JSON_THROW_ON_ERROR),
PDO::PARAM_STR
);
$stmt->bindValue(':max_attempts', $maxAttempts, PDO::PARAM_INT);
$stmt->execute();
$jobId = $stmt->fetchColumn();
if ($jobId === false) {
fwrite(STDERR, "failed to enqueue job\n");
exit(1);
}
printf("enqueued job #%d" . PHP_EOL, (int) $jobId);
ここでは status も 'queued' で明示的に入れます。今回の init.sql では status に DEFAULT を付けていないため、省略すると null value in column "status" で投入に失敗します。
bin/show-queue.php は queue 状態を JSON で確認するためのスクリプトです。
<?php
declare(strict_types=1);
require __DIR__ . '/bootstrap.php';
$pdo = createPdo();
$rows = $pdo->query(
'SELECT id,
job_type,
status,
attempts,
max_attempts,
reserved_by,
reserved_at,
available_at,
finished_at,
last_error,
payload::text AS payload_json
FROM jobs
ORDER BY id ASC'
)->fetchAll();
$jobs = array_map(
static function (array $row): array {
return [
'id' => (int) $row['id'],
'job_type' => (string) $row['job_type'],
'status' => (string) $row['status'],
'attempts' => (int) $row['attempts'],
'max_attempts' => (int) $row['max_attempts'],
'reserved_by' => $row['reserved_by'] !== null ? (string) $row['reserved_by'] : null,
'reserved_at' => $row['reserved_at'] !== null ? (string) $row['reserved_at'] : null,
'available_at' => (string) $row['available_at'],
'finished_at' => $row['finished_at'] !== null ? (string) $row['finished_at'] : null,
'last_error' => $row['last_error'] !== null ? (string) $row['last_error'] : null,
'payload' => json_decode((string) $row['payload_json'], true, 512, JSON_THROW_ON_ERROR),
];
},
$rows
);
echo json_encode($jobs, JSON_PRETTY_PRINT | JSON_UNESCAPED_UNICODE) . PHP_EOL;
一度リセットしてから、ジョブを 3 件入れます。
docker compose exec app php bin/reset-demo.php
docker compose exec app php bin/enqueue-job.php send-mail '{"to":"alice@example.com"}'
docker compose exec app php bin/enqueue-job.php send-mail '{"to":"bob@example.com"}'
docker compose exec app php bin/enqueue-job.php build-report '{"report_id":42,"fail_once":true}' 2
docker compose exec app php bin/show-queue.php
初期状態は次のようになります。
[
{
"id": 1,
"job_type": "send-mail",
"status": "queued",
"attempts": 0,
"max_attempts": 3,
"reserved_by": null,
"reserved_at": null,
"available_at": "...",
"finished_at": null,
"last_error": null,
"payload": {
"to": "alice@example.com"
}
},
{
"id": 2,
"job_type": "send-mail",
"status": "queued",
"attempts": 0,
"max_attempts": 3,
"reserved_by": null,
"reserved_at": null,
"available_at": "...",
"finished_at": null,
"last_error": null,
"payload": {
"to": "bob@example.com"
}
},
{
"id": 3,
"job_type": "build-report",
"status": "queued",
"attempts": 0,
"max_attempts": 2,
"reserved_by": null,
"reserved_at": null,
"available_at": "...",
"finished_at": null,
"last_error": null,
"payload": {
"report_id": 42,
"fail_once": true
}
}
]
payload.fail_once は 6 章の retry 用です。
ジョブの見え方を固定したいので、ここでは投入側と確認側を先に用意してから worker に進みます。
コードのポイント
$stmt = $pdo->prepare(
"INSERT INTO jobs (job_type, payload, status, max_attempts)
VALUES (:job_type, CAST(:payload AS JSONB), 'queued', :max_attempts)
RETURNING id"
);
$stmt->bindValue(':job_type', $jobType, PDO::PARAM_STR);
$stmt->bindValue(
':payload',
json_encode($payload, JSON_UNESCAPED_UNICODE | JSON_THROW_ON_ERROR),
PDO::PARAM_STR
);
- 3行目
CAST(:payload AS JSONB)— PDO はプレースホルダーを文字列として渡すため、JSONBカラムへの INSERT には明示的なキャストが必要。省略すると型不一致エラーになる場合がある。 - 8〜10行目
json_encode(..., PDO::PARAM_STR)— PHP 配列を JSON 文字列にしてからバインドし、SQL 側のCASTでJSONBに変換する。JSON_THROW_ON_ERRORを付けることで、エンコード失敗を例外として検出できる。
4. FOR UPDATE SKIP LOCKED で 1 件だけ取るワーカーを作る
次は work-once.php です。1 回の起動で 1 件だけ claim(ジョブを取得して自分の担当にすること)し、結果を done / queued / failed へ反映します。
bin/work-once.php を作成します。
<?php
declare(strict_types=1);
require __DIR__ . '/bootstrap.php';
$workerId = $argv[1] ?? '';
$processingSeconds = isset($argv[2]) ? (int) $argv[2] : 0;
if ($workerId === '' || $processingSeconds < 0) {
fwrite(
STDERR,
"Usage: php bin/work-once.php WORKER_ID [PROCESSING_SECONDS]" . PHP_EOL
);
exit(1);
}
$pdo = createPdo();
try {
$pdo->beginTransaction();
$claim = $pdo->prepare(
<<<'SQL'
WITH next_job AS (
SELECT id
FROM jobs
WHERE status = 'queued'
AND available_at <= NOW()
ORDER BY available_at ASC, id ASC
FOR UPDATE SKIP LOCKED
LIMIT 1
)
UPDATE jobs
SET status = 'processing',
attempts = attempts + 1,
reserved_by = :worker_id,
reserved_at = NOW(),
updated_at = NOW(),
last_error = NULL
WHERE id = (SELECT id FROM next_job)
RETURNING id,
job_type,
payload::text AS payload_json,
attempts,
max_attempts
SQL
);
$claim->execute([':worker_id' => $workerId]);
$job = $claim->fetch();
if (!is_array($job)) {
$pdo->commit();
printf("[%s] no ready jobs" . PHP_EOL, $workerId);
exit(0);
}
$pdo->commit();
$jobId = (int) $job['id'];
$jobType = (string) $job['job_type'];
$attempts = (int) $job['attempts'];
$maxAttempts = (int) $job['max_attempts'];
$payload = json_decode((string) $job['payload_json'], true, 512, JSON_THROW_ON_ERROR);
if (!is_array($payload)) {
throw new RuntimeException('Job payload must decode to an object or array.');
}
printf(
"[%s] claimed job #%d (%s), attempt %d/%d" . PHP_EOL,
$workerId,
$jobId,
$jobType,
$attempts,
$maxAttempts
);
if ($processingSeconds > 0) {
printf(
"[%s] processing for %d second(s)" . PHP_EOL,
$workerId,
$processingSeconds
);
sleep($processingSeconds);
}
$shouldFailOnce = ($payload['fail_once'] ?? false) === true && $attempts === 1;
if ($shouldFailOnce) {
$errorMessage = 'simulated failure on first attempt';
if ($attempts < $maxAttempts) {
$retry = $pdo->prepare(
"UPDATE jobs
SET status = 'queued',
available_at = NOW() + (:retry_delay * INTERVAL '1 second'),
reserved_by = NULL,
reserved_at = NULL,
finished_at = NULL,
last_error = :last_error,
updated_at = NOW()
WHERE id = :id"
);
$retry->bindValue(':retry_delay', retryDelaySeconds(), PDO::PARAM_INT);
$retry->bindValue(':last_error', $errorMessage, PDO::PARAM_STR);
$retry->bindValue(':id', $jobId, PDO::PARAM_INT);
$retry->execute();
printf(
"[%s] job #%d failed, re-queued after %d second(s)" . PHP_EOL,
$workerId,
$jobId,
retryDelaySeconds()
);
exit(0);
}
$fail = $pdo->prepare(
"UPDATE jobs
SET status = 'failed',
finished_at = NOW(),
last_error = :last_error,
updated_at = NOW()
WHERE id = :id"
);
$fail->bindValue(':last_error', $errorMessage, PDO::PARAM_STR);
$fail->bindValue(':id', $jobId, PDO::PARAM_INT);
$fail->execute();
printf("[%s] job #%d failed permanently" . PHP_EOL, $workerId, $jobId);
exit(0);
}
$done = $pdo->prepare(
"UPDATE jobs
SET status = 'done',
finished_at = NOW(),
last_error = NULL,
updated_at = NOW()
WHERE id = :id"
);
$done->bindValue(':id', $jobId, PDO::PARAM_INT);
$done->execute();
printf("[%s] job #%d completed" . PHP_EOL, $workerId, $jobId);
} catch (Throwable $e) {
if ($pdo->inTransaction()) {
$pdo->rollBack();
}
fwrite(STDERR, $e->getMessage() . PHP_EOL);
exit(1);
}
やりたいことは単純で、まだ取られていない ready job を 1 件選び、その場で processing に変えることです。これを 1 回の claim トランザクションでまとめて行います。
claim 部分の核は次の SQL です。
WITH next_job AS (
SELECT id
FROM jobs
WHERE status = 'queued'
AND available_at <= NOW()
ORDER BY available_at ASC, id ASC
FOR UPDATE SKIP LOCKED
LIMIT 1
)
UPDATE jobs
SET status = 'processing',
attempts = attempts + 1,
reserved_by = :worker_id,
reserved_at = NOW()
WHERE id = (SELECT id FROM next_job)
RETURNING ...
このデモでは attempts を「失敗回数」ではなく「処理開始回数」として扱うため、claim 時点で 1 増やしています。
この場面で押さえたいのは 2 点です。
SKIP LOCKEDは claim の瞬間だけ使う- 実処理の
sleep()や本物のメール送信は、コミット後に進める
在庫引当の記事では、後続側は同じ行を待つ形でした。
ジョブキューでは、別の ready job があるなら待たせず次へ進めたい場面が増えます。SKIP LOCKED はその差を作るためのオプションです。
SKIP LOCKED は「競合中の claim で待たない」ための仕組みで、実行中ジョブを表す本体は status = 'processing' と reserved_* です。
もう 1 つ重要なのは、コミット後に同じ job が再取得されない理由です。
ロックは claim トランザクションの終了で解放されます。とはいえ、その前に status = 'processing' へ更新しているため、次の worker は queued 条件に引っかかりません。
コードのポイント
if ($attempts < $maxAttempts) {
$retry = $pdo->prepare(
"UPDATE jobs
SET status = 'queued',
available_at = NOW() + (:retry_delay * INTERVAL '1 second'),
reserved_by = NULL,
reserved_at = NULL,
finished_at = NULL,
last_error = :last_error,
updated_at = NOW()
WHERE id = :id"
);
- 4行目
status = 'queued'— 失敗したジョブを再度queuedに戻す。available_atが未来時刻であれば、次の dequeue クエリのavailable_at <= NOW()条件でまだ取得されない。 - 5行目
available_at = NOW() + (:retry_delay * INTERVAL '1 second')— 遅延秒数をINTERVALに変換してリトライ開始時刻をずらす。available_atを 1 列持つだけで、通常投入とリトライ待機を同じWHERE条件で扱える。
5. 2 ワーカーで別々の ready job を取ることを確認する
3 件のジョブを入れ直してから確認します。
この手順でまず確認したいのは、2 ワーカーが同じ job を二重取得せず、別々の ready job を取れることです。
なお、今回の work-once.php は claim 後すぐにコミットするので、SKIP LOCKED による「ロック中の行を飛ばす」瞬間そのものは見えにくい場合があります。ここで見ている中心は、短い claim と processing への状態更新を組み合わせた安全な取得です。
先に流れだけ図で見ると、worker-a が job #1 を claim しても、worker-b は同じ job を再取得せず、次の ready job へ進みます。ここで差を作っているのが FOR UPDATE SKIP LOCKED です。
sequenceDiagram
participant WA as worker-a
participant DB as PostgreSQL
participant WB as worker-b
WA->>DB: BEGIN
WA->>DB: claim next queued job with SKIP LOCKED
DB-->>WA: job #1
WA->>DB: UPDATE job #1 -> processing
WA->>DB: COMMIT
WB->>DB: BEGIN
WB->>DB: claim next queued job with SKIP LOCKED
DB-->>WB: job #2
WB->>DB: UPDATE job #2 -> processing
WB->>DB: COMMIT
WA->>DB: UPDATE job #1 -> done
WB->>DB: UPDATE job #2 -> done
ready job が 1 件しかない場合、後続ワーカーは次の候補を取れないので no ready jobs になります。この記事では、まず「別の ready job があるなら待たずに進む」形を先に確認します。
docker compose exec app php bin/reset-demo.php
docker compose exec app php bin/enqueue-job.php send-mail '{"to":"alice@example.com"}'
docker compose exec app php bin/enqueue-job.php send-mail '{"to":"bob@example.com"}'
docker compose exec app php bin/enqueue-job.php build-report '{"report_id":42,"fail_once":true}' 2
ターミナルA:
docker compose exec app php bin/work-once.php worker-a 5
ターミナルB:
docker compose exec app php bin/work-once.php worker-b 0
期待する出力:
ターミナルA:
[worker-a] claimed job #1 (send-mail), attempt 1/3
[worker-a] processing for 5 second(s)
[worker-a] job #1 completed
ターミナルB:
[worker-b] claimed job #2 (send-mail), attempt 1/3
[worker-b] job #2 completed
worker-b は job #1 を再取得せず、次の ready job へ進みます。
claim の瞬間に row lock が残っていれば SKIP LOCKED で飛ばし、すでにコミット済みなら status = 'processing' のため候補から外れます。今回のデモでは後者として見えることもありますが、どちらにせよ同じ job #1 は取りません。
実行後の状態を確認します。
docker compose exec app php bin/show-queue.php
期待する結果:
[
{
"id": 1,
"job_type": "send-mail",
"status": "done",
"attempts": 1,
"max_attempts": 3,
"reserved_by": "worker-a",
"reserved_at": "...",
"available_at": "...",
"finished_at": "...",
"last_error": null,
"payload": {
"to": "alice@example.com"
}
},
{
"id": 2,
"job_type": "send-mail",
"status": "done",
"attempts": 1,
"max_attempts": 3,
"reserved_by": "worker-b",
"reserved_at": "...",
"available_at": "...",
"finished_at": "...",
"last_error": null,
"payload": {
"to": "bob@example.com"
}
},
{
"id": 3,
"job_type": "build-report",
"status": "queued",
"attempts": 0,
"max_attempts": 2,
"reserved_by": null,
"reserved_at": null,
"available_at": "...",
"finished_at": null,
"last_error": null,
"payload": {
"report_id": 42,
"fail_once": true
}
}
]
ORDER BY available_at, id を入れているので、ready な job が複数あるときの取り順も追いやすくなります。
6. 成功 / 失敗 / リトライの最小状態管理を確認する
次は job #3 を使って retry を見ます。
この job は 1 回目だけ失敗し、2 回目で成功するようにしてあります。
docker compose exec app php bin/work-once.php worker-c 0
docker compose exec app php bin/show-queue.php
期待する出力:
[worker-c] claimed job #3 (build-report), attempt 1/2
[worker-c] job #3 failed, re-queued after 5 second(s)
この時点の queue 状態:
[
{
"id": 1,
"job_type": "send-mail",
"status": "done",
"attempts": 1,
"max_attempts": 3,
"reserved_by": "worker-a",
"reserved_at": "...",
"available_at": "...",
"finished_at": "...",
"last_error": null,
"payload": {
"to": "alice@example.com"
}
},
{
"id": 2,
"job_type": "send-mail",
"status": "done",
"attempts": 1,
"max_attempts": 3,
"reserved_by": "worker-b",
"reserved_at": "...",
"available_at": "...",
"finished_at": "...",
"last_error": null,
"payload": {
"to": "bob@example.com"
}
},
{
"id": 3,
"job_type": "build-report",
"status": "queued",
"attempts": 1,
"max_attempts": 2,
"reserved_by": null,
"reserved_at": null,
"available_at": "... future ...",
"finished_at": null,
"last_error": "simulated failure on first attempt",
"payload": {
"report_id": 42,
"fail_once": true
}
}
]
status は queued に戻っていますが、available_at が未来時刻なので、すぐには再取得されません。
この 1 列があるだけで、遅延実行と retry 待機を同じ形で扱えます。
少し待ってからもう一度 worker を動かします。
sleep 6
docker compose exec app php bin/work-once.php worker-d 0
docker compose exec app php bin/show-queue.php
期待する出力:
[worker-d] claimed job #3 (build-report), attempt 2/2
[worker-d] job #3 completed
成功と retry の確認はここまでです。
次は failed を確認するために、max_attempts = 1 の job を別で 1 件だけ入れます。
docker compose exec app php bin/reset-demo.php
docker compose exec app php bin/enqueue-job.php build-report '{"report_id":99,"fail_once":true}' 1
docker compose exec app php bin/work-once.php worker-e 0
docker compose exec app php bin/show-queue.php
期待する結果:
[
{
"id": 1,
"job_type": "build-report",
"status": "failed",
"attempts": 1,
"max_attempts": 1,
"reserved_by": "worker-e",
"reserved_at": "...",
"available_at": "...",
"finished_at": "...",
"last_error": "simulated failure on first attempt",
"payload": {
"report_id": 99,
"fail_once": true
}
}
]
この最小構成では、次の 3 つで十分です。
- 成功したら
done - 再試行したいなら
queuedに戻し、available_atを未来へずらす - 上限到達なら
failedとlast_errorを残す
7. 実務へ戻すときの勘所と詰まりどころ
このデモを実務コードへ戻すときに、最低限外したくない点は 5 つです。
- claim トランザクションを長く持たない
今回は SELECT ... FOR UPDATE SKIP LOCKED と UPDATE ... SET status = 'processing' までを 1 トランザクションに閉じ、処理本体はコミット後に進める構成にしました。
メール送信や HTTP 呼び出しまで同じトランザクションへ入れると、別 worker の待ち時間が伸びる原因になります。
SKIP LOCKEDだけで安全になるわけではない
同じ job を再取得させないためには、claim 直後に processing へ更新する必要があります。
ロックはコミットで解放されるので、その後は状態列で ownership を表現します。
- retry は
available_atとattemptsで持つ
ジョブキューの最小要件は「今すぐ取れるか」と「何回やったか」です。
ここを先に持っておくと、あとから指数バックオフや cron 的な遅延実行にも広げやすくなります。
- 部分インデックスを先に入れる
今回は status = 'queued' の行だけを探すインデックスを入れました。
行数が増えるほど、queued 以外を毎回なめないで済みます。
- 次に足すべきは stuck job 回収
worker が claim 後に落ちると、その job は processing のまま残ります。
この記事では扱いませんが、実務では reserved_at を見て一定時間超過分を戻す仕組みが次の課題になります。
詰まりやすい点と対処は次の表のとおりです。
| 症状 | 主原因 | まず見る場所 | 戻る章 |
|---|---|---|---|
2 本目の worker が no ready jobs になる | 1 件しか投入していない、または available_at が未来になっている | docker compose exec app php bin/show-queue.php | 3章, 6章 |
| 2 本目の worker が待っているように見える | claim 後の処理を同じトランザクションで持っている | sleep() の位置、commit() の位置 | 4章 |
retry されずにすぐ failed になる | max_attempts = 1 で投入している | enqueue-job.php の第3引数 | 3章, 6章 |
Invalid JSON payload が出る | payload の JSON 文字列が壊れている | enqueue-job.php の引数 | 3章 |
could not find driver | pdo_pgsql が入っていない | `docker compose exec app php -m | grep pgsql` |
切り分けに使う最小コマンド:
docker compose exec app php bin/show-queue.php
docker compose exec app php bin/reset-demo.php
docker compose exec app php -m | grep pgsql
docker compose logs db
docker compose down -v
8. まとめと次の一歩
本記事では、次の 3 点を確認しました。
- PostgreSQL の
jobsテーブルだけでも、FOR UPDATE SKIP LOCKEDで最小のジョブキューを作れる - 同じ job を二重に取らないためには、短い claim トランザクションと
processingへの状態更新をセットで使う - retry は
available_at、失敗確定はfailedとlast_errorで最小管理できる
SKIP LOCKED を使うのは、「同じ行を待たせたい場面」ではなく、「別の ready job があるなら次へ進ませたい場面」です。
次の一歩としては、次の 2 つを候補にできます。
- この
work-once.phpを常駐ワーカー化し、docker compose上で回し続ける processingのまま取り残された job を回収する timeout / heartbeat を足す
今回の最小構成を自分の手で 1 回再現して、2 本目の worker が「待つ」のではなく「別の job を取る」瞬間を確認しておくと、その後にフレームワークのキューへ進んでも挙動を追いやすくなります。