-
Notifications
You must be signed in to change notification settings - Fork 165
/
Copy pathclaimed_execution.rb
71 lines (59 loc) · 1.49 KB
/
claimed_execution.rb
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
class SolidQueue::ClaimedExecution < SolidQueue::Execution
belongs_to :process
class Result < Struct.new(:success, :error)
def success?
success
end
end
class << self
def claiming(job_ids, process_id, &block)
job_data = Array(job_ids).collect { |job_id| { job_id: job_id, process_id: process_id } }
insert_all(job_data)
where(job_id: job_ids).load.tap do |claimed|
block.call(claimed)
SolidQueue.logger.info("[SolidQueue] Claimed #{claimed.size} jobs")
end
end
def release_all
includes(:job).each(&:release)
end
end
def perform
result = execute
if result.success?
finished
else
failed_with(result.error)
end
ensure
job.unblock_next_blocked_job
end
def release
transaction do
job.prepare_for_execution
destroy!
end
end
private
def execute
SolidQueue.logger.info("[SolidQueue] Performing job #{job.id} - #{job.active_job_id}")
ActiveJob::Base.execute(job.arguments)
Result.new(true, nil)
rescue Exception => e
Result.new(false, e)
end
def finished
transaction do
job.finished!
destroy!
end
SolidQueue.logger.info("[SolidQueue] Performed job #{job.id} - #{job.active_job_id}")
end
def failed_with(error)
transaction do
job.failed_with(error)
destroy!
end
SolidQueue.logger.info("[SolidQueue] Failed job #{job.id} - #{job.active_job_id}")
end
end