Skip to content

Commit

Permalink
seek in file
Browse files Browse the repository at this point in the history
  • Loading branch information
shannonwells committed Jan 23, 2024
1 parent 29e0c26 commit ab273a7
Show file tree
Hide file tree
Showing 2 changed files with 17 additions and 12 deletions.
6 changes: 5 additions & 1 deletion lib/reader.ts
Original file line number Diff line number Diff line change
Expand Up @@ -198,6 +198,8 @@ export class ParquetReader {
*/
constructor(metadata: FileMetaDataExt, envelopeReader: ParquetEnvelopeReader, opts?: BufferReaderOptions) {
opts = opts || {};

console.log({metadata})
if (!PARQUET_VERSIONS.includes(metadata.version)) {
throw 'invalid parquet version';
}
Expand Down Expand Up @@ -483,6 +485,7 @@ export class ParquetEnvelopeReader {
}
const Range = `bytes=${offset}-${offset+length-1}`;
const input = { ...{ Range }, ...params };
console.log({input})
const response = await client.send(new GetObjectCommand(input));

const body = response.Body;
Expand Down Expand Up @@ -764,7 +767,8 @@ export class ParquetEnvelopeReader {
let offset = (this.fileSize as number) - trailerLen;
let trailerBuf = await this.read(offset, trailerLen);

if (trailerBuf.slice(4).toString() != PARQUET_MAGIC) {
console.log(trailerBuf.toString())
if (trailerBuf.subarray(4).toString() != PARQUET_MAGIC) {
throw 'not a valid parquet file';
}

Expand Down
23 changes: 12 additions & 11 deletions test/s3Client.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -11,18 +11,20 @@ describe('ParquetReader with S3', () => {
const s3Mock = mockClient(S3Client);

it('mocks get object', async () => {
let srcFile = 'test/test-files/nation.dict.parquet';

const headStream = new Readable();
headStream.push('PAR1');
headStream.push(null);
const headSdkStream = sdkStreamMixin(headStream)

const footStream = new Readable();
footStream.push(Uint8Array.from([234,0,0,0])); // metadata length is 234
footStream.push('PAR1');
footStream.push(null);
const footSdkStream = sdkStreamMixin(footStream)
const footStream = createReadStream(srcFile, {start: 2842, end: 2849})
const footSdkStream= sdkStreamMixin(footStream);

const metadataStream = createReadStream(srcFile, {start: 2608, end: 2841});
const metaDataSdkStream = sdkStreamMixin(metadataStream)

const stream = createReadStream('test/test-files/nation.dict.parquet');
const stream = createReadStream(srcFile);

// wrap the Stream with SDK mixin
const sdkStream = sdkStreamMixin(stream);
Expand All @@ -40,13 +42,12 @@ describe('ParquetReader with S3', () => {
s3Mock.on(GetObjectCommand, {Range: 'bytes=2841-2848', Key: 'foo', Bucket: 'bar'})
.resolves({Body: footSdkStream});

s3Mock.on(GetObjectCommand, {Range: 'bytes=2607-2840', Key: 'foo', Bucket: 'bar'})
.resolves({Body: metaDataSdkStream});

const s3 = new S3Client({});
try {
await ParquetReader.openS3(s3, {Key: 'foo', Bucket: 'bar'});
} catch (e: any) {
assert(e.toString().includes('invalid parquet version'))
}
let res = await ParquetReader.openS3(s3, {Key: 'foo', Bucket: 'bar'});
assert(res.envelopeReader);
});
})
})

0 comments on commit ab273a7

Please sign in to comment.