公開日 2026-06-03

PHP + Docker で非同期ジョブを動かす最小構成(PostgreSQL キュー)

PHP と Docker で PostgreSQL キュー用の常駐ワーカーを作り、起動、停止、並列実行、取り残し確認の入口までを最小構成で整理する。

目次

  1. 前提環境
  2. 1. ゴールと非対象
  3. 2. 最小デモ環境を作成する
  4. コードのポイント
  5. 3. キュー初期化と投入スクリプトを用意する
  6. コードのポイント
  7. コードのポイント
  8. コードのポイント
  9. 4. 常駐ワーカー bin/worker.php を作る
  10. コードのポイント
  11. 5. 1 ワーカーで起動とログを確認する
  12. 6. 2 ワーカーで同時実行時の見え方を確認する
  13. 7. 停止方法と processing 取り残しの入口を押さえる
  14. 8. まとめと次の一歩

この記事の対象は、FOR UPDATE SKIP LOCKED で 1 件だけ取る最小 SQL は分かるが、それを Docker 上の常駐ワーカーへどうつなぐかで止まりやすい人です。 前段の PHP + PostgreSQLでジョブキューを作る(FOR UPDATE SKIP LOCKED 最小構成) では 1 回実行のデモまで確認しました。今回は常駐ワーカーへ広げます。

先に読んでおくと入りやすい記事:

前提環境

  • Windows 11
  • WSL2(Ubuntu)
  • VS Code(Remote - WSL)
  • Docker Desktop(WSL 連携有効)

以降のコマンドは、特記がない限り WSL 側ターミナルで実行します。

1. ゴールと非対象

到達する状態:

  • PostgreSQL の jobs テーブルを、Docker 上の常駐ワーカーから処理できる
  • worker サービス 1 本で 1 件取得 -> 処理 -> 完了更新 のループを再現できる
  • docker compose stop worker で止める、docker compose logs worker で見る、processing の取り残し候補を調べる、までを一続きで確認できる

扱わない内容:

  • HTTP からジョブを投入する API
  • stuck job の自動回収
  • Supervisor や systemd によるプロセス管理
  • dead letter queue、優先度制御、指数バックオフ
  • Redis / RabbitMQ / Kafka との比較

前段記事では claim を 1 回だけ実行しました。 本記事では、その claim を短いトランザクションのまま保ちつつ、PHP プロセス自体は止めるまで回し続けます。

2. 最小デモ環境を作成する

記事用のデモディレクトリを作ります。

mkdir -p ~/projects/php-docker-postgresql-job-worker-demo
cd ~/projects/php-docker-postgresql-job-worker-demo
mkdir -p bin docker/php docker/db
code .

compose.yml は次の内容で作成します。

services:
  app:
    build:
      context: .
      dockerfile: docker/php/Dockerfile
    working_dir: /workspace
    volumes:
      - ./:/workspace
    env_file:
      - .env
    command: ["sleep", "infinity"]
    depends_on:
      db:
        condition: service_healthy

  worker:
    build:
      context: .
      dockerfile: docker/php/Dockerfile
    working_dir: /workspace
    volumes:
      - ./:/workspace
    env_file:
      - .env
    command: ["php", "bin/worker.php"]
    restart: unless-stopped
    stop_grace_period: 10s
    depends_on:
      db:
        condition: service_healthy

  db:
    image: postgres:17
    environment:
      POSTGRES_DB: ${POSTGRES_DB}
      POSTGRES_USER: ${POSTGRES_USER}
      POSTGRES_PASSWORD: ${POSTGRES_PASSWORD}
    ports:
      - "5432:5432"
    volumes:
      - db-data:/var/lib/postgresql/data
      - ./docker/db/init.sql:/docker-entrypoint-initdb.d/init.sql:ro
    healthcheck:
      test: ["CMD-SHELL", "pg_isready -U ${POSTGRES_USER} -d ${POSTGRES_DB}"]
      interval: 5s
      timeout: 3s
      retries: 20

volumes:
  db-data:

app は投入と確認用、worker は常駐処理用です。 同じイメージを使いますが、起動コマンドだけを分けています。 workerrestart: unless-stopped を入れているので、想定外の例外で PHP プロセスが落ちても、コンテナは自動的に起動し直します。

docker/php/Dockerfile は次の内容で作成します。

