21
21
import io .opentelemetry .api .common .AttributeKey ;
22
22
import io .opentelemetry .api .common .Attributes ;
23
23
import io .opentelemetry .api .trace .Span ;
24
+ import io .opentelemetry .api .trace .StatusCode ;
24
25
import io .opentelemetry .semconv .ServerAttributes ;
25
26
26
27
import org .apache .logging .log4j .Logger ;
@@ -73,6 +74,7 @@ public static class FileReadContext
73
74
public int recordReadLimit = -1 ;
74
75
public boolean createPrefetchThread = true ;
75
76
public int readSizeKB = -1 ;
77
+ public int readRequestSpanBatchSize = -1 ; // The number of read requests before creating a new span
76
78
public Span parentSpan = null ;
77
79
};
78
80
@@ -266,21 +268,7 @@ public HpccRemoteFileReader(FileReadContext ctx, DataPartition dp, IRecordBuilde
266
268
this .dataPartition = dp ;
267
269
this .recordBuilder = recBuilder ;
268
270
269
- String readSpanName = "HPCCRemoteFileReader.RowService/Read_" + dataPartition .getFileName () + "_" + dataPartition .getThisPart ();
270
- this .readSpan = Utils .createChildSpan (context .parentSpan , readSpanName );
271
-
272
- String primaryIP = dp .getCopyIP (0 );
273
- String secondaryIP = "" ;
274
- if (dp .getCopyCount () > 1 )
275
- {
276
- secondaryIP = dp .getCopyIP (1 );
277
- }
278
-
279
- Attributes attributes = Attributes .of ( AttributeKey .stringKey ("server.0.address" ), primaryIP ,
280
- AttributeKey .stringKey ("server.1.address" ), secondaryIP ,
281
- ServerAttributes .SERVER_PORT , Long .valueOf (dp .getPort ()),
282
- AttributeKey .longKey ("read.size" ), Long .valueOf (context .readSizeKB *1000 ));
283
- this .readSpan .setAllAttributes (attributes );
271
+ this .readSpan = createReadSpan (ctx , dp );
284
272
285
273
if (context .originalRD == null )
286
274
{
@@ -304,6 +292,7 @@ public HpccRemoteFileReader(FileReadContext ctx, DataPartition dp, IRecordBuilde
304
292
this .inputStream = new RowServiceInputStream (this .dataPartition , context .originalRD , projectedRecordDefinition , context .connectTimeout ,
305
293
context .recordReadLimit , context .createPrefetchThread , context .readSizeKB , null ,
306
294
false , context .socketOpTimeoutMS , this .readSpan );
295
+ this .inputStream .setReadRequestSpanBatchSize (context .readRequestSpanBatchSize );
307
296
this .binaryRecordReader = new BinaryRecordReader (this .inputStream );
308
297
this .binaryRecordReader .initialize (this .recordBuilder );
309
298
@@ -321,13 +310,15 @@ public HpccRemoteFileReader(FileReadContext ctx, DataPartition dp, IRecordBuilde
321
310
this .inputStream = new RowServiceInputStream (this .dataPartition , context .originalRD , projectedRecordDefinition , context .connectTimeout ,
322
311
context .recordReadLimit , context .createPrefetchThread , context .readSizeKB , restartInfo ,
323
312
false , context .socketOpTimeoutMS , this .readSpan );
313
+ this .inputStream .setReadRequestSpanBatchSize (context .readRequestSpanBatchSize );
324
314
325
315
long bytesToSkip = resumeInfo .recordReaderStreamPos - resumeInfo .inputStreamPos ;
326
316
if (bytesToSkip < 0 )
327
317
{
328
318
Exception e = new Exception ("Unable to restart read stream, unexpected stream position in record reader." );
329
319
this .readSpan .recordException (e );
330
320
this .readSpan .end ();
321
+ throw e ;
331
322
}
332
323
this .inputStream .skip (bytesToSkip );
333
324
@@ -344,6 +335,35 @@ public HpccRemoteFileReader(FileReadContext ctx, DataPartition dp, IRecordBuilde
344
335
openTimeMs = System .currentTimeMillis ();
345
336
}
346
337
338
+ private static Span createReadSpan (FileReadContext context , DataPartition dp )
339
+ {
340
+ String readSpanName = "HPCCRemoteFileReader/Read_" + dp .getFileName () + "_" + dp .getThisPart ();
341
+ Span readSpan = Utils .createChildSpan (context .parentSpan , readSpanName );
342
+ readSpan .setStatus (StatusCode .OK );
343
+
344
+ String primaryIP = dp .getCopyIP (0 );
345
+ String secondaryIP = "" ;
346
+ if (dp .getCopyCount () > 1 )
347
+ {
348
+ secondaryIP = dp .getCopyIP (1 );
349
+ }
350
+
351
+ long readSize = context .readSizeKB ;
352
+ if (readSize < 0 )
353
+ {
354
+ readSize = RowServiceInputStream .DEFAULT_MAX_READ_SIZE_KB ;
355
+ }
356
+ readSize *= 1000 ;
357
+
358
+ Attributes attributes = Attributes .of ( AttributeKey .stringKey ("server.0.address" ), primaryIP ,
359
+ AttributeKey .stringKey ("server.1.address" ), secondaryIP ,
360
+ ServerAttributes .SERVER_PORT , Long .valueOf (dp .getPort ()),
361
+ AttributeKey .longKey ("read.size" ), Long .valueOf (readSize ));
362
+ readSpan .setAllAttributes (attributes );
363
+
364
+ return readSpan ;
365
+ }
366
+
347
367
private boolean retryRead ()
348
368
{
349
369
if (retryCount < maxReadRetries )
@@ -364,20 +384,12 @@ private boolean retryRead()
364
384
365
385
try
366
386
{
367
- String readSpanName = "HPCCRemoteFileReader.RowService/Read_" + dataPartition .getFileName () + "_" + dataPartition .getThisPart ();
368
- if (context .parentSpan != null )
369
- {
370
- this .readSpan = Utils .createChildSpan (context .parentSpan , readSpanName );
371
- }
372
- else
373
- {
374
- this .readSpan = Utils .createSpan (readSpanName );
375
- }
387
+ this .readSpan = createReadSpan (context , dataPartition );
376
388
377
389
this .inputStream = new RowServiceInputStream (this .dataPartition , context .originalRD ,this .recordBuilder .getRecordDefinition (),
378
390
context .connectTimeout , context .recordReadLimit , context .createPrefetchThread ,
379
391
context .readSizeKB , restartInfo , false , context .socketOpTimeoutMS , this .readSpan );
380
-
392
+ this . inputStream . setReadRequestSpanBatchSize ( context . readRequestSpanBatchSize );
381
393
long bytesToSkip = resumeInfo .recordReaderStreamPos - resumeInfo .inputStreamPos ;
382
394
if (bytesToSkip < 0 )
383
395
{
@@ -391,6 +403,7 @@ private boolean retryRead()
391
403
catch (Exception e )
392
404
{
393
405
this .readSpan .recordException (e );
406
+ this .readSpan .setStatus (StatusCode .ERROR );
394
407
this .readSpan .end ();
395
408
log .error ("Failed to retry read for " + this .dataPartition .toString () + " " + e .getMessage (), e );
396
409
return false ;
@@ -529,6 +542,10 @@ public boolean hasNext()
529
542
}
530
543
catch (HpccFileException e )
531
544
{
545
+ this .readSpan .recordException (e );
546
+ this .readSpan .setStatus (StatusCode .ERROR );
547
+ this .readSpan .end ();
548
+
532
549
if (!retryRead ())
533
550
{
534
551
canReadNext = false ;
@@ -564,6 +581,10 @@ public T next()
564
581
}
565
582
catch (HpccFileException e )
566
583
{
584
+ this .readSpan .recordException (e );
585
+ this .readSpan .setStatus (StatusCode .ERROR );
586
+ this .readSpan .end ();
587
+
567
588
if (!retryRead ())
568
589
{
569
590
log .error ("Read failure for " + this .dataPartition .toString () + " " + e .getMessage (), e );
0 commit comments