-
Notifications
You must be signed in to change notification settings - Fork 1.8k
Expand file tree
/
Copy pathchange-streams.yml
More file actions
348 lines (336 loc) · 12.3 KB
/
change-streams.yml
File metadata and controls
348 lines (336 loc) · 12.3 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
description: "timeoutMS behaves correctly for change streams"
schemaVersion: "1.9"
runOnRequirements:
- minServerVersion: "4.4"
topologies: ["replicaset", "sharded"]
createEntities:
- client:
id: &failPointClient failPointClient
useMultipleMongoses: false
- client:
id: &client client
useMultipleMongoses: false
observeEvents:
- commandStartedEvent
# Drivers are not required to execute killCursors during resume attempts, so it should be ignored for command
# monitoring assertions.
ignoreCommandMonitoringEvents: ["killCursors"]
- database:
id: &database database
client: *client
databaseName: &databaseName test
- collection:
id: &collection collection
database: *database
collectionName: &collectionName coll
initialData:
- collectionName: *collectionName
databaseName: *databaseName
documents: []
tests:
- description: "error if maxAwaitTimeMS is greater than timeoutMS"
operations:
- name: createChangeStream
object: *collection
arguments:
pipeline: []
timeoutMS: 5
maxAwaitTimeMS: 10
expectError:
isClientError: true
- description: "error if maxAwaitTimeMS is equal to timeoutMS"
operations:
- name: createChangeStream
object: *collection
arguments:
pipeline: []
timeoutMS: 5
maxAwaitTimeMS: 5
expectError:
isClientError: true
- description: "timeoutMS applied to initial aggregate"
operations:
- name: failPoint
object: testRunner
arguments:
client: *failPointClient
failPoint:
configureFailPoint: failCommand
mode: { times: 1 }
data:
failCommands: ["aggregate"]
blockConnection: true
blockTimeMS: 250
- name: createChangeStream
object: *collection
arguments:
pipeline: []
timeoutMS: 200
expectError:
isTimeoutError: true
expectEvents:
- client: *client
events:
- commandStartedEvent:
commandName: aggregate
databaseName: *databaseName
command:
aggregate: *collectionName
maxTimeMS: { $$type: ["int", "long"] }
# If maxAwaitTimeMS is not set, timeoutMS should be refreshed for the getMore and the getMore should not have a
# maxTimeMS field. This test requires a high timeout because the server applies a default 1000ms maxAwaitTime. To
# ensure that the driver is refreshing the timeout between commands, the test blocks aggregate and getMore commands
# for 30ms each and creates/iterates a change stream with timeoutMS=1050. The initial aggregate will block for 30ms
# and the getMore will block for 1030ms.
- description: "timeoutMS is refreshed for getMore if maxAwaitTimeMS is not set"
operations:
- name: failPoint
object: testRunner
arguments:
client: *failPointClient
failPoint:
configureFailPoint: failCommand
mode: { times: 2 }
data:
failCommands: ["aggregate", "getMore"]
blockConnection: true
blockTimeMS: 30
- name: createChangeStream
object: *collection
arguments:
pipeline: []
timeoutMS: 1050
saveResultAsEntity: &changeStream changeStream
- name: iterateOnce
object: *changeStream
expectEvents:
- client: *client
events:
- commandStartedEvent:
commandName: aggregate
databaseName: *databaseName
command:
aggregate: *collectionName
maxTimeMS: { $$type: ["int", "long"] }
- commandStartedEvent:
commandName: getMore
databaseName: *databaseName
command:
getMore: { $$type: ["int", "long"] }
collection: *collectionName
maxTimeMS: { $$exists: false }
# If maxAwaitTimeMS is set, timeoutMS should still be refreshed for the getMore and the getMore command should have a
# maxTimeMS field.
- description: "timeoutMS is refreshed for getMore if maxAwaitTimeMS is set"
operations:
- name: failPoint
object: testRunner
arguments:
client: *failPointClient
failPoint:
configureFailPoint: failCommand
mode: { times: 2 }
data:
failCommands: ["aggregate", "getMore"]
blockConnection: true
blockTimeMS: 150
- name: createChangeStream
object: *collection
arguments:
pipeline: []
timeoutMS: 200
batchSize: 2
maxAwaitTimeMS: 1
saveResultAsEntity: &changeStream changeStream
- name: iterateOnce
object: *changeStream
expectEvents:
- client: *client
events:
- commandStartedEvent:
commandName: aggregate
databaseName: *databaseName
command:
aggregate: *collectionName
maxTimeMS: { $$type: ["int", "long"] }
- commandStartedEvent:
commandName: getMore
databaseName: *databaseName
command:
getMore: { $$type: ["int", "long"] }
collection: *collectionName
maxTimeMS: 1
# If a resume is required for a next call on a change stream, the timeout MUST apply to the entirety of the initial
# getMore and all commands sent as part of the resume attempt.
- description: "timeoutMS applies to full resume attempt in a next call"
operations:
- name: createChangeStream
object: *collection
arguments:
pipeline: []
timeoutMS: 200
saveResultAsEntity: &changeStream changeStream
- name: failPoint
object: testRunner
arguments:
client: *failPointClient
failPoint:
configureFailPoint: failCommand
mode: { times: 2 }
data:
failCommands: ["getMore", "aggregate"]
blockConnection: true
blockTimeMS: 120
errorCode: 7 # HostNotFound - resumable but does not require an SDAM state change.
# failCommand doesn't correctly add the ResumableChangeStreamError by default. It needs to be specified
# manually here so the error is considered resumable. The failGetMoreAfterCursorCheckout fail point
# would add the label without this, but it does not support blockConnection functionality.
errorLabels: ["ResumableChangeStreamError"]
- name: iterateUntilDocumentOrError
object: *changeStream
expectError:
isTimeoutError: true
expectEvents:
- client: *client
events:
- commandStartedEvent:
commandName: aggregate
databaseName: *databaseName
command:
aggregate: *collectionName
maxTimeMS: { $$type: ["int", "long"] }
- commandStartedEvent:
commandName: getMore
databaseName: *databaseName
command:
getMore: { $$type: ["int", "long"] }
collection: *collectionName
maxTimeMS: { $$exists: false }
- commandStartedEvent:
commandName: aggregate
databaseName: *databaseName
command:
aggregate: *collectionName
maxTimeMS: { $$type: ["int", "long"] }
# [resumption] If a next call fails with a timeout error, drivers MUST NOT
# invalidate the change stream. The subsequent next call MUST perform a resume
# attempt to establish a new change stream on the server.
#
# [refresh] If a resume is required for a next call on a change stream, the
# timeout MUST apply to the entirety of the initial getMore and all commands
# sent as part of the resume attempt.
- description: "change stream iteration succeeds after a timeout error"
operations:
- name: createChangeStream
object: *collection
arguments:
pipeline: []
# Specify a short maxAwaitTimeMS because otherwise the getMore on the new cursor will wait for 1000ms and
# time out.
maxAwaitTimeMS: 1
timeoutMS: 200
saveResultAsEntity: &changeStream changeStream
# Block getMore for 250ms to force the next() call to time out.
- name: failPoint
object: testRunner
arguments:
client: *failPointClient
failPoint:
configureFailPoint: failCommand
mode: { times: 1 }
data:
failCommands: ["getMore"]
blockConnection: true
blockTimeMS: 250
# Iterate until timeout.
- name: iterateUntilDocumentOrError
object: *changeStream
expectError:
isTimeoutError: true
# Block the resume aggregate for 150ms. If the timeout were exhausted from
# the previous call, this would fail immediately. With a fresh 200ms
# timeout, the 150ms delay should be acceptable.
- name: failPoint
object: testRunner
arguments:
client: *failPointClient
failPoint:
configureFailPoint: failCommand
mode: { times: 1 }
data:
failCommands: ["aggregate"]
blockConnection: true
blockTimeMS: 150
# A final iteration should succeed because the timeout is refreshed and
# the change stream was not invalidated.
- name: iterateOnce
object: *changeStream
expectEvents:
- client: *client
events:
- commandStartedEvent:
commandName: aggregate
databaseName: *databaseName
command:
aggregate: *collectionName
maxTimeMS: { $$type: ["int", "long"] }
# The first iterateUntilDocumentOrError sends a getMore that times
# out.
- commandStartedEvent:
commandName: getMore
databaseName: *databaseName
command:
getMore: { $$type: ["int", "long"] }
collection: *collectionName
# The iterateOnce operation re-creates the cursor via an aggregate
# [resumption]. The aggregate is blocked for 150ms but succeeds
# because the timeout was refreshed to a fresh 200ms, proving the
# timeout is not shared with the previous timed-out call [refresh].
- commandStartedEvent:
commandName: aggregate
databaseName: *databaseName
command:
aggregate: *collectionName
maxTimeMS: { $$type: ["int", "long"] }
# The timeoutMS value should be refreshed for getMore's. This is a failure test. The createChangeStream operation
# sets timeoutMS=200 and the getMore blocks for 250ms, causing iteration to fail with a timeout error.
- description: "timeoutMS is refreshed for getMore - failure"
operations:
- name: failPoint
object: testRunner
arguments:
client: *failPointClient
failPoint:
configureFailPoint: failCommand
mode: { times: 1 }
data:
failCommands: ["getMore"]
blockConnection: true
blockTimeMS: 250
- name: createChangeStream
object: *collection
arguments:
pipeline: []
timeoutMS: 200
saveResultAsEntity: &changeStream changeStream
# The first iteration should do a getMore
- name: iterateUntilDocumentOrError
object: *changeStream
expectError:
isTimeoutError: true
expectEvents:
- client: *client
events:
- commandStartedEvent:
commandName: aggregate
databaseName: *databaseName
command:
aggregate: *collectionName
maxTimeMS: { $$type: ["int", "long"] }
# The iterateUntilDocumentOrError operation should send a getMore.
- commandStartedEvent:
commandName: getMore
databaseName: *databaseName
command:
getMore: { $$type: ["int", "long"] }
collection: *collectionName