Skip to content

Commit

Permalink
fix: Tag mesh peers on graft, remove tags on prune (#380)
Browse files Browse the repository at this point in the history
  • Loading branch information
maschad committed Dec 14, 2022
1 parent 23d49be commit b82f169
Showing 1 changed file with 47 additions and 55 deletions.
102 changes: 47 additions & 55 deletions src/index.ts
Original file line number Diff line number Diff line change
@@ -1,31 +1,40 @@
import { pipe } from 'it-pipe'
import type { Connection, Stream } from '@libp2p/interface-connection'
import { RecordEnvelope } from '@libp2p/peer-record'
import { peerIdFromBytes, peerIdFromString } from '@libp2p/peer-id'
import { Logger, logger } from '@libp2p/logger'
import { createTopology } from '@libp2p/topology'
import type { PeerId } from '@libp2p/interface-peer-id'
import { CustomEvent, EventEmitter } from '@libp2p/interfaces/events'
import { Logger, logger } from '@libp2p/logger'
import { peerIdFromBytes, peerIdFromString } from '@libp2p/peer-id'
import { RecordEnvelope } from '@libp2p/peer-record'
import { createTopology } from '@libp2p/topology'
import { pipe } from 'it-pipe'

import { MessageCache } from './message-cache.js'
import { RPC, IRPC } from './message/rpc.js'
import * as constants from './constants.js'
import { shuffle, messageIdToString } from './utils/index.js'
import { ConnectionManager } from '@libp2p/interface-connection-manager'
import { PeerMultiaddrsChangeData, PeerStore } from '@libp2p/interface-peer-store'
import {
PeerScore,
PeerScoreParams,
PeerScoreThresholds,
createPeerScoreParams,
createPeerScoreThresholds,
PeerScoreStatsDump
} from './score/index.js'
import { IWantTracer } from './tracer.js'
import { SimpleTimeCache } from './utils/time-cache.js'
Message,
PublishResult,
PubSub,
PubSubEvents,
PubSubInit,
StrictNoSign,
StrictSign,
SubscriptionChangeData,
TopicValidatorFn,
TopicValidatorResult
} from '@libp2p/interface-pubsub'
import type { IncomingStreamData, Registrar } from '@libp2p/interface-registrar'
import { Multiaddr } from '@multiformats/multiaddr'
import { pushable } from 'it-pushable'
import { Uint8ArrayList } from 'uint8arraylist'
import type { GossipsubOptsSpec } from './config.js'
import * as constants from './constants.js'
import {
ACCEPT_FROM_WHITELIST_DURATION_MS,
ACCEPT_FROM_WHITELIST_MAX_MESSAGES,
ACCEPT_FROM_WHITELIST_THRESHOLD_SCORE
} from './constants.js'
import { MessageCache } from './message-cache.js'
import { decodeRpc, DecodeRPCLimits, defaultDecodeRpcLimits } from './message/decodeRpc.js'
import { IRPC, RPC } from './message/rpc.js'
import {
ChurnReason,
getMetrics,
Expand All @@ -38,49 +47,24 @@ import {
ToSendGroupCount
} from './metrics.js'
import {
MsgIdFn,
PublishConfig,
TopicStr,
MsgIdStr,
ValidateError,
PeerIdStr,
MessageStatus,
RejectReason,
RejectReasonObj,
FastMsgIdFn,
createPeerScoreParams,
createPeerScoreThresholds, PeerScore,
PeerScoreParams, PeerScoreStatsDump, PeerScoreThresholds
} from './score/index.js'
import { computeAllPeersScoreWeights } from './score/scoreMetrics.js'
import { InboundStream, OutboundStream } from './stream.js'
import { IWantTracer } from './tracer.js'
import {
AddrInfo,
DataTransform,
rejectReasonFromAcceptance,
MsgIdToStrFn,
MessageId
DataTransform, FastMsgIdFn, MessageId, MessageStatus, MsgIdFn, MsgIdStr, MsgIdToStrFn, PeerIdStr, PublishConfig, RejectReason, rejectReasonFromAcceptance, RejectReasonObj, TopicStr, ValidateError
} from './types.js'
import { buildRawMessage, validateToRawMessage } from './utils/buildRawMessage.js'
import { messageIdToString, shuffle } from './utils/index.js'
import { msgIdFnStrictNoSign, msgIdFnStrictSign } from './utils/msgIdFn.js'
import { computeAllPeersScoreWeights } from './score/scoreMetrics.js'
import { multiaddrToIPStr } from './utils/multiaddr.js'
import { getPublishConfigFromPeerId } from './utils/publishConfig.js'
import type { GossipsubOptsSpec } from './config.js'
import {
Message,
PublishResult,
PubSub,
PubSubEvents,
PubSubInit,
StrictNoSign,
StrictSign,
SubscriptionChangeData,
TopicValidatorFn,
TopicValidatorResult
} from '@libp2p/interface-pubsub'
import type { IncomingStreamData, Registrar } from '@libp2p/interface-registrar'
import { removeFirstNItemsFromSet, removeItemsFromSet } from './utils/set.js'
import { pushable } from 'it-pushable'
import { InboundStream, OutboundStream } from './stream.js'
import { Uint8ArrayList } from 'uint8arraylist'
import { decodeRpc, DecodeRPCLimits, defaultDecodeRpcLimits } from './message/decodeRpc.js'
import { ConnectionManager } from '@libp2p/interface-connection-manager'
import { PeerMultiaddrsChangeData, PeerStore } from '@libp2p/interface-peer-store'
import { Multiaddr } from '@multiformats/multiaddr'
import { multiaddrToIPStr } from './utils/multiaddr.js'
import { SimpleTimeCache } from './utils/time-cache.js'

type ConnectionDirection = 'inbound' | 'outbound'

Expand Down Expand Up @@ -2559,6 +2543,9 @@ export class GossipSub extends EventEmitter<GossipsubEvents> implements PubSub<G
} else {
topics.push(topic)
}

// Untag peer upon pruning
this.components.peerStore.unTagPeer(id, 'gossipsub-mesh-peer')
}

const graftPeer = (id: PeerIdStr, reason: InclusionReason): void => {
Expand All @@ -2577,6 +2564,11 @@ export class GossipSub extends EventEmitter<GossipsubEvents> implements PubSub<G
} else {
topics.push(topic)
}

// Tag the peer upon grafting
this.components.peerStore.tagPeer(id, 'gossipsub-mesh-peer', {
value: 100 // value should be 0-100
})
}

// drop all peers with negative score, without PX
Expand Down

0 comments on commit b82f169

Please sign in to comment.