Skip to content

Commit fd91d18

Browse files
authored
Merge pull request #24 from scramjetorg/feature/reduce
Introduce "DataStream.reduce()"
2 parents 6f55422 + 7b7b3e7 commit fd91d18

File tree

5 files changed

+247
-15
lines changed

5 files changed

+247
-15
lines changed

src/streams/base-stream.ts

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,7 @@
1-
import { TransformFunction, AnyIterable } from "../types";
1+
import { TransformFunction, AnyIterable, MaybePromise } from "../types";
22
export interface BaseStream<T extends any> {
33
map<U, W extends any[]>(callback: TransformFunction<T, U, W>, ...args: W): BaseStream<U>;
44
flatMap<U, W extends any[]>(callback: TransformFunction<T, AnyIterable<U>, W>, ...args: W): BaseStream<U>;
55
filter<W extends any[]>(callback: TransformFunction<T, Boolean, W>, ...args: W): BaseStream<T>;
6+
reduce<U = T>(callback: (previousValue: U, currentChunk: T) => MaybePromise<U>, initial?: U): Promise<U>
67
}

src/streams/data-stream.ts

Lines changed: 140 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -2,9 +2,16 @@ import { Readable } from "stream";
22
import { createReadStream, promises as fs } from "fs";
33
import { BaseStream } from "./base-stream";
44
import { IFCA } from "../ifca";
5-
import { AnyIterable, Constructor, DroppedChunk, ResolvablePromiseObject, TransformFunction } from "../types";
5+
import { AnyIterable, Constructor, DroppedChunk, ResolvablePromiseObject, TransformFunction, MaybePromise } from "../types";
66
import { createResolvablePromiseObject, isAsyncFunction } from "../utils";
77

8+
type Reducer<T, U> = {
9+
isAsync: boolean,
10+
value?: U,
11+
onFirstChunkCallback: Function,
12+
onChunkCallback: (chunk: T) => MaybePromise<void>
13+
};
14+
815
export class DataStream<T> implements BaseStream<T>, AsyncIterable<T> {
916
constructor() {
1017
this.ifca = new IFCA<T, T, any>(2, (chunk: T) => chunk);
@@ -67,10 +74,27 @@ export class DataStream<T> implements BaseStream<T>, AsyncIterable<T> {
6774
return this.asNewFlattenedStream(this.map<AnyIterable<U>, W>(callback, ...args));
6875
}
6976

77+
async reduce<U = T>(callback: (previousValue: U, currentChunk: T) => MaybePromise<U>, initial?: U): Promise<U> {
78+
// https://developer.mozilla.org/en-US/docs/Web/JavaScript/Reference/Global_Objects/Array/Reduce#parameters
79+
//
80+
// initialValue (optional):
81+
// A value to which previousValue is initialized the first time the callback is called.
82+
// If initialValue is specified, that also causes currentValue to be initialized to the first
83+
// value in the array. If initialValue is not specified, previousValue is initialized to the first
84+
// value in the array, and currentValue is initialized to the second value in the array.
85+
86+
const reducer = this.getReducer<U>(callback, initial);
87+
const reader = reducer.isAsync
88+
? this.getReaderAsyncCallback(true, reducer)
89+
: this.getReader(true, reducer);
90+
91+
return reader().then(() => reducer.value as U);
92+
}
93+
7094
async toArray(): Promise<T[]> {
7195
const chunks: Array<T> = [];
7296

73-
await (this.getReader(true, chunk => { chunks.push(chunk); }))();
97+
await (this.getReader(true, { onChunkCallback: chunk => { chunks.push(chunk); } }))();
7498

7599
return chunks;
76100
}
@@ -97,6 +121,37 @@ export class DataStream<T> implements BaseStream<T>, AsyncIterable<T> {
97121
}
98122
}
99123

124+
protected getReducer<U>(
125+
callback: (previousValue: U, currentChunk: T) => MaybePromise<U>,
126+
initial?: U
127+
): Reducer<T, U> {
128+
const reducer: any = {
129+
isAsync: isAsyncFunction(callback),
130+
value: initial
131+
};
132+
133+
reducer.onFirstChunkCallback = async (chunk: T): Promise<void> => {
134+
if (initial === undefined) {
135+
// Here we should probably check if typeof chunk is U.
136+
reducer.value = chunk as unknown as U;
137+
} else {
138+
reducer.value = await callback(reducer.value as U, chunk);
139+
}
140+
};
141+
142+
if (reducer.isAsync) {
143+
reducer.onChunkCallback = async (chunk: T): Promise<void> => {
144+
reducer.value = await callback(reducer.value as U, chunk) as U;
145+
};
146+
} else {
147+
reducer.onChunkCallback = (chunk: T): void => {
148+
reducer.value = callback(reducer.value as U, chunk) as U;
149+
};
150+
}
151+
152+
return reducer as Reducer<T, U>;
153+
}
154+
100155
protected asNewFlattenedStream<U, W extends DataStream<AnyIterable<U>>>(
101156
fromStream: W,
102157
onEndYield?: () => { yield: boolean, value?: U }
@@ -116,34 +171,109 @@ export class DataStream<T> implements BaseStream<T>, AsyncIterable<T> {
116171
})(fromStream));
117172
}
118173

119-
// For now this method assumes both callbacks are sync ones.
120174
protected getReader(
121175
uncork: boolean,
122-
onChunkCallback: (chunk: T) => void,
123-
onEndCallback?: Function
176+
callbacks: {
177+
onChunkCallback: (chunk: T) => void,
178+
onFirstChunkCallback?: Function,
179+
onEndCallback?: Function
180+
}
124181
): () => Promise<void> {
182+
/* eslint-disable complexity */
125183
return async () => {
126184
if (uncork && this.corked) {
127185
this._uncork();
128186
}
129187

188+
let chunk = this.ifca.read();
189+
190+
// A bit of code duplication but we don't want to have unnecessary if inside a while loop
191+
// which is called for every chunk or wrap the common code inside another function due to performance.
192+
if (callbacks.onFirstChunkCallback) {
193+
if (chunk instanceof Promise) {
194+
chunk = await chunk;
195+
}
196+
197+
if (chunk !== null) {
198+
await callbacks.onFirstChunkCallback(chunk);
199+
chunk = this.ifca.read();
200+
}
201+
}
202+
130203
// eslint-disable-next-line no-constant-condition
131204
while (true) {
132-
let chunk = this.ifca.read();
205+
if (chunk instanceof Promise) {
206+
chunk = await chunk;
207+
}
208+
209+
if (chunk === null) {
210+
break;
211+
}
212+
213+
callbacks.onChunkCallback(chunk);
214+
215+
chunk = this.ifca.read();
216+
}
217+
218+
if (callbacks.onEndCallback) {
219+
await callbacks.onEndCallback.call(this);
220+
}
221+
};
222+
/* eslint-enable complexity */
223+
}
224+
225+
// This is duplicated '.getReader()' method with the only difference that 'onChunkCallback'
226+
// is an async function so we have to 'await' on it for each chunk. Since it has significant effect
227+
// on processing time (and makes it asynchronous) I have extracted it as a separate method.
228+
protected getReaderAsyncCallback(
229+
uncork: boolean,
230+
callbacks: {
231+
onChunkCallback: (chunk: T) => MaybePromise<void>,
232+
onFirstChunkCallback?: Function,
233+
onEndCallback?: Function
234+
}
235+
): () => Promise<void> {
236+
/* eslint-disable complexity */
237+
return async () => {
238+
if (uncork && this.corked) {
239+
this._uncork();
240+
}
241+
242+
let chunk = this.ifca.read();
133243

244+
// A bit of code duplication but we don't want to have unnecessary if inside a while loop
245+
// which is called for every chunk or wrap the common code inside another function due to performance.
246+
if (callbacks.onFirstChunkCallback) {
134247
if (chunk instanceof Promise) {
135248
chunk = await chunk;
136249
}
250+
251+
if (chunk !== null) {
252+
await callbacks.onFirstChunkCallback(chunk);
253+
chunk = this.ifca.read();
254+
}
255+
}
256+
257+
// eslint-disable-next-line no-constant-condition
258+
while (true) {
259+
if (chunk instanceof Promise) {
260+
chunk = await chunk;
261+
}
262+
137263
if (chunk === null) {
138-
if (onEndCallback) {
139-
onEndCallback.call(this);
140-
}
141264
break;
142265
}
143266

144-
onChunkCallback(chunk);
267+
await callbacks.onChunkCallback(chunk);
268+
269+
chunk = this.ifca.read();
270+
}
271+
272+
if (callbacks.onEndCallback) {
273+
await callbacks.onEndCallback.call(this);
145274
}
146275
};
276+
/* eslint-enable complexity */
147277
}
148278

149279
// Native node readables also implement AsyncIterable interface.

src/utils.ts

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -27,19 +27,19 @@ function createResolvablePromiseObject<T>(): ResolvablePromiseObject<T> {
2727
return { promise, resolver: resolver as () => (T) };
2828
}
2929

30-
function isIterable(iterable: any): Boolean {
30+
function isIterable(iterable: any): boolean {
3131
return iterable && iterable[Symbol.iterator] && typeof iterable[Symbol.iterator] === "function";
3232
}
3333

34-
function isAsyncIterable(iterable: any): Boolean {
34+
function isAsyncIterable(iterable: any): boolean {
3535
return iterable && iterable[Symbol.asyncIterator] && typeof iterable[Symbol.asyncIterator] === "function";
3636
}
3737

38-
function isAsyncFunction(func: any): Boolean {
38+
function isAsyncFunction(func: any): boolean {
3939
return func && func[Symbol.toStringTag] === "AsyncFunction";
4040
}
4141

42-
function isAsyncTransformHandler(func: TransformHandler<any, any>): Boolean {
42+
function isAsyncTransformHandler(func: TransformHandler<any, any>): boolean {
4343
return isAsyncFunction(func[0]) || isAsyncFunction(func[1]);
4444
}
4545

test/helpers/utils.ts

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -12,6 +12,10 @@ async function defer<X extends any | undefined>(ts: number, out?: X): Promise<X
1212
return new Promise((res) => setTimeout(() => res(out), ts));
1313
}
1414

15+
async function deferReturn<X extends any>(ts: number, out: X): Promise<X> {
16+
return new Promise((res) => setTimeout(() => res(out), ts));
17+
}
18+
1519
function writeInput(ifca: IFCA<any, any, any>, input: any[]): void {
1620
for (const i of input) {
1721
ifca.write(i);
@@ -58,6 +62,7 @@ const transforms = {
5862

5963
export {
6064
defer,
65+
deferReturn,
6166
writeInput,
6267
readNTimes,
6368
readNTimesConcurrently,

test/streams/data/reduce.spec.ts

Lines changed: 96 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,96 @@
1+
import test from "ava";
2+
import { DataStream } from "../../../src/streams/data-stream";
3+
import { deferReturn } from "../../helpers/utils";
4+
5+
test("DataStream reduce can be use to calculate sum", async (t) => {
6+
const result = await DataStream
7+
.from([1, 2, 3, 4, 0, 0, 20, 10, 2, 2])
8+
.reduce((a, b) => a + b);
9+
10+
t.deepEqual(result, 44);
11+
});
12+
13+
test("DataStream reduce can be use to calculate sum (initial provided)", async (t) => {
14+
const result = await DataStream
15+
.from([1, 2, 3, 4, 0, 0, 20, 10, 2, 2])
16+
.reduce((a, b) => a + b, 0);
17+
18+
t.deepEqual(result, 44);
19+
});
20+
21+
test("DataStream reduce can be use to concate numbers to string", async (t) => {
22+
const result = await DataStream
23+
.from([1, 2, 3, 4, 5, 6, 7, 8, 9, 0])
24+
.reduce((a, b) => `${a}${b}`, "");
25+
26+
t.deepEqual(result, "1234567890");
27+
});
28+
29+
test("DataStream reduce can be use to calculate sum (async)", async (t) => {
30+
const result = await DataStream
31+
.from([1, 2, 3, 4, 0, 0, 20, 10, 2, 2])
32+
.reduce(async (a, b) => deferReturn(5, a + b));
33+
34+
t.deepEqual(result, 44);
35+
});
36+
37+
test("DataStream reduce can be use to calculate sum (initial provided, async)", async (t) => {
38+
const result = await DataStream
39+
.from([1, 2, 3, 4, 0, 0, 20, 10, 2, 2])
40+
.reduce(async (a, b) => deferReturn(5, a + b), 0);
41+
42+
t.deepEqual(result, 44);
43+
});
44+
45+
test("DataStream reduce can be use to concate numbers to string (async)", async (t) => {
46+
const result = await DataStream
47+
.from([1, 2, 3, 4, 5, 6, 7, 8, 9, 0])
48+
.reduce(async (a, b) => deferReturn(5, `${a}${b}`), "");
49+
50+
t.deepEqual(result, "1234567890");
51+
});
52+
53+
// Tests below inspired by:
54+
// https://developer.mozilla.org/en-US/docs/Web/JavaScript/Reference/Global_Objects/Array/Reduce#examples
55+
56+
test("DataStream can sum values in an object stream", async (t) => {
57+
const result = await DataStream
58+
.from([{ x: 1 }, { x: 2 }, { x: 3 }])
59+
.reduce((a, b) => a + b.x, 0);
60+
61+
t.deepEqual(result, 6);
62+
});
63+
64+
test("DataStream can flatten a stream of arrays (explicit inital)", async (t) => {
65+
const initial: number[] = [];
66+
const result = await DataStream
67+
.from([[0, 1], [2, 3], [4, 5]])
68+
.reduce((a, b) => a.concat(b), initial);
69+
70+
t.deepEqual(result, [0, 1, 2, 3, 4, 5]);
71+
});
72+
73+
test("DataStream can flatten a stream of arrays (explicit type)", async (t) => {
74+
const result = await DataStream
75+
.from([[0, 1], [2, 3], [4, 5]])
76+
.reduce<number[]>((a, b) => a.concat(b), []);
77+
78+
t.deepEqual(result, [0, 1, 2, 3, 4, 5]);
79+
});
80+
81+
test("DataStream can count instances of values in an object stream", async (t) => {
82+
const initial: any = {};
83+
const result = await DataStream
84+
.from(["Alice", "Bob", "Tiff", "Bruce", "Alice"])
85+
.reduce((allNames, name) => {
86+
if (name in allNames) {
87+
allNames[name]++;
88+
} else {
89+
allNames[name] = 1;
90+
}
91+
92+
return allNames;
93+
}, initial);
94+
95+
t.deepEqual(result, { Alice: 2, Bob: 1, Tiff: 1, Bruce: 1 });
96+
});

0 commit comments

Comments
 (0)