FROM php:8.5-cli

RUN apt-get update \
    && apt-get install -y --no-install-recommends libpq-dev \
    && docker-php-ext-install pdo_pgsql pcntl \
    && rm -rf /var/lib/apt/lists/*

WORKDIR /workspace

pcntl を入れている理由は、ワーカーが SIGTERM を受けたときに現在のループを終えて止まれるようにするためです。

.env.example は次の内容で作成します。

POSTGRES_DB=app
POSTGRES_USER=app
POSTGRES_PASSWORD=app

APP_DB_HOST=db
APP_DB_PORT=5432
APP_DB_NAME=app
APP_DB_USER=app
APP_DB_PASS=app

APP_POLL_INTERVAL_SECONDS=2
APP_PROCESSING_SECONDS=4
APP_RETRY_DELAY_SECONDS=5

docker/db/init.sql は次の内容で作成します。

CREATE TABLE IF NOT EXISTS jobs (
  id BIGSERIAL PRIMARY KEY,
  job_type TEXT NOT NULL,
  payload JSONB NOT NULL DEFAULT '{}'::jsonb,
  status TEXT NOT NULL CHECK (status IN ('queued', 'processing', 'done', 'failed')),
  attempts INTEGER NOT NULL DEFAULT 0 CHECK (attempts >= 0),
  max_attempts INTEGER NOT NULL DEFAULT 3 CHECK (max_attempts > 0),
  available_at TIMESTAMPTZ NOT NULL DEFAULT NOW(),
  reserved_by TEXT,
  reserved_at TIMESTAMPTZ,
  last_error TEXT,
  finished_at TIMESTAMPTZ,
  created_at TIMESTAMPTZ NOT NULL DEFAULT NOW(),
  updated_at TIMESTAMPTZ NOT NULL DEFAULT NOW()
);

CREATE INDEX IF NOT EXISTS idx_jobs_dequeue
  ON jobs (status, available_at, id)
  WHERE status = 'queued';

コードのポイント

① 状態遷移と再試行に必要な列を 1 テーブルへ集める

CREATE TABLE IF NOT EXISTS jobs (
  id BIGSERIAL PRIMARY KEY,
  job_type TEXT NOT NULL,
  payload JSONB NOT NULL DEFAULT '{}'::jsonb,
  status TEXT NOT NULL CHECK (status IN ('queued', 'processing', 'done', 'failed')),
  attempts INTEGER NOT NULL DEFAULT 0 CHECK (attempts >= 0),
  max_attempts INTEGER NOT NULL DEFAULT 3 CHECK (max_attempts > 0),
  available_at TIMESTAMPTZ NOT NULL DEFAULT NOW(),
  reserved_by TEXT,
  reserved_at TIMESTAMPTZ,
  last_error TEXT,
  finished_at TIMESTAMPTZ,
  created_at TIMESTAMPTZ NOT NULL DEFAULT NOW(),
  updated_at TIMESTAMPTZ NOT NULL DEFAULT NOW()
);

status でジョブの現在地、attemptsmax_attempts でリトライ余地を把握できます。reserved_byreserved_at を残しておくことで、processing のまま長時間残っているジョブの追跡を後から始められます。

② dequeue 条件に絞った部分インデックスを張る

CREATE INDEX IF NOT EXISTS idx_jobs_dequeue
  ON jobs (status, available_at, id)
  WHERE status = 'queued';

WHERE status = 'queued' を付けることで、取り出し対象外の行をインデックスから除外できます。available_atid の順序が claim クエリの ORDER BY と一致しているので、インデックスがそのまま使われます。

reserved_at は、ワーカーが取った時刻を残す列です。 後で processing のまま残ったジョブ候補を洗い出す入口になります。

起動して確認します。

cp .env.example .env
docker compose up -d --build db app
docker compose exec db pg_isready -U app -d app
docker compose exec db psql -U app -d app -c "\dt"
docker compose ps

起動直後の確認結果は次のようになります。

起動直後の確認結果

appdb が起動し、jobs テーブルが見えれば OK です。

起動で詰まったとき:

docker compose logs db
docker compose ps
docker compose down -v
docker compose up -d --build db app

3. キュー初期化と投入スクリプトを用意する

worker を起動する前に、毎回同じ状態から検証できるようにしておきます。

bin/bootstrap.php は次の内容で作成します。全スクリプトで共有する DB 接続、環境変数取得、ワーカー ID 生成をまとめます。環境変数の整数チェックと、ワーカー ID の hostname 取得が注目点です。

<?php
declare(strict_types=1);

function envString(string $name, string $default): string
{
    $value = getenv($name);

    return $value === false || $value === '' ? $default : $value;
}

function envInt(string $name, int $default): int
{
    $value = getenv($name);

    if ($value === false || $value === '') {
        return $default;
    }

    $parsed = filter_var($value, FILTER_VALIDATE_INT);

    if ($parsed === false) {
        throw new RuntimeException(sprintf('%s must be an integer.', $name));
    }

    return $parsed;
}

function createPdo(): PDO
{
    $dsn = sprintf(
        'pgsql:host=%s;port=%s;dbname=%s',
        envString('APP_DB_HOST', 'db'),
        envString('APP_DB_PORT', '5432'),
        envString('APP_DB_NAME', 'app')
    );

    return new PDO(
        $dsn,
        envString('APP_DB_USER', 'app'),
        envString('APP_DB_PASS', 'app'),
        [
            PDO::ATTR_ERRMODE => PDO::ERRMODE_EXCEPTION,
            PDO::ATTR_DEFAULT_FETCH_MODE => PDO::FETCH_ASSOC,
        ]
    );
}

function workerId(): string
{
    $fromEnv = getenv('APP_WORKER_ID');

    if ($fromEnv !== false && $fromEnv !== '') {
        return $fromEnv;
    }

    $hostName = gethostname();

    return $hostName === false || $hostName === '' ? 'worker-unknown' : $hostName;
}

コードのポイント

envInt で型と範囲を起動時に検証する

function envInt(string $name, int $default): int
{
    $value = getenv($name);

    if ($value === false || $value === '') {
        return $default;
    }

    $parsed = filter_var($value, FILTER_VALIDATE_INT);

    if ($parsed === false) {
        throw new RuntimeException(sprintf('%s must be an integer.', $name));
    }

    return $parsed;
}

空ならデフォルト値を使い、整数に変換できなければ起動直後に例外を投げます。ポーリング間隔やリトライ秒数は実行中に壊れるより、起動時に止めるほうが原因を追いやすいです。

② ワーカー ID を hostname から取る

function workerId(): string
{
    $fromEnv = getenv('APP_WORKER_ID');

    if ($fromEnv !== false && $fromEnv !== '') {
        return $fromEnv;
    }

    $hostName = gethostname();

    return $hostName === false || $hostName === '' ? 'worker-unknown' : $hostName;
}

環境変数で上書きできるようにしつつ、デフォルトは gethostname() でコンテナ ID を使います。--scale worker=2 で 2 本起動したとき、ログが別 ID で分かれるのはこの仕組みによります。

bin/reset-demo.php は次の内容で作成します。

<?php
declare(strict_types=1);

require __DIR__ . '/bootstrap.php';

$pdo = createPdo();
$result = $pdo->exec('TRUNCATE TABLE jobs RESTART IDENTITY');

if ($result === false) {
    fwrite(STDERR, "failed to reset queue" . PHP_EOL);
    exit(1);
}

echo "queue reset completed" . PHP_EOL;

bin/enqueue-job.php は次の内容で作成します。引数から job_type・payload・最大試行回数・遅延秒数を受け取り、available_at をずらして投入できる点が前段記事との差分です。

<?php
declare(strict_types=1);

require __DIR__ . '/bootstrap.php';

$jobType = $argv[1] ?? '';
$payloadJson = $argv[2] ?? '{}';
$maxAttempts = isset($argv[3]) ? (int) $argv[3] : 3;
$delaySeconds = isset($argv[4]) ? (int) $argv[4] : 0;

if ($jobType === '' || $maxAttempts < 1 || $delaySeconds < 0) {
    fwrite(
        STDERR,
        "Usage: php bin/enqueue-job.php JOB_TYPE PAYLOAD_JSON [MAX_ATTEMPTS] [DELAY_SECONDS]" . PHP_EOL
    );
    exit(1);
}

json_decode($payloadJson, true, 512, JSON_THROW_ON_ERROR);

$pdo = createPdo();
$statement = $pdo->prepare(<<<'SQL'
INSERT INTO jobs (
    job_type,
    payload,
    status,
    max_attempts,
    available_at,
    updated_at
) VALUES (
    :job_type,
    CAST(:payload AS jsonb),
    'queued',
    :max_attempts,
    NOW() + make_interval(secs => :delay_seconds),
    NOW()
)
RETURNING id
SQL);

$statement->bindValue(':job_type', $jobType, PDO::PARAM_STR);
$statement->bindValue(':payload', $payloadJson, PDO::PARAM_STR);
$statement->bindValue(':max_attempts', $maxAttempts, PDO::PARAM_INT);
$statement->bindValue(':delay_seconds', $delaySeconds, PDO::PARAM_INT);
$statement->execute();

$jobId = (int) $statement->fetchColumn();

printf("enqueued job #%d (%s)%s", $jobId, $jobType, PHP_EOL);

コードのポイント

① 引数バリデーションと JSON チェックを投入前に完了させる

$jobType = $argv[1] ?? '';
$payloadJson = $argv[2] ?? '{}';
$maxAttempts = isset($argv[3]) ? (int) $argv[3] : 3;
$delaySeconds = isset($argv[4]) ? (int) $argv[4] : 0;

if ($jobType === '' || $maxAttempts < 1 || $delaySeconds < 0) {
    fwrite(
        STDERR,
        "Usage: php bin/enqueue-job.php JOB_TYPE PAYLOAD_JSON [MAX_ATTEMPTS] [DELAY_SECONDS]" . PHP_EOL
    );
    exit(1);
}

json_decode($payloadJson, true, 512, JSON_THROW_ON_ERROR);

job_type の空チェックと数値範囲チェックを先に済ませ、JSON パースも投入前に検証しています。不正 payload がキューに入ると、取り出した側で初めて壊れていたと分かる状況を防げます。

available_at に遅延を組み込んで投入する

INSERT INTO jobs (
    job_type,
    payload,
    status,
    max_attempts,
    available_at,
    updated_at
) VALUES (
    :job_type,
    CAST(:payload AS jsonb),
    'queued',
    :max_attempts,
    NOW() + make_interval(secs => :delay_seconds),
    NOW()
)
RETURNING id

available_atNOW() + make_interval(secs => :delay_seconds) にすることで、即時投入と遅延投入を同じ INSERT で切り替えられます。CAST(:payload AS jsonb) によって PHP 文字列を DB 側で JSONB として格納します。

bin/show-queue.php は次の内容で作成します。

<?php
declare(strict_types=1);

require __DIR__ . '/bootstrap.php';

$pdo = createPdo();
$statement = $pdo->query(<<<'SQL'
SELECT
    id,
    job_type,
    payload,
    status,
    attempts,
    max_attempts,
    available_at,
    reserved_by,
    reserved_at,
    last_error,
    finished_at
FROM jobs
ORDER BY id
SQL);

$rows = $statement->fetchAll();

echo json_encode($rows, JSON_PRETTY_PRINT | JSON_UNESCAPED_UNICODE) . PHP_EOL;

コードのポイント

① 監視に役立つ列だけ選んで取得する

SELECT
    id,
    job_type,
    payload,
    status,
    attempts,
    max_attempts,
    available_at,
    reserved_by,
    reserved_at,
    last_error,
    finished_at
FROM jobs
ORDER BY id

status でいま何をしているか、reserved_by で誰が取ったか、last_error でなぜ失敗したかを一覧で追えます。show-queue.php を毎回実行すれば、ワーカーの動きをログと DB 両側から照合できます。

ここまで作ったら、前準備を一度通します。

docker compose exec app php bin/reset-demo.php
docker compose exec app php bin/enqueue-job.php send-mail '{"to":"alice@example.com"}'
docker compose exec app php bin/enqueue-job.php build-report '{"report_id":42,"fail_once":true}'
docker compose exec app php bin/enqueue-job.php export-csv '{"rows":500}'
docker compose exec app php bin/show-queue.php

この時点では、3 件とも queued で並んでいれば十分です。 build-report にだけ fail_once を入れているので、後で retry の流れもそのまま使えます。

4. 常駐ワーカー bin/worker.php を作る

ワーカーの構成は単純です。

  1. 取れるジョブが 1 件あるか探す
  2. あれば processing にして処理する
  3. 成功なら done、失敗なら queuedfailed に更新する
  4. 取れるジョブがなければ少し待ってもう一度見る

流れを図で先に置くと、次の形です。

flowchart TD
    A[worker loop start] --> B{queued job exists?}
    B -- no --> C[sleep poll interval]
    C --> A
    B -- yes --> D[claim with FOR UPDATE SKIP LOCKED]
    D --> E[update to processing]
    E --> F[process job outside transaction]
    F --> G{success?}
    G -- yes --> H[update to done]
    G -- no, attempts left --> I[update to queued and delay available_at]
    G -- no, attempts exhausted --> J[update to failed]
    H --> A
    I --> A
    J --> A

bin/worker.php は次の内容で作成します。シグナルによる安全停止、FOR UPDATE SKIP LOCKED での claim、成功・再試行・失敗確定の分岐まで 1 ファイルにまとめます。長く動くのは PHP プロセスであり、DB トランザクションではないことが核心です。

<?php
declare(strict_types=1);

require __DIR__ . '/bootstrap.php';

$pdo = createPdo();
$workerId = workerId();
$pollIntervalSeconds = envInt('APP_POLL_INTERVAL_SECONDS', 2);
$processingSeconds = envInt('APP_PROCESSING_SECONDS', 4);
$retryDelaySeconds = envInt('APP_RETRY_DELAY_SECONDS', 5);

$shouldStop = false;

pcntl_async_signals(true);
pcntl_signal(SIGTERM, function () use (&$shouldStop, $workerId): void {
    $shouldStop = true;
    printf("[%s] received SIGTERM, stop after current loop%s", $workerId, PHP_EOL);
});
pcntl_signal(SIGINT, function () use (&$shouldStop, $workerId): void {
    $shouldStop = true;
    printf("[%s] received SIGINT, stop after current loop%s", $workerId, PHP_EOL);
});

printf("[%s] worker started%s", $workerId, PHP_EOL);

while (!$shouldStop) {
    try {
        $job = claimJob($pdo, $workerId);

        if ($job === null) {
            printf(
                "[%s] no ready jobs, sleep %d second(s)%s",
                $workerId,
                $pollIntervalSeconds,
                PHP_EOL
            );
            sleep($pollIntervalSeconds);
            continue;
        }

        processJob($pdo, $job, $workerId, $processingSeconds, $retryDelaySeconds);
    } catch (Throwable $throwable) {
        fprintf(
            STDERR,
            "[%s] unexpected error: %s%s",
            $workerId,
            $throwable->getMessage(),
            PHP_EOL
        );
        sleep($pollIntervalSeconds);
    }
}

printf("[%s] worker stopped gracefully%s", $workerId, PHP_EOL);

function claimJob(PDO $pdo, string $workerId): ?array
{
    $pdo->beginTransaction();

    try {
        $statement = $pdo->prepare(<<<'SQL'
WITH picked AS (
    SELECT id
    FROM jobs
    WHERE status = 'queued'
      AND available_at <= NOW()
    ORDER BY available_at, id
    FOR UPDATE SKIP LOCKED
    LIMIT 1
)
UPDATE jobs AS j
SET status = 'processing',
    attempts = j.attempts + 1,
    reserved_by = :worker_id,
    reserved_at = NOW(),
    updated_at = NOW()
FROM picked
WHERE j.id = picked.id
RETURNING
    j.id,
    j.job_type,
    j.payload::text AS payload_json,
    j.attempts,
    j.max_attempts
SQL);
        $statement->execute([':worker_id' => $workerId]);
        $row = $statement->fetch();
        $pdo->commit();

        if ($row === false) {
            return null;
        }

        $row['payload'] = json_decode((string) $row['payload_json'], true, 512, JSON_THROW_ON_ERROR);
        unset($row['payload_json']);

        return $row;
    } catch (Throwable $throwable) {
        if ($pdo->inTransaction()) {
            $pdo->rollBack();
        }

        throw $throwable;
    }
}

function processJob(
    PDO $pdo,
    array $job,
    string $workerId,
    int $processingSeconds,
    int $retryDelaySeconds
): void {
    $jobId = (int) $job['id'];
    $attempts = (int) $job['attempts'];
    $maxAttempts = (int) $job['max_attempts'];
    $payload = $job['payload'];

    printf(
        "[%s] claimed job #%d (%s), attempt %d/%d%s",
        $workerId,
        $jobId,
        $job['job_type'],
        $attempts,
        $maxAttempts,
        PHP_EOL
    );
    printf("[%s] processing for %d second(s)%s", $workerId, $processingSeconds, PHP_EOL);

    sleep($processingSeconds);

    $shouldFailOnce = (bool) ($payload['fail_once'] ?? false);

    if ($shouldFailOnce && $attempts === 1) {
        $message = 'simulated failure for retry demo';

        if ($attempts >= $maxAttempts) {
            markFailed($pdo, $jobId, $message);
            printf("[%s] job #%d failed permanently%s", $workerId, $jobId, PHP_EOL);
            return;
        }

        requeueJob($pdo, $jobId, $message, $retryDelaySeconds);
        printf(
            "[%s] job #%d failed, re-queued after %d second(s)%s",
            $workerId,
            $jobId,
            $retryDelaySeconds,
            PHP_EOL
        );
        return;
    }

    markDone($pdo, $jobId);
    printf("[%s] job #%d completed%s", $workerId, $jobId, PHP_EOL);
}

function markDone(PDO $pdo, int $jobId): void
{
    $statement = $pdo->prepare(<<<'SQL'
UPDATE jobs
SET status = 'done',
    last_error = NULL,
    finished_at = NOW(),
    updated_at = NOW()
WHERE id = :id
SQL);
    $statement->execute([':id' => $jobId]);
}

function requeueJob(PDO $pdo, int $jobId, string $message, int $retryDelaySeconds): void
{
    $statement = $pdo->prepare(<<<'SQL'
UPDATE jobs
SET status = 'queued',
    available_at = NOW() + make_interval(secs => :retry_delay_seconds),
    last_error = :message,
    reserved_by = NULL,
    reserved_at = NULL,
    updated_at = NOW()
WHERE id = :id
SQL);
    $statement->bindValue(':retry_delay_seconds', $retryDelaySeconds, PDO::PARAM_INT);
    $statement->bindValue(':message', $message, PDO::PARAM_STR);
    $statement->bindValue(':id', $jobId, PDO::PARAM_INT);
    $statement->execute();
}

function markFailed(PDO $pdo, int $jobId, string $message): void
{
    $statement = $pdo->prepare(<<<'SQL'
UPDATE jobs
SET status = 'failed',
    last_error = :message,
    finished_at = NOW(),
    updated_at = NOW()
WHERE id = :id
SQL);
    $statement->bindValue(':message', $message, PDO::PARAM_STR);
    $statement->bindValue(':id', $jobId, PDO::PARAM_INT);
    $statement->execute();
}

コードのポイント

① SIGTERM を受けたら現在のループを終えてから止まる

pcntl_async_signals(true);
pcntl_signal(SIGTERM, function () use (&$shouldStop, $workerId): void {
    $shouldStop = true;
    printf("[%s] received SIGTERM, stop after current loop%s", $workerId, PHP_EOL);
});

docker compose stop worker を打つと、処理中のジョブを完了させてから停止します。

② claim は FOR UPDATE SKIP LOCKEDprocessing 更新を 1 トランザクションで閉じる

WITH picked AS (
    SELECT id
    FROM jobs
    WHERE status = 'queued'
      AND available_at <= NOW()
    ORDER BY available_at, id
    FOR UPDATE SKIP LOCKED
    LIMIT 1
)
UPDATE jobs AS j
SET status = 'processing',
    attempts = j.attempts + 1,
    reserved_by = :worker_id,

FOR UPDATE SKIP LOCKED で他ワーカーが取得中の行を飛ばし、同時に status = 'processing' へ更新します。このトランザクションは取得だけに使い、実処理はトランザクション外で進めることで、ロックを長時間持ち続けません。

③ 失敗時はリトライ余地に応じて queued 戻しか failed 確定に分岐する

if ($shouldFailOnce && $attempts === 1) {
    $message = 'simulated failure for retry demo';

    if ($attempts >= $maxAttempts) {
        markFailed($pdo, $jobId, $message);
        printf("[%s] job #%d failed permanently%s", $workerId, $jobId, PHP_EOL);
        return;
    }

    requeueJob($pdo, $jobId, $message, $retryDelaySeconds);

上限に達した場合は failed で止め、余地があれば available_at をずらして queued に戻します。どちらの経路でも processing のまま放置しないのが重要で、stuck job を防ぐ基本になります。

workerId()HOSTNAME を拾うので、worker を 2 本に増やしてもコンテナごとに別 ID でログを見分けられます。 メインループを try/catch で包んでいるため、接続の瞬断や payload の不整合で 1 回例外が出ても、その場でコンテナが止まりません。

5. 1 ワーカーで起動とログを確認する

最初は 1 本だけで十分です。 キューを空にしてからジョブを入れ、worker サービスを起動します。

docker compose exec app php bin/reset-demo.php
docker compose exec app php bin/enqueue-job.php send-mail '{"to":"alice@example.com"}'
docker compose exec app php bin/enqueue-job.php build-report '{"report_id":42,"fail_once":true}'
docker compose exec app php bin/enqueue-job.php export-csv '{"rows":500}'
docker compose up -d worker
docker compose logs -f worker

ログの見え方は次のようになります。

worker-1  | [4b2a7f5c6c19] worker started
worker-1  | [4b2a7f5c6c19] claimed job #1 (send-mail), attempt 1/3
worker-1  | [4b2a7f5c6c19] processing for 4 second(s)
worker-1  | [4b2a7f5c6c19] job #1 completed
worker-1  | [4b2a7f5c6c19] claimed job #2 (build-report), attempt 1/3
worker-1  | [4b2a7f5c6c19] processing for 4 second(s)
worker-1  | [4b2a7f5c6c19] job #2 failed, re-queued after 5 second(s)

docker compose logs -f worker を開きっぱなしにしておくと、ワーカーがいま何をしているかを追えます。 状態を DB でも照合しておきます。

docker compose exec app php bin/show-queue.php

send-maildonebuild-report が一度 queued に戻っていれば正常です。 もし worker コンテナが落ちた場合でも、docker compose logs worker で直前の例外を確認し、その後に docker compose up -d worker で再起動すれば同じ手順へ戻れます。

worker ログとキュー状態を並べると、処理結果を照合しやすくなります。

1 ワーカー実行時の worker ログ 1 ワーカー実行後のキュー状態

6. 2 ワーカーで同時実行時の見え方を確認する

次は worker サービスを 2 本に増やします。 先に状態を戻して、少なくとも 3 件は ready job を入れてから確認します。

docker compose stop worker
docker compose exec app php bin/reset-demo.php
docker compose exec app php bin/enqueue-job.php send-mail '{"to":"alice@example.com"}'
docker compose exec app php bin/enqueue-job.php build-report '{"report_id":42}'
docker compose exec app php bin/enqueue-job.php export-csv '{"rows":500}'
docker compose up -d --scale worker=2 worker
docker compose logs --tail=50 worker

流れを図で置くと、2 本のワーカーはこう進みます。

sequenceDiagram
    participant W1 as worker-1
    participant DB as PostgreSQL
    participant W2 as worker-2

    W1->>DB: claim queued job with FOR UPDATE SKIP LOCKED
    DB-->>W1: job #1, status=processing
    W2->>DB: claim queued job with FOR UPDATE SKIP LOCKED
    DB-->>W2: job #2, status=processing
    W1->>DB: update job #1 to done
    W2->>DB: update job #2 to done

実際のログは次のようになります。

worker-1  | [77184a0a0af1] worker started
worker-1  | [77184a0a0af1] claimed job #1 (send-mail), attempt 1/3
worker-1  | [77184a0a0af1] processing for 4 second(s)
worker-2  | [8c45b9fc7118] worker started
worker-2  | [8c45b9fc7118] claimed job #2 (build-report), attempt 1/3
worker-2  | [8c45b9fc7118] processing for 4 second(s)
worker-1  | [77184a0a0af1] job #1 completed
worker-2  | [8c45b9fc7118] job #2 completed
worker-1  | [77184a0a0af1] claimed job #3 (export-csv), attempt 1/3
worker-1  | [77184a0a0af1] processing for 4 second(s)
worker-1  | [77184a0a0af1] job #3 completed

claim を担当する worker とログの行順は毎回固定ではありません。docker compose logs --tail=50 worker には、直前の 1 ワーカー実行分や docker compose stop worker で出た SIGTERM ログが混ざることもあります。

ここで見たいのは、片方の worker が 1 件処理している間に、もう片方が別の job へ進めること、そして同じ job ID を 2 本の worker が二重取得していないことです。 FOR UPDATE SKIP LOCKED がその分離を担い、status = 'processing' への更新が再取得を防いでいます。

キュー状態も確認します。

docker compose exec app php bin/show-queue.php
docker compose ps worker

attemptsreserved_by を見ると、どの worker が処理したかをあとから追えます。

retry も常駐ワーカー側でそのまま動きます。 たとえば fail_once を混ぜると、1 回目は queued に戻り、次のループか別ワーカーのループで再取得されます。

docker compose stop worker
docker compose exec app php bin/reset-demo.php
docker compose exec app php bin/enqueue-job.php build-report '{"report_id":99,"fail_once":true}' 3 0
docker compose up -d --scale worker=2 worker
docker compose logs --tail=50 worker
docker compose exec app php bin/show-queue.php

1 回目は queued に戻り、available_at が 5 秒先へずれます。 少し待つと、次のループで再取得されて done になります。

max_attempts = 1 の失敗確定も同じです。

docker compose stop worker
docker compose exec app php bin/reset-demo.php
docker compose exec app php bin/enqueue-job.php build-report '{"report_id":100,"fail_once":true}' 1 0
docker compose up -d worker
docker compose logs --tail=20 worker
docker compose exec app php bin/show-queue.php

このときは再投入されず、failed で止まります。

7. 停止方法と processing 取り残しの入口を押さえる

ワーカーを止めるときは、コンテナをいきなり消すのではなく stop を使います。

docker compose stop worker
docker compose logs --tail=20 worker
docker compose ps worker

worker.phpSIGTERM を受けたらフラグを立て、現在のループを終えてから止まるようにしています。 そのため、ログには次のような行が出ます。

worker-1  | [77184a0a0af1] received SIGTERM, stop after current loop
worker-1  | [77184a0a0af1] worker stopped gracefully

この形にしておくと、「いま処理中のジョブを途中で切り捨てたのか」「ループは正常に閉じたのか」をログで判断できます。

それでも、コンテナ障害やホスト再起動では processing が残ることがあります。 本記事では自動回収までは書きませんが、入口として次の SQL はすぐ使えます。

docker compose exec db psql -U app -d app -c "
SELECT id, job_type, reserved_by, reserved_at, attempts, max_attempts
FROM jobs
WHERE status = 'processing'
  AND reserved_at < NOW() - INTERVAL '5 minutes'
ORDER BY reserved_at;
"

ここで古い processing が見つかったら、最初にやるのは強制的な queued 戻しではありません。 まずは次の 3 点を照合します。

  • docker compose logs worker に、その job ID の完了または失敗ログがないか
  • reserved_by の worker が停止済みかどうか
  • 実処理が外部 API やメール送信だった場合、外側では完了していないか

そのうえで、後続記事や実務実装では reserved_at にタイムアウトを設け、一定時間を超えた processing を回収対象として扱います。 入口として必要なのは、回収ロジックより先に「候補を安全に見つける列を持っておく」ことです。

8. まとめと次の一歩

今回の最小構成で持ち帰りたい点は 3 つです。

  • worker サービスは php bin/worker.php を起動コマンドにするだけでも十分に始められる
  • claim は FOR UPDATE SKIP LOCKEDprocessing 更新までを短く閉じ、処理本体はトランザクション外で進める
  • 停止は docker compose stop worker、確認は docker compose logs worker、取り残し確認は reserved_at を起点に進める

ワーカー本数を増やしても、重くしないほうがよいのは claim のトランザクションです。 長く持つのは PHP プロセスであり、ロックではありません。

次の一歩は 2 つあります。

  1. processing のまま残ったジョブを timeout 付きで回収する
  2. HTTP リクエストや CLI から enqueue する入口をアプリ側へつなぐ

FOR UPDATE SKIP LOCKED の最小 claim をもう一度整理したい場合は、PHP + PostgreSQLでジョブキューを作る(FOR UPDATE SKIP LOCKED 最小構成) から戻るとつながりやすくなります。