--[[ Function to create a new job from stored dedup-next data when a deduplicated job with keepLastIfActive finishes. At most one next job is created per deduplication ID. Multiple triggers while active overwrite the dedup-next data, so only the latest data is used. ]] -- Includes --- @include "getOrSetMaxEvents" --- @include "setDeduplicationKey" --- @include "storeAndEnqueueJob" local function requeueDeduplicatedJob(prefix, deduplicationId, eventStreamKey, metaKey, activeKey, waitKey, pausedKey, markerKey, prioritizedKey, priorityCounterKey, delayedKey, timestamp) local deduplicationNextKey = prefix .. "dn:" .. deduplicationId if rcall("EXISTS", deduplicationNextKey) == 1 then local nextData = rcall("HMGET", deduplicationNextKey, "name", "data", "opts", "pk", "pd", "pdk", "rjk") local newJobId = rcall("INCR", prefix .. "id") .. "" local newJobIdKey = prefix .. newJobId local newOpts = cjson.decode(nextData[3]) local deduplicationKey = prefix .. "de:" .. deduplicationId local parentKey = nextData[4] or nil local parentData = nextData[5] or nil local parentDependenciesKey = nextData[6] or nil local repeatJobKey = nextData[7] or nil -- Set dedup key for the new job (without TTL when keepLastIfActive, -- so the key outlives the job's active duration) local deOpts = newOpts['de'] if deOpts and deOpts['keepLastIfActive'] then rcall('SET', deduplicationKey, newJobId) else setDeduplicationKey(deduplicationKey, newJobId, deOpts) end -- Store and enqueue using the shared helper (handles priority/lifo/delayed) local maxEvents = getOrSetMaxEvents(metaKey) storeAndEnqueueJob(eventStreamKey, newJobIdKey, newJobId, nextData[1], nextData[2], newOpts, timestamp, parentKey, parentData, repeatJobKey, maxEvents, waitKey, pausedKey, activeKey, metaKey, prioritizedKey, priorityCounterKey, delayedKey, markerKey) -- Register as child dependency if the job has a parent if parentDependenciesKey then rcall("SADD", parentDependenciesKey, newJobIdKey) end -- Only delete the dedup-next hash after the job is fully created, -- so that if any step above errors, the data is not permanently lost. rcall("DEL", deduplicationNextKey) end end