Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: writableChunk and test #19

Open
wants to merge 2 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
89 changes: 89 additions & 0 deletions src/stream/writable/writableChunk.test.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,89 @@
import { createGzip } from 'node:zlib'
import { pMap } from '@naturalcycles/js-lib'
import { transformMap, writablePushToArray, _pipeline, readableCreate, gunzipToString } from '../..'
import { writableChunk } from './writableChunk'

// jest.setTimeout(900_000)

test('writableChunk', async () => {
const allData: string[][] = []
const array1: string[] = [1, 2, 3, 4, 5].map(n => n + '\n')
const array2: string[] = [6, 7, 8, 9].map(n => n + '\n')

const readableInput = readableCreate<number>()

let i = 0
const interval = setInterval(() => {
readableInput.push(++i)
if (i >= 9) {
clearInterval(interval)
readableInput.push(null) // complete
}
}, 100)

await _pipeline([
readableInput,
transformMap<number, string>(
n => {
console.log(`mapper ${n}`)
return n + '\n'
},
{ concurrency: 1 },
),
writableChunk(
// Split every 5th row
(row: string) => Number.parseInt(row) % 5 === 0,
// no transforms
[],
(_: number) => {
const newArray: string[] = []
allData.push(newArray)
return writablePushToArray(newArray)
},
),
])
expect(allData).toEqual([array1, array2])
})

test('writableChunk with Gzip', async () => {
const allData: Buffer[][] = []
const file1: string = [1, 2, 3, 4, 5].map(n => n + '\n').join('')
const file2: string = [6, 7, 8, 9].map(n => n + '\n').join('')

const readableInput = readableCreate<number>()

let i = 0
const interval = setInterval(() => {
readableInput.push(++i)
if (i >= 9) {
clearInterval(interval)
readableInput.push(null) // complete
}
}, 100)

await _pipeline([
readableInput,
transformMap<number, string>(
n => {
console.log(`mapper ${n}`)
return n + '\n'
},
{ concurrency: 1 },
),
writableChunk(
// Split every 5th row
(row: string) => Number.parseInt(row) % 5 === 0,
// gzipped data
[createGzip],
(_: number) => {
const newArray: Buffer[] = []
allData.push(newArray)
return writablePushToArray(newArray)
},
),
])
const unzippedData = await pMap(allData, async buff => {
return await gunzipToString(Buffer.concat(buff))
})
expect(unzippedData).toEqual([file1, file2])
})
92 changes: 92 additions & 0 deletions src/stream/writable/writableChunk.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,92 @@
import { Writable } from 'node:stream'
import { promisify } from 'node:util'
import { _deepCopy, _last } from '@naturalcycles/js-lib'
import {
TransformOptions,
TransformTyped,
WritableTyped,
transformNoOp,
} from '@naturalcycles/nodejs-lib'

// This is a helper function to create a promise which resolves when the stream emits a 'finish'
// event.
// This is used to await all the writables in the final method of the writableChunk
async function awaitFinish(stream: Writable): Promise<unknown> {
return await promisify(((cb: any) => stream.on('finish', cb)).bind(stream))()
}
/**
* Generates an array of [arr[i], arr[i+1]] tuples from the input array.
* The resulting array will have a length of `arr.length - 1`.
* ```ts
* successiveElements([1, 2, 3, 4]) // [[1, 2], [2, 3], [3, 4]]
* ```
*/
function successiveElements<T>(arr: T[]): [T, T][] {
const tuples: [T, T][] = []
const arrCopy = _deepCopy(arr)
for (let i = 1; i < arrCopy.length; i++) {
tuples.push([arrCopy[i - 1]!, arrCopy[i]!])
}
return tuples
}
/**
* Allows to split the output to multiple files by splitting into chunks
* based on `shouldSplitFn`.
* `transformFactories` are used to create a chain of transforms for each chunk.
* It was meant to be used with createGzip, which needs a proper start and end for each chunk
* for the output file to be a valid gzip file.
*
* @experimental
*/
export function writableChunk<T>(
shouldSplitFn: (row: T) => boolean,
transformFactories: (() => TransformTyped<T, T>)[],
writableFactory: (index: number) => WritableTyped<T>,
opt?: TransformOptions,
): WritableTyped<T> {
let currentSplitIndex = 0
// We don't want to have an empty chain, so we add a no-op transform
if (transformFactories.length === 0) {
transformFactories.push(transformNoOp<T>)
}

// Create the transforms as well as the Writable, and pipe them together
let currentWritable = writableFactory(currentSplitIndex)
let transforms = transformFactories.map(f => f())
successiveElements(transforms).forEach(([t1, t2]) => t1.pipe(t2))
_last(transforms).pipe(currentWritable)

// We keep track of all the pending writables, so we can await them in the final method
const writablesFinish: Promise<unknown>[] = [awaitFinish(currentWritable)]

return new Writable({
objectMode: true,
...opt,
write(chunk: T, _, cb) {
// pipe will take care of piping the data through the different streams correctly
transforms[0]!.write(chunk, cb)

if (shouldSplitFn(chunk)) {
console.log(`writableChunk splitting at index: ${currentSplitIndex}`)
currentSplitIndex++
transforms[0]!.end()

currentWritable = writableFactory(currentSplitIndex)
transforms = transformFactories.map(f => f())
successiveElements(transforms).forEach(([t1, t2]) => t1.pipe(t2))
_last(transforms).pipe(currentWritable)

writablesFinish.push(awaitFinish(currentWritable))
}
},
async final(cb) {
try {
transforms[0]!.end()
await Promise.all(writablesFinish)
cb()
} catch (err) {
cb(err as Error)
}
},
})
}