この記事の対象は、FOR UPDATE SKIP LOCKED で 1 件だけ取る最小 SQL は分かるが、それを Docker 上の常駐ワーカーへどうつなぐかで止まりやすい人です。
前段の PHP + PostgreSQLでジョブキューを作る(FOR UPDATE SKIP LOCKED 最小構成) では 1 回実行のデモまで確認しました。今回は常駐ワーカーへ広げます。
先に読んでおくと入りやすい記事:
前提環境
- Windows 11
- WSL2(Ubuntu)
- VS Code(Remote - WSL)
- Docker Desktop(WSL 連携有効)
以降のコマンドは、特記がない限り WSL 側ターミナルで実行します。
1. ゴールと非対象
到達する状態:
- PostgreSQL の
jobsテーブルを、Docker 上の常駐ワーカーから処理できる workerサービス 1 本で1 件取得 -> 処理 -> 完了更新のループを再現できるdocker compose stop workerで止める、docker compose logs workerで見る、processingの取り残し候補を調べる、までを一続きで確認できる
扱わない内容:
- HTTP からジョブを投入する API
- stuck job の自動回収
- Supervisor や systemd によるプロセス管理
- dead letter queue、優先度制御、指数バックオフ
- Redis / RabbitMQ / Kafka との比較
前段記事では claim を 1 回だけ実行しました。
本記事では、その claim を短いトランザクションのまま保ちつつ、PHP プロセス自体は止めるまで回し続けます。
2. 最小デモ環境を作成する
記事用のデモディレクトリを作ります。
mkdir -p ~/projects/php-docker-postgresql-job-worker-demo
cd ~/projects/php-docker-postgresql-job-worker-demo
mkdir -p bin docker/php docker/db
code .
compose.yml は次の内容で作成します。
services:
app:
build:
context: .
dockerfile: docker/php/Dockerfile
working_dir: /workspace
volumes:
- ./:/workspace
env_file:
- .env
command: ["sleep", "infinity"]
depends_on:
db:
condition: service_healthy
worker:
build:
context: .
dockerfile: docker/php/Dockerfile
working_dir: /workspace
volumes:
- ./:/workspace
env_file:
- .env
command: ["php", "bin/worker.php"]
restart: unless-stopped
stop_grace_period: 10s
depends_on:
db:
condition: service_healthy
db:
image: postgres:17
environment:
POSTGRES_DB: ${POSTGRES_DB}
POSTGRES_USER: ${POSTGRES_USER}
POSTGRES_PASSWORD: ${POSTGRES_PASSWORD}
ports:
- "5432:5432"
volumes:
- db-data:/var/lib/postgresql/data
- ./docker/db/init.sql:/docker-entrypoint-initdb.d/init.sql:ro
healthcheck:
test: ["CMD-SHELL", "pg_isready -U ${POSTGRES_USER} -d ${POSTGRES_DB}"]
interval: 5s
timeout: 3s
retries: 20
volumes:
db-data:
app は投入と確認用、worker は常駐処理用です。
同じイメージを使いますが、起動コマンドだけを分けています。
worker に restart: unless-stopped を入れているので、想定外の例外で PHP プロセスが落ちても、コンテナは自動的に起動し直します。
docker/php/Dockerfile は次の内容で作成します。
FROM php:8.5-cli
RUN apt-get update \
&& apt-get install -y --no-install-recommends libpq-dev \
&& docker-php-ext-install pdo_pgsql pcntl \
&& rm -rf /var/lib/apt/lists/*
WORKDIR /workspace
pcntl を入れている理由は、ワーカーが SIGTERM を受けたときに現在のループを終えて止まれるようにするためです。
.env.example は次の内容で作成します。
POSTGRES_DB=app
POSTGRES_USER=app
POSTGRES_PASSWORD=app
APP_DB_HOST=db
APP_DB_PORT=5432
APP_DB_NAME=app
APP_DB_USER=app
APP_DB_PASS=app
APP_POLL_INTERVAL_SECONDS=2
APP_PROCESSING_SECONDS=4
APP_RETRY_DELAY_SECONDS=5
docker/db/init.sql は次の内容で作成します。
CREATE TABLE IF NOT EXISTS jobs (
id BIGSERIAL PRIMARY KEY,
job_type TEXT NOT NULL,
payload JSONB NOT NULL DEFAULT '{}'::jsonb,
status TEXT NOT NULL CHECK (status IN ('queued', 'processing', 'done', 'failed')),
attempts INTEGER NOT NULL DEFAULT 0 CHECK (attempts >= 0),
max_attempts INTEGER NOT NULL DEFAULT 3 CHECK (max_attempts > 0),
available_at TIMESTAMPTZ NOT NULL DEFAULT NOW(),
reserved_by TEXT,
reserved_at TIMESTAMPTZ,
last_error TEXT,
finished_at TIMESTAMPTZ,
created_at TIMESTAMPTZ NOT NULL DEFAULT NOW(),
updated_at TIMESTAMPTZ NOT NULL DEFAULT NOW()
);
CREATE INDEX IF NOT EXISTS idx_jobs_dequeue
ON jobs (status, available_at, id)
WHERE status = 'queued';
コードのポイント
① 状態遷移と再試行に必要な列を 1 テーブルへ集める
CREATE TABLE IF NOT EXISTS jobs (
id BIGSERIAL PRIMARY KEY,
job_type TEXT NOT NULL,
payload JSONB NOT NULL DEFAULT '{}'::jsonb,
status TEXT NOT NULL CHECK (status IN ('queued', 'processing', 'done', 'failed')),
attempts INTEGER NOT NULL DEFAULT 0 CHECK (attempts >= 0),
max_attempts INTEGER NOT NULL DEFAULT 3 CHECK (max_attempts > 0),
available_at TIMESTAMPTZ NOT NULL DEFAULT NOW(),
reserved_by TEXT,
reserved_at TIMESTAMPTZ,
last_error TEXT,
finished_at TIMESTAMPTZ,
created_at TIMESTAMPTZ NOT NULL DEFAULT NOW(),
updated_at TIMESTAMPTZ NOT NULL DEFAULT NOW()
);
status でジョブの現在地、attempts と max_attempts でリトライ余地を把握できます。reserved_by と reserved_at を残しておくことで、processing のまま長時間残っているジョブの追跡を後から始められます。
② dequeue 条件に絞った部分インデックスを張る
CREATE INDEX IF NOT EXISTS idx_jobs_dequeue
ON jobs (status, available_at, id)
WHERE status = 'queued';
WHERE status = 'queued' を付けることで、取り出し対象外の行をインデックスから除外できます。available_at と id の順序が claim クエリの ORDER BY と一致しているので、インデックスがそのまま使われます。
reserved_at は、ワーカーが取った時刻を残す列です。
後で processing のまま残ったジョブ候補を洗い出す入口になります。
起動して確認します。
cp .env.example .env
docker compose up -d --build db app
docker compose exec db pg_isready -U app -d app
docker compose exec db psql -U app -d app -c "\dt"
docker compose ps
起動直後の確認結果は次のようになります。
app と db が起動し、jobs テーブルが見えれば OK です。
起動で詰まったとき:
docker compose logs db
docker compose ps
docker compose down -v
docker compose up -d --build db app
3. キュー初期化と投入スクリプトを用意する
worker を起動する前に、毎回同じ状態から検証できるようにしておきます。
bin/bootstrap.php は次の内容で作成します。全スクリプトで共有する DB 接続、環境変数取得、ワーカー ID 生成をまとめます。環境変数の整数チェックと、ワーカー ID の hostname 取得が注目点です。
<?php
declare(strict_types=1);
function envString(string $name, string $default): string
{
$value = getenv($name);
return $value === false || $value === '' ? $default : $value;
}
function envInt(string $name, int $default): int
{
$value = getenv($name);
if ($value === false || $value === '') {
return $default;
}
$parsed = filter_var($value, FILTER_VALIDATE_INT);
if ($parsed === false) {
throw new RuntimeException(sprintf('%s must be an integer.', $name));
}
return $parsed;
}
function createPdo(): PDO
{
$dsn = sprintf(
'pgsql:host=%s;port=%s;dbname=%s',
envString('APP_DB_HOST', 'db'),
envString('APP_DB_PORT', '5432'),
envString('APP_DB_NAME', 'app')
);
return new PDO(
$dsn,
envString('APP_DB_USER', 'app'),
envString('APP_DB_PASS', 'app'),
[
PDO::ATTR_ERRMODE => PDO::ERRMODE_EXCEPTION,
PDO::ATTR_DEFAULT_FETCH_MODE => PDO::FETCH_ASSOC,
]
);
}
function workerId(): string
{
$fromEnv = getenv('APP_WORKER_ID');
if ($fromEnv !== false && $fromEnv !== '') {
return $fromEnv;
}
$hostName = gethostname();
return $hostName === false || $hostName === '' ? 'worker-unknown' : $hostName;
}
コードのポイント
① envInt で型と範囲を起動時に検証する
function envInt(string $name, int $default): int
{
$value = getenv($name);
if ($value === false || $value === '') {
return $default;
}
$parsed = filter_var($value, FILTER_VALIDATE_INT);
if ($parsed === false) {
throw new RuntimeException(sprintf('%s must be an integer.', $name));
}
return $parsed;
}
空ならデフォルト値を使い、整数に変換できなければ起動直後に例外を投げます。ポーリング間隔やリトライ秒数は実行中に壊れるより、起動時に止めるほうが原因を追いやすいです。
② ワーカー ID を hostname から取る
function workerId(): string
{
$fromEnv = getenv('APP_WORKER_ID');
if ($fromEnv !== false && $fromEnv !== '') {
return $fromEnv;
}
$hostName = gethostname();
return $hostName === false || $hostName === '' ? 'worker-unknown' : $hostName;
}
環境変数で上書きできるようにしつつ、デフォルトは gethostname() でコンテナ ID を使います。--scale worker=2 で 2 本起動したとき、ログが別 ID で分かれるのはこの仕組みによります。
bin/reset-demo.php は次の内容で作成します。
<?php
declare(strict_types=1);
require __DIR__ . '/bootstrap.php';
$pdo = createPdo();
$result = $pdo->exec('TRUNCATE TABLE jobs RESTART IDENTITY');
if ($result === false) {
fwrite(STDERR, "failed to reset queue" . PHP_EOL);
exit(1);
}
echo "queue reset completed" . PHP_EOL;
bin/enqueue-job.php は次の内容で作成します。引数から job_type・payload・最大試行回数・遅延秒数を受け取り、available_at をずらして投入できる点が前段記事との差分です。
<?php
declare(strict_types=1);
require __DIR__ . '/bootstrap.php';
$jobType = $argv[1] ?? '';
$payloadJson = $argv[2] ?? '{}';
$maxAttempts = isset($argv[3]) ? (int) $argv[3] : 3;
$delaySeconds = isset($argv[4]) ? (int) $argv[4] : 0;
if ($jobType === '' || $maxAttempts < 1 || $delaySeconds < 0) {
fwrite(
STDERR,
"Usage: php bin/enqueue-job.php JOB_TYPE PAYLOAD_JSON [MAX_ATTEMPTS] [DELAY_SECONDS]" . PHP_EOL
);
exit(1);
}
json_decode($payloadJson, true, 512, JSON_THROW_ON_ERROR);
$pdo = createPdo();
$statement = $pdo->prepare(<<<'SQL'
INSERT INTO jobs (
job_type,
payload,
status,
max_attempts,
available_at,
updated_at
) VALUES (
:job_type,
CAST(:payload AS jsonb),
'queued',
:max_attempts,
NOW() + make_interval(secs => :delay_seconds),
NOW()
)
RETURNING id
SQL);
$statement->bindValue(':job_type', $jobType, PDO::PARAM_STR);
$statement->bindValue(':payload', $payloadJson, PDO::PARAM_STR);
$statement->bindValue(':max_attempts', $maxAttempts, PDO::PARAM_INT);
$statement->bindValue(':delay_seconds', $delaySeconds, PDO::PARAM_INT);
$statement->execute();
$jobId = (int) $statement->fetchColumn();
printf("enqueued job #%d (%s)%s", $jobId, $jobType, PHP_EOL);
コードのポイント
① 引数バリデーションと JSON チェックを投入前に完了させる
$jobType = $argv[1] ?? '';
$payloadJson = $argv[2] ?? '{}';
$maxAttempts = isset($argv[3]) ? (int) $argv[3] : 3;
$delaySeconds = isset($argv[4]) ? (int) $argv[4] : 0;
if ($jobType === '' || $maxAttempts < 1 || $delaySeconds < 0) {
fwrite(
STDERR,
"Usage: php bin/enqueue-job.php JOB_TYPE PAYLOAD_JSON [MAX_ATTEMPTS] [DELAY_SECONDS]" . PHP_EOL
);
exit(1);
}
json_decode($payloadJson, true, 512, JSON_THROW_ON_ERROR);
job_type の空チェックと数値範囲チェックを先に済ませ、JSON パースも投入前に検証しています。不正 payload がキューに入ると、取り出した側で初めて壊れていたと分かる状況を防げます。
② available_at に遅延を組み込んで投入する
INSERT INTO jobs (
job_type,
payload,
status,
max_attempts,
available_at,
updated_at
) VALUES (
:job_type,
CAST(:payload AS jsonb),
'queued',
:max_attempts,
NOW() + make_interval(secs => :delay_seconds),
NOW()
)
RETURNING id
available_at を NOW() + make_interval(secs => :delay_seconds) にすることで、即時投入と遅延投入を同じ INSERT で切り替えられます。CAST(:payload AS jsonb) によって PHP 文字列を DB 側で JSONB として格納します。
bin/show-queue.php は次の内容で作成します。
<?php
declare(strict_types=1);
require __DIR__ . '/bootstrap.php';
$pdo = createPdo();
$statement = $pdo->query(<<<'SQL'
SELECT
id,
job_type,
payload,
status,
attempts,
max_attempts,
available_at,
reserved_by,
reserved_at,
last_error,
finished_at
FROM jobs
ORDER BY id
SQL);
$rows = $statement->fetchAll();
echo json_encode($rows, JSON_PRETTY_PRINT | JSON_UNESCAPED_UNICODE) . PHP_EOL;
コードのポイント
① 監視に役立つ列だけ選んで取得する
SELECT
id,
job_type,
payload,
status,
attempts,
max_attempts,
available_at,
reserved_by,
reserved_at,
last_error,
finished_at
FROM jobs
ORDER BY id
status でいま何をしているか、reserved_by で誰が取ったか、last_error でなぜ失敗したかを一覧で追えます。show-queue.php を毎回実行すれば、ワーカーの動きをログと DB 両側から照合できます。
ここまで作ったら、前準備を一度通します。
docker compose exec app php bin/reset-demo.php
docker compose exec app php bin/enqueue-job.php send-mail '{"to":"alice@example.com"}'
docker compose exec app php bin/enqueue-job.php build-report '{"report_id":42,"fail_once":true}'
docker compose exec app php bin/enqueue-job.php export-csv '{"rows":500}'
docker compose exec app php bin/show-queue.php
この時点では、3 件とも queued で並んでいれば十分です。
build-report にだけ fail_once を入れているので、後で retry の流れもそのまま使えます。
4. 常駐ワーカー bin/worker.php を作る
ワーカーの構成は単純です。
- 取れるジョブが 1 件あるか探す
- あれば
processingにして処理する - 成功なら
done、失敗ならqueuedかfailedに更新する - 取れるジョブがなければ少し待ってもう一度見る
流れを図で先に置くと、次の形です。
flowchart TD
A[worker loop start] --> B{queued job exists?}
B -- no --> C[sleep poll interval]
C --> A
B -- yes --> D[claim with FOR UPDATE SKIP LOCKED]
D --> E[update to processing]
E --> F[process job outside transaction]
F --> G{success?}
G -- yes --> H[update to done]
G -- no, attempts left --> I[update to queued and delay available_at]
G -- no, attempts exhausted --> J[update to failed]
H --> A
I --> A
J --> A
bin/worker.php は次の内容で作成します。シグナルによる安全停止、FOR UPDATE SKIP LOCKED での claim、成功・再試行・失敗確定の分岐まで 1 ファイルにまとめます。長く動くのは PHP プロセスであり、DB トランザクションではないことが核心です。
<?php
declare(strict_types=1);
require __DIR__ . '/bootstrap.php';
$pdo = createPdo();
$workerId = workerId();
$pollIntervalSeconds = envInt('APP_POLL_INTERVAL_SECONDS', 2);
$processingSeconds = envInt('APP_PROCESSING_SECONDS', 4);
$retryDelaySeconds = envInt('APP_RETRY_DELAY_SECONDS', 5);
$shouldStop = false;
pcntl_async_signals(true);
pcntl_signal(SIGTERM, function () use (&$shouldStop, $workerId): void {
$shouldStop = true;
printf("[%s] received SIGTERM, stop after current loop%s", $workerId, PHP_EOL);
});
pcntl_signal(SIGINT, function () use (&$shouldStop, $workerId): void {
$shouldStop = true;
printf("[%s] received SIGINT, stop after current loop%s", $workerId, PHP_EOL);
});
printf("[%s] worker started%s", $workerId, PHP_EOL);
while (!$shouldStop) {
try {
$job = claimJob($pdo, $workerId);
if ($job === null) {
printf(
"[%s] no ready jobs, sleep %d second(s)%s",
$workerId,
$pollIntervalSeconds,
PHP_EOL
);
sleep($pollIntervalSeconds);
continue;
}
processJob($pdo, $job, $workerId, $processingSeconds, $retryDelaySeconds);
} catch (Throwable $throwable) {
fprintf(
STDERR,
"[%s] unexpected error: %s%s",
$workerId,
$throwable->getMessage(),
PHP_EOL
);
sleep($pollIntervalSeconds);
}
}
printf("[%s] worker stopped gracefully%s", $workerId, PHP_EOL);
function claimJob(PDO $pdo, string $workerId): ?array
{
$pdo->beginTransaction();
try {
$statement = $pdo->prepare(<<<'SQL'
WITH picked AS (
SELECT id
FROM jobs
WHERE status = 'queued'
AND available_at <= NOW()
ORDER BY available_at, id
FOR UPDATE SKIP LOCKED
LIMIT 1
)
UPDATE jobs AS j
SET status = 'processing',
attempts = j.attempts + 1,
reserved_by = :worker_id,
reserved_at = NOW(),
updated_at = NOW()
FROM picked
WHERE j.id = picked.id
RETURNING
j.id,
j.job_type,
j.payload::text AS payload_json,
j.attempts,
j.max_attempts
SQL);
$statement->execute([':worker_id' => $workerId]);
$row = $statement->fetch();
$pdo->commit();
if ($row === false) {
return null;
}
$row['payload'] = json_decode((string) $row['payload_json'], true, 512, JSON_THROW_ON_ERROR);
unset($row['payload_json']);
return $row;
} catch (Throwable $throwable) {
if ($pdo->inTransaction()) {
$pdo->rollBack();
}
throw $throwable;
}
}
function processJob(
PDO $pdo,
array $job,
string $workerId,
int $processingSeconds,
int $retryDelaySeconds
): void {
$jobId = (int) $job['id'];
$attempts = (int) $job['attempts'];
$maxAttempts = (int) $job['max_attempts'];
$payload = $job['payload'];
printf(
"[%s] claimed job #%d (%s), attempt %d/%d%s",
$workerId,
$jobId,
$job['job_type'],
$attempts,
$maxAttempts,
PHP_EOL
);
printf("[%s] processing for %d second(s)%s", $workerId, $processingSeconds, PHP_EOL);
sleep($processingSeconds);
$shouldFailOnce = (bool) ($payload['fail_once'] ?? false);
if ($shouldFailOnce && $attempts === 1) {
$message = 'simulated failure for retry demo';
if ($attempts >= $maxAttempts) {
markFailed($pdo, $jobId, $message);
printf("[%s] job #%d failed permanently%s", $workerId, $jobId, PHP_EOL);
return;
}
requeueJob($pdo, $jobId, $message, $retryDelaySeconds);
printf(
"[%s] job #%d failed, re-queued after %d second(s)%s",
$workerId,
$jobId,
$retryDelaySeconds,
PHP_EOL
);
return;
}
markDone($pdo, $jobId);
printf("[%s] job #%d completed%s", $workerId, $jobId, PHP_EOL);
}
function markDone(PDO $pdo, int $jobId): void
{
$statement = $pdo->prepare(<<<'SQL'
UPDATE jobs
SET status = 'done',
last_error = NULL,
finished_at = NOW(),
updated_at = NOW()
WHERE id = :id
SQL);
$statement->execute([':id' => $jobId]);
}
function requeueJob(PDO $pdo, int $jobId, string $message, int $retryDelaySeconds): void
{
$statement = $pdo->prepare(<<<'SQL'
UPDATE jobs
SET status = 'queued',
available_at = NOW() + make_interval(secs => :retry_delay_seconds),
last_error = :message,
reserved_by = NULL,
reserved_at = NULL,
updated_at = NOW()
WHERE id = :id
SQL);
$statement->bindValue(':retry_delay_seconds', $retryDelaySeconds, PDO::PARAM_INT);
$statement->bindValue(':message', $message, PDO::PARAM_STR);
$statement->bindValue(':id', $jobId, PDO::PARAM_INT);
$statement->execute();
}
function markFailed(PDO $pdo, int $jobId, string $message): void
{
$statement = $pdo->prepare(<<<'SQL'
UPDATE jobs
SET status = 'failed',
last_error = :message,
finished_at = NOW(),
updated_at = NOW()
WHERE id = :id
SQL);
$statement->bindValue(':message', $message, PDO::PARAM_STR);
$statement->bindValue(':id', $jobId, PDO::PARAM_INT);
$statement->execute();
}
コードのポイント
① SIGTERM を受けたら現在のループを終えてから止まる
pcntl_async_signals(true);
pcntl_signal(SIGTERM, function () use (&$shouldStop, $workerId): void {
$shouldStop = true;
printf("[%s] received SIGTERM, stop after current loop%s", $workerId, PHP_EOL);
});
docker compose stop worker を打つと、処理中のジョブを完了させてから停止します。
② claim は FOR UPDATE SKIP LOCKED と processing 更新を 1 トランザクションで閉じる
WITH picked AS (
SELECT id
FROM jobs
WHERE status = 'queued'
AND available_at <= NOW()
ORDER BY available_at, id
FOR UPDATE SKIP LOCKED
LIMIT 1
)
UPDATE jobs AS j
SET status = 'processing',
attempts = j.attempts + 1,
reserved_by = :worker_id,
FOR UPDATE SKIP LOCKED で他ワーカーが取得中の行を飛ばし、同時に status = 'processing' へ更新します。このトランザクションは取得だけに使い、実処理はトランザクション外で進めることで、ロックを長時間持ち続けません。
③ 失敗時はリトライ余地に応じて queued 戻しか failed 確定に分岐する
if ($shouldFailOnce && $attempts === 1) {
$message = 'simulated failure for retry demo';
if ($attempts >= $maxAttempts) {
markFailed($pdo, $jobId, $message);
printf("[%s] job #%d failed permanently%s", $workerId, $jobId, PHP_EOL);
return;
}
requeueJob($pdo, $jobId, $message, $retryDelaySeconds);
上限に達した場合は failed で止め、余地があれば available_at をずらして queued に戻します。どちらの経路でも processing のまま放置しないのが重要で、stuck job を防ぐ基本になります。
workerId() が HOSTNAME を拾うので、worker を 2 本に増やしてもコンテナごとに別 ID でログを見分けられます。
メインループを try/catch で包んでいるため、接続の瞬断や payload の不整合で 1 回例外が出ても、その場でコンテナが止まりません。
5. 1 ワーカーで起動とログを確認する
最初は 1 本だけで十分です。
キューを空にしてからジョブを入れ、worker サービスを起動します。
docker compose exec app php bin/reset-demo.php
docker compose exec app php bin/enqueue-job.php send-mail '{"to":"alice@example.com"}'
docker compose exec app php bin/enqueue-job.php build-report '{"report_id":42,"fail_once":true}'
docker compose exec app php bin/enqueue-job.php export-csv '{"rows":500}'
docker compose up -d worker
docker compose logs -f worker
ログの見え方は次のようになります。
worker-1 | [4b2a7f5c6c19] worker started
worker-1 | [4b2a7f5c6c19] claimed job #1 (send-mail), attempt 1/3
worker-1 | [4b2a7f5c6c19] processing for 4 second(s)
worker-1 | [4b2a7f5c6c19] job #1 completed
worker-1 | [4b2a7f5c6c19] claimed job #2 (build-report), attempt 1/3
worker-1 | [4b2a7f5c6c19] processing for 4 second(s)
worker-1 | [4b2a7f5c6c19] job #2 failed, re-queued after 5 second(s)
docker compose logs -f worker を開きっぱなしにしておくと、ワーカーがいま何をしているかを追えます。
状態を DB でも照合しておきます。
docker compose exec app php bin/show-queue.php
send-mail が done、build-report が一度 queued に戻っていれば正常です。
もし worker コンテナが落ちた場合でも、docker compose logs worker で直前の例外を確認し、その後に docker compose up -d worker で再起動すれば同じ手順へ戻れます。
worker ログとキュー状態を並べると、処理結果を照合しやすくなります。
6. 2 ワーカーで同時実行時の見え方を確認する
次は worker サービスを 2 本に増やします。
先に状態を戻して、少なくとも 3 件は ready job を入れてから確認します。
docker compose stop worker
docker compose exec app php bin/reset-demo.php
docker compose exec app php bin/enqueue-job.php send-mail '{"to":"alice@example.com"}'
docker compose exec app php bin/enqueue-job.php build-report '{"report_id":42}'
docker compose exec app php bin/enqueue-job.php export-csv '{"rows":500}'
docker compose up -d --scale worker=2 worker
docker compose logs --tail=50 worker
流れを図で置くと、2 本のワーカーはこう進みます。
sequenceDiagram
participant W1 as worker-1
participant DB as PostgreSQL
participant W2 as worker-2
W1->>DB: claim queued job with FOR UPDATE SKIP LOCKED
DB-->>W1: job #1, status=processing
W2->>DB: claim queued job with FOR UPDATE SKIP LOCKED
DB-->>W2: job #2, status=processing
W1->>DB: update job #1 to done
W2->>DB: update job #2 to done
実際のログは次のようになります。
worker-1 | [77184a0a0af1] worker started
worker-1 | [77184a0a0af1] claimed job #1 (send-mail), attempt 1/3
worker-1 | [77184a0a0af1] processing for 4 second(s)
worker-2 | [8c45b9fc7118] worker started
worker-2 | [8c45b9fc7118] claimed job #2 (build-report), attempt 1/3
worker-2 | [8c45b9fc7118] processing for 4 second(s)
worker-1 | [77184a0a0af1] job #1 completed
worker-2 | [8c45b9fc7118] job #2 completed
worker-1 | [77184a0a0af1] claimed job #3 (export-csv), attempt 1/3
worker-1 | [77184a0a0af1] processing for 4 second(s)
worker-1 | [77184a0a0af1] job #3 completed
claim を担当する worker とログの行順は毎回固定ではありません。docker compose logs --tail=50 worker には、直前の 1 ワーカー実行分や docker compose stop worker で出た SIGTERM ログが混ざることもあります。
ここで見たいのは、片方の worker が 1 件処理している間に、もう片方が別の job へ進めること、そして同じ job ID を 2 本の worker が二重取得していないことです。
FOR UPDATE SKIP LOCKED がその分離を担い、status = 'processing' への更新が再取得を防いでいます。
キュー状態も確認します。
docker compose exec app php bin/show-queue.php
docker compose ps worker
attempts と reserved_by を見ると、どの worker が処理したかをあとから追えます。
retry も常駐ワーカー側でそのまま動きます。
たとえば fail_once を混ぜると、1 回目は queued に戻り、次のループか別ワーカーのループで再取得されます。
docker compose stop worker
docker compose exec app php bin/reset-demo.php
docker compose exec app php bin/enqueue-job.php build-report '{"report_id":99,"fail_once":true}' 3 0
docker compose up -d --scale worker=2 worker
docker compose logs --tail=50 worker
docker compose exec app php bin/show-queue.php
1 回目は queued に戻り、available_at が 5 秒先へずれます。
少し待つと、次のループで再取得されて done になります。
max_attempts = 1 の失敗確定も同じです。
docker compose stop worker
docker compose exec app php bin/reset-demo.php
docker compose exec app php bin/enqueue-job.php build-report '{"report_id":100,"fail_once":true}' 1 0
docker compose up -d worker
docker compose logs --tail=20 worker
docker compose exec app php bin/show-queue.php
このときは再投入されず、failed で止まります。
7. 停止方法と processing 取り残しの入口を押さえる
ワーカーを止めるときは、コンテナをいきなり消すのではなく stop を使います。
docker compose stop worker
docker compose logs --tail=20 worker
docker compose ps worker
worker.php は SIGTERM を受けたらフラグを立て、現在のループを終えてから止まるようにしています。
そのため、ログには次のような行が出ます。
worker-1 | [77184a0a0af1] received SIGTERM, stop after current loop
worker-1 | [77184a0a0af1] worker stopped gracefully
この形にしておくと、「いま処理中のジョブを途中で切り捨てたのか」「ループは正常に閉じたのか」をログで判断できます。
それでも、コンテナ障害やホスト再起動では processing が残ることがあります。
本記事では自動回収までは書きませんが、入口として次の SQL はすぐ使えます。
docker compose exec db psql -U app -d app -c "
SELECT id, job_type, reserved_by, reserved_at, attempts, max_attempts
FROM jobs
WHERE status = 'processing'
AND reserved_at < NOW() - INTERVAL '5 minutes'
ORDER BY reserved_at;
"
ここで古い processing が見つかったら、最初にやるのは強制的な queued 戻しではありません。
まずは次の 3 点を照合します。
docker compose logs workerに、その job ID の完了または失敗ログがないかreserved_byの worker が停止済みかどうか- 実処理が外部 API やメール送信だった場合、外側では完了していないか
そのうえで、後続記事や実務実装では reserved_at にタイムアウトを設け、一定時間を超えた processing を回収対象として扱います。
入口として必要なのは、回収ロジックより先に「候補を安全に見つける列を持っておく」ことです。
8. まとめと次の一歩
今回の最小構成で持ち帰りたい点は 3 つです。
workerサービスはphp bin/worker.phpを起動コマンドにするだけでも十分に始められる- claim は
FOR UPDATE SKIP LOCKEDとprocessing更新までを短く閉じ、処理本体はトランザクション外で進める - 停止は
docker compose stop worker、確認はdocker compose logs worker、取り残し確認はreserved_atを起点に進める
ワーカー本数を増やしても、重くしないほうがよいのは claim のトランザクションです。 長く持つのは PHP プロセスであり、ロックではありません。
次の一歩は 2 つあります。
processingのまま残ったジョブを timeout 付きで回収する- HTTP リクエストや CLI から enqueue する入口をアプリ側へつなぐ
FOR UPDATE SKIP LOCKED の最小 claim をもう一度整理したい場合は、PHP + PostgreSQLでジョブキューを作る(FOR UPDATE SKIP LOCKED 最小構成) から戻るとつながりやすくなります。