Skip to content

Commit 7b4059c

Browse files
authored
Merge pull request #39 from scramjetorg/feature/pipe
Introduce "DataStream.pipe()" method (Scramjet streams)
2 parents c52ed74 + 81dd483 commit 7b4059c

File tree

4 files changed

+312
-4
lines changed

4 files changed

+312
-4
lines changed

src/streams/base-stream.ts

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -8,12 +8,17 @@ export interface BaseStream<IN extends any, OUT extends any> {
88
end(): MaybePromise<void>;
99

1010
each<ARGS extends any[]>(callback: TransformFunction<OUT, void, ARGS>, ...args: ARGS): BaseStream<IN, OUT>;
11+
map<ARGS extends any[]>(
12+
callback: TransformFunction<OUT, OUT, ARGS>, ...args: ARGS): BaseStream<IN, OUT>;
1113
map<NEW_OUT, ARGS extends any[]>(
1214
callback: TransformFunction<OUT, NEW_OUT, ARGS>, ...args: ARGS): BaseStream<IN, NEW_OUT>;
1315
filter<ARGS extends any[]>(callback: TransformFunction<OUT, Boolean, ARGS>, ...args: ARGS): BaseStream<IN, OUT>;
1416
batch<ARGS extends any[]>(callback: TransformFunction<OUT, Boolean, ARGS>, ...args: ARGS): BaseStream<IN, OUT[]>;
17+
flatMap<ARGS extends any[]>(
18+
callback: TransformFunction<OUT, AnyIterable<OUT>, ARGS>, ...args: ARGS): BaseStream<IN, OUT>;
1519
flatMap<NEW_OUT, ARGS extends any[]>(
1620
callback: TransformFunction<OUT, AnyIterable<NEW_OUT>, ARGS>, ...args: ARGS): BaseStream<IN, NEW_OUT>;
21+
pipe<DEST extends BaseStream<OUT, any>>(destination: DEST, options: { end: boolean }): DEST;
1722
reduce<NEW_OUT>(
1823
callback: (previous: NEW_OUT, current: OUT) => MaybePromise<NEW_OUT>, initial?: NEW_OUT): Promise<NEW_OUT>;
1924
toArray(): Promise<OUT[]>;

src/streams/data-stream.ts

Lines changed: 55 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -14,32 +14,47 @@ type Reducer<IN, OUT> = {
1414
onChunkCallback: (chunk: IN) => MaybePromise<void>
1515
};
1616

17+
type Pipe<IN> = {
18+
destination: BaseStream<IN, any>, // TODO BaseStream<IN, any> | Writable
19+
options: { end: boolean }
20+
};
21+
1722
export class DataStream<IN, OUT = IN> implements BaseStream<IN, OUT>, AsyncIterable<OUT> {
1823
constructor(
1924
protected options: StreamOptions = { maxParallel: 4 },
2025
protected parentStream?: DataStream<IN, any>
2126
) {
22-
this.corked = createResolvablePromiseObject<void>();
23-
2427
if (!this.parentStream) {
2528
this.ifcaChain = new IFCAChain<IN>();
2629
this.ifca = this.ifcaChain.create<IN | OUT, OUT>(options);
30+
this.pipes = [];
2731
} else {
2832
this.ifcaChain = this.parentStream.ifcaChain;
2933
this.ifca = this.ifcaChain.get<IN | OUT, OUT>();
34+
this.pipes = this.parentStream.pipes;
3035
}
3136
}
3237

33-
protected corked: ResolvablePromiseObject<void> | null;
38+
protected corked: ResolvablePromiseObject<void> | null = createResolvablePromiseObject<void>();
3439
protected ifcaChain: IFCAChain<IN>;
3540
protected ifca: IFCA<IN | OUT, OUT, any>;
41+
protected pipes: Array<Pipe<OUT>>;
42+
43+
// All streams in chain are writable.
44+
// Only the last stream created through transforms (the one with no children streams)
45+
// is readable, transformable and pipeable.
46+
// Piped source stream (one on which pipe() was called) is writable, readable, pipeable but not transformable.
3647

3748
// Whether we can write to, end, pasue and resume this stream instance.
3849
protected writable: boolean = true;
3950
// Whether we can read from this stream instance.
4051
protected readable: boolean = true;
4152
// Whether we can add transforms to this stream instance.
4253
protected transformable: boolean = true;
54+
// Whether we can pipe from this stream.
55+
protected pipeable: boolean = true;
56+
// Whether this stream has been piped from.
57+
protected isPiped: boolean = false;
4358

4459
static from<IN extends any, STREAM extends DataStream<IN>>(
4560
this: StreamConstructor<STREAM>,
@@ -217,6 +232,41 @@ export class DataStream<IN, OUT = IN> implements BaseStream<IN, OUT>, AsyncItera
217232
return newStream;
218233
}
219234

235+
pipe<DEST extends BaseStream<OUT, any>>(destination: DEST, options: { end: boolean } = { end: true }): DEST {
236+
// pipe<DEST extends Writable>(destination: DEST, options?: { end: boolean }): DEST;
237+
// pipe<DEST extends BaseStream<OUT, any> | Writable>(
238+
// destination: DEST,
239+
// options?: { end: boolean }
240+
// ): DEST
241+
// {
242+
if (!this.pipeable) {
243+
throw new Error("Stream is not pipeable.");
244+
}
245+
246+
this.transformable = false;
247+
248+
this.pipes.push({ destination, options: options });
249+
250+
if (!this.isPiped) {
251+
this.isPiped = true;
252+
253+
const onChunkCallback = async (chunk: OUT) => {
254+
// This is the simplest approach - wait until all pipe destinations are ready
255+
// again to accept incoming chunk. This also means that chunks reading speed
256+
// in all piped streams (both source and all destinations) will be as fast as
257+
// the slowest stream can accept new chunks.
258+
return Promise.all(this.pipes.map(pipe => pipe.destination.write(chunk))) as Promise<any>;
259+
};
260+
const onEndCallback = async () => {
261+
this.pipes.filter(pipe => pipe.options.end).forEach(pipe => pipe.destination.end());
262+
};
263+
264+
(this.getReaderAsyncCallback(true, { onChunkCallback, onEndCallback }))();
265+
}
266+
267+
return destination;
268+
}
269+
220270
@checkTransformability
221271
async reduce<NEW_OUT = OUT>(
222272
callback: (previousValue: NEW_OUT, currentChunk: OUT) => MaybePromise<NEW_OUT>,
@@ -273,6 +323,7 @@ export class DataStream<IN, OUT = IN> implements BaseStream<IN, OUT>, AsyncItera
273323
protected createChildStream<NEW_OUT>(): DataStream<IN, NEW_OUT> {
274324
this.readable = false;
275325
this.transformable = false;
326+
this.pipeable = false;
276327

277328
return new DataStream<IN, NEW_OUT>(this.options, this);
278329
}
@@ -287,6 +338,7 @@ export class DataStream<IN, OUT = IN> implements BaseStream<IN, OUT>, AsyncItera
287338
protected createChildStreamSuperType<NEW_OUT>(): DataStream<IN, NEW_OUT> {
288339
this.readable = false;
289340
this.transformable = false;
341+
this.pipeable = false;
290342

291343
return new DataStream<IN, NEW_OUT>(this.options, this);
292344
}

src/utils.ts

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -44,7 +44,7 @@ function isAsyncTransformHandler(func: TransformHandler<any, any>): boolean {
4444
}
4545

4646
function getId(prefix: string): string {
47-
return `${ prefix }-${ Date.now() }`;
47+
return `${ prefix }-${ Date.now() }${ (Math.random() * 100).toPrecision(2) }`;
4848
}
4949

5050
export {

test/unit/streams/data/pipe.spec.ts

Lines changed: 251 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,251 @@
1+
import test from "ava";
2+
import { DataStream } from "../../../../src/streams/data-stream";
3+
import { StringStream } from "../../../../src/streams/string-stream";
4+
import { deferReturn } from "../../../_helpers/utils";
5+
6+
// Run tests for different sets of "maxParallel" values for each stream.
7+
const maxParallels = [
8+
// Constant
9+
[1, 1, 1, 1, 1],
10+
[2, 2, 2, 2, 2],
11+
[4, 4, 4, 4, 4],
12+
[8, 8, 8, 8, 8],
13+
// Mixed
14+
[2, 1, 5, 2, 9],
15+
[32, 1, 1, 10, 5],
16+
// Increasing
17+
[2, 4, 6, 8, 10],
18+
[1, 4, 16, 32, 64],
19+
// Decreasing
20+
[10, 8, 6, 4, 2],
21+
[64, 32, 16, 4, 1]
22+
];
23+
24+
for (const maxParallel of maxParallels) {
25+
test(`DataStream can be piped to another DataStream, ${ maxParallel.slice(0, 2) }`, async (t) => {
26+
const sourceStream = DataStream.from([1, 2, 3, 4, 5, 6, 7], { maxParallel: maxParallel[0] });
27+
const destStream = new DataStream<number, number>({ maxParallel: maxParallel[1] });
28+
29+
sourceStream.pipe(destStream);
30+
31+
t.deepEqual(await destStream.toArray(), [1, 2, 3, 4, 5, 6, 7]);
32+
});
33+
34+
test(`DataStream with transforms can be piped to another DataStream, ${ maxParallel.slice(0, 2) }`, async (t) => {
35+
const sourceStream = DataStream.from([1, 2, 3, 4, 5], { maxParallel: maxParallel[0] });
36+
const destStream = new DataStream<number, number>({ maxParallel: maxParallel[1] });
37+
38+
sourceStream.map((x) => x * 2).pipe(destStream);
39+
40+
t.deepEqual(await destStream.toArray(), [2, 4, 6, 8, 10]);
41+
});
42+
43+
test(`DataStream can be piped to another DataStream with transforms, ${ maxParallel.slice(0, 2) }`, async (t) => {
44+
const sourceStream = DataStream.from([1, 2, 3, 4, 5], { maxParallel: maxParallel[0] });
45+
const destStream = sourceStream.pipe(
46+
new DataStream<number, number>({ maxParallel: maxParallel[1] }).map((x) => x * 2));
47+
48+
t.deepEqual(await destStream.toArray(), [2, 4, 6, 8, 10]);
49+
});
50+
51+
test(`DataStream with IFCA breaking transforms can be piped to another DataStream, ${ maxParallel.slice(0, 2) }`, async (t) => {
52+
const sourceStream = DataStream
53+
.from([1, 2, 3, 4, 5], { maxParallel: maxParallel[0] })
54+
.flatMap(x => [x, x + 10, x + 100]);
55+
const destStream = sourceStream.pipe(
56+
new DataStream<number, number>({ maxParallel: maxParallel[1] }));
57+
58+
t.deepEqual(await destStream.toArray(), [1, 11, 101, 2, 12, 102, 3, 13, 103, 4, 14, 104, 5, 15, 105]);
59+
});
60+
61+
test(`DataStream using write can be piped to another DataStream (toArray), ${ maxParallel.slice(0, 2) }`, async (t) => {
62+
const sourceStream = new DataStream<number, number>({ maxParallel: maxParallel[0] })
63+
.filter(x => x % 2 === 0)
64+
.map(x => x * 2);
65+
const destStream = new DataStream<number, string>({ maxParallel: maxParallel[1] }).map((x) => `${ x }`);
66+
67+
sourceStream.pipe(destStream);
68+
69+
// We need to use sinking method here withput awaiting so it can consume sourceStream
70+
// chunks as they come, otherwise the sourceStream will fill up maxParallel and block.
71+
const result = destStream.toArray();
72+
73+
for (const i of [1, 2, 3, 4, 5, 6, 7, 8]) {
74+
await sourceStream.write(i);
75+
}
76+
77+
sourceStream.end();
78+
79+
t.deepEqual(await result, ["4", "8", "12", "16"]);
80+
});
81+
82+
test(`DataStream can be piped to multiple streams ${ maxParallel }`, async (t) => {
83+
const stream1 = DataStream
84+
.from([1, 2, 3, 4, 5, 6, 7, 8, 9, 10], { maxParallel: maxParallel[0] })
85+
.map(x => x * 2);
86+
const stream2 = new DataStream<number, number>({ maxParallel: maxParallel[1] })
87+
.filter(x => x % 4 === 0)
88+
.map(x => `foo-${ x }-`);
89+
const stream3 = new DataStream<number>({ maxParallel: maxParallel[2] })
90+
.map(x => ({ value: x }));
91+
const stream4 = new DataStream<string>({ maxParallel: maxParallel[3] });
92+
const stringStream = new StringStream({ maxParallel: maxParallel[4] });
93+
94+
stream1.pipe(stream2);
95+
stream1.pipe(stream3);
96+
97+
stream2.pipe(stream4);
98+
stream2.pipe(stringStream);
99+
100+
const [result3, result4, resultString] = await Promise.all([
101+
stream3.toArray(), // Result of: stream1 | stream3
102+
stream4.toArray(), // Result of: stream1 | stream2 | stream4
103+
stringStream.split("-").toArray() // Result of: stream1 | stream2 | stringStream
104+
]);
105+
106+
t.deepEqual(result3, [{ value: 2 }, { value: 4 }, { value: 6 }, { value: 8 },
107+
{ value: 10 }, { value: 12 }, { value: 14 }, { value: 16 }, { value: 18 }, { value: 20 }]);
108+
t.deepEqual(result4, ["foo-4-", "foo-8-", "foo-12-", "foo-16-", "foo-20-"]);
109+
t.deepEqual(resultString, ["foo", "4", "foo", "8", "foo", "12", "foo", "16", "foo", "20", ""]);
110+
});
111+
112+
test(`DataStream using write can be piped to another DataStream (read) #1, ${ maxParallel.slice(0, 2) }`, async (t) => {
113+
const sourceStream = new DataStream<number, number>({ maxParallel: maxParallel[0] })
114+
.map(x => x * 2);
115+
const destStream = new DataStream<number, string>({ maxParallel: maxParallel[1] }).map((x) => `${ x }`);
116+
117+
sourceStream.pipe(destStream);
118+
119+
const result = [];
120+
121+
for (const i of [1, 2, 3, 4, 5, 6, 7, 8]) {
122+
const [, out] = await Promise.all([sourceStream.write(i), destStream.read()]);
123+
124+
result.push(out);
125+
}
126+
127+
t.deepEqual(result, ["2", "4", "6", "8", "10", "12", "14", "16"]);
128+
});
129+
130+
test(`DataStream using write can be piped to another DataStream (read) #2, ${ maxParallel.slice(0, 2) }`, async (t) => {
131+
const sourceStream = new DataStream<number, number>({ maxParallel: maxParallel[0] })
132+
.filter(x => x % 2 === 0)
133+
.map(x => x * 2);
134+
const destStream = new DataStream<number, string>({ maxParallel: maxParallel[1] }).map((x) => `${ x }`);
135+
136+
sourceStream.pipe(destStream);
137+
138+
const result = [];
139+
140+
for (const i of [1, 2, 3, 4, 5, 6, 7, 8]) {
141+
await sourceStream.write(i);
142+
143+
// Since we filter out odd chunks, we need to read just half of initial chunks number.
144+
if (i % 2 === 0) {
145+
result.push(await destStream.read());
146+
}
147+
}
148+
149+
t.deepEqual(result, ["4", "8", "12", "16"]);
150+
});
151+
}
152+
153+
test("DataStream pipe ends destination stream", async (t) => {
154+
const sourceStream = DataStream.from([1, 2, 3, 4, 5, 6, 7]);
155+
const destStream = new DataStream<number, number>();
156+
157+
sourceStream.pipe(destStream);
158+
159+
for (let i = 0; i < 7; i++) {
160+
await destStream.read();
161+
}
162+
163+
t.throws(() => destStream.write(8), { message: "Write after end" }, "Throws if stream is ended.");
164+
});
165+
166+
test("DataStream pipe does not end destination stream if end:false passed", async (t) => {
167+
const sourceStream = DataStream.from([1, 2, 3, 4, 5, 6, 7]);
168+
const destStream = new DataStream<number, number>();
169+
170+
sourceStream.pipe(destStream, { end: false });
171+
172+
for (let i = 0; i < 7; i++) {
173+
await destStream.read();
174+
}
175+
176+
t.notThrows(() => destStream.write(8), "Should not throw if stream is not ended.");
177+
});
178+
179+
test("Pipe source can be read from", async (t) => {
180+
const sourceStream = DataStream.from([1, 2, 3, 4, 5, 6, 7]);
181+
const destStream = new DataStream<number, number>();
182+
183+
sourceStream.pipe(destStream);
184+
185+
const read = await sourceStream.read();
186+
187+
// First chunk, which is one will be send to a piped stream,
188+
// so the result of the read will be second chunk.
189+
t.deepEqual(read, 2);
190+
});
191+
192+
test("Pipe source can be piped from again", async (t) => {
193+
const sourceStream = DataStream.from([1, 2, 3, 4, 5, 6, 7]);
194+
const destStream1 = new DataStream<number, number>();
195+
const destStream2 = new DataStream<number, number>();
196+
197+
sourceStream.pipe(destStream1);
198+
199+
t.notThrows(() => sourceStream.pipe(destStream2), "Should not throw.");
200+
});
201+
202+
test("Pipe source cannot be transformed further", async (t) => {
203+
const sourceStream = DataStream.from([1, 2, 3, 4, 5, 6, 7]);
204+
const destStream = new DataStream<number, number>();
205+
206+
sourceStream.pipe(destStream);
207+
208+
t.throws(() => sourceStream.map(x => x * 2), { message: "Stream is not transformable." }, "Should throw.");
209+
});
210+
211+
test("Pipe keeps correct backpressure (1 destination)", async (t) => {
212+
const assert = (stream: any) => {
213+
t.true(stream.ifca.state.all <= stream.ifca.state.maxParallel,
214+
`Backpressure is not exceeded (${ stream.ifca.state }).`);
215+
};
216+
const stream1 = DataStream.from([1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12], { maxParallel: 5 });
217+
const stream2 = new DataStream<number>({ maxParallel: 3 })
218+
.map(x => { assert(stream2); deferReturn(10, x); });
219+
220+
stream1.pipe(stream2);
221+
222+
await stream2.toArray();
223+
});
224+
225+
test("Pipe keeps correct backpressure (2 destinations)", async (t) => {
226+
const state = {
227+
stream1: [0],
228+
stream2: [0],
229+
stream3: [0]
230+
};
231+
const assert = (name: string, stream: any) => {
232+
t.true(stream.ifca.state.all <= stream.ifca.state.maxParallel,
233+
`Backpressure is not exceeded (${ name }, ${ stream.ifca.state }).`);
234+
235+
if (name === "stream3") {
236+
t.true(state.stream3.length === state.stream2.length, "Stream3 has same number of chunks done or in progress.");
237+
}
238+
};
239+
const stream1 = DataStream
240+
.from([1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15, 16, 17, 18, 19, 20], { maxParallel: 7 })
241+
.map(x => { state.stream1.push(x); assert("stream1", stream1); return x; });
242+
const stream2 = new DataStream<number>({ maxParallel: 3 })
243+
.map(x => { state.stream2.push(x); assert("stream2", stream2); return deferReturn(10, x); });
244+
const stream3 = new DataStream<number>({ maxParallel: 5 })
245+
.map(x => { state.stream3.push(x); assert("stream3", stream3); return deferReturn(5, x); });
246+
247+
stream1.pipe(stream2);
248+
stream1.pipe(stream3);
249+
250+
await Promise.all([stream2.toArray(), stream3.toArray()]);
251+
});

0 commit comments

Comments
 (0)