33
33
34
34
/**
35
35
* Serializes records into the provided OutputStream utilizing the provided IRecordAccessor to access record data.
36
- *
36
+ *
37
37
* The IRecordAccessor must match the type of records that are provided to {@link #writeRecord(Object) writeRecord}.
38
38
* The data written to the OutputStream will be in the HPCC Systems binary record format.
39
39
*/
@@ -52,6 +52,8 @@ public class BinaryRecordWriter implements IRecordWriter
52
52
private static final int QSTR_COMPRESSED_CHUNK_LEN = 3 ;
53
53
private static final int QSTR_EXPANDED_CHUNK_LEN = 4 ;
54
54
55
+ private static final byte NULL_TERMINATOR = '\0' ;
56
+
55
57
private byte [] scratchBuffer = new byte [SCRATCH_BUFFER_SIZE ];
56
58
57
59
private OutputStream outputStream = null ;
@@ -104,7 +106,7 @@ public BinaryRecordWriter(OutputStream output, ByteOrder byteOrder) throws Excep
104
106
105
107
/*
106
108
* (non-Javadoc)
107
- *
109
+ *
108
110
* @see org.hpccsystems.dfs.client.IRecordWriter#initialize(org.hpccsystems.dfs.client.IRecordAccessor)
109
111
*/
110
112
public void initialize (IRecordAccessor recordAccessor )
@@ -307,7 +309,7 @@ private void writeField(FieldDef fd, Object fieldValue) throws Exception
307
309
fillLength = SCRATCH_BUFFER_SIZE ;
308
310
}
309
311
310
- Arrays .fill (scratchBuffer , 0 , fillLength , ( byte ) '\0' );
312
+ Arrays .fill (scratchBuffer , 0 , fillLength , NULL_TERMINATOR );
311
313
writeByteArray (scratchBuffer , 0 , fillLength );
312
314
numFillBytes -= fillLength ;
313
315
}
@@ -335,7 +337,7 @@ private void writeField(FieldDef fd, Object fieldValue) throws Exception
335
337
case FILEPOS :
336
338
{
337
339
Long value = null ;
338
- if (fieldValue ==null )
340
+ if (fieldValue ==null )
339
341
{
340
342
value =Long .valueOf (0 );
341
343
}
@@ -391,7 +393,7 @@ else if (fd.getDataLen() < 8 && fd.getDataLen() > 0)
391
393
{
392
394
this .buffer .put ((byte ) ((value >> (i *8 )) & 0xFF ));
393
395
}
394
-
396
+
395
397
long signBit = value < 0 ? 0x80L : 0 ;
396
398
this .buffer .put ((byte ) (((value >> (lastByteIdx *8 )) & 0xFF ) | signBit ));
397
399
}
@@ -436,8 +438,8 @@ else if (fd.getDataLen() == 8)
436
438
}
437
439
case CHAR :
438
440
{
439
- byte c = '\0' ;
440
- if (fieldValue !=null )
441
+ byte c = NULL_TERMINATOR ;
442
+ if (fieldValue !=null )
441
443
{
442
444
String value = (String ) fieldValue ;
443
445
c = (byte ) value .charAt (0 );
@@ -449,6 +451,15 @@ else if (fd.getDataLen() == 8)
449
451
case STRING :
450
452
{
451
453
String value = fieldValue != null ? (String ) fieldValue : "" ;
454
+ if (fd .getFieldType () == FieldType .VAR_STRING )
455
+ {
456
+ int eosIdx = value .indexOf (NULL_TERMINATOR );
457
+ if (eosIdx > -1 )
458
+ {
459
+ value = value .substring (0 ,eosIdx );
460
+ }
461
+ }
462
+
452
463
byte [] data = new byte [0 ];
453
464
if (fd .getSourceType () == HpccSrcType .UTF16LE )
454
465
{
@@ -475,7 +486,7 @@ else if (fd.getSourceType() == HpccSrcType.QSTRING)
475
486
int compressedDataLen = tempData .length * QSTR_COMPRESSED_CHUNK_LEN + (QSTR_EXPANDED_CHUNK_LEN -1 );
476
487
compressedDataLen /= QSTR_EXPANDED_CHUNK_LEN ;
477
488
data = new byte [compressedDataLen ];
478
-
489
+
479
490
int bitOffset = 0 ;
480
491
for (int i = 0 ; i < tempData .length ; i ++)
481
492
{
@@ -491,7 +502,7 @@ else if (fd.getSourceType() == HpccSrcType.QSTRING)
491
502
case 2 :
492
503
// The top 4 bits of Char 2 are in the bot 4 bits of byte1
493
504
data [byteIdx ] |= (byte ) ((qstrByteValue & 0x3C ) >> 2 );
494
-
505
+
495
506
// The bot 2 bits of Char 2 are in the top 2 bits of byte2
496
507
data [byteIdx +1 ] = (byte ) ((qstrByteValue & 0x3 ) << 6 );
497
508
break ;
@@ -541,10 +552,10 @@ else if (fd.getSourceType() == HpccSrcType.QSTRING)
541
552
{
542
553
if (fd .getFieldType () == FieldType .VAR_STRING && bytesToWrite > 0 )
543
554
{
544
- data [bytesToWrite - 1 ] = '\0' ;
555
+ data [bytesToWrite - 1 ] = NULL_TERMINATOR ;
545
556
if (fd .getSourceType ().isUTF16 () && bytesToWrite > 1 )
546
557
{
547
- data [bytesToWrite - 2 ] = '\0' ;
558
+ data [bytesToWrite - 2 ] = NULL_TERMINATOR ;
548
559
}
549
560
}
550
561
@@ -562,7 +573,7 @@ else if (fd.getSourceType() == HpccSrcType.QSTRING)
562
573
fillLength = SCRATCH_BUFFER_SIZE ;
563
574
}
564
575
565
- Arrays .fill (scratchBuffer , 0 , fillLength , ( byte ) '\0' );
576
+ Arrays .fill (scratchBuffer , 0 , fillLength , NULL_TERMINATOR );
566
577
writeByteArray (scratchBuffer , 0 , fillLength );
567
578
numFillBytes -= fillLength ;
568
579
}
@@ -576,11 +587,25 @@ else if (fd.getSourceType() == HpccSrcType.QSTRING)
576
587
577
588
if (fd .getFieldType () == FieldType .VAR_STRING )
578
589
{
579
- byte nullByte = '\0' ;
580
- this .buffer .put (nullByte );
581
590
if (fd .getSourceType ().isUTF16 ())
582
591
{
583
- this .buffer .put (nullByte );
592
+ boolean needsNullAdded = data .length < 2
593
+ || data [data .length - 1 ] != NULL_TERMINATOR
594
+ || data [data .length - 2 ] != NULL_TERMINATOR ;
595
+ if (needsNullAdded )
596
+ {
597
+ this .buffer .put (NULL_TERMINATOR );
598
+ this .buffer .put (NULL_TERMINATOR );
599
+ }
600
+ }
601
+ else
602
+ {
603
+ boolean needsNullAdded = data .length < 1
604
+ || data [data .length - 1 ] != NULL_TERMINATOR ;
605
+ if (needsNullAdded )
606
+ {
607
+ this .buffer .put (NULL_TERMINATOR );
608
+ }
584
609
}
585
610
}
586
611
}
@@ -885,7 +910,7 @@ private void writeDecimal(FieldDef fd, BigDecimal decimalValue)
885
910
886
911
// 1e18
887
912
BigInteger divisor = BigInteger .valueOf (1000000000000000000L );
888
- for (int currentDigit = 0 ; currentDigit < desiredPrecision ;)
913
+ for (int currentDigit = 0 ; currentDigit < desiredPrecision ;)
889
914
{
890
915
// Consume 18 digits at a time
891
916
BigInteger [] quotientRemainder = unscaledInt .divideAndRemainder (divisor );
0 commit comments