-
Notifications
You must be signed in to change notification settings - Fork 34
Expand file tree
/
Copy pathredis_test.rb
More file actions
317 lines (266 loc) · 7.77 KB
/
redis_test.rb
File metadata and controls
317 lines (266 loc) · 7.77 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
# frozen_string_literal: true
require 'test_helper'
class CI::Queue::RedisTest < Minitest::Test
include SharedQueueAssertions
def setup
@redis_url = ENV.fetch('REDIS_URL', 'redis://localhost:6379/0')
@redis = ::Redis.new(url: @redis_url)
@redis.flushdb
super
@config = @queue.send(:config) # hack
end
def test_from_uri
second_queue = populate(
CI::Queue.from_uri(@redis_url, config)
)
assert_instance_of CI::Queue::Redis::Worker, second_queue
assert_equal @queue.to_a, second_queue.to_a
end
def test_requeue # redefine the shared one
previous_offset = CI::Queue::Redis.requeue_offset
CI::Queue::Redis.requeue_offset = 2
failed_once = false
test_order = poll(@queue, ->(test) {
if test == shuffled_test_list.last && !failed_once
failed_once = true
false
else
true
end
})
expected_order = shuffled_test_list.dup
expected_order.insert(-CI::Queue::Redis.requeue_offset, shuffled_test_list.last)
assert_equal expected_order, test_order
ensure
CI::Queue::Redis.requeue_offset = previous_offset
end
def test_retry_queue_with_all_tests_passing
poll(@queue)
retry_queue = @queue.retry_queue
populate(retry_queue)
retry_test_order = poll(retry_queue)
assert_equal [], retry_test_order
end
def test_retry_queue_with_all_tests_passing_2
poll(@queue)
retry_queue = @queue.retry_queue
populate(retry_queue)
retry_test_order = poll(retry_queue) do |test|
@queue.build.record_error(test.id, 'Failed')
end
assert_equal retry_test_order, retry_test_order
end
def test_shutdown
poll(@queue) do
@queue.shutdown!
end
assert_equal TEST_LIST.size - 1, @queue.size
end
def test_master_election
assert_predicate @queue, :master?
refute_predicate worker(2), :master?
@redis.flushdb
assert_predicate worker(2), :master?
refute_predicate worker(1), :master?
end
def test_exhausted_while_not_populated
assert_predicate @queue, :populated?
second_worker = worker(2, populate: false)
refute_predicate second_worker, :populated?
refute_predicate second_worker, :exhausted?
poll(@queue)
refute_predicate second_worker, :populated?
assert_predicate second_worker, :exhausted?
end
def test_monitor_boot_and_shutdown
@queue.config.max_missed_heartbeat_seconds = 1
@queue.boot_heartbeat_process!
status = @queue.stop_heartbeat!
assert_predicate status, :success?
ensure
@queue.config.max_missed_heartbeat_seconds = nil
end
def test_timed_out_test_are_picked_up_by_other_workers
second_queue = worker(2)
acquired = false
done = false
monitor = Monitor.new
condition = monitor.new_cond
Thread.start do
monitor.synchronize do
condition.wait_until { acquired }
poll(second_queue)
done = true
condition.signal
end
end
poll(@queue) do
acquired = true
monitor.synchronize do
condition.signal
condition.wait_until { done }
end
end
assert_predicate @queue, :exhausted?
assert_equal [], populate(@queue.retry_queue).to_a
assert_equal [], populate(second_queue.retry_queue).to_a.sort
end
def test_release_immediately_timeout_the_lease
second_queue = worker(2)
reserved_test = nil
poll(@queue) do |test|
reserved_test = test
break
end
refute_nil reserved_test
worker(1).release! # Use a new instance to ensure we don't depend on in-memory state
poll(second_queue) do |test|
assert_equal reserved_test, test
break
end
end
def test_test_isnt_requeued_if_it_was_picked_up_by_another_worker
second_queue = worker(2)
acquired = false
done = false
monitor = Monitor.new
condition = monitor.new_cond
Thread.start do
monitor.synchronize do
condition.wait_until { acquired }
poll(second_queue)
done = true
condition.signal
end
end
poll(@queue, false) do
break if acquired
acquired = true
monitor.synchronize do
condition.signal
condition.wait_until { done }
end
end
assert_predicate @queue, :exhausted?
end
def test_acknowledge_returns_false_if_the_test_was_picked_up_by_another_worker
second_queue = worker(2)
acquired = false
done = false
monitor = Monitor.new
condition = monitor.new_cond
Thread.start do
monitor.synchronize do
condition.wait_until { acquired }
second_queue.poll do |test|
assert_equal true, second_queue.acknowledge(test.id)
end
done = true
condition.signal
end
end
@queue.poll do |test|
break if acquired
acquired = true
monitor.synchronize do
condition.signal
condition.wait_until { done }
assert_equal false, @queue.acknowledge(test.id)
end
end
assert_predicate @queue, :exhausted?
end
def test_workers_register
assert_equal 1, @redis.scard(('build:42:workers'))
worker(2)
assert_equal 2, @redis.scard(('build:42:workers'))
end
def test_timeout_warning
begin
threads = 2.times.map do |i|
Thread.new do
queue = worker(i, tests: [TEST_LIST.first], build_id: '24')
queue.poll do |test|
sleep 1 # timeout
queue.acknowledge(test.id)
end
end
end
threads.each { |t| t.join(3) }
threads.each { |t| refute_predicate t, :alive? }
queue = worker(12, build_id: '24')
assert_equal [[:RESERVED_LOST_TEST, {test: 'ATest#test_foo', timeout: 0.2}]], queue.build.pop_warnings
ensure
threads.each(&:kill)
end
end
def test_continuously_timing_out_tests
3.times do
@redis.flushdb
begin
threads = 2.times.map do |i|
Thread.new do
queue = worker(i, tests: [TEST_LIST.first], build_id: '24')
queue.poll do |test|
sleep 1 # timeout
queue.acknowledge(test.id)
end
end
end
threads.each { |t| t.join(3) }
threads.each { |t| refute_predicate t, :alive? }
queue = worker(12, build_id: '24')
assert_predicate queue, :queue_initialized?
assert_predicate queue, :exhausted?
ensure
threads.each(&:kill)
end
end
end
def test_initialise_from_redis_uri
queue = CI::Queue.from_uri('redis://localhost:6379/0', config)
assert_instance_of CI::Queue::Redis::Worker, queue
end
def test_initialise_from_rediss_uri
queue = CI::Queue.from_uri('rediss://localhost:6379/0', config)
assert_instance_of CI::Queue::Redis::Worker, queue
end
def test_custom_config_returns_empty_hash_by_default
queue = worker(1)
custom_config = queue.send(:custom_config)
assert_equal({}, custom_config)
end
def test_custom_config_returns_logger_when_debug_log_is_set
queue = worker(1, debug_log: '/dev/null')
custom_config = queue.send(:custom_config)
assert_kind_of Logger, custom_config[:debug_log]
end
private
def shuffled_test_list
CI::Queue.shuffle(TEST_LIST, Random.new(0)).freeze
end
def build_queue
worker(1, max_requeues: 1, requeue_tolerance: 0.1, populate: false, max_consecutive_failures: 10)
end
def populate(worker, tests: TEST_LIST.dup)
worker.populate(tests, random: Random.new(0))
end
def worker(id, **args)
tests = args.delete(:tests) || TEST_LIST.dup
skip_populate = args.delete(:populate) == false
queue = CI::Queue::Redis.new(
@redis_url,
CI::Queue::Configuration.new(
build_id: '42',
worker_id: id.to_s,
timeout: 0.2,
**args,
)
)
if skip_populate
return queue
else
populate(queue, tests: tests)
end
end
end