Skip to content

Commit

Permalink
chore: remove double-pipe
Browse files Browse the repository at this point in the history
  • Loading branch information
achingbrain committed Jan 30, 2024
1 parent e5fdd3e commit ca2ab09
Show file tree
Hide file tree
Showing 3 changed files with 21 additions and 24 deletions.
21 changes: 15 additions & 6 deletions package-lock.json

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 0 additions & 2 deletions package.json
Original file line number Diff line number Diff line change
Expand Up @@ -81,8 +81,6 @@
"abortable-iterator": "^5.0.1",
"denque": "^2.1.0",
"it-length-prefixed": "^9.0.4",
"it-map": "^3.0.5",
"it-merge": "^3.0.3",
"it-pipe": "^3.0.1",
"it-pushable": "^3.2.3",
"multiformats": "^13.0.1",
Expand Down
22 changes: 6 additions & 16 deletions src/stream.ts
Original file line number Diff line number Diff line change
@@ -1,7 +1,5 @@
import { abortableSource } from 'abortable-iterator'
import { encode, decode } from 'it-length-prefixed'
import map from 'it-map'
import merge from 'it-merge'
import { pipe } from 'it-pipe'
import { pushable, type Pushable } from 'it-pushable'
import type { Stream } from '@libp2p/interface'
Expand All @@ -18,24 +16,17 @@ interface InboundStreamOpts {
}

export class OutboundStream {
private readonly pushable: Pushable<Uint8Array>
private readonly lpPushable: Pushable<Uint8ArrayList>
private readonly pushable: Pushable<Uint8Array | Uint8ArrayList>
private readonly closeController: AbortController
private readonly maxBufferSize: number

constructor (private readonly rawStream: Stream, errCallback: (e: Error) => void, opts: OutboundStreamOpts) {
this.pushable = pushable({ objectMode: false })
this.lpPushable = pushable({ objectMode: false })
this.pushable = pushable()
this.closeController = new AbortController()
this.maxBufferSize = opts.maxBufferSize ?? Infinity

pipe(
abortableSource(
merge(
this.lpPushable,
map(this.pushable, buf => encode.single(buf))
), this.closeController.signal, { returnOnAbort: true }
),
abortableSource(this.pushable, this.closeController.signal, { returnOnAbort: true }),
this.rawStream
).catch(errCallback)
}
Expand All @@ -51,24 +42,23 @@ export class OutboundStream {
throw Error(`OutboundStream buffer full, size > ${this.maxBufferSize}`)
}

this.pushable.push(data)
this.pushable.push(encode.single(data))
}

/**
* Same to push() but this is prefixed data so no need to encode length prefixed again
*/
pushPrefixed (data: Uint8ArrayList): void {
if (this.lpPushable.readableLength > this.maxBufferSize) {
if (this.pushable.readableLength > this.maxBufferSize) {
throw Error(`OutboundStream buffer full, size > ${this.maxBufferSize}`)
}
this.lpPushable.push(data)
this.pushable.push(data)
}

async close (): Promise<void> {
this.closeController.abort()
// similar to pushable.end() but clear the internal buffer
await this.pushable.return()
await this.lpPushable.return()
await this.rawStream.close()
}
}
Expand Down

0 comments on commit ca2ab09

Please sign in to comment.