Skip to content

Commit a707c74

Browse files
committed
respect the state update counter if present
1 parent 4dbb53c commit a707c74

File tree

7 files changed

+206
-5
lines changed

7 files changed

+206
-5
lines changed

lib/travis/hub/handler.rb

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,11 +1,12 @@
11
require 'travis/hub/helper/context'
2+
require 'travis/hub/helper/hash'
23
require 'travis/hub/helper/string'
34
require 'travis/hub/service'
45

56
module Travis
67
module Hub
78
class Handler
8-
include Helper::Context, Helper::String
9+
include Helper::Context, Helper::Hash, Helper::String
910

1011
attr_reader :context, :type, :event, :payload, :object
1112

@@ -31,6 +32,7 @@ def run
3132

3233
def handle
3334
const = Service.const_get("Update#{camelize(type)}")
35+
p [event, payload]
3436
const.new(context, event, payload).run
3537
end
3638

@@ -45,7 +47,7 @@ def normalize_event(event)
4547
end
4648

4749
def normalize_payload(payload)
48-
payload = payload.symbolize_keys
50+
payload = deep_symbolize_keys(payload)
4951
payload = normalize_state(payload)
5052
normalize_timestamps(payload)
5153
end

lib/travis/hub/helper/hash.rb

Lines changed: 21 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,21 @@
1+
module Travis
2+
module Hub
3+
module Helper
4+
module Hash
5+
def deep_symbolize_keys(hash)
6+
hash.map do |key, obj|
7+
obj = case obj
8+
when Array
9+
obj.map { |obj| deep_symbolize_keys(obj) }
10+
when ::Hash
11+
deep_symbolize_keys(obj)
12+
else
13+
obj
14+
end
15+
[key.to_sym, obj]
16+
end.to_h
17+
end
18+
end
19+
end
20+
end
21+
end
Lines changed: 106 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,106 @@
1+
module Travis
2+
module Hub
3+
module Service
4+
class StateUpdate < Struct.new(:event, :data, :block)
5+
class Counter < Struct.new(:job_id, :redis)
6+
TTL = 3600 * 24
7+
8+
def count
9+
@count ||= redis.get(key).to_i
10+
end
11+
12+
def store(count)
13+
redis.set(key, count)
14+
redis.expire(key, TTL)
15+
end
16+
17+
private
18+
19+
def key
20+
"job:state_update_count:#{job_id}"
21+
end
22+
end
23+
24+
include Helper::Context
25+
26+
OUT_OF_BAND = [:cancel, :restart]
27+
28+
MSGS = {
29+
missing: 'Received state update (%p) with no count for job id=%p, last known count: %p.',
30+
ordered: 'Received state update %p (%p) for job id=%p, last known count: %p',
31+
unordered: 'Received state update %p (%p) for job id=%p, last known count: %p. Skipping the message.',
32+
}
33+
34+
def apply
35+
if !enabled? || out_of_band?
36+
call
37+
elsif missing?
38+
missing
39+
elsif ordered?
40+
ordered
41+
else
42+
unordered
43+
end
44+
end
45+
46+
private
47+
48+
def call
49+
block.call
50+
end
51+
52+
def enabled?
53+
!!ENV['UPDATE_COUNT']
54+
end
55+
56+
def out_of_band?
57+
OUT_OF_BAND.include?(event)
58+
end
59+
60+
def missing?
61+
count.nil?
62+
end
63+
64+
def missing
65+
warn :missing, event, job_id, counter.count
66+
call
67+
store
68+
end
69+
70+
def ordered
71+
info :ordered, count, event, job_id, counter.count
72+
call
73+
store
74+
end
75+
76+
def unordered
77+
warn :unordered, count, event, job_id, counter.count
78+
end
79+
80+
def ordered?
81+
count >= counter.count
82+
end
83+
84+
def store
85+
counter.store(count)
86+
end
87+
88+
def counter
89+
@counter ||= Counter.new(job_id, redis)
90+
end
91+
92+
def job_id
93+
data[:id]
94+
end
95+
96+
def count
97+
meta[:state_update_count]
98+
end
99+
100+
def meta
101+
data[:meta] || {}
102+
end
103+
end
104+
end
105+
end
106+
end

lib/travis/hub/service/update_job.rb

Lines changed: 10 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,7 @@
33
require 'travis/hub/helper/locking'
44
require 'travis/hub/model/job'
55
require 'travis/hub/service/error_job'
6+
require 'travis/hub/service/state_update'
67
require 'travis/hub/service/notify_workers'
78
require 'travis/hub/helper/limit'
89

@@ -16,14 +17,16 @@ class UpdateJob < Struct.new(:event, :data)
1617
EVENTS = [:receive, :reset, :start, :finish, :cancel, :restart]
1718

1819
MSGS = {
19-
skipped: 'Skipped event job:%s for <Job id=%s> trying to update state from %s to %s data=%s',
20+
skipped: 'Skipped event job:%s for <Job id=%s> trying to update state from %p to %p data=%s',
2021
}
2122

2223
def run
2324
exclusive do
2425
validate
25-
update_job
26-
notify
26+
with_state_update do
27+
update_job
28+
notify
29+
end
2730
end
2831
end
2932
instrument :run
@@ -72,6 +75,10 @@ def skipped
7275
warn :skipped, event, job.id, job.state, data[:state], data
7376
end
7477

78+
def with_state_update(&block)
79+
StateUpdate.new(context, event, data, block).apply
80+
end
81+
7582
def resets
7683
@resets ||= Limit.new(redis, :resets, job.id, config.limit.resets)
7784
end

spec/spec_helper.rb

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -75,6 +75,7 @@
7575
Travis::Event.instance_variable_set(:@subscriptions, nil)
7676
# Travis::Addons.setup({ host: 'host.com', encryption: { key: 'secret' * 10 } }, logger)
7777
Time.stubs(:now).returns(NOW)
78+
context.redis.flushall
7879
end
7980
end
8081

spec/support/context.rb

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,7 @@ module Context
66

77
included do
88
let(:stdout) { StringIO.new }
9+
let(:log) { stdout.string }
910
let(:logger) { Travis::Logger.new(stdout) }
1011
let(:context) { Travis::Hub::Context.new(logger: logger) }
1112
before { Travis::Hub.context = context }

spec/travis/hub/service/update_job_spec.rb

Lines changed: 63 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -210,4 +210,67 @@ def recieve(msg)
210210
expect(job.reload.state).to eql :passed
211211
end
212212
end
213+
214+
describe 'state update count' do
215+
let(:job) { FactoryGirl.create(:job, state: :received) }
216+
let(:event) { :start }
217+
let(:data) { { id: job.id, state: :started, meta: meta } }
218+
219+
before { ENV['UPDATE_COUNT'] = 'true' }
220+
after { ENV['UPDATE_COUNT'] = nil }
221+
222+
describe 'with no count stored' do
223+
describe 'given no meta' do
224+
let(:meta) { nil }
225+
before { subject.run }
226+
227+
it { expect(job.reload.state).to eq :started }
228+
it { expect(log).to include "W Received state update (:start) with no count for job id=#{job.id}, last known count: 0" }
229+
end
230+
231+
describe 'given no count' do
232+
let(:meta) { {} }
233+
before { subject.run }
234+
235+
it { expect(job.reload.state).to eq :started }
236+
it { expect(log).to include "W Received state update (:start) with no count for job id=#{job.id}, last known count: 0" }
237+
end
238+
239+
describe 'given a count' do
240+
let(:meta) { { state_update_count: 2 } }
241+
before { subject.run }
242+
243+
it { expect(job.reload.state).to eq :started }
244+
it { expect(log).to include "I Received state update 2 (:start) for job id=#{job.id}, last known count: 0" }
245+
end
246+
end
247+
248+
describe 'with a count stored' do
249+
before { context.redis.set("job:state_update_count:#{job.id}", 3) }
250+
251+
describe 'given no meta it skips the message' do
252+
let(:meta) { nil }
253+
before { subject.run }
254+
255+
it { expect(job.reload.state).to eq :started }
256+
it { expect(log).to include "W Received state update (:start) with no count for job id=#{job.id}, last known count: 3" }
257+
end
258+
259+
describe 'given no count it skips the message' do
260+
let(:meta) { {} }
261+
before { subject.run }
262+
263+
it { expect(job.reload.state).to eq :started }
264+
it { expect(log).to include "W Received state update (:start) with no count for job id=#{job.id}, last known count: 3" }
265+
end
266+
267+
describe 'given a count it skips the message' do
268+
let(:meta) { { state_update_count: 2 } }
269+
before { subject.run }
270+
271+
it { expect(job.reload.state).to eq :received }
272+
it { expect(log).to include "W Received state update 2 (:start) for job id=#{job.id}, last known count: 3. Skipping the message." }
273+
end
274+
end
275+
end
213276
end

0 commit comments

Comments
 (0)