@@ -251,3 +251,127 @@ async def wait(fut):
251
251
252
252
await reader0 .close ()
253
253
await reader1 .close ()
254
+
255
+
256
+ @pytest .mark .asyncio
257
+ class TestTopicNoConsumerReaderAsyncIO :
258
+ async def test_reader_with_sync_lambda (self , driver , topic_with_messages ):
259
+ def sync_lambda (partition_id : int ):
260
+ assert partition_id == 0
261
+ return 1
262
+
263
+ reader = driver .topic_client .no_consumer_reader (topic_with_messages , [0 ], sync_lambda )
264
+ msg = await reader .receive_message ()
265
+
266
+ assert msg .seqno == 2
267
+
268
+ await reader .close ()
269
+
270
+ async def test_reader_with_async_lambda (self , driver , topic_with_messages ):
271
+ async def async_lambda (partition_id : int ) -> int :
272
+ assert partition_id == 0
273
+ return 1
274
+
275
+ reader = driver .topic_client .no_consumer_reader (topic_with_messages , [0 ], async_lambda )
276
+ msg = await reader .receive_message ()
277
+
278
+ assert msg .seqno == 2
279
+
280
+ await reader .close ()
281
+
282
+ async def test_commit_not_allowed (self , driver , topic_with_messages ):
283
+ reader = driver .topic_client .no_consumer_reader (topic_with_messages , [0 ], lambda x : None )
284
+ batch = await reader .receive_batch ()
285
+
286
+ with pytest .raises (ydb .Error ):
287
+ reader .commit (batch )
288
+
289
+ with pytest .raises (ydb .Error ):
290
+ await reader .commit_with_ack (batch )
291
+
292
+ await reader .close ()
293
+
294
+ async def test_offsets_updated_after_reconnect (self , driver , topic_with_messages ):
295
+ current_offset = 0
296
+
297
+ def get_start_offset_lambda (partition_id : int ) -> int :
298
+ nonlocal current_offset
299
+ return current_offset
300
+
301
+ reader = driver .topic_client .no_consumer_reader (topic_with_messages , [0 ], get_start_offset_lambda )
302
+ msg = await reader .receive_message ()
303
+
304
+ assert msg .seqno == current_offset + 1
305
+
306
+ current_offset += 2
307
+ reader ._reconnector ._stream_reader ._set_first_error (ydb .Unavailable ("some retriable error" ))
308
+
309
+ await asyncio .sleep (0 )
310
+
311
+ msg = await reader .receive_message ()
312
+
313
+ assert msg .seqno == current_offset + 1
314
+
315
+ await reader .close ()
316
+
317
+
318
+ class TestTopicNoConsumerReader :
319
+ def test_reader_with_sync_lambda (self , driver_sync , topic_with_messages ):
320
+ def sync_lambda (partition_id : int ):
321
+ assert partition_id == 0
322
+ return 1
323
+
324
+ reader = driver_sync .topic_client .no_consumer_reader (topic_with_messages , [0 ], sync_lambda )
325
+ msg = reader .receive_message ()
326
+
327
+ assert msg .seqno == 2
328
+
329
+ reader .close ()
330
+
331
+ def test_reader_with_async_lambda (self , driver_sync , topic_with_messages ):
332
+ async def async_lambda (partition_id : int ) -> int :
333
+ assert partition_id == 0
334
+ return 1
335
+
336
+ reader = driver_sync .topic_client .no_consumer_reader (topic_with_messages , [0 ], async_lambda )
337
+ msg = reader .receive_message ()
338
+
339
+ assert msg .seqno == 2
340
+
341
+ reader .close ()
342
+
343
+ def test_commit_not_allowed (self , driver_sync , topic_with_messages ):
344
+ reader = driver_sync .topic_client .no_consumer_reader (topic_with_messages , [0 ], lambda x : None )
345
+ batch = reader .receive_batch ()
346
+
347
+ with pytest .raises (ydb .Error ):
348
+ reader .commit (batch )
349
+
350
+ with pytest .raises (ydb .Error ):
351
+ reader .commit_with_ack (batch )
352
+
353
+ with pytest .raises (ydb .Error ):
354
+ reader .async_commit_with_ack (batch )
355
+
356
+ reader .close ()
357
+
358
+ def test_offsets_updated_after_reconnect (self , driver_sync , topic_with_messages ):
359
+ current_offset = 0
360
+
361
+ def get_start_offset_lambda (partition_id : int ) -> int :
362
+ nonlocal current_offset
363
+ return current_offset
364
+
365
+ reader = driver_sync .topic_client .no_consumer_reader (topic_with_messages , [0 ], get_start_offset_lambda )
366
+ msg = reader .receive_message ()
367
+
368
+ assert msg .seqno == current_offset + 1
369
+
370
+ current_offset += 2
371
+ reader ._async_reader ._reconnector ._stream_reader ._set_first_error (ydb .Unavailable ("some retriable error" ))
372
+
373
+ msg = reader .receive_message ()
374
+
375
+ assert msg .seqno == current_offset + 1
376
+
377
+ reader .close ()
0 commit comments