1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
module Gitlab
# The SidekiqStatus module and its child classes can be used for checking if a
# Sidekiq job has been processed or not.
#
# To check if a job has been completed, simply pass the job ID to the
# `completed?` method:
#
# job_id = SomeWorker.perform_async(...)
#
# if Gitlab::SidekiqStatus.completed?(job_id)
# ...
# end
#
# For each job ID registered a separate key is stored in Redis, making lookups
# much faster than using Sidekiq's built-in job finding/status API. These keys
# expire after a certain period of time to prevent storing too many keys in
# Redis.
module SidekiqStatus
STATUS_KEY = 'gitlab-sidekiq-status:%s'.freeze
# The default time (in seconds) after which a status key is expired
# automatically. The default of 30 minutes should be more than sufficient
# for most jobs.
DEFAULT_EXPIRATION = 30.minutes.to_i
# Starts tracking of the given job.
#
# jid - The Sidekiq job ID
# expire - The expiration time of the Redis key.
def self.set(jid, expire = DEFAULT_EXPIRATION)
Sidekiq.redis do |redis|
redis.set(key_for(jid), 1, ex: expire)
end
end
# Stops the tracking of the given job.
#
# jid - The Sidekiq job ID to remove.
def self.unset(jid)
Sidekiq.redis do |redis|
redis.del(key_for(jid))
end
end
# Returns true if all the given job have been completed.
#
# job_ids - The Sidekiq job IDs to check.
#
# Returns true or false.
def self.all_completed?(job_ids)
self.num_running(job_ids).zero?
end
# Returns the number of jobs that are running.
#
# job_ids - The Sidekiq job IDs to check.
def self.num_running(job_ids)
responses = self.job_status(job_ids)
responses.select(&:present?).count
end
# Returns the number of jobs that have completed.
#
# job_ids - The Sidekiq job IDs to check.
def self.num_completed(job_ids)
job_ids.size - self.num_running(job_ids)
end
# Returns the job status for each of the given job IDs.
#
# job_ids - The Sidekiq job IDs to check.
#
# Returns an array of true or false indicating job completion.
# true = job is still running
# false = job completed
def self.job_status(job_ids)
keys = job_ids.map { |jid| key_for(jid) }
Sidekiq.redis do |redis|
redis.pipelined do
keys.each { |key| redis.exists(key) }
end
end
end
# Returns the JIDs that are completed
#
# job_ids - The Sidekiq job IDs to check.
#
# Returns an array of completed JIDs
def self.completed_jids(job_ids)
Sidekiq.redis do |redis|
job_ids.reject { |jid| redis.exists(key_for(jid)) }
end
end
def self.key_for(jid)
STATUS_KEY % jid
end
end
end