@@ -38,12 +38,20 @@ public class HpccRemoteFileReader<T> implements Iterator<T>
38
38
private boolean handlePrefetch = true ;
39
39
private boolean isClosed = false ;
40
40
private boolean canReadNext = true ;
41
+ private boolean createPrefetchThread = true ;
42
+ private int retryCount = 0 ;
43
+ private int connectTimeout = 0 ;
44
+ private int readSizeKB = 0 ;
45
+ private int limit = -1 ;
46
+ private int maxReadRetries = DEFAULT_READ_RETRIES ;
47
+ private int socketOpTimeoutMs = 0 ;
41
48
private long openTimeMs = 0 ;
42
49
private long recordsRead = 0 ;
43
50
44
51
public static final int NO_RECORD_LIMIT = -1 ;
45
52
public static final int DEFAULT_READ_SIZE_OPTION = -1 ;
46
53
public static final int DEFAULT_CONNECT_TIMEOUT_OPTION = -1 ;
54
+ public static final int DEFAULT_READ_RETRIES = 3 ;
47
55
48
56
public static class FileReadResumeInfo
49
57
{
@@ -189,18 +197,23 @@ public HpccRemoteFileReader(DataPartition dp, FieldDef originalRD, IRecordBuilde
189
197
{
190
198
this .handlePrefetch = createPrefetchThread ;
191
199
this .originalRecordDef = originalRD ;
192
- if (this .originalRecordDef == null )
193
- {
194
- throw new Exception ("HpccRemoteFileReader: Original record definition is null." );
195
- }
200
+ this .dataPartition = dp ;
201
+ this .recordBuilder = recBuilder ;
202
+ this .readSizeKB = readSizeKB ;
203
+ this .limit = limit ;
204
+ this .createPrefetchThread = createPrefetchThread ;
205
+ this .socketOpTimeoutMs = socketOpTimeoutMs ;
196
206
197
207
if (connectTimeout < 1 )
198
208
{
199
209
connectTimeout = RowServiceInputStream .DEFAULT_CONNECT_TIMEOUT_MILIS ;
200
210
}
211
+ this .connectTimeout = connectTimeout ;
201
212
202
- this .dataPartition = dp ;
203
- this .recordBuilder = recBuilder ;
213
+ if (this .originalRecordDef == null )
214
+ {
215
+ throw new Exception ("HpccRemoteFileReader: Provided original record definition is null, original record definition is required." );
216
+ }
204
217
205
218
FieldDef projectedRecordDefinition = recBuilder .getRecordDefinition ();
206
219
if (projectedRecordDefinition == null )
@@ -246,6 +259,61 @@ public HpccRemoteFileReader(DataPartition dp, FieldDef originalRD, IRecordBuilde
246
259
openTimeMs = System .currentTimeMillis ();
247
260
}
248
261
262
+ private boolean retryRead ()
263
+ {
264
+ if (retryCount < maxReadRetries )
265
+ {
266
+ log .info ("Retrying read for " + this .dataPartition .toString () + " retry count: " + retryCount );
267
+ retryCount ++;
268
+
269
+ FileReadResumeInfo resumeInfo = getFileReadResumeInfo ();
270
+ RowServiceInputStream .RestartInformation restartInfo = new RowServiceInputStream .RestartInformation ();
271
+ restartInfo .streamPos = resumeInfo .inputStreamPos ;
272
+ restartInfo .tokenBin = resumeInfo .tokenBin ;
273
+
274
+ try
275
+ {
276
+ this .inputStream .close ();
277
+ }
278
+ catch (Exception e ) {}
279
+
280
+ try
281
+ {
282
+ this .inputStream = new RowServiceInputStream (this .dataPartition , this .originalRecordDef ,
283
+ this .recordBuilder .getRecordDefinition (), this .connectTimeout , this .limit , this .createPrefetchThread ,
284
+ this .readSizeKB , restartInfo , false , this .socketOpTimeoutMs );
285
+ long bytesToSkip = resumeInfo .recordReaderStreamPos - resumeInfo .inputStreamPos ;
286
+ if (bytesToSkip < 0 )
287
+ {
288
+ throw new Exception ("Unable to restart read stream, unexpected stream position in record reader." );
289
+ }
290
+ this .inputStream .skip (bytesToSkip );
291
+
292
+ this .binaryRecordReader = new BinaryRecordReader (this .inputStream , resumeInfo .recordReaderStreamPos );
293
+ this .binaryRecordReader .initialize (this .recordBuilder );
294
+ }
295
+ catch (Exception e )
296
+ {
297
+ log .error ("Failed to retry read for " + this .dataPartition .toString () + " " + e .getMessage (), e );
298
+ return false ;
299
+ }
300
+
301
+ return true ;
302
+ }
303
+
304
+ return false ;
305
+ }
306
+
307
+ /**
308
+ * Sets the maximum number of times to retry a read operation before failing.
309
+ *
310
+ * @param maxReadRetries maximum number of read retries
311
+ */
312
+ public void setMaxReadRetries (int maxReadRetries )
313
+ {
314
+ this .maxReadRetries = maxReadRetries ;
315
+ }
316
+
249
317
/**
250
318
* Returns the stream position within the file.
251
319
*
@@ -363,11 +431,16 @@ public boolean hasNext()
363
431
}
364
432
catch (HpccFileException e )
365
433
{
366
- canReadNext = false ;
367
- log .error ("Read failure for " + this .dataPartition .toString ());
368
- java .util .NoSuchElementException exception = new java .util .NoSuchElementException ("Fatal read error: " + e .getMessage ());
369
- exception .initCause (e );
370
- throw exception ;
434
+ if (!retryRead ())
435
+ {
436
+ canReadNext = false ;
437
+ log .error ("Read failure for " + this .dataPartition .toString (), e );
438
+ java .util .NoSuchElementException exception = new java .util .NoSuchElementException ("Fatal read error: " + e .getMessage ());
439
+ exception .initCause (e );
440
+ throw exception ;
441
+ }
442
+
443
+ return hasNext ();
371
444
}
372
445
373
446
return canReadNext ;
@@ -393,10 +466,15 @@ public T next()
393
466
}
394
467
catch (HpccFileException e )
395
468
{
396
- log .error ("Read failure for " + this .dataPartition .toString () + " " + e .getMessage ());
397
- java .util .NoSuchElementException exception = new java .util .NoSuchElementException ("Fatal read error: " + e .getMessage ());
398
- exception .initCause (e );
399
- throw exception ;
469
+ if (!retryRead ())
470
+ {
471
+ log .error ("Read failure for " + this .dataPartition .toString () + " " + e .getMessage (), e );
472
+ java .util .NoSuchElementException exception = new java .util .NoSuchElementException ("Fatal read error: " + e .getMessage ());
473
+ exception .initCause (e );
474
+ throw exception ;
475
+ }
476
+
477
+ return next ();
400
478
}
401
479
402
480
recordsRead ++;
0 commit comments