From ca2ab09ea8ab69b202a0ace7cf8f1ee210d7caf5 Mon Sep 17 00:00:00 2001 From: achingbrain Date: Tue, 30 Jan 2024 14:28:15 +0100 Subject: [PATCH] chore: remove double-pipe --- package-lock.json | 21 +++++++++++++++------ package.json | 2 -- src/stream.ts | 22 ++++++---------------- 3 files changed, 21 insertions(+), 24 deletions(-) diff --git a/package-lock.json b/package-lock.json index e943e580..63e39cca 100644 --- a/package-lock.json +++ b/package-lock.json @@ -18,8 +18,6 @@ "abortable-iterator": "^5.0.1", "denque": "^2.1.0", "it-length-prefixed": "^9.0.4", - "it-map": "^3.0.5", - "it-merge": "^3.0.3", "it-pipe": "^3.0.1", "it-pushable": "^3.2.3", "multiformats": "^13.0.1", @@ -4779,6 +4777,15 @@ "ajv": ">=5.0.0" } }, + "node_modules/ansi-colors": { + "version": "4.1.3", + "resolved": "https://registry.npmjs.org/ansi-colors/-/ansi-colors-4.1.3.tgz", + "integrity": "sha512-/6w/C21Pm1A7aZitlI5Ni/2J6FFQN8i1Cvz3kHABAAbw93v/NlvKdVOqz7CCWz/3iv/JplRSEEZ83XION15ovw==", + "extraneous": true, + "engines": { + "node": ">=6" + } + }, "node_modules/ansi-escapes": { "version": "3.2.0", "resolved": "https://registry.npmjs.org/ansi-escapes/-/ansi-escapes-3.2.0.tgz", @@ -11028,6 +11035,7 @@ "version": "3.0.5", "resolved": "https://registry.npmjs.org/it-map/-/it-map-3.0.5.tgz", "integrity": "sha512-hB0TDXo/h4KSJJDSRLgAPmDroiXP6Fx1ck4Bzl3US9hHfZweTKsuiP0y4gXuTMcJlS6vj0bb+f70rhkD47ZA3w==", + "dev": true, "dependencies": { "it-peekable": "^3.0.0" } @@ -11063,7 +11071,8 @@ "node_modules/it-peekable": { "version": "3.0.3", "resolved": "https://registry.npmjs.org/it-peekable/-/it-peekable-3.0.3.tgz", - "integrity": "sha512-Wx21JX/rMzTEl9flx3DGHuPV1KQFGOl8uoKfQtmZHgPQtGb89eQ6RyVd82h3HuP9Ghpt0WgBDlmmdWeHXqyx7w==" + "integrity": "sha512-Wx21JX/rMzTEl9flx3DGHuPV1KQFGOl8uoKfQtmZHgPQtGb89eQ6RyVd82h3HuP9Ghpt0WgBDlmmdWeHXqyx7w==", + "dev": true }, "node_modules/it-pipe": { "version": "3.0.1", @@ -21977,9 +21986,9 @@ } }, "node_modules/type-fest": { - "version": "4.10.1", - "resolved": "https://registry.npmjs.org/type-fest/-/type-fest-4.10.1.tgz", - "integrity": "sha512-7ZnJYTp6uc04uYRISWtiX3DSKB/fxNQT0B5o1OUeCqiQiwF+JC9+rJiZIDrPrNCLLuTqyQmh4VdQqh/ZOkv9MQ==", + "version": "4.10.2", + "resolved": "https://registry.npmjs.org/type-fest/-/type-fest-4.10.2.tgz", + "integrity": "sha512-anpAG63wSpdEbLwOqH8L84urkL6PiVIov3EMmgIhhThevh9aiMQov+6Btx0wldNcvm4wV+e2/Rt1QdDwKHFbHw==", "dev": true, "engines": { "node": ">=16" diff --git a/package.json b/package.json index bf7e28b3..46a41d84 100644 --- a/package.json +++ b/package.json @@ -81,8 +81,6 @@ "abortable-iterator": "^5.0.1", "denque": "^2.1.0", "it-length-prefixed": "^9.0.4", - "it-map": "^3.0.5", - "it-merge": "^3.0.3", "it-pipe": "^3.0.1", "it-pushable": "^3.2.3", "multiformats": "^13.0.1", diff --git a/src/stream.ts b/src/stream.ts index b0a2b81d..505c7897 100644 --- a/src/stream.ts +++ b/src/stream.ts @@ -1,7 +1,5 @@ import { abortableSource } from 'abortable-iterator' import { encode, decode } from 'it-length-prefixed' -import map from 'it-map' -import merge from 'it-merge' import { pipe } from 'it-pipe' import { pushable, type Pushable } from 'it-pushable' import type { Stream } from '@libp2p/interface' @@ -18,24 +16,17 @@ interface InboundStreamOpts { } export class OutboundStream { - private readonly pushable: Pushable - private readonly lpPushable: Pushable + private readonly pushable: Pushable private readonly closeController: AbortController private readonly maxBufferSize: number constructor (private readonly rawStream: Stream, errCallback: (e: Error) => void, opts: OutboundStreamOpts) { - this.pushable = pushable({ objectMode: false }) - this.lpPushable = pushable({ objectMode: false }) + this.pushable = pushable() this.closeController = new AbortController() this.maxBufferSize = opts.maxBufferSize ?? Infinity pipe( - abortableSource( - merge( - this.lpPushable, - map(this.pushable, buf => encode.single(buf)) - ), this.closeController.signal, { returnOnAbort: true } - ), + abortableSource(this.pushable, this.closeController.signal, { returnOnAbort: true }), this.rawStream ).catch(errCallback) } @@ -51,24 +42,23 @@ export class OutboundStream { throw Error(`OutboundStream buffer full, size > ${this.maxBufferSize}`) } - this.pushable.push(data) + this.pushable.push(encode.single(data)) } /** * Same to push() but this is prefixed data so no need to encode length prefixed again */ pushPrefixed (data: Uint8ArrayList): void { - if (this.lpPushable.readableLength > this.maxBufferSize) { + if (this.pushable.readableLength > this.maxBufferSize) { throw Error(`OutboundStream buffer full, size > ${this.maxBufferSize}`) } - this.lpPushable.push(data) + this.pushable.push(data) } async close (): Promise { this.closeController.abort() // similar to pushable.end() but clear the internal buffer await this.pushable.return() - await this.lpPushable.return() await this.rawStream.close() } }