Skip to content

Commit 21088c5

Browse files
authored
feat: allow overriding stream handlers (#2945)
Adds a `force` flag to `libp2p.handle` that means the method will not throw if a handler already exists for the protocol being handled. Fixes #2928
1 parent 96f14e4 commit 21088c5

File tree

8 files changed

+90
-62
lines changed

8 files changed

+90
-62
lines changed

packages/interface-internal/src/registrar/index.ts

+10-41
Original file line numberDiff line numberDiff line change
@@ -1,56 +1,25 @@
1-
import type { Connection, Stream, Topology } from '@libp2p/interface'
1+
import type { StreamHandler, StreamHandlerOptions, StreamHandlerRecord, Topology, IncomingStreamData } from '@libp2p/interface'
22

3-
export interface IncomingStreamData {
3+
export type {
44
/**
5-
* The stream that has been opened
5+
* @deprecated This type should be imported from @libp2p/interface directly
66
*/
7-
stream: Stream
7+
IncomingStreamData,
88

99
/**
10-
* The connection that the stream was opened on
10+
* @deprecated This type should be imported from @libp2p/interface directly
1111
*/
12-
connection: Connection
13-
}
14-
15-
export interface StreamHandler {
16-
(data: IncomingStreamData): void
17-
}
18-
19-
export interface StreamHandlerOptions {
20-
/**
21-
* How many incoming streams can be open for this protocol at the same time on each connection
22-
*
23-
* @default 32
24-
*/
25-
maxInboundStreams?: number
26-
27-
/**
28-
* How many outgoing streams can be open for this protocol at the same time on each connection
29-
*
30-
* @default 64
31-
*/
32-
maxOutboundStreams?: number
33-
34-
/**
35-
* If true, allow this protocol to run on limited connections (e.g.
36-
* connections with data or duration limits such as circuit relay
37-
* connections)
38-
*
39-
* @default false
40-
*/
41-
runOnLimitedConnection?: boolean
42-
}
12+
StreamHandler,
4313

44-
export interface StreamHandlerRecord {
4514
/**
46-
* The handler that was registered to handle streams opened on the protocol
15+
* @deprecated This type should be imported from @libp2p/interface directly
4716
*/
48-
handler: StreamHandler
17+
StreamHandlerOptions,
4918

5019
/**
51-
* The options that were used to register the stream handler
20+
* @deprecated This type should be imported from @libp2p/interface directly
5221
*/
53-
options: StreamHandlerOptions
22+
StreamHandlerRecord
5423
}
5524

5625
export interface Registrar {

packages/interface/src/index.ts

+2-1
Original file line numberDiff line numberDiff line change
@@ -620,7 +620,8 @@ export interface Libp2p<T extends ServiceMap = ServiceMap> extends Startable, Ty
620620
*
621621
* `libp2p.handle(protocols, handler, options)`
622622
*
623-
* In the event of a new handler for the same protocol being added, the first one is discarded.
623+
* In the event of a new handler for the same protocol being added and error
624+
* will be thrown. Pass `force: true` to override this.
624625
*
625626
* @example
626627
*

packages/interface/src/stream-handler/index.ts

+23
Original file line numberDiff line numberDiff line change
@@ -1,11 +1,21 @@
11
import type { Connection, Stream } from '../connection/index.js'
22

33
export interface IncomingStreamData {
4+
/**
5+
* The newly opened stream
6+
*/
47
stream: Stream
8+
9+
/**
10+
* The connection the stream was opened on
11+
*/
512
connection: Connection
613
}
714

815
export interface StreamHandler {
16+
/**
17+
* A callback function that accepts the incoming stream data
18+
*/
919
(data: IncomingStreamData): void
1020
}
1121

@@ -29,9 +39,22 @@ export interface StreamHandlerOptions {
2939
* transferred or how long it can be open for.
3040
*/
3141
runOnLimitedConnection?: boolean
42+
43+
/**
44+
* If `true`, and a handler is already registered for the specified
45+
* protocol(s), the existing handler will be discarded.
46+
*/
47+
force?: true
3248
}
3349

3450
export interface StreamHandlerRecord {
51+
/**
52+
* The handler that was registered to handle streams opened on the protocol
53+
*/
3554
handler: StreamHandler
55+
56+
/**
57+
* The options that were used to register the stream handler
58+
*/
3659
options: StreamHandlerOptions
3760
}

packages/libp2p/src/libp2p.ts

+3-4
Original file line numberDiff line numberDiff line change
@@ -16,15 +16,14 @@ import { ConnectionMonitor } from './connection-monitor.js'
1616
import { CompoundContentRouting } from './content-routing.js'
1717
import { DefaultPeerRouting } from './peer-routing.js'
1818
import { RandomWalk } from './random-walk.js'
19-
import { DefaultRegistrar } from './registrar.js'
19+
import { Registrar } from './registrar.js'
2020
import { DefaultTransportManager } from './transport-manager.js'
2121
import { DefaultUpgrader } from './upgrader.js'
2222
import { userAgent } from './user-agent.js'
2323
import * as pkg from './version.js'
2424
import type { Components } from './components.js'
2525
import type { Libp2p as Libp2pInterface, Libp2pInit } from './index.js'
26-
import type { PeerRouting, ContentRouting, Libp2pEvents, PendingDial, ServiceMap, AbortOptions, ComponentLogger, Logger, Connection, NewStreamOptions, Stream, Metrics, PeerId, PeerInfo, PeerStore, Topology, Libp2pStatus, IsDialableOptions, DialOptions, PublicKey, Ed25519PeerId, Secp256k1PeerId, RSAPublicKey, RSAPeerId, URLPeerId, Ed25519PublicKey, Secp256k1PublicKey } from '@libp2p/interface'
27-
import type { StreamHandler, StreamHandlerOptions } from '@libp2p/interface-internal'
26+
import type { PeerRouting, ContentRouting, Libp2pEvents, PendingDial, ServiceMap, AbortOptions, ComponentLogger, Logger, Connection, NewStreamOptions, Stream, Metrics, PeerId, PeerInfo, PeerStore, Topology, Libp2pStatus, IsDialableOptions, DialOptions, PublicKey, Ed25519PeerId, Secp256k1PeerId, RSAPublicKey, RSAPeerId, URLPeerId, Ed25519PublicKey, Secp256k1PublicKey, StreamHandler, StreamHandlerOptions } from '@libp2p/interface'
2827

2928
export class Libp2p<T extends ServiceMap = ServiceMap> extends TypedEventEmitter<Libp2pEvents> implements Libp2pInterface<T> {
3029
public peerId: PeerId
@@ -132,7 +131,7 @@ export class Libp2p<T extends ServiceMap = ServiceMap> extends TypedEventEmitter
132131
}
133132

134133
// Create the Registrar
135-
this.configureComponent('registrar', new DefaultRegistrar(this.components))
134+
this.configureComponent('registrar', new Registrar(this.components))
136135

137136
// Addresses {listen, announce, noAnnounce}
138137
this.configureComponent('addressManager', new AddressManager(this.components, init.addresses))

packages/libp2p/src/registrar.ts

+4-4
Original file line numberDiff line numberDiff line change
@@ -1,8 +1,8 @@
11
import { InvalidParametersError } from '@libp2p/interface'
22
import merge from 'merge-options'
33
import * as errorsJs from './errors.js'
4-
import type { IdentifyResult, Libp2pEvents, Logger, PeerUpdate, TypedEventTarget, PeerId, PeerStore, Topology } from '@libp2p/interface'
5-
import type { StreamHandlerOptions, StreamHandlerRecord, Registrar, StreamHandler } from '@libp2p/interface-internal'
4+
import type { IdentifyResult, Libp2pEvents, Logger, PeerUpdate, TypedEventTarget, PeerId, PeerStore, Topology, StreamHandlerRecord, StreamHandlerOptions } from '@libp2p/interface'
5+
import type { Registrar as RegistrarInterface, StreamHandler } from '@libp2p/interface-internal'
66
import type { ComponentLogger } from '@libp2p/logger'
77

88
export const DEFAULT_MAX_INBOUND_STREAMS = 32
@@ -18,7 +18,7 @@ export interface RegistrarComponents {
1818
/**
1919
* Responsible for notifying registered protocols of events in the network.
2020
*/
21-
export class DefaultRegistrar implements Registrar {
21+
export class Registrar implements RegistrarInterface {
2222
private readonly log: Logger
2323
private readonly topologies: Map<string, Map<string, Topology>>
2424
private readonly handlers: Map<string, StreamHandlerRecord>
@@ -73,7 +73,7 @@ export class DefaultRegistrar implements Registrar {
7373
* Registers the `handler` for each protocol
7474
*/
7575
async handle (protocol: string, handler: StreamHandler, opts?: StreamHandlerOptions): Promise<void> {
76-
if (this.handlers.has(protocol)) {
76+
if (this.handlers.has(protocol) && opts?.force !== true) {
7777
throw new errorsJs.DuplicateProtocolHandlerError(`Handler already registered for protocol ${protocol}`)
7878
}
7979

packages/libp2p/test/registrar/errors.spec.ts

+3-3
Original file line numberDiff line numberDiff line change
@@ -9,10 +9,10 @@ import { MemoryDatastore } from 'datastore-core/memory'
99
import { stubInterface } from 'sinon-ts'
1010
import { defaultComponents } from '../../src/components.js'
1111
import { DefaultConnectionManager } from '../../src/connection-manager/index.js'
12-
import { DefaultRegistrar } from '../../src/registrar.js'
12+
import { Registrar } from '../../src/registrar.js'
1313
import type { Components } from '../../src/components.js'
1414
import type { Upgrader, ConnectionGater, PeerId } from '@libp2p/interface'
15-
import type { Registrar, TransportManager } from '@libp2p/interface-internal'
15+
import type { TransportManager } from '@libp2p/interface-internal'
1616

1717
describe('registrar errors', () => {
1818
let components: Components
@@ -35,7 +35,7 @@ describe('registrar errors', () => {
3535
maxConnections: 1000,
3636
inboundUpgradeTimeout: 1000
3737
})
38-
registrar = new DefaultRegistrar(components)
38+
registrar = new Registrar(components)
3939
})
4040

4141
it('should fail to register a protocol if no multicodec is provided', () => {

packages/libp2p/test/registrar/protocols.spec.ts

+43-6
Original file line numberDiff line numberDiff line change
@@ -5,15 +5,13 @@ import pDefer from 'p-defer'
55
import { createLibp2p } from '../../src/index.js'
66
import type { Components } from '../../src/components.js'
77
import type { Libp2p } from '@libp2p/interface'
8+
import type { Registrar } from '@libp2p/interface-internal'
89

910
describe('registrar protocols', () => {
1011
let libp2p: Libp2p
12+
let registrar: Registrar
1113

12-
afterEach(async () => {
13-
await libp2p?.stop()
14-
})
15-
16-
it('should be able to register and unregister a handler', async () => {
14+
beforeEach(async () => {
1715
const deferred = pDefer<Components>()
1816

1917
libp2p = await createLibp2p({
@@ -25,9 +23,14 @@ describe('registrar protocols', () => {
2523
})
2624

2725
const components = await deferred.promise
26+
registrar = components.registrar
27+
})
2828

29-
const registrar = components.registrar
29+
afterEach(async () => {
30+
await libp2p?.stop()
31+
})
3032

33+
it('should be able to register and unregister a handler', async () => {
3134
expect(registrar.getProtocols()).to.not.have.any.keys(['/echo/1.0.0', '/echo/1.0.1'])
3235

3336
const echoHandler = (): void => {}
@@ -43,4 +46,38 @@ describe('registrar protocols', () => {
4346
'/echo/1.0.1'
4447
])
4548
})
49+
50+
it('should error if registering two handlers for the same protocol', async () => {
51+
const echoHandler = (): void => {}
52+
await libp2p.handle('/echo/1.0.0', echoHandler)
53+
54+
await expect(libp2p.handle('/echo/1.0.0', echoHandler)).to.eventually.be.rejected
55+
.with.property('name', 'DuplicateProtocolHandlerError')
56+
})
57+
58+
it('should error if registering two handlers for the same protocols', async () => {
59+
const echoHandler = (): void => {}
60+
await libp2p.handle('/echo/1.0.0', echoHandler)
61+
62+
await expect(libp2p.handle(['/echo/2.0.0', '/echo/1.0.0'], echoHandler)).to.eventually.be.rejected
63+
.with.property('name', 'DuplicateProtocolHandlerError')
64+
})
65+
66+
it('should not error if force-registering two handlers for the same protocol', async () => {
67+
const echoHandler = (): void => {}
68+
await libp2p.handle('/echo/1.0.0', echoHandler)
69+
70+
await expect(libp2p.handle('/echo/1.0.0', echoHandler, {
71+
force: true
72+
})).to.eventually.be.ok
73+
})
74+
75+
it('should not error if force-registering two handlers for the same protocols', async () => {
76+
const echoHandler = (): void => {}
77+
await libp2p.handle('/echo/1.0.0', echoHandler)
78+
79+
await expect(libp2p.handle(['/echo/2.0.0', '/echo/1.0.0'], echoHandler, {
80+
force: true
81+
})).to.eventually.be.ok
82+
})
4683
})

packages/libp2p/test/registrar/registrar.spec.ts

+2-3
Original file line numberDiff line numberDiff line change
@@ -8,9 +8,8 @@ import { peerIdFromPrivateKey } from '@libp2p/peer-id'
88
import { expect } from 'aegir/chai'
99
import pDefer from 'p-defer'
1010
import { stubInterface } from 'sinon-ts'
11-
import { DefaultRegistrar } from '../../src/registrar.js'
11+
import { Registrar } from '../../src/registrar.js'
1212
import type { TypedEventTarget, Libp2pEvents, PeerId, PeerStore, Topology, Peer, Connection } from '@libp2p/interface'
13-
import type { Registrar } from '@libp2p/interface-internal'
1413
import type { StubbedInstance } from 'sinon-ts'
1514

1615
const protocol = '/test/1.0.0'
@@ -31,7 +30,7 @@ describe('registrar topologies', () => {
3130
peerStore = stubInterface<PeerStore>()
3231
events = new TypedEventEmitter<Libp2pEvents>()
3332

34-
registrar = new DefaultRegistrar({
33+
registrar = new Registrar({
3534
peerId,
3635
peerStore,
3736
events,

0 commit comments

Comments
 (0)