15
15
*/
16
16
import { PeerIdB58 } from '@fluencelabs/interfaces' ;
17
17
import { pipe } from 'it-pipe' ;
18
- import { encode , decode } from 'it-length-prefixed' ;
18
+ import { decode , encode } from 'it-length-prefixed' ;
19
19
import type { PeerId } from '@libp2p/interface/peer-id' ;
20
20
import { createLibp2p , Libp2p } from 'libp2p' ;
21
21
22
22
import { noise } from '@chainsafe/libp2p-noise' ;
23
23
import { yamux } from '@chainsafe/libp2p-yamux' ;
24
24
import { webSockets } from '@libp2p/websockets' ;
25
25
import { all } from '@libp2p/websockets/filters' ;
26
- import { multiaddr } from '@multiformats/multiaddr' ;
27
- import type { Multiaddr } from '@multiformats/multiaddr' ;
26
+ import { multiaddr , type Multiaddr } from '@multiformats/multiaddr' ;
28
27
29
28
import map from 'it-map' ;
30
29
import { fromString } from 'uint8arrays/from-string' ;
@@ -35,9 +34,13 @@ import { Subject } from 'rxjs';
35
34
import { throwIfHasNoPeerId } from '../util/libp2pUtils.js' ;
36
35
import { IConnection } from './interfaces.js' ;
37
36
import { IParticle } from '../particle/interfaces.js' ;
38
- import { Particle , serializeToString } from '../particle/Particle.js' ;
37
+ import { Particle , serializeToString , verifySignature } from '../particle/Particle.js' ;
39
38
import { identifyService } from 'libp2p/identify' ;
40
39
import { pingService } from 'libp2p/ping' ;
40
+ import { unmarshalPublicKey } from '@libp2p/crypto/keys' ;
41
+ import { peerIdFromString } from '@libp2p/peer-id' ;
42
+ import { Stream } from '@libp2p/interface/connection' ;
43
+ import { KeyPair } from '../keypair/index.js' ;
41
44
42
45
const log = logger ( 'connection' ) ;
43
46
@@ -170,6 +173,31 @@ export class RelayConnection implements IConnection {
170
173
) ;
171
174
log . trace ( 'data written to sink' ) ;
172
175
}
176
+
177
+ private async processIncomingMessage ( msg : string , stream : Stream ) {
178
+ let particle : Particle | undefined ;
179
+ try {
180
+ particle = Particle . fromString ( msg ) ;
181
+ log . trace ( 'got particle from stream with id %s and particle id %s' , stream . id , particle . id ) ;
182
+ const initPeerId = peerIdFromString ( particle . initPeerId ) ;
183
+
184
+ if ( initPeerId . publicKey === undefined ) {
185
+ log . error ( 'cannot retrieve public key from init_peer_id. particle id: %s. init_peer_id: %s' , particle . id , particle . initPeerId ) ;
186
+ return ;
187
+ }
188
+
189
+ const isVerified = await verifySignature ( particle , initPeerId . publicKey ) ;
190
+ if ( isVerified ) {
191
+ this . particleSource . next ( particle ) ;
192
+ } else {
193
+ log . trace ( 'particle signature is incorrect. rejecting particle with id: %s' , particle . id ) ;
194
+ }
195
+ } catch ( e ) {
196
+ const particleId = particle ?. id ;
197
+ const particleIdMessage = typeof particleId === 'string' ? `. particle id: ${ particleId } ` : '' ;
198
+ log . error ( `error on handling an incoming message: %O%s` , e , particleIdMessage ) ;
199
+ }
200
+ }
173
201
174
202
private async connect ( ) {
175
203
if ( this . lib2p2Peer === null ) {
@@ -178,30 +206,20 @@ export class RelayConnection implements IConnection {
178
206
179
207
await this . lib2p2Peer . handle (
180
208
[ PROTOCOL_NAME ] ,
181
- async ( { connection, stream } ) => {
182
- pipe (
183
- stream . source ,
184
- // @ts -ignore
185
- decode ( ) ,
186
- // @ts -ignore
187
- ( source ) => map ( source , ( buf ) => toString ( buf . subarray ( ) ) ) ,
188
- async ( source ) => {
189
- try {
190
- for await ( const msg of source ) {
191
- try {
192
- const particle = Particle . fromString ( msg ) ;
193
- log . trace ( 'got particle from stream with id %s and particle id %s' , stream . id , particle . id ) ;
194
- this . particleSource . next ( particle ) ;
195
- } catch ( e ) {
196
- log . error ( 'error on handling a new incoming message: %j' , e ) ;
197
- }
198
- }
199
- } catch ( e ) {
200
- log . error ( 'connection closed: %j' , e ) ;
209
+ async ( { connection, stream } ) => pipe (
210
+ stream . source ,
211
+ decode ( ) ,
212
+ ( source ) => map ( source , ( buf ) => toString ( buf . subarray ( ) ) ) ,
213
+ async ( source ) => {
214
+ try {
215
+ for await ( const msg of source ) {
216
+ await this . processIncomingMessage ( msg , stream ) ;
201
217
}
202
- } ,
203
- ) ;
204
- } ,
218
+ } catch ( e ) {
219
+ log . error ( 'connection closed: %j' , e ) ;
220
+ }
221
+ } ,
222
+ ) ,
205
223
{
206
224
maxInboundStreams : this . config . maxInboundStreams ,
207
225
maxOutboundStreams : this . config . maxOutboundStreams ,
0 commit comments