-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathindex.js
102 lines (83 loc) · 2.46 KB
/
index.js
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
const { Duplex, Readable } = require('stream');
const createReadMethod = (entry, state) => function() {
if (state.alreadyRead < entry.available) {
const { readableHighWaterMark } = this;
const chunkSize = Math.min(readableHighWaterMark, entry.available - state.alreadyRead);
const chunk = entry.buffer.slice(state.alreadyRead, state.alreadyRead + chunkSize);
this.push(Buffer.from(chunk));
state.alreadyRead += chunkSize;
if (state.readAttempts > 0) state.readAttempts--;
} else if (entry.complete) {
this.push(null);
} else {
state.readAttempts++;
}
};
const streamBufferCache = Cls => class extends Cls {
set(key) {
const state = {
alreadyRead: 0,
readAttempts: 0,
};
const entry = {
available: 0,
buffer: [],
complete: false,
stream: new Duplex({
write(chunk, encoding, cb) {
const { byteLength } = chunk;
for (const b of chunk) entry.buffer.push(b);
entry.available += byteLength;
if (state.readAttempts > 0) {
entry.stream.push(chunk);
state.alreadyRead += byteLength;
state.readAttempts--;
}
return cb();
},
}),
};
entry.stream['_read'] = createReadMethod(entry, state);
entry.stream.on('error', () => {
this.delete(key);
process.nextTick(() => entry.stream.removeAllListeners());
});
entry.stream.once('finish', () => {
if (state.readAttempts > 0) entry.stream.push(null);
entry.complete = true;
});
super.set(key, entry);
return entry.stream;
}
get(key) {
const entry = super.get(key);
if (!entry) return undefined;
const state = {
alreadyRead: 0,
readAttempts: 0,
};
const stream = new Readable;
stream['_read'] = createReadMethod(entry, state);
if (!entry.complete) {
const onData = chunk => {
if (state.readAttempts > 0) {
stream.push(chunk);
state.alreadyRead += chunk.byteLength;
state.readAttempts--;
}
};
const onError = err => {
stream.emit('error', err);
};
entry.stream.on('data', onData);
entry.stream.on('error', onError);
entry.stream.once('finish', () => {
if (state.readAttempts > 0) stream.push(null);
entry.stream.off('data', onData);
entry.stream.off('error', onError);
});
}
return stream;
}
};
module.exports = streamBufferCache;