@@ -251,3 +251,143 @@ async def wait(fut):
251
251
252
252
await reader0 .close ()
253
253
await reader1 .close ()
254
+
255
+
256
+ @pytest .mark .asyncio
257
+ class TestTopicNoConsumerReaderAsyncIO :
258
+ async def test_reader_with_default_lambda (self , driver , topic_with_messages ):
259
+ reader = driver .topic_client .no_consumer_reader (topic_with_messages , [0 ])
260
+ msg = await reader .receive_message ()
261
+
262
+ assert msg .seqno == 1
263
+
264
+ await reader .close ()
265
+
266
+ async def test_reader_with_sync_lambda (self , driver , topic_with_messages ):
267
+ def sync_lambda (partition_id : int ):
268
+ assert partition_id == 0
269
+ return 1
270
+
271
+ reader = driver .topic_client .no_consumer_reader (topic_with_messages , [0 ], sync_lambda )
272
+ msg = await reader .receive_message ()
273
+
274
+ assert msg .seqno == 2
275
+
276
+ await reader .close ()
277
+
278
+ async def test_reader_with_async_lambda (self , driver , topic_with_messages ):
279
+ async def async_lambda (partition_id : int ) -> int :
280
+ assert partition_id == 0
281
+ return 1
282
+
283
+ reader = driver .topic_client .no_consumer_reader (topic_with_messages , [0 ], async_lambda )
284
+ msg = await reader .receive_message ()
285
+
286
+ assert msg .seqno == 2
287
+
288
+ await reader .close ()
289
+
290
+ async def test_commit_not_allowed (self , driver , topic_with_messages ):
291
+ reader = driver .topic_client .no_consumer_reader (topic_with_messages , [0 ], lambda x : None )
292
+ batch = await reader .receive_batch ()
293
+
294
+ with pytest .raises (ydb .Error ):
295
+ reader .commit (batch )
296
+
297
+ with pytest .raises (ydb .Error ):
298
+ await reader .commit_with_ack (batch )
299
+
300
+ await reader .close ()
301
+
302
+ async def test_offsets_updated_after_reconnect (self , driver , topic_with_messages ):
303
+ current_offset = 0
304
+
305
+ def get_start_offset_lambda (partition_id : int ) -> int :
306
+ nonlocal current_offset
307
+ return current_offset
308
+
309
+ reader = driver .topic_client .no_consumer_reader (topic_with_messages , [0 ], get_start_offset_lambda )
310
+ msg = await reader .receive_message ()
311
+
312
+ assert msg .seqno == current_offset + 1
313
+
314
+ current_offset += 2
315
+ reader ._reconnector ._stream_reader ._set_first_error (ydb .Unavailable ("some retriable error" ))
316
+
317
+ await asyncio .sleep (0 )
318
+
319
+ msg = await reader .receive_message ()
320
+
321
+ assert msg .seqno == current_offset + 1
322
+
323
+ await reader .close ()
324
+
325
+
326
+ class TestTopicNoConsumerReader :
327
+ def test_reader_with_default_lambda (self , driver_sync , topic_with_messages ):
328
+ reader = driver_sync .topic_client .no_consumer_reader (topic_with_messages , [0 ])
329
+ msg = reader .receive_message ()
330
+
331
+ assert msg .seqno == 1
332
+
333
+ reader .close ()
334
+
335
+ def test_reader_with_sync_lambda (self , driver_sync , topic_with_messages ):
336
+ def sync_lambda (partition_id : int ):
337
+ assert partition_id == 0
338
+ return 1
339
+
340
+ reader = driver_sync .topic_client .no_consumer_reader (topic_with_messages , [0 ], sync_lambda )
341
+ msg = reader .receive_message ()
342
+
343
+ assert msg .seqno == 2
344
+
345
+ reader .close ()
346
+
347
+ def test_reader_with_async_lambda (self , driver_sync , topic_with_messages ):
348
+ async def async_lambda (partition_id : int ) -> int :
349
+ assert partition_id == 0
350
+ return 1
351
+
352
+ reader = driver_sync .topic_client .no_consumer_reader (topic_with_messages , [0 ], async_lambda )
353
+ msg = reader .receive_message ()
354
+
355
+ assert msg .seqno == 2
356
+
357
+ reader .close ()
358
+
359
+ def test_commit_not_allowed (self , driver_sync , topic_with_messages ):
360
+ reader = driver_sync .topic_client .no_consumer_reader (topic_with_messages , [0 ], lambda x : None )
361
+ batch = reader .receive_batch ()
362
+
363
+ with pytest .raises (ydb .Error ):
364
+ reader .commit (batch )
365
+
366
+ with pytest .raises (ydb .Error ):
367
+ reader .commit_with_ack (batch )
368
+
369
+ with pytest .raises (ydb .Error ):
370
+ reader .async_commit_with_ack (batch )
371
+
372
+ reader .close ()
373
+
374
+ def test_offsets_updated_after_reconnect (self , driver_sync , topic_with_messages ):
375
+ current_offset = 0
376
+
377
+ def get_start_offset_lambda (partition_id : int ) -> int :
378
+ nonlocal current_offset
379
+ return current_offset
380
+
381
+ reader = driver_sync .topic_client .no_consumer_reader (topic_with_messages , [0 ], get_start_offset_lambda )
382
+ msg = reader .receive_message ()
383
+
384
+ assert msg .seqno == current_offset + 1
385
+
386
+ current_offset += 2
387
+ reader ._async_reader ._reconnector ._stream_reader ._set_first_error (ydb .Unavailable ("some retriable error" ))
388
+
389
+ msg = reader .receive_message ()
390
+
391
+ assert msg .seqno == current_offset + 1
392
+
393
+ reader .close ()
0 commit comments