From 5015a2cfd22d6aef0b7ea99f294e3708e855082a Mon Sep 17 00:00:00 2001 From: Bob den Os <108393871+BobdenOs@users.noreply.github.com> Date: Mon, 22 Jan 2024 14:13:55 +0100 Subject: [PATCH] chore: expose native hana lob stream on structured selects (#417) Co-authored-by: Vitaly Kozyura <58591662+vkozyura@users.noreply.github.com> --- db-service/lib/SQLService.js | 1 + db-service/lib/cqn2sql.js | 4 +- hana/lib/HANAService.js | 24 ++-- hana/lib/drivers/hana-client.js | 150 ++++++++++++++++++---- hana/lib/drivers/hdb.js | 52 +++++++- sqlite/test/general/stream.compat.test.js | 6 +- sqlite/test/general/stream.test.js | 76 ++++++----- 7 files changed, 237 insertions(+), 76 deletions(-) diff --git a/db-service/lib/SQLService.js b/db-service/lib/SQLService.js index 1889cace0..462dbc6bb 100644 --- a/db-service/lib/SQLService.js +++ b/db-service/lib/SQLService.js @@ -71,6 +71,7 @@ class SQLService extends DatabaseService { _stream(val) { if (val === null) return null + if (val instanceof Readable) return val // Buffer.from only applies encoding when the input is a string let raw = typeof val === 'string' ? Buffer.from(val.toString(), 'base64') : val return new Readable({ diff --git a/db-service/lib/cqn2sql.js b/db-service/lib/cqn2sql.js index 3df42a6ef..48da9b538 100644 --- a/db-service/lib/cqn2sql.js +++ b/db-service/lib/cqn2sql.js @@ -430,7 +430,7 @@ class CQN2SQLRenderer { INSERT.entries[0].type = 'json' this.entries = [[...this.values, INSERT.entries[0]]] } else { - const stream = Readable.from(this.INSERT_entries_stream(INSERT.entries)) + const stream = Readable.from(this.INSERT_entries_stream(INSERT.entries), { objectMode: false }) stream.type = 'json' this.entries = [[...this.values, stream]] } @@ -567,7 +567,7 @@ class CQN2SQLRenderer { INSERT.rows[0].type = 'json' this.entries = [[...this.values, INSERT.rows[0]]] } else { - const stream = Readable.from(this.INSERT_rows_stream(INSERT.rows)) + const stream = Readable.from(this.INSERT_rows_stream(INSERT.rows), { objectMode: false }) stream.type = 'json' this.entries = [[...this.values, stream]] } diff --git a/hana/lib/HANAService.js b/hana/lib/HANAService.js index acc686f03..a6a7d2888 100644 --- a/hana/lib/HANAService.js +++ b/hana/lib/HANAService.js @@ -105,7 +105,9 @@ class HANAService extends SQLService { const { cqn, temporary, blobs, withclause, values } = this.cqn2sql(query, data) // REVISIT: add prepare options when param:true is used const sqlScript = this.wrapTemporary(temporary, withclause, blobs) - let rows = values?.length ? await (await this.prepare(sqlScript)).all(values) : await this.exec(sqlScript) + let rows = (values?.length || blobs.length > 0) + ? await (await this.prepare(sqlScript, blobs.length)).all(values || []) + : await this.exec(sqlScript) if (rows.length) { rows = this.parseRows(rows) } @@ -124,7 +126,7 @@ class HANAService extends SQLService { const results = await (entries ? HANAVERSION <= 2 ? entries.reduce((l, c) => l.then(() => ps.run(c)), Promise.resolve(0)) - : ps.run(entries) + : ps.run(entries[0]) : ps.run()) return new this.class.InsertResults(cqn, results) } @@ -202,8 +204,8 @@ class HANAService extends SQLService { } // prepare and exec are both implemented inside the drivers - prepare(sql) { - return this.ensureDBC().prepare(sql) + prepare(sql, hasBlobs) { + return this.ensureDBC().prepare(sql, hasBlobs) } exec(sql) { @@ -527,13 +529,15 @@ class HANAService extends SQLService { // HANA Express does not process large JSON documents // The limit is somewhere between 64KB and 128KB if (HANAVERSION <= 2) { - this.entries = INSERT.entries.map(e => (e instanceof Readable ? [e] : [Readable.from(this.INSERT_entries_stream([e], 'hex'))])) + this.entries = INSERT.entries.map(e => (e instanceof Readable + ? [e] + : [Readable.from(this.INSERT_entries_stream([e], 'hex'), { objectMode: false })])) } else { - this.entries = [ + this.entries = [[ INSERT.entries[0] instanceof Readable ? INSERT.entries[0] - : Readable.from(this.INSERT_entries_stream(INSERT.entries, 'hex')) - ] + : Readable.from(this.INSERT_entries_stream(INSERT.entries, 'hex'), { objectMode: false }) + ]] } // WITH SRC is used to force HANA to interpret the ? as a NCLOB allowing for streaming of the data @@ -954,7 +958,7 @@ class HANAService extends SQLService { this.dbc = con const stmt = await this.dbc.prepare(createContainerDatabase) - const res = await stmt.all([creds.user, creds.password, creds.containerGroup, !clean]) + const res = await stmt.run([creds.user, creds.password, creds.containerGroup, !clean]) DEBUG?.(res.map(r => r.MESSAGE).join('\n')) } finally { if (this.dbc) { @@ -996,7 +1000,7 @@ class HANAService extends SQLService { this.dbc = con const stmt = await this.dbc.prepare(createContainerTenant.replaceAll('{{{GROUP}}}', creds.containerGroup)) - const res = await stmt.all([creds.user, creds.password, creds.schema, !clean]) + const res = await stmt.run([creds.user, creds.password, creds.schema, !clean]) res && DEBUG?.(res.map(r => r.MESSAGE).join('\n')) } finally { await this.dbc.disconnect() diff --git a/hana/lib/drivers/hana-client.js b/hana/lib/drivers/hana-client.js index b54381866..66ee09e6f 100644 --- a/hana/lib/drivers/hana-client.js +++ b/hana/lib/drivers/hana-client.js @@ -45,13 +45,70 @@ class HANAClientDriver extends driver { } set(variables) { - for(const key in variables) { + for (const key in variables) { this._native.setClientInfo(key, variables[key]) } } - async prepare(sql) { + async prepare(sql, hasBlobs) { const ret = await super.prepare(sql) + // hana-client ResultSet API does not allow for deferred streaming of blobs + // With the current design of the hana-client ResultSet it is only + // possible to read all LOBs into memory to do deferred streaming + // Main reason is that the ResultSet only allowes using getData() on the current row + // with the current next() implemenation it is only possible to go foward in the ResultSet + // It would be required to allow using getDate() on previous rows + if (hasBlobs) { + ret.all = async (values) => { + const stmt = await ret._prep + // Create result set + const reset = async function () { + if (this) await prom(this, 'close')() + const rs = await prom(stmt, 'executeQuery')(values) + rs.reset = reset + return rs + } + const rs = await reset() + const rsStreamsProm = {} + const rsStreams = new Promise((resolve, reject) => { + rsStreamsProm.resolve = resolve + rsStreamsProm.reject = reject + }) + + rs._rowPosition = -1 + const _next = prom(rs, 'next') + const next = () => { + rs._rowPosition++ + return _next() + } + const getValue = prom(rs, 'getValue') + const result = [] + // Fetch the next row + while (await next()) { + const cols = stmt.getColumnInfo().map(b => b.columnName) + // column 0-3 are metadata columns + const values = await Promise.all([getValue(0), getValue(1), getValue(2), getValue(3)]) + + const row = {} + for (let i = 0; i < cols.length; i++) { + const col = cols[i] + // column >3 are all blob columns + row[col] = i > 3 ? + rs.isNull(i) + ? null + : Readable.from(streamBlob(rsStreams, rs._rowPosition, i, 'binary')) + : values[i] + } + + result.push(row) + } + + rs.reset().then(rsStreamsProm.resolve, rsStreamsProm.reject) + + return result + } + } + ret.stream = async (values, one) => { const stmt = await ret._prep values = Array.isArray(values) ? values : [] @@ -66,7 +123,7 @@ class HANAClientDriver extends driver { // Which creates an inherent limitation to the maximum size of a result set (~0xfffffffb) if (streamUnsafe && sql.startsWith('DO')) { const rows = await prom(stmt, 'exec')(values, { rowsAsArray: true }) - return Readable.from(rowsIterator(rows, stmt.getColumnInfo())) + return Readable.from(rowsIterator(rows, stmt.getColumnInfo()), { objectMode: false }) } const rs = await prom(stmt, 'executeQuery')(values) const cols = rs.getColumnInfo() @@ -75,9 +132,9 @@ class HANAClientDriver extends driver { if (rs.getRowCount() === 0) return null await prom(rs, 'next')() if (rs.isNull(0)) return null - return Readable.from(streamBlob(rs, 0, 'binary')) + return Readable.from(streamBlob(rs, undefined, 0, 'binary'), { objectMode: false }) } - return Readable.from(rsIterator(rs, one)) + return Readable.from(rsIterator(rs, one), { objectMode: false }) } return ret } @@ -187,7 +244,7 @@ async function* rsIterator(rs, one) { yield buffer buffer = '' - for await (const chunk of streamBlob(rs, columnIndex, 'base64', binaryBuffer)) { + for await (const chunk of streamBlob(rs, undefined, columnIndex, 'base64', binaryBuffer)) { yield chunk } buffer += '"' @@ -210,32 +267,75 @@ async function* rsIterator(rs, one) { yield buffer } -async function* streamBlob(rs, columnIndex, encoding, binaryBuffer = Buffer.allocUnsafe(1 << 16)) { - const getData = prom(rs, 'getData') +async function* streamBlob(rs, rowIndex = -1, columnIndex, encoding, binaryBuffer = Buffer.allocUnsafe(1 << 16)) { + const promChain = { + resolve: () => { }, + reject: () => { } + } + try { + // Check if the resultset is a promise + if (rs.then) { + // Copy the current Promise + const prom = new Promise((resolve, reject) => rs.then(resolve, reject)) + // Enqueue all following then calls till after the current call + const next = new Promise((resolve, reject) => { + promChain.resolve = resolve + promChain.reject = reject + }) + rs.then = (resolve, reject) => next.then(resolve, reject) + rs = await prom + } - let decoder = new StringDecoder(encoding) + // Check if the provided resultset is on the correct row + if (rowIndex >= 0) { + rs._rowPosition ??= -1 + if (rowIndex - rs._rowPosition < 0) { + rs = await rs.reset() + rs._rowPosition ??= -1 + } - let blobPosition = 0 + const _next = prom(rs, 'next') + const next = () => { + rs._rowPosition++ + return _next() + } - while (true) { - // REVISIT: Ensure that the data read is divisible by 3 as that allows for base64 encoding - let start = 0 - const read = await getData(columnIndex, blobPosition, binaryBuffer, 0, binaryBuffer.byteLength) - if (blobPosition === 0 && binaryBuffer.slice(0, 7).toString() === 'base64,') { - decoder = { - write: encoding === 'base64' ? c => c : chunk => Buffer.from(chunk.toString(), 'base64'), - end: () => Buffer.allocUnsafe(0), + // Move result set to the correct row + while (rowIndex - rs._rowPosition > 0) { + await next() } - start = 7 } - blobPosition += read - if (read < binaryBuffer.byteLength) { - yield decoder.write(binaryBuffer.slice(start, read)) - break + + const getData = prom(rs, 'getData') + + let decoder = new StringDecoder(encoding) + + let blobPosition = 0 + + while (true) { + // REVISIT: Ensure that the data read is divisible by 3 as that allows for base64 encoding + let start = 0 + const read = await getData(columnIndex, blobPosition, binaryBuffer, 0, binaryBuffer.byteLength) + if (blobPosition === 0 && binaryBuffer.slice(0, 7).toString() === 'base64,') { + decoder = { + write: encoding === 'base64' ? c => c : chunk => Buffer.from(chunk.toString(), 'base64'), + end: () => Buffer.allocUnsafe(0), + } + start = 7 + } + blobPosition += read + if (read < binaryBuffer.byteLength) { + yield decoder.write(binaryBuffer.slice(start, read)) + break + } + yield decoder.write(binaryBuffer.slice(start).toString('base64')) } - yield decoder.write(binaryBuffer.slice(start).toString('base64')) + yield decoder.end() + } catch (e) { + promChain.reject(e) + } finally { + promChain.resolve(rs) } - yield decoder.end() } async function* rowsIterator(rows, cols) { diff --git a/hana/lib/drivers/hdb.js b/hana/lib/drivers/hdb.js index 9aead8e76..0b4e697f4 100644 --- a/hana/lib/drivers/hdb.js +++ b/hana/lib/drivers/hdb.js @@ -1,5 +1,6 @@ -const { Readable, PassThrough, Stream } = require('stream') +const { Readable, Stream } = require('stream') const { StringDecoder } = require('string_decoder') +const { text } = require('stream/consumers') const hdb = require('hdb') const iconv = require('iconv-lite') @@ -40,7 +41,7 @@ class HDBDriver extends driver { set(variables) { const clientInfo = this._native._connection.getClientInfo() - for(const key in variables) { + for (const key in variables) { clientInfo.setProperty(key, variables[key]) } } @@ -69,8 +70,42 @@ class HDBDriver extends driver { }) } - async prepare(sql) { + async prepare(sql, hasBlobs) { const ret = await super.prepare(sql) + + if (hasBlobs) { + ret.all = async (values) => { + const stmt = await ret._prep + // Create result set + const rs = await prom(stmt, 'execute')(values) + const cols = rs.metadata.map(b => b.columnName) + const stream = rs.createReadStream() + + const result = [] + for await (const row of stream) { + const obj = {} + for (let i = 0; i < cols.length; i++) { + const col = cols[i] + // hdb returns large strings as streams sometimes + if (col === '_json_' && typeof row[col] === 'object') { + obj[col] = await text(row[col].createReadStream()) + continue + } + obj[col] = i > 3 + ? row[col] === null + ? null + : ( + row[col].createReadStream?.() + || Readable.from(echoStream(row[col]), { objectMode: false }) + ) + : row[col] + } + result.push(obj) + } + return result + } + } + ret.stream = async (values, one) => { const stmt = await ret._prep const rs = await prom(stmt, 'execute')(values || []) @@ -98,11 +133,10 @@ class HDBDriver extends driver { if (!Array.isArray(values)) return { values: [], streams: [] } const streams = [] values = values.map((v, i) => { - if (v instanceof Stream && !(v instanceof PassThrough)) { + if (v instanceof Stream) { streams[i] = v - const passThrough = new PassThrough() - v.pipe(passThrough) - return passThrough + const iterator = v[Symbol.asyncIterator]() + return Readable.from(iterator, { objectMode: false }) } return v }) @@ -113,6 +147,10 @@ class HDBDriver extends driver { } } +function* echoStream(ret) { + yield ret +} + async function* rsIterator(rs, one) { // Raw binary data stream unparsed const raw = rs.createBinaryStream()[Symbol.asyncIterator]() diff --git a/sqlite/test/general/stream.compat.test.js b/sqlite/test/general/stream.compat.test.js index 47eaad1b3..35efd6d71 100644 --- a/sqlite/test/general/stream.compat.test.js +++ b/sqlite/test/general/stream.compat.test.js @@ -236,9 +236,9 @@ describe('streaming', () => { test('WRITE dataset from json file stream', async () => { const { Images } = cds.entities('test') - // to be discussed + // REVISIT: required proper BASE64_DECODE support from HANA // const stream = fs.createReadStream(path.join(__dirname, 'samples/data.json')) - // const changes = await STREAM.into(Images).data(stream) + // const changes = await INSERT(stream).into(Images) const json = JSON.parse(fs.readFileSync(path.join(__dirname, 'samples/data.json'))) const changes = await INSERT.into(Images).entries(json) @@ -288,7 +288,7 @@ describe('streaming', () => { } yield ']' } - const stream = Readable.from(generator()) + const stream = Readable.from(generator(), { objectMode: false }) const changes = await INSERT.into(Images).entries(stream) try { diff --git a/sqlite/test/general/stream.test.js b/sqlite/test/general/stream.test.js index edbcbfd14..ed2eceb17 100644 --- a/sqlite/test/general/stream.test.js +++ b/sqlite/test/general/stream.test.js @@ -81,7 +81,7 @@ describe('streaming', () => { test('READ stream property using SELECT CQN', async () => { const { Images } = cds.entities('test') - const cqn = SELECT('data').from(Images,1) + const cqn = SELECT('data').from(Images, 1) const stream = await cds.stream(cqn) await checkSize(stream) }) @@ -101,7 +101,7 @@ describe('streaming', () => { afterAll(async () => { const { Images } = cds.entities('test') await DELETE.from(Images) - }) + }) describe('READ', () => { test('READ stream property with .one .from, .column and .where', async () => { @@ -118,7 +118,11 @@ describe('streaming', () => { test('READ stream property with odata $mediaContentType', async () => { const { Images } = cds.entities('test') - const { data: stream, '$mediaContentType': val } = await SELECT.one.from(Images).columns('data', {val: 'image/jpeg', as: '$mediaContentType'}).where({ ID: 1 }) + const { + data: stream, '$mediaContentType': val + } = await SELECT.one.from(Images) + .columns('data', { val: 'image/jpeg', as: '$mediaContentType' }) + .where({ ID: 1 }) await checkSize(stream) expect(val).toEqual('image/jpeg') }) @@ -128,7 +132,7 @@ describe('streaming', () => { const [{ data: stream }] = await SELECT.from(Images).columns('data').where({ ID: 2 }) expect(stream).toBeNull() }) - + test('READ ID and stream property with .from, .column and .where', async () => { const { Images } = cds.entities('test') const [{ ID, data: stream }] = await SELECT.from(Images).columns(['ID', 'data']).where({ ID: 1 }) @@ -138,7 +142,11 @@ describe('streaming', () => { test('READ multiple stream properties with .from, .column and .where', async () => { const { Images } = cds.entities('test') - const [{ ID, data: stream1, data2: stream2 }] = await SELECT.from(Images).columns(['ID', 'data', 'data2']).where({ ID: 1 }) + const [{ + ID, data: stream1, data2: stream2 + }] = await SELECT.from(Images) + .columns(['ID', 'data', 'data2']) + .where({ ID: 1 }) await checkSize(stream1) await checkSize(stream2) expect(ID).toEqual(1) @@ -146,16 +154,20 @@ describe('streaming', () => { test('READ all entries with stream property with .from, .column ', async () => { const { Images } = cds.entities('test') - const [{ ID: ID1, data: stream1, data2: stream2 }, { ID: ID2, data: stream3, data2: stream4 }, { ID: ID3, data: stream5, data2: stream6 }] = await SELECT.from(Images).columns(['ID', 'data', 'data2']) + const [ + { ID: ID1, data: stream1, data2: stream2 }, + { ID: ID2, data: stream3, data2: stream4 }, + { ID: ID3, data: stream5, data2: stream6 } + ] = await SELECT.from(Images).columns(['ID', 'data', 'data2']) await checkSize(stream1) - await checkSize(stream2) + await checkSize(stream2) expect(stream3).toBeNull() await checkSize(stream4) - await checkSize(stream5) + await checkSize(stream5) expect(stream6).toBeNull() expect(ID1).toEqual(1) expect(ID2).toEqual(2) - expect(ID3).toEqual(3) + expect(ID3).toEqual(3) }) test('READ one ignore stream properties if columns = all', async () => { @@ -214,7 +226,7 @@ describe('streaming', () => { } catch (err) { expect(err.code).toEqual('ERR_INVALID_ARG_TYPE') } - }) + }) test('WRITE stream property', async () => { const { Images } = cds.entities('test') @@ -222,11 +234,11 @@ describe('streaming', () => { const changes = await UPDATE(Images).with({ data2: stream }).where({ ID: 3 }) expect(changes).toEqual(1) - + const [{ data2: stream_ }] = await SELECT.from(Images).columns('data2').where({ ID: 3 }) await checkSize(stream_) - }) - + }) + test('WRITE multiple stream properties', async () => { const { Images } = cds.entities('test') const stream1 = fs.createReadStream(path.join(__dirname, 'samples/test.jpg')) @@ -234,24 +246,33 @@ describe('streaming', () => { const changes = await UPDATE(Images).with({ data: stream1, data2: stream2 }).where({ ID: 4 }) expect(changes).toEqual(1) - - const [{ data: stream1_, data2: stream2_ }] = await SELECT.from(Images).columns(['data','data2']).where({ ID: 4 }) + + const [{ + data: stream1_, data2: stream2_ + }] = await SELECT.from(Images) + .columns(['data', 'data2']) + .where({ ID: 4 }) await checkSize(stream1_) await checkSize(stream2_) - }) + }) test('WRITE multiple blob properties', async () => { const { Images } = cds.entities('test') const blob1 = fs.readFileSync(path.join(__dirname, 'samples/test.jpg')) const blob2 = fs.readFileSync(path.join(__dirname, 'samples/test.jpg')) - + const changes = await UPDATE(Images).with({ data: blob1, data2: blob2 }).where({ ID: 4 }) expect(changes).toEqual(1) - - const [{ data: stream1_, data2: stream2_ }] = await SELECT.from(Images).columns(['data','data2']).where({ ID: 4 }) + + const [{ + data: stream1_, + data2: stream2_ + }] = await SELECT.from(Images) + .columns(['data', 'data2']) + .where({ ID: 4 }) await checkSize(stream1_) await checkSize(stream2_) - }) + }) test('WRITE stream property on view', async () => { const { ImagesView } = cds.entities('test') @@ -263,13 +284,13 @@ describe('streaming', () => { const [{ renamedData: stream_ }] = await SELECT.from(ImagesView).columns('renamedData').where({ ID: 1 }) await checkSize(stream_) }) - + test('WRITE dataset from json file stream', async () => { const { Images } = cds.entities('test') - // REVISIT: to be discussed + // REVISIT: required proper BASE64_DECODE support from HANA // const stream = fs.createReadStream(path.join(__dirname, 'samples/data.json')) - // const changes = await INSERT.into(Images).data(stream) + // const changes = await INSERT(stream).into(Images) const json = JSON.parse(fs.readFileSync(path.join(__dirname, 'samples/data.json'))) const changes = await INSERT.into(Images).entries(json) @@ -298,9 +319,6 @@ describe('streaming', () => { await Promise.all([wrap(out1000), wrap(out1001)]) }) - // TODO: breaks on Postgres, because INSERT tries to decode it as base64 string (InputConverters) - // Sqlite: (1) Stream is read to Buffer - not to JSON. (2) Bad JSON - multiple nesting of same quotations - // same x test in stream.compat.test.js xtest('WRITE dataset from json generator stream', async () => { const { Images } = cds.entities('test') @@ -317,9 +335,9 @@ describe('streaming', () => { } yield ']' } - const stream = Readable.from(generator()) - - const changes = await INSERT.into(Images).entries(stream) + const stream = Readable.from(generator(), { objectMode: false }) + + const changes = await INSERT(stream).into(Images) try { expect(changes | 0).toEqual(count) } catch (e) {