Skip to content

Commit 89d7c83

Browse files
authored
Merge pull request #26 from scramjetorg/feature/batch
Introduce "DataStream.batch()"
2 parents fd91d18 + 34b4dee commit 89d7c83

File tree

3 files changed

+105
-1
lines changed

3 files changed

+105
-1
lines changed

src/streams/base-stream.ts

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -3,5 +3,6 @@ export interface BaseStream<T extends any> {
33
map<U, W extends any[]>(callback: TransformFunction<T, U, W>, ...args: W): BaseStream<U>;
44
flatMap<U, W extends any[]>(callback: TransformFunction<T, AnyIterable<U>, W>, ...args: W): BaseStream<U>;
55
filter<W extends any[]>(callback: TransformFunction<T, Boolean, W>, ...args: W): BaseStream<T>;
6-
reduce<U = T>(callback: (previousValue: U, currentChunk: T) => MaybePromise<U>, initial?: U): Promise<U>
6+
batch<W extends any[] = []>(callback: TransformFunction<T, Boolean, W>, ...args: W): BaseStream<T[]>;
7+
reduce<U = T>(callback: (previousValue: U, currentChunk: T) => MaybePromise<U>, initial?: U): Promise<U>;
78
}

src/streams/data-stream.ts

Lines changed: 58 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -74,6 +74,45 @@ export class DataStream<T> implements BaseStream<T>, AsyncIterable<T> {
7474
return this.asNewFlattenedStream(this.map<AnyIterable<U>, W>(callback, ...args));
7575
}
7676

77+
batch<W extends any[] = []>(callback: TransformFunction<T, Boolean, W>, ...args: W): DataStream<T[]> {
78+
let currentBatch: T[] = [];
79+
let aggregator: TransformFunction<T, T[], W>;
80+
81+
if (isAsyncFunction(callback)) {
82+
aggregator = async (chunk: T, ...args1: W): Promise<T[]> => {
83+
currentBatch.push(chunk);
84+
85+
let result: T[] = [];
86+
87+
if (await callback(chunk, ...args1)) {
88+
result = [...currentBatch];
89+
currentBatch = [];
90+
}
91+
92+
return result;
93+
};
94+
} else {
95+
aggregator = (chunk: T, ...args1: W): T[] => {
96+
currentBatch.push(chunk);
97+
98+
let result: T[] = [];
99+
100+
if (callback(chunk, ...args1)) {
101+
result = [...currentBatch];
102+
currentBatch = [];
103+
}
104+
105+
return result;
106+
};
107+
}
108+
109+
const onEnd = () => {
110+
return { yield: currentBatch.length > 0, value: currentBatch };
111+
};
112+
113+
return this.asNewStream(this.map<T[], W>(aggregator, ...args).filter(chunk => chunk.length > 0), onEnd);
114+
}
115+
77116
async reduce<U = T>(callback: (previousValue: U, currentChunk: T) => MaybePromise<U>, initial?: U): Promise<U> {
78117
// https://developer.mozilla.org/en-US/docs/Web/JavaScript/Reference/Global_Objects/Array/Reduce#parameters
79118
//
@@ -152,6 +191,25 @@ export class DataStream<T> implements BaseStream<T>, AsyncIterable<T> {
152191
return reducer as Reducer<T, U>;
153192
}
154193

194+
protected asNewStream<U, W extends DataStream<U>>(
195+
fromStream: W,
196+
onEndYield?: () => { yield: boolean, value?: U }
197+
): DataStream<U> {
198+
return DataStream.from((async function * (stream){
199+
for await (const chunk of stream) {
200+
yield chunk;
201+
}
202+
203+
if (onEndYield) {
204+
const yieldValue = onEndYield();
205+
206+
if (yieldValue.yield) {
207+
yield yieldValue.value as U;
208+
}
209+
}
210+
})(fromStream));
211+
}
212+
155213
protected asNewFlattenedStream<U, W extends DataStream<AnyIterable<U>>>(
156214
fromStream: W,
157215
onEndYield?: () => { yield: boolean, value?: U }

test/streams/data/batch.spec.ts

Lines changed: 45 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,45 @@
1+
import test from "ava";
2+
import { DataStream } from "../../../src/streams/data-stream";
3+
import { deferReturn } from "../../helpers/utils";
4+
5+
test("DataStream batch can make sentences from words", async (t) => {
6+
const result = await DataStream
7+
.from(["foo", "bar.", "baz", "bax", ".", "foo"])
8+
.batch(chunk => chunk.endsWith("."))
9+
.toArray();
10+
11+
t.deepEqual(result, [["foo", "bar."], ["baz", "bax", "."], ["foo"]]);
12+
});
13+
14+
test("DataStream batch can make sentences from words (async)", async (t) => {
15+
const result = await DataStream
16+
.from(["foo", "bar.", "baz", "bax", ".", "foo"])
17+
.batch(async (chunk) => deferReturn(5, chunk.endsWith(".")))
18+
.toArray();
19+
20+
t.deepEqual(result, [["foo", "bar."], ["baz", "bax", "."], ["foo"]]);
21+
});
22+
23+
test("DataStream batch can bu used to batch by amount (via variadic arg counter)", async (t) => {
24+
const result = await DataStream
25+
.from([0, 1, 2, 3, 4, 5, 6, 7, 8, 9])
26+
.batch((chunk, counter) => { counter.i++; return counter.i % 3 === 0; }, { i: 0 })
27+
.toArray();
28+
29+
t.deepEqual(result, [[0, 1, 2], [3, 4, 5], [6, 7, 8], [9]]);
30+
});
31+
32+
test("DataStream batch does not deep copy chunks", async (t) => {
33+
const input = [{ id: 0, data: "foo" }, { id: 1, data: "bar" }, { id: 2, data: "baz" }, { id: 3, data: "bax" }];
34+
const result = await DataStream
35+
.from(input)
36+
.batch((chunk) => chunk.id % 2 !== 0)
37+
.toArray();
38+
39+
t.deepEqual(result, [[{ id: 0, data: "foo" }, { id: 1, data: "bar" }], [{ id: 2, data: "baz" }, { id: 3, data: "bax" }]]);
40+
41+
input[0].data = "changed1";
42+
input[3].data = "changed2";
43+
44+
t.deepEqual(result, [[{ id: 0, data: "changed1" }, { id: 1, data: "bar" }], [{ id: 2, data: "baz" }, { id: 3, data: "changed2" }]]);
45+
});

0 commit comments

Comments
 (0)