forked from seomoz/qless-core
-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathcomplete.lua
More file actions
218 lines (189 loc) · 8.32 KB
/
complete.lua
File metadata and controls
218 lines (189 loc) · 8.32 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
-- Complete(0, jid, worker, queue, now, data, [next, q, [(delay, d) | (depends, '["jid1","jid2",...]')])
-- -----------------------------------------------------------------------------------------------------
-- Complete a job and optionally put it in another queue, either scheduled or to
-- be considered waiting immediately. It can also optionally accept other jids
-- on which this job will be considered dependent before it's considered valid.
--
-- Args:
-- 1) jid
-- 2) worker
-- 3) queue
-- 4) now
-- 5) data
-- *) [next, q, [delay, d]], [depends, '...']
if #KEYS > 0 then error('Complete(): No Keys should be provided') end
local jid = assert(ARGV[1] , 'Complete(): Arg "jid" missing.')
local worker = assert(ARGV[2] , 'Complete(): Arg "worker" missing.')
local queue = assert(ARGV[3] , 'Complete(): Arg "queue" missing.')
local now = assert(tonumber(ARGV[4]) , 'Complete(): Arg "now" not a number or missing: ' .. tostring(ARGV[4]))
local data = assert(cjson.decode(ARGV[5]) , 'Complete(): Arg "data" missing or not JSON: ' .. tostring(ARGV[5]))
-- Read in all the optional parameters
local options = {}
for i = 6, #ARGV, 2 do options[ARGV[i]] = ARGV[i + 1] end
-- Sanity check on optional args
local nextq = options['next']
local delay = assert(tonumber(options['delay'] or 0))
local depends = assert(cjson.decode(options['depends'] or '[]'), 'Complete(): Arg "depends" not JSON: ' .. tostring(options['depends']))
-- Delay and depends are not allowed together
if delay > 0 and #depends > 0 then
error('Complete(): "delay" and "depends" are not allowed to be used together')
end
-- Depends doesn't make sense without nextq
if options['delay'] and nextq == nil then
error('Complete(): "delay" cannot be used without a "next".')
end
-- Depends doesn't make sense without nextq
if options['depends'] and nextq == nil then
error('Complete(): "depends" cannot be used without a "next".')
end
-- The bin is midnight of the provided day
-- 24 * 60 * 60 = 86400
local bin = now - (now % 86400)
-- First things first, we should see if the worker still owns this job
local lastworker, history, state, priority, retries = unpack(redis.call('hmget', 'ql:j:' .. jid, 'worker', 'history', 'state', 'priority', 'retries', 'dependents'))
if (lastworker ~= worker) or (state ~= 'running') then
return false
end
-- Now we can assume that the worker does own the job. We need to
-- 1) Remove the job from the 'locks' from the old queue
-- 2) Enqueue it in the next stage if necessary
-- 3) Update the data
-- 4) Mark the job as completed, remove the worker, remove expires, and update history
-- Unpack the history, and update it
history = cjson.decode(history)
history[#history]['done'] = math.floor(now)
if data then
redis.call('hset', 'ql:j:' .. jid, 'data', cjson.encode(data))
end
-- Remove the job from the previous queue
redis.call('zrem', 'ql:q:' .. queue .. '-work', jid)
redis.call('zrem', 'ql:q:' .. queue .. '-locks', jid)
redis.call('zrem', 'ql:q:' .. queue .. '-scheduled', jid)
----------------------------------------------------------
-- This is the massive stats update that we have to do
----------------------------------------------------------
-- This is how long we've been waiting to get popped
local waiting = math.floor(now) - history[#history]['popped']
-- Now we'll go through the apparently long and arduous process of update
local count, mean, vk = unpack(redis.call('hmget', 'ql:s:run:' .. bin .. ':' .. queue, 'total', 'mean', 'vk'))
count = count or 0
if count == 0 then
mean = waiting
vk = 0
count = 1
else
count = count + 1
local oldmean = mean
mean = mean + (waiting - mean) / count
vk = vk + (waiting - mean) * (waiting - oldmean)
end
-- Now, update the histogram
-- - `s1`, `s2`, ..., -- second-resolution histogram counts
-- - `m1`, `m2`, ..., -- minute-resolution
-- - `h1`, `h2`, ..., -- hour-resolution
-- - `d1`, `d2`, ..., -- day-resolution
waiting = math.floor(waiting)
if waiting < 60 then -- seconds
redis.call('hincrby', 'ql:s:run:' .. bin .. ':' .. queue, 's' .. waiting, 1)
elseif waiting < 3600 then -- minutes
redis.call('hincrby', 'ql:s:run:' .. bin .. ':' .. queue, 'm' .. math.floor(waiting / 60), 1)
elseif waiting < 86400 then -- hours
redis.call('hincrby', 'ql:s:run:' .. bin .. ':' .. queue, 'h' .. math.floor(waiting / 3600), 1)
else -- days
redis.call('hincrby', 'ql:s:run:' .. bin .. ':' .. queue, 'd' .. math.floor(waiting / 86400), 1)
end
redis.call('hmset', 'ql:s:run:' .. bin .. ':' .. queue, 'total', count, 'mean', mean, 'vk', vk)
----------------------------------------------------------
-- Remove this job from the jobs that the worker that was running it has
redis.call('zrem', 'ql:w:' .. worker .. ':jobs', jid)
if redis.call('zscore', 'ql:tracked', jid) ~= false then
redis.call('publish', 'completed', jid)
end
if nextq then
-- Enqueue the job
table.insert(history, {
q = nextq,
put = math.floor(now)
})
-- We're going to make sure that this queue is in the
-- set of known queues
if redis.call('zscore', 'ql:queues', nextq) == false then
redis.call('zadd', 'ql:queues', now, nextq)
end
redis.call('hmset', 'ql:j:' .. jid, 'state', 'waiting', 'worker', '', 'failure', '{}',
'queue', nextq, 'expires', 0, 'history', cjson.encode(history), 'remaining', tonumber(retries))
if delay > 0 then
redis.call('zadd', 'ql:q:' .. nextq .. '-scheduled', now + delay, jid)
return 'scheduled'
else
-- These are the jids we legitimately have to wait on
local count = 0
for i, j in ipairs(depends) do
-- Make sure it's something other than 'nil' or complete.
local state = redis.call('hget', 'ql:j:' .. j, 'state')
if (state and state ~= 'complete') then
count = count + 1
redis.call('sadd', 'ql:j:' .. j .. '-dependents' , jid)
redis.call('sadd', 'ql:j:' .. jid .. '-dependencies', j)
end
end
if count > 0 then
redis.call('zadd', 'ql:q:' .. nextq .. '-depends', now, jid)
redis.call('hset', 'ql:j:' .. jid, 'state', 'depends')
return 'depends'
else
redis.call('zadd', 'ql:q:' .. nextq .. '-work', priority - (now / 10000000000), jid)
return 'waiting'
end
end
else
redis.call('hmset', 'ql:j:' .. jid, 'state', 'complete', 'worker', '', 'failure', '{}',
'queue', '', 'expires', 0, 'history', cjson.encode(history), 'remaining', tonumber(retries))
-- Do the completion dance
local count, time = unpack(redis.call('hmget', 'ql:config', 'jobs-history-count', 'jobs-history'))
-- These are the default values
count = tonumber(count or 50000)
time = tonumber(time or 7 * 24 * 60 * 60)
-- Schedule this job for destructination eventually
redis.call('zadd', 'ql:completed', now, jid)
-- Now look at the expired job data. First, based on the current time
local jids = redis.call('zrangebyscore', 'ql:completed', 0, now - time)
-- Any jobs that need to be expired... delete
for index, jid in ipairs(jids) do
local tags = cjson.decode(redis.call('hget', 'ql:j:' .. jid, 'tags') or '{}')
for i, tag in ipairs(tags) do
redis.call('zrem', 'ql:t:' .. tag, jid)
redis.call('zincrby', 'ql:tags', -1, tag)
end
redis.call('del', 'ql:j:' .. jid)
end
-- And now remove those from the queued-for-cleanup queue
redis.call('zremrangebyscore', 'ql:completed', 0, now - time)
-- Now take the all by the most recent 'count' ids
jids = redis.call('zrange', 'ql:completed', 0, (-1-count))
for index, jid in ipairs(jids) do
local tags = cjson.decode(redis.call('hget', 'ql:j:' .. jid, 'tags') or '{}')
for i, tag in ipairs(tags) do
redis.call('zrem', 'ql:t:' .. tag, jid)
redis.call('zincrby', 'ql:tags', -1, tag)
end
redis.call('del', 'ql:j:' .. jid)
end
redis.call('zremrangebyrank', 'ql:completed', 0, (-1-count))
-- Alright, if this has any dependents, then we should go ahead
-- and unstick those guys.
for i, j in ipairs(redis.call('smembers', 'ql:j:' .. jid .. '-dependents')) do
redis.call('srem', 'ql:j:' .. j .. '-dependencies', jid)
if redis.call('scard', 'ql:j:' .. j .. '-dependencies') == 0 then
local q, p = unpack(redis.call('hmget', 'ql:j:' .. j, 'queue', 'priority'))
if q then
redis.call('zrem', 'ql:q:' .. q .. '-depends', j)
redis.call('zadd', 'ql:q:' .. q .. '-work', p, j)
redis.call('hset', 'ql:j:' .. j, 'state', 'waiting')
end
end
end
-- Delete our dependents key
redis.call('del', 'ql:j:' .. jid .. '-dependents')
return 'complete'
end