|
| 1 | +import { PassThrough, Transform } from "stream"; |
| 2 | +import duplexify from "duplexify"; |
| 3 | + |
| 4 | +function isObject(data: Buffer) { |
| 5 | + return !Buffer.isBuffer(data) && typeof data !== 'string'; |
| 6 | +} |
| 7 | + |
| 8 | +export interface options { |
| 9 | + maxBuffer?: number; |
| 10 | + newLine?: boolean; |
| 11 | + strict?: any |
| 12 | +} |
| 13 | + |
| 14 | +export type onpeek = (data: Buffer, swap: (err?: any, str?: Transform) => void) => void |
| 15 | + |
| 16 | +export function peek(opts: number): duplexify.Duplexify; |
| 17 | +export function peek(onpeek: onpeek): duplexify.Duplexify; |
| 18 | +export function peek(): duplexify.Duplexify; |
| 19 | +export function peek(opts: options, onpeek: onpeek): duplexify.Duplexify; |
| 20 | +export function peek(opts?: options|number|onpeek, onpeek?: onpeek): duplexify.Duplexify { |
| 21 | + if (typeof opts === "number") opts = {maxBuffer: opts}; |
| 22 | + if (typeof opts === "function") return peek(null, opts) |
| 23 | + if (!opts) opts = {}; |
| 24 | + const maxBuffer = typeof opts.maxBuffer === "number" ? opts.maxBuffer : 65535; |
| 25 | + const newline = opts.newLine !== false; |
| 26 | + const strict = opts.strict; |
| 27 | + const dup = duplexify.obj(); |
| 28 | + let buffer = [], bufferSize = 0; |
| 29 | + |
| 30 | + function onpreend() { |
| 31 | + if (strict) return dup.destroy(new Error('No newline found')); |
| 32 | + dup.cork(); |
| 33 | + return ready(Buffer.concat(buffer), null, (err) => err ? dup.destroy(err) : dup.uncork()); |
| 34 | + } |
| 35 | + |
| 36 | + function ready(data: Buffer, overflow, cb) { |
| 37 | + dup.removeListener("preend", onpreend) |
| 38 | + onpeek(data, function(err, parser) { |
| 39 | + if (err) return cb(err) |
| 40 | + dup.setWritable(parser); |
| 41 | + dup.setReadable(parser); |
| 42 | + if (data) parser.write(data); |
| 43 | + if (overflow) parser.write(overflow); |
| 44 | + overflow = buffer = peeker = null; // free the data |
| 45 | + return cb(); |
| 46 | + }); |
| 47 | + }; |
| 48 | + |
| 49 | + var peeker = new PassThrough({ |
| 50 | + highWaterMark: 1, |
| 51 | + transform(chunk, encoding, callback) { |
| 52 | + if (isObject(chunk)) return ready(chunk, null, callback) |
| 53 | + if (!Buffer.isBuffer(chunk)) chunk = Buffer.from(chunk, encoding); |
| 54 | + |
| 55 | + if (newline) { |
| 56 | + var nl = Array.prototype.indexOf.call(chunk, 10); |
| 57 | + if (nl > 0 && chunk[nl-1] === 13) nl--; |
| 58 | + if (nl > -1) { |
| 59 | + buffer.push(chunk.slice(0, nl)) |
| 60 | + return ready(Buffer.concat(buffer), chunk.slice(nl), callback) |
| 61 | + } |
| 62 | + } |
| 63 | + |
| 64 | + buffer.push(chunk) |
| 65 | + bufferSize += chunk.length |
| 66 | + |
| 67 | + if (bufferSize < maxBuffer) return callback(); |
| 68 | + if (strict) return callback(new Error("No newline found")); |
| 69 | + ready(Buffer.concat(buffer), null, callback) |
| 70 | + }, |
| 71 | + }); |
| 72 | + |
| 73 | + dup.on("preend", onpreend); |
| 74 | + dup.setWritable(peeker); |
| 75 | + return dup; |
| 76 | +} |
0 commit comments