FOR UPDATE SKIP LOCKED で safe に claim できても、worker crash や deploy 中断が入ると processing のまま戻らない job は残ります。いつ reclaim してよいか、どの条件なら再実行しても問題が起きないかを決めるのが、運用上の難しい部分にある。
この記事では、PostgreSQL の job table を前提に、timeout 判定をどの列で持つか、reclaim の対象をどう絞るか、古い worker が復帰しても現在の実行を上書きできないようにするには何が必要かを整理します。
先に読んでおくとつながりやすい記事:
- PHP + PostgreSQLでジョブキューを作る(FOR UPDATE SKIP LOCKED 最小構成)
- PHP + Docker で非同期ジョブを動かす最小構成(PostgreSQL キュー)
1. 問題設定
stuck job は単に processing が長い job ではありません。いま実行権を持つ worker がもう heartbeat を更新できず、そのままでは次に進まない job です。ここを曖昧にすると、遅いだけの正常 job まで reclaim して二重実行が増える。
雑に戻す案が危ない理由は次のとおりです。
| 回収案 | 問題点 |
|---|---|
reserved_at < NOW() - interval '5 minutes' だけで queued に戻す | 長時間処理と停止 worker を区別できない |
運用者が UPDATE jobs SET status = 'queued' を手で打つ | 古い worker があとから done を書き戻せる余地が残る |
processing 行を消して新しい job を積み直す | 失敗理由や試行回数が消え、調査しにくい |
SKIP LOCKED は「同じ ready job を同時に取らない」ための仕組みです。今回の論点は別で、いったん processing に入った job を、どの時点なら安全に reclaim できるかにあります。
扱う範囲は次の 3 点です。
- timeout 判定の正本をどの列に置くか
- reclaim 対象を誤って広げない SQL 条件
- 再実行後に古い worker が結果を上書きしないための柵
言語別 worker 実装、監視通知、専用キューとの比較は扱いません。ここでは PostgreSQL 側の設計に絞ります。
2. 前提をそろえる
まず分けて考えたいのは、claim 時刻、観測時刻、timeout 判定時刻の 3 つです。
| 列 | 役割 | timeout 判定の正本にするか |
|---|---|---|
reserved_at | いつ claim したかを残す監査列 | しない |
heartbeat_at | worker が最後に生存を示した時刻 | 単独ではしない |
lease_expires_at | この時刻を過ぎたら reclaim 候補とみなす列 | する |
reserved_at だけで timeout を見ると、重い job と死んだ job が同じ見え方になります。heartbeat_at は観測には便利ですが、reclaimer が見る条件を 1 つに揃えるなら、lease_expires_at を更新していく形のほうが扱いやすい設計です。
既存の jobs テーブルにある attempts(試行回数)/ max_attempts(上限)/ available_at(実行解禁時刻)はそのまま使います。最小構成として、追加する列は次のとおりです。
ALTER TABLE jobs
ADD COLUMN attempt_token UUID,
ADD COLUMN lease_expires_at TIMESTAMPTZ,
ADD COLUMN heartbeat_at TIMESTAMPTZ,
ADD COLUMN recovery_policy TEXT NOT NULL DEFAULT 'auto'
CHECK (recovery_policy IN ('auto', 'manual')),
ADD COLUMN reclaim_count INTEGER NOT NULL DEFAULT 0 CHECK (reclaim_count >= 0);
- 2 行目
attempt_token UUIDは、いまの実行権を表す柵です。claim のたびに新しい値へ差し替えます。 - 3 行目
lease_expires_at TIMESTAMPTZを timeout 判定の正本にします。worker は heartbeat 時にこの値を延長します。 - 5 行目
recovery_policyは、自動回収してよい job と、人間確認を挟む job を DB 上で分けるための列です。
processing 行の追跡には、次の部分インデックスが効きます。
CREATE INDEX idx_jobs_processing_timeout
ON jobs (status, lease_expires_at, id)
WHERE status = 'processing';
この index は reclaim 専用です。dequeue 用の index が queued 行を探すためのものであるのに対し、こちらは timeout 候補を狭く取るために置きます。
3. 論点を整理する
回収の流れは、claim より後ろで次のように分かれます。
flowchart TD
A[worker が claim] --> B[status=processing<br/>attempt_token 発行<br/>lease_expires_at 設定]
B --> C{heartbeat が続くか}
C -->|はい| D[lease_expires_at を延長]
D --> E{処理完了したか}
E -->|はい| F[attempt_token 条件付きで done 更新]
E -->|いいえ| C
C -->|いいえ| G[lease_expires_at を超過]
G --> H[reclaimer が expired row を取得]
H --> I{recovery_policy と max_attempts}
I -->|auto かつ上限未満| J[queued に戻す]
I -->|manual または上限到達| K[failed か manual 対象へ送る]
J --> A
F --> L[終了]
K --> L
claim 時に実行権を作る
まず worker は ready job を 1 件取り、processing への更新と同時に attempt_token と lease_expires_at をセットします。
WITH target AS (
SELECT id
FROM jobs
WHERE status = 'queued'
AND available_at <= NOW()
ORDER BY available_at, id
FOR UPDATE SKIP LOCKED
LIMIT 1
)
UPDATE jobs j
SET status = 'processing',
reserved_by = $1,
reserved_at = NOW(),
lease_expires_at = NOW() + INTERVAL '30 seconds',
attempt_token = $2,
attempts = attempts + 1,
updated_at = NOW()
FROM target
WHERE j.id = target.id
RETURNING j.id, j.attempt_token, j.lease_expires_at;
- 7 行目
FOR UPDATE SKIP LOCKEDで、別 worker が先に claim 中の ready job を待たずに飛ばします。 - 14 行目
lease_expires_at = NOW() + INTERVAL '30 seconds'が timeout 判定の出発点です。 - 15 行目
attempt_token = $2により、今回の実行権を次の update でも識別できます。値はアプリ側で UUID を生成して渡す前提です。
heartbeat は lease を延長する
worker が生きている間は、一定間隔で lease_expires_at を延長します。
UPDATE jobs
SET heartbeat_at = NOW(),
lease_expires_at = NOW() + INTERVAL '30 seconds',
updated_at = NOW()
WHERE id = $1
AND status = 'processing'
AND reserved_by = $2
AND attempt_token = $3;
3 行目の更新で見るべき値は heartbeat_at ではなく lease_expires_at です。heartbeat_at はあとから「最後にいつ生きていたか」を見るために残します。
8 行目 attempt_token = $3 がないと、すでに reclaim されたあとに古い worker が heartbeat を送り直し、再取得済み job の lease を延ばしてしまう余地が生まれます。
reclaim は expired row を柵付きで戻す
reclaimer は lease_expires_at < NOW() を満たす processing 行だけを取り、recovery_policy と試行回数で戻し先を分けます。ここでは既存の 4 状態を前提に、manual 系はいったん failed へ寄せています。manual_review のような専用状態を持つ運用なら、CASE 式の ELSE 'failed' をその状態へ置き換えてください。
WITH expired AS (
SELECT id, attempt_token, recovery_policy
FROM jobs
WHERE status = 'processing'
AND lease_expires_at < NOW()
ORDER BY lease_expires_at, id
FOR UPDATE SKIP LOCKED
LIMIT 50
)
UPDATE jobs j
SET status = CASE
WHEN expired.recovery_policy = 'auto' AND j.attempts < j.max_attempts THEN 'queued'
ELSE 'failed'
END,
available_at = CASE
WHEN expired.recovery_policy = 'auto' AND j.attempts < j.max_attempts THEN NOW()
ELSE j.available_at
END,
reserved_by = NULL,
reserved_at = NULL,
heartbeat_at = NULL,
lease_expires_at = NULL,
attempt_token = NULL,
reclaim_count = j.reclaim_count + 1,
last_error = 'lease expired before completion',
finished_at = CASE
WHEN expired.recovery_policy = 'auto' AND j.attempts < j.max_attempts THEN NULL
ELSE NOW()
END,
updated_at = NOW()
FROM expired
WHERE j.id = expired.id
AND j.attempt_token = expired.attempt_token
RETURNING j.id, j.status, j.reclaim_count;
- 5 行目
lease_expires_at < NOW()が reclaim 候補の正本です。reserved_atの古さではなく、最後に延長された lease が切れたかで判断します。 - 12 行目の分岐で、自動回収してよい job だけを
queuedへ戻します。そうでない job はfailedか、別の manual review 状態へ送る設計にします。 - 33 行目
j.attempt_token = expired.attempt_tokenによって、選択した実行権と同じ試行だけを reclaim します。ここが stale worker を切り離す最後の柵です。
complete も attempt_token 条件付きで閉じる
処理が終わった worker は、結果更新でも同じ attempt_token を条件へ入れます。
UPDATE jobs
SET status = 'done',
reserved_by = NULL,
reserved_at = NULL,
heartbeat_at = NULL,
lease_expires_at = NULL,
attempt_token = NULL,
finished_at = NOW(),
updated_at = NOW()
WHERE id = $1
AND status = 'processing'
AND attempt_token = $2;
2 行目で done にできるのは、まだ自分が現在の実行権を持っているときに限ります。12 行目の条件で 0 行更新になったなら、その worker はすでに reclaim 済みだと判断してログへ落とし、結果を確定させないほうが安全です。
この柵で守れるのは DB 側の境界だけだ。メール送信や決済のような外部副作用の二重実行までは防げないので、そこは別に idempotency key か業務キーで重複排除を持たせる必要があります。
4. 判断基準を置く
自動回収と手動回収は、job の性質で分けたほうが運用しやすくなります。
| job の性質 | 自動回収 | 理由 |
|---|---|---|
| 同じ入力を再実行しても結果が上書きされるだけ | 向く | たとえば集計再計算やキャッシュ生成は idempotent に寄せやすい |
| 外部 API が idempotency key を受け付ける | 向く | reclaim 後の再送でも外部側で重複を吸収しやすい |
| 決済、メール送信、在庫引当のように副作用が強いが重複排除が弱い | 向かない | DB 側の柵では外部副作用の重複を止めきれない |
| 同じ job が繰り返し timeout し、原因が処理時間か障害か未判別 | 向かない | まず実行環境かロジックかを切り分けるべき |
役割分担は次のように切ると実務で混ざりにくくなります。
- 自動回収:
lease_expires_at超過、recovery_policy = 'auto'、attempts < max_attemptsの job を短いバッチで戻す - 手動回収: 外部副作用が強い job、同一 job の timeout 連発、deploy 直後にまとめて増えた timeout を調べる
- 運用者確認: ログ、外部システムの状態、直近 deploy、有無を見てから
queuedに戻すかfailedで止めるかを決める
運用開始前の確認項目は 4 つです。
- timeout 判定列が
lease_expires_atに一本化されているか - claim / heartbeat / reclaim / complete の全更新が
attempt_token条件付きか - 自動回収する job に idempotency key か業務上の重複排除策があるか
- 自動回収しない job をどこへ退避し、誰が見るかが決まっているか
ここまで決まっていれば、processing 行が増えたときにも「とりあえず戻す」ではなく、「lease が切れていて、かつ自動回収してよい job か」で判断できます。
5. まとめ
stuck job 回収で先に決めるべきなのは、timeout 判定の正本を何にするかです。reserved_at は監査に残しつつ、reclaim 条件は lease_expires_at に寄せると、heartbeat と回収 SQL の軸がぶれません。
そのうえで、claim / heartbeat / reclaim / complete のすべてに attempt_token を通すと、reclaim 後に古い worker が復帰しても現在の実行を上書きしにくくなります。DB 側で守れる境界と、外部副作用の重複排除を分けて考えることが、再実行を安全にする最短ルートです。
次に読み進めるなら、claim の最小構成は PHP + PostgreSQLでジョブキューを作る(FOR UPDATE SKIP LOCKED 最小構成)、常駐 worker へつなぐ流れは PHP + Docker で非同期ジョブを動かす最小構成(PostgreSQL キュー) がつながります。