Skip to content

Commit

Permalink
chore: expose native hana lob stream on structured selects (#417)
Browse files Browse the repository at this point in the history
Co-authored-by: Vitaly Kozyura <[email protected]>
  • Loading branch information
BobdenOs and vkozyura authored Jan 22, 2024
1 parent 44c0a59 commit 5015a2c
Show file tree
Hide file tree
Showing 7 changed files with 237 additions and 76 deletions.
1 change: 1 addition & 0 deletions db-service/lib/SQLService.js
Original file line number Diff line number Diff line change
Expand Up @@ -71,6 +71,7 @@ class SQLService extends DatabaseService {

_stream(val) {
if (val === null) return null
if (val instanceof Readable) return val
// Buffer.from only applies encoding when the input is a string
let raw = typeof val === 'string' ? Buffer.from(val.toString(), 'base64') : val
return new Readable({
Expand Down
4 changes: 2 additions & 2 deletions db-service/lib/cqn2sql.js
Original file line number Diff line number Diff line change
Expand Up @@ -430,7 +430,7 @@ class CQN2SQLRenderer {
INSERT.entries[0].type = 'json'
this.entries = [[...this.values, INSERT.entries[0]]]
} else {
const stream = Readable.from(this.INSERT_entries_stream(INSERT.entries))
const stream = Readable.from(this.INSERT_entries_stream(INSERT.entries), { objectMode: false })
stream.type = 'json'
this.entries = [[...this.values, stream]]
}
Expand Down Expand Up @@ -567,7 +567,7 @@ class CQN2SQLRenderer {
INSERT.rows[0].type = 'json'
this.entries = [[...this.values, INSERT.rows[0]]]
} else {
const stream = Readable.from(this.INSERT_rows_stream(INSERT.rows))
const stream = Readable.from(this.INSERT_rows_stream(INSERT.rows), { objectMode: false })
stream.type = 'json'
this.entries = [[...this.values, stream]]
}
Expand Down
24 changes: 14 additions & 10 deletions hana/lib/HANAService.js
Original file line number Diff line number Diff line change
Expand Up @@ -105,7 +105,9 @@ class HANAService extends SQLService {
const { cqn, temporary, blobs, withclause, values } = this.cqn2sql(query, data)
// REVISIT: add prepare options when param:true is used
const sqlScript = this.wrapTemporary(temporary, withclause, blobs)
let rows = values?.length ? await (await this.prepare(sqlScript)).all(values) : await this.exec(sqlScript)
let rows = (values?.length || blobs.length > 0)
? await (await this.prepare(sqlScript, blobs.length)).all(values || [])
: await this.exec(sqlScript)
if (rows.length) {
rows = this.parseRows(rows)
}
Expand All @@ -124,7 +126,7 @@ class HANAService extends SQLService {
const results = await (entries
? HANAVERSION <= 2
? entries.reduce((l, c) => l.then(() => ps.run(c)), Promise.resolve(0))
: ps.run(entries)
: ps.run(entries[0])
: ps.run())
return new this.class.InsertResults(cqn, results)
}
Expand Down Expand Up @@ -202,8 +204,8 @@ class HANAService extends SQLService {
}

// prepare and exec are both implemented inside the drivers
prepare(sql) {
return this.ensureDBC().prepare(sql)
prepare(sql, hasBlobs) {
return this.ensureDBC().prepare(sql, hasBlobs)
}

exec(sql) {
Expand Down Expand Up @@ -527,13 +529,15 @@ class HANAService extends SQLService {
// HANA Express does not process large JSON documents
// The limit is somewhere between 64KB and 128KB
if (HANAVERSION <= 2) {
this.entries = INSERT.entries.map(e => (e instanceof Readable ? [e] : [Readable.from(this.INSERT_entries_stream([e], 'hex'))]))
this.entries = INSERT.entries.map(e => (e instanceof Readable
? [e]
: [Readable.from(this.INSERT_entries_stream([e], 'hex'), { objectMode: false })]))
} else {
this.entries = [
this.entries = [[
INSERT.entries[0] instanceof Readable
? INSERT.entries[0]
: Readable.from(this.INSERT_entries_stream(INSERT.entries, 'hex'))
]
: Readable.from(this.INSERT_entries_stream(INSERT.entries, 'hex'), { objectMode: false })
]]
}

// WITH SRC is used to force HANA to interpret the ? as a NCLOB allowing for streaming of the data
Expand Down Expand Up @@ -954,7 +958,7 @@ class HANAService extends SQLService {
this.dbc = con

const stmt = await this.dbc.prepare(createContainerDatabase)
const res = await stmt.all([creds.user, creds.password, creds.containerGroup, !clean])
const res = await stmt.run([creds.user, creds.password, creds.containerGroup, !clean])
DEBUG?.(res.map(r => r.MESSAGE).join('\n'))
} finally {
if (this.dbc) {
Expand Down Expand Up @@ -996,7 +1000,7 @@ class HANAService extends SQLService {
this.dbc = con

const stmt = await this.dbc.prepare(createContainerTenant.replaceAll('{{{GROUP}}}', creds.containerGroup))
const res = await stmt.all([creds.user, creds.password, creds.schema, !clean])
const res = await stmt.run([creds.user, creds.password, creds.schema, !clean])
res && DEBUG?.(res.map(r => r.MESSAGE).join('\n'))
} finally {
await this.dbc.disconnect()
Expand Down
150 changes: 125 additions & 25 deletions hana/lib/drivers/hana-client.js
Original file line number Diff line number Diff line change
Expand Up @@ -45,13 +45,70 @@ class HANAClientDriver extends driver {
}

set(variables) {
for(const key in variables) {
for (const key in variables) {
this._native.setClientInfo(key, variables[key])
}
}

async prepare(sql) {
async prepare(sql, hasBlobs) {
const ret = await super.prepare(sql)
// hana-client ResultSet API does not allow for deferred streaming of blobs
// With the current design of the hana-client ResultSet it is only
// possible to read all LOBs into memory to do deferred streaming
// Main reason is that the ResultSet only allowes using getData() on the current row
// with the current next() implemenation it is only possible to go foward in the ResultSet
// It would be required to allow using getDate() on previous rows
if (hasBlobs) {
ret.all = async (values) => {
const stmt = await ret._prep
// Create result set
const reset = async function () {
if (this) await prom(this, 'close')()
const rs = await prom(stmt, 'executeQuery')(values)
rs.reset = reset
return rs
}
const rs = await reset()
const rsStreamsProm = {}
const rsStreams = new Promise((resolve, reject) => {
rsStreamsProm.resolve = resolve
rsStreamsProm.reject = reject
})

rs._rowPosition = -1
const _next = prom(rs, 'next')
const next = () => {
rs._rowPosition++
return _next()
}
const getValue = prom(rs, 'getValue')
const result = []
// Fetch the next row
while (await next()) {
const cols = stmt.getColumnInfo().map(b => b.columnName)
// column 0-3 are metadata columns
const values = await Promise.all([getValue(0), getValue(1), getValue(2), getValue(3)])

const row = {}
for (let i = 0; i < cols.length; i++) {
const col = cols[i]
// column >3 are all blob columns
row[col] = i > 3 ?
rs.isNull(i)
? null
: Readable.from(streamBlob(rsStreams, rs._rowPosition, i, 'binary'))
: values[i]
}

result.push(row)
}

rs.reset().then(rsStreamsProm.resolve, rsStreamsProm.reject)

return result
}
}

ret.stream = async (values, one) => {
const stmt = await ret._prep
values = Array.isArray(values) ? values : []
Expand All @@ -66,7 +123,7 @@ class HANAClientDriver extends driver {
// Which creates an inherent limitation to the maximum size of a result set (~0xfffffffb)
if (streamUnsafe && sql.startsWith('DO')) {
const rows = await prom(stmt, 'exec')(values, { rowsAsArray: true })
return Readable.from(rowsIterator(rows, stmt.getColumnInfo()))
return Readable.from(rowsIterator(rows, stmt.getColumnInfo()), { objectMode: false })
}
const rs = await prom(stmt, 'executeQuery')(values)
const cols = rs.getColumnInfo()
Expand All @@ -75,9 +132,9 @@ class HANAClientDriver extends driver {
if (rs.getRowCount() === 0) return null
await prom(rs, 'next')()
if (rs.isNull(0)) return null
return Readable.from(streamBlob(rs, 0, 'binary'))
return Readable.from(streamBlob(rs, undefined, 0, 'binary'), { objectMode: false })
}
return Readable.from(rsIterator(rs, one))
return Readable.from(rsIterator(rs, one), { objectMode: false })
}
return ret
}
Expand Down Expand Up @@ -187,7 +244,7 @@ async function* rsIterator(rs, one) {
yield buffer
buffer = ''

for await (const chunk of streamBlob(rs, columnIndex, 'base64', binaryBuffer)) {
for await (const chunk of streamBlob(rs, undefined, columnIndex, 'base64', binaryBuffer)) {
yield chunk
}
buffer += '"'
Expand All @@ -210,32 +267,75 @@ async function* rsIterator(rs, one) {
yield buffer
}

async function* streamBlob(rs, columnIndex, encoding, binaryBuffer = Buffer.allocUnsafe(1 << 16)) {
const getData = prom(rs, 'getData')
async function* streamBlob(rs, rowIndex = -1, columnIndex, encoding, binaryBuffer = Buffer.allocUnsafe(1 << 16)) {
const promChain = {
resolve: () => { },
reject: () => { }
}
try {
// Check if the resultset is a promise
if (rs.then) {
// Copy the current Promise
const prom = new Promise((resolve, reject) => rs.then(resolve, reject))
// Enqueue all following then calls till after the current call
const next = new Promise((resolve, reject) => {
promChain.resolve = resolve
promChain.reject = reject
})
rs.then = (resolve, reject) => next.then(resolve, reject)
rs = await prom
}

let decoder = new StringDecoder(encoding)
// Check if the provided resultset is on the correct row
if (rowIndex >= 0) {
rs._rowPosition ??= -1
if (rowIndex - rs._rowPosition < 0) {
rs = await rs.reset()
rs._rowPosition ??= -1
}

let blobPosition = 0
const _next = prom(rs, 'next')
const next = () => {
rs._rowPosition++
return _next()
}

while (true) {
// REVISIT: Ensure that the data read is divisible by 3 as that allows for base64 encoding
let start = 0
const read = await getData(columnIndex, blobPosition, binaryBuffer, 0, binaryBuffer.byteLength)
if (blobPosition === 0 && binaryBuffer.slice(0, 7).toString() === 'base64,') {
decoder = {
write: encoding === 'base64' ? c => c : chunk => Buffer.from(chunk.toString(), 'base64'),
end: () => Buffer.allocUnsafe(0),
// Move result set to the correct row
while (rowIndex - rs._rowPosition > 0) {
await next()
}
start = 7
}
blobPosition += read
if (read < binaryBuffer.byteLength) {
yield decoder.write(binaryBuffer.slice(start, read))
break

const getData = prom(rs, 'getData')

let decoder = new StringDecoder(encoding)

let blobPosition = 0

while (true) {
// REVISIT: Ensure that the data read is divisible by 3 as that allows for base64 encoding
let start = 0
const read = await getData(columnIndex, blobPosition, binaryBuffer, 0, binaryBuffer.byteLength)
if (blobPosition === 0 && binaryBuffer.slice(0, 7).toString() === 'base64,') {
decoder = {
write: encoding === 'base64' ? c => c : chunk => Buffer.from(chunk.toString(), 'base64'),
end: () => Buffer.allocUnsafe(0),
}
start = 7
}
blobPosition += read
if (read < binaryBuffer.byteLength) {
yield decoder.write(binaryBuffer.slice(start, read))
break
}
yield decoder.write(binaryBuffer.slice(start).toString('base64'))
}
yield decoder.write(binaryBuffer.slice(start).toString('base64'))
yield decoder.end()
} catch (e) {
promChain.reject(e)
} finally {
promChain.resolve(rs)
}
yield decoder.end()
}

async function* rowsIterator(rows, cols) {
Expand Down
52 changes: 45 additions & 7 deletions hana/lib/drivers/hdb.js
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
const { Readable, PassThrough, Stream } = require('stream')
const { Readable, Stream } = require('stream')
const { StringDecoder } = require('string_decoder')
const { text } = require('stream/consumers')

const hdb = require('hdb')
const iconv = require('iconv-lite')
Expand Down Expand Up @@ -40,7 +41,7 @@ class HDBDriver extends driver {

set(variables) {
const clientInfo = this._native._connection.getClientInfo()
for(const key in variables) {
for (const key in variables) {
clientInfo.setProperty(key, variables[key])
}
}
Expand Down Expand Up @@ -69,8 +70,42 @@ class HDBDriver extends driver {
})
}

async prepare(sql) {
async prepare(sql, hasBlobs) {
const ret = await super.prepare(sql)

if (hasBlobs) {
ret.all = async (values) => {
const stmt = await ret._prep
// Create result set
const rs = await prom(stmt, 'execute')(values)
const cols = rs.metadata.map(b => b.columnName)
const stream = rs.createReadStream()

const result = []
for await (const row of stream) {
const obj = {}
for (let i = 0; i < cols.length; i++) {
const col = cols[i]
// hdb returns large strings as streams sometimes
if (col === '_json_' && typeof row[col] === 'object') {
obj[col] = await text(row[col].createReadStream())
continue
}
obj[col] = i > 3
? row[col] === null
? null
: (
row[col].createReadStream?.()
|| Readable.from(echoStream(row[col]), { objectMode: false })
)
: row[col]
}
result.push(obj)
}
return result
}
}

ret.stream = async (values, one) => {
const stmt = await ret._prep
const rs = await prom(stmt, 'execute')(values || [])
Expand Down Expand Up @@ -98,11 +133,10 @@ class HDBDriver extends driver {
if (!Array.isArray(values)) return { values: [], streams: [] }
const streams = []
values = values.map((v, i) => {
if (v instanceof Stream && !(v instanceof PassThrough)) {
if (v instanceof Stream) {
streams[i] = v
const passThrough = new PassThrough()
v.pipe(passThrough)
return passThrough
const iterator = v[Symbol.asyncIterator]()
return Readable.from(iterator, { objectMode: false })
}
return v
})
Expand All @@ -113,6 +147,10 @@ class HDBDriver extends driver {
}
}

function* echoStream(ret) {
yield ret
}

async function* rsIterator(rs, one) {
// Raw binary data stream unparsed
const raw = rs.createBinaryStream()[Symbol.asyncIterator]()
Expand Down
Loading

0 comments on commit 5015a2c

Please sign in to comment.