From d61d898d0dbcde20335c4657137feb7014f9b2ef Mon Sep 17 00:00:00 2001 From: Lion - dapplion <35266934+dapplion@users.noreply.github.com> Date: Tue, 5 Apr 2022 12:32:11 +0200 Subject: [PATCH] Bump to new gossipsub (#3858) * Bump to new gossipsub * Fix test type errors * Add getMeshPeers method * Cleanup test nodes on success and error * Add allowPublishToZeroPeers option * Add allowPublishToZeroPeers to dev options * Add scripts to run dev servers * Bump gossipsub * Bump gossipsub * Expose gossip peer score stats * bump gossipsub * Dump gossipsub score params * Bump lodestar * Add behaviourPenaltyThreshold to gossip scoring params * Switch to latest gossipsub update * Fix cli unit test * Fix unknownBlockSync e2e test * Update latest gossipsub * Fix e2e - network addr in use * Use latest gossipsub 3efae56c9d39ff3a047a7e6c65fa32e5756bebde * Use latest gossipsub 039af54c7bf47396c1be414d9fa33aa2e8cc2697 * Increase meshMessageDeliveriesWindow and gossipsubIWantFollowupMs * Set gossipsubIWantFollowupMs and meshMessageDeliveriesWindow 12s * Reset gossipsub metric behaviourPenalty per scrape * Compute score using lodestar score and gossipsub score (#3875) * Refactor PeerRpcScoreStore: add PeerScore class * Aggregate lodestarScore, gossipsubScore to compute final score * updateGossipsubScores util and unit test * Populate PeerScore on updateGossipsubScore * Fix peerManager e2e test * Fix test/sim/multiNodeSingleThread.test.ts * Update latest gossipsub * lodestar_gossip_mesh_peers_by_client_count metric * Log sent peers in gossipsub.publishObject() * Fix lodestar_gossip_mesh_peers_by_client_count metric * Fix lodestar_gossip_mesh_peers_by_client_count metric * Add asyncValidation=true option to gossipsub * Fix sha256 usage * Go with libp2p-gossipsub 0.14.0 Co-authored-by: Tuyen Nguyen Co-authored-by: Cayman --- packages/api/src/routes/lodestar.ts | 9 +- packages/cli/src/cmds/dev/options.ts | 8 +- .../src/options/beaconNodeOptions/network.ts | 9 + .../unit/options/beaconNodeOptions.test.ts | 2 + packages/lodestar/package.json | 5 +- .../lodestar/src/api/impl/lodestar/index.ts | 9 +- packages/lodestar/src/metrics/metrics.ts | 28 +- .../lodestar/src/metrics/metrics/lodestar.ts | 5 + .../lodestar/src/network/gossip/encoding.ts | 146 +++---- .../lodestar/src/network/gossip/gossipsub.ts | 409 +++++++----------- .../lodestar/src/network/gossip/interface.ts | 29 +- .../src/network/gossip/scoringParameters.ts | 8 +- .../src/network/gossip/validation/index.ts | 25 +- .../src/network/gossip/validation/queue.ts | 7 +- packages/lodestar/src/network/network.ts | 4 +- packages/lodestar/src/network/options.ts | 3 +- .../lodestar/src/network/peers/peerManager.ts | 25 +- packages/lodestar/src/network/peers/score.ts | 167 +++++-- .../test/e2e/chain/lightclient.test.ts | 4 + .../test/e2e/network/gossipsub.test.ts | 10 +- .../lodestar/test/e2e/network/network.test.ts | 6 +- .../e2e/network/peers/peerManager.test.ts | 3 +- .../test/e2e/sync/finalizedSync.test.ts | 8 +- .../test/e2e/sync/unknownBlockSync.test.ts | 12 +- packages/lodestar/test/e2e/sync/wss.test.ts | 17 +- .../test/scripts/blsPubkeyBytesFrequency.ts | 4 +- .../lodestar/test/sim/merge-interop.test.ts | 2 +- .../test/sim/multiNodeSingleThread.test.ts | 15 +- .../test/sim/singleNodeSingleThread.test.ts | 1 + .../network/gossip/scoringParameters.test.ts | 6 +- .../unit/network/gossip/gossipsub.test.ts | 106 ----- .../test/unit/network/peers/score.test.ts | 46 +- scripts/dev/node1.sh | 19 + scripts/dev/node2.sh | 22 + yarn.lock | 48 +- 35 files changed, 643 insertions(+), 584 deletions(-) delete mode 100644 packages/lodestar/test/unit/network/gossip/gossipsub.test.ts create mode 100755 scripts/dev/node1.sh create mode 100755 scripts/dev/node2.sh diff --git a/packages/api/src/routes/lodestar.ts b/packages/api/src/routes/lodestar.ts index 79063e337b5..7dc3e249ddf 100644 --- a/packages/api/src/routes/lodestar.ts +++ b/packages/api/src/routes/lodestar.ts @@ -17,9 +17,10 @@ export type SyncChainDebugState = { export type GossipQueueItem = { topic: unknown; - receivedFrom: string; + propagationSource: string; data: Uint8Array; addedTimeMs: number; + seenTimestampSec: number; }; export type RegenQueueItem = { @@ -66,6 +67,8 @@ export type Api = { getStateCacheItems(): Promise; /** Dump a summary of the states in the CheckpointStateCache */ getCheckpointStateCacheItems(): Promise; + /** Dump peer gossip stats by peer */ + getGossipPeerScoreStats(): Promise>; /** Run GC with `global.gc()` */ runGC(): Promise; /** Drop all states in the state cache */ @@ -95,6 +98,7 @@ export const routesData: RoutesData = { getBlockProcessorQueueItems: {url: "/eth/v1/lodestar/block-processor-queue-items", method: "GET"}, getStateCacheItems: {url: "/eth/v1/lodestar/state-cache-items", method: "GET"}, getCheckpointStateCacheItems: {url: "/eth/v1/lodestar/checkpoint-state-cache-items", method: "GET"}, + getGossipPeerScoreStats: {url: "/eth/v1/lodestar/gossip-peer-score-stats", method: "GET"}, runGC: {url: "/eth/v1/lodestar/gc", method: "POST"}, dropStateCache: {url: "/eth/v1/lodestar/drop-state-cache", method: "POST"}, connectPeer: {url: "/eth/v1/lodestar/connect_peer", method: "POST"}, @@ -113,6 +117,7 @@ export type ReqTypes = { getBlockProcessorQueueItems: ReqEmpty; getStateCacheItems: ReqEmpty; getCheckpointStateCacheItems: ReqEmpty; + getGossipPeerScoreStats: ReqEmpty; runGC: ReqEmpty; dropStateCache: ReqEmpty; connectPeer: {query: {peerId: string; multiaddr: string[]}}; @@ -140,6 +145,7 @@ export function getReqSerializers(): ReqSerializers { getBlockProcessorQueueItems: reqEmpty, getStateCacheItems: reqEmpty, getCheckpointStateCacheItems: reqEmpty, + getGossipPeerScoreStats: reqEmpty, runGC: reqEmpty, dropStateCache: reqEmpty, connectPeer: { @@ -173,6 +179,7 @@ export function getReturnTypes(): ReturnTypes { getBlockProcessorQueueItems: jsonType("camel"), getStateCacheItems: jsonType("camel"), getCheckpointStateCacheItems: jsonType("camel"), + getGossipPeerScoreStats: jsonType("camel"), getPeers: jsonType("camel"), discv5GetKadValues: jsonType("camel"), }; diff --git a/packages/cli/src/cmds/dev/options.ts b/packages/cli/src/cmds/dev/options.ts index 20dfcea7ccc..b1e756b1ec1 100644 --- a/packages/cli/src/cmds/dev/options.ts +++ b/packages/cli/src/cmds/dev/options.ts @@ -35,8 +35,7 @@ const devOwnOptions: ICliCommandOptions = { }, startValidators: { - description: "Start interop validators in given range", - default: "0:7", + description: "Start interop validators in inclusive range with notation '0:7'", type: "string", group: "dev", }, @@ -74,6 +73,11 @@ const externalOptionsOverrides: {[k: string]: Options} = { defaultDescription: undefined, default: true, }, + "network.allowPublishToZeroPeers": { + ...beaconNodeOptions["network.allowPublishToZeroPeers"], + defaultDescription: undefined, + default: true, + }, "network.maxPeers": { ...beaconNodeOptions["network.maxPeers"], defaultDescription: undefined, diff --git a/packages/cli/src/options/beaconNodeOptions/network.ts b/packages/cli/src/options/beaconNodeOptions/network.ts index 08de9114413..f5e79ff0de6 100644 --- a/packages/cli/src/options/beaconNodeOptions/network.ts +++ b/packages/cli/src/options/beaconNodeOptions/network.ts @@ -17,6 +17,7 @@ export interface INetworkArgs { "network.blockCountPeerLimit": number; "network.rateTrackerTimeoutMs": number; "network.dontSendGossipAttestationsToForkchoice": boolean; + "network.allowPublishToZeroPeers": boolean; } export function parseArgs(args: INetworkArgs): IBeaconNodeOptions["network"] { @@ -40,6 +41,7 @@ export function parseArgs(args: INetworkArgs): IBeaconNodeOptions["network"] { blockCountPeerLimit: args["network.blockCountPeerLimit"], rateTrackerTimeoutMs: args["network.rateTrackerTimeoutMs"], dontSendGossipAttestationsToForkchoice: args["network.dontSendGossipAttestationsToForkchoice"], + allowPublishToZeroPeers: args["network.allowPublishToZeroPeers"], }; } @@ -153,4 +155,11 @@ export const options: ICliCommandOptions = { description: "Pass gossip attestations to forkchoice or not", group: "network", }, + + "network.allowPublishToZeroPeers": { + hidden: true, + type: "boolean", + description: "Don't error when publishing to zero peers", + group: "network", + }, }; diff --git a/packages/cli/test/unit/options/beaconNodeOptions.test.ts b/packages/cli/test/unit/options/beaconNodeOptions.test.ts index 3a9e9f36ec9..8c6d211865b 100644 --- a/packages/cli/test/unit/options/beaconNodeOptions.test.ts +++ b/packages/cli/test/unit/options/beaconNodeOptions.test.ts @@ -55,6 +55,7 @@ describe("options / beaconNodeOptions", () => { "network.blockCountPeerLimit": 500, "network.rateTrackerTimeoutMs": 60000, "network.dontSendGossipAttestationsToForkchoice": true, + "network.allowPublishToZeroPeers": true, "sync.isSingleNode": true, "sync.disableProcessAsChainSegment": true, "sync.backfillBatchSize": 64, @@ -120,6 +121,7 @@ describe("options / beaconNodeOptions", () => { blockCountPeerLimit: 500, rateTrackerTimeoutMs: 60000, dontSendGossipAttestationsToForkchoice: true, + allowPublishToZeroPeers: true, }, sync: { isSingleNode: true, diff --git a/packages/lodestar/package.json b/packages/lodestar/package.json index d549430bde7..6521952e667 100644 --- a/packages/lodestar/package.json +++ b/packages/lodestar/package.json @@ -80,10 +80,10 @@ "@chainsafe/ssz": "^0.9.0", "@ethersproject/abi": "^5.0.0", "@types/datastore-level": "^3.0.0", - "datastore-core": "^7.0.1", "bl": "^5.0.0", "buffer-xor": "^2.0.2", "cross-fetch": "^3.1.4", + "datastore-core": "^7.0.1", "datastore-level": "^6.0.2", "deepmerge": "^3.2.0", "fastify": "3.15.1", @@ -96,8 +96,7 @@ "jwt-simple": "0.5.6", "libp2p": "^0.36.2", "libp2p-bootstrap": "^0.14.0", - "libp2p-gossipsub": "^0.13.2", - "libp2p-interfaces": "^4.0.4", + "libp2p-gossipsub": "^0.14.0", "libp2p-mdns": "^0.18.0", "libp2p-mplex": "^0.10.5", "libp2p-tcp": "^0.17.2", diff --git a/packages/lodestar/src/api/impl/lodestar/index.ts b/packages/lodestar/src/api/impl/lodestar/index.ts index 1b699a18730..9f401c448f3 100644 --- a/packages/lodestar/src/api/impl/lodestar/index.ts +++ b/packages/lodestar/src/api/impl/lodestar/index.ts @@ -88,12 +88,13 @@ export function getLodestarApi({ } return jobQueue.getItems().map((item) => { - const [topic, message] = item.args; + const [topic, message, propagationSource, seenTimestampSec] = item.args; return { topic: topic, - receivedFrom: message.receivedFrom, + propagationSource, data: message.data, addedTimeMs: item.addedTimeMs, + seenTimestampSec, }; }); }, @@ -130,6 +131,10 @@ export function getLodestarApi({ return (chain as BeaconChain)["checkpointStateCache"].dumpSummary(); }, + async getGossipPeerScoreStats() { + return network.gossip.dumpPeerScoreStats(); + }, + async runGC() { // eslint-disable-next-line @typescript-eslint/strict-boolean-expressions if (!global.gc) throw Error("You must expose GC running the Node.js process with 'node --expose_gc'"); diff --git a/packages/lodestar/src/metrics/metrics.ts b/packages/lodestar/src/metrics/metrics.ts index 6fbef215de8..f4ec0e2ce2d 100644 --- a/packages/lodestar/src/metrics/metrics.ts +++ b/packages/lodestar/src/metrics/metrics.ts @@ -3,7 +3,7 @@ */ import {BeaconStateAllForks, getCurrentSlot} from "@chainsafe/lodestar-beacon-state-transition"; import {IChainForkConfig} from "@chainsafe/lodestar-config"; -import {collectDefaultMetrics, Counter, Registry} from "prom-client"; +import {collectDefaultMetrics, Counter, Metric, Registry} from "prom-client"; import gcStats from "prometheus-gc-stats"; import {DbMetricLabels, IDbMetrics} from "@chainsafe/lodestar-db"; import {createBeaconMetrics, IBeaconMetrics} from "./metrics/beacon"; @@ -12,13 +12,13 @@ import {IMetricsOptions} from "./options"; import {RegistryMetricCreator} from "./utils/registryMetricCreator"; import {createValidatorMonitor, IValidatorMonitor} from "./validatorMonitor"; -export type IMetrics = IBeaconMetrics & ILodestarMetrics & IValidatorMonitor & {register: Registry}; +export type IMetrics = IBeaconMetrics & ILodestarMetrics & IValidatorMonitor & {register: RegistryMetricCreator}; export function createMetrics( opts: IMetricsOptions, config: IChainForkConfig, anchorState: BeaconStateAllForks, - registries: Registry[] = [] + externalRegistries: Registry[] = [] ): IMetrics { const register = new RegistryMetricCreator(); const beacon = createBeaconMetrics(register); @@ -47,7 +47,24 @@ export function createMetrics( // - nodejs_gc_reclaimed_bytes_total: The number of bytes GC has freed gcStats(register)(); - return {...beacon, ...lodestar, ...validatorMonitor, register: Registry.merge([register, ...registries])}; + // Merge external registries + register; + for (const externalRegister of externalRegistries) { + // Wrong types, does not return a promise + const metrics = (externalRegister.getMetricsAsArray() as unknown) as Resolves< + typeof externalRegister.getMetricsAsArray + >; + for (const metric of metrics) { + register.registerMetric((metric as unknown) as Metric); + } + } + + return { + ...beacon, + ...lodestar, + ...validatorMonitor, + register, + }; } export function createDbMetrics(): {metrics: IDbMetrics; registry: Registry} { @@ -68,3 +85,6 @@ export function createDbMetrics(): {metrics: IDbMetrics; registry: Registry} { registry.registerMetric(metrics.dbWrites); return {metrics, registry}; } + +// eslint-disable-next-line @typescript-eslint/no-explicit-any +type Resolves Promise> = F extends (...args: any[]) => Promise ? T : never; diff --git a/packages/lodestar/src/metrics/metrics/lodestar.ts b/packages/lodestar/src/metrics/metrics/lodestar.ts index dbe4d253a0f..a1b31c20308 100644 --- a/packages/lodestar/src/metrics/metrics/lodestar.ts +++ b/packages/lodestar/src/metrics/metrics/lodestar.ts @@ -189,6 +189,11 @@ export function createLodestarMetrics( help: "Gossip peer score by threashold", labelNames: ["threshold"], }), + meshPeersByClient: register.gauge<"client">({ + name: "lodestar_gossip_mesh_peers_by_client_count", + help: "number of mesh peers, labeled by client", + labelNames: ["client"], + }), score: register.avgMinMax({ name: "lodestar_gossip_score_avg_min_max", help: "Avg min max of all gossip peer scores", diff --git a/packages/lodestar/src/network/gossip/encoding.ts b/packages/lodestar/src/network/gossip/encoding.ts index 45d249b5a81..19901a033bb 100644 --- a/packages/lodestar/src/network/gossip/encoding.ts +++ b/packages/lodestar/src/network/gossip/encoding.ts @@ -1,106 +1,80 @@ +import {digest} from "@chainsafe/as-sha256"; import {compress, uncompress} from "snappyjs"; import {intToBytes} from "@chainsafe/lodestar-utils"; -import {digest} from "@chainsafe/as-sha256"; import {ForkName} from "@chainsafe/lodestar-params"; -import { - DEFAULT_ENCODING, - GOSSIP_MSGID_LENGTH, - MESSAGE_DOMAIN_INVALID_SNAPPY, - MESSAGE_DOMAIN_VALID_SNAPPY, -} from "./constants"; -import {Eth2InMessage, GossipEncoding, GossipTopic} from "./interface"; +import {MESSAGE_DOMAIN_VALID_SNAPPY} from "./constants"; +import {GossipTopicCache} from "./topic"; +import {RPC} from "libp2p-gossipsub/src/message/rpc"; +import {GossipsubMessage} from "libp2p-gossipsub/src/types"; /** - * Uncompressed data is used to - * - compute message id - * - if message is not seen then we use it to deserialize to gossip object - * - * We cache uncompressed data in InMessage to prevent uncompressing multiple times. + * The function used to generate a gossipsub message id + * We use the first 8 bytes of SHA256(data) for content addressing */ -export function getUncompressedData(msg: Eth2InMessage): Uint8Array { - if (!msg.uncompressedData) { - msg.uncompressedData = uncompress(msg.data); - } - - return msg.uncompressedData; -} - -export function encodeMessageData(encoding: GossipEncoding, msgData: Uint8Array): Uint8Array { - switch (encoding) { - case GossipEncoding.ssz_snappy: - return compress(msgData); - - default: - throw new Error(`Unsupported encoding ${encoding}`); +export function fastMsgIdFn(rpcMsg: RPC.IMessage): string { + if (rpcMsg.data) { + return Buffer.from(digest(rpcMsg.data)).slice(0, 8).toString("hex"); + } else { + return "0000000000000000"; } } /** - * Function to compute message id for all forks. + * Only valid msgId. Messages that fail to snappy_decompress() are not tracked */ -export function computeMsgId(topic: GossipTopic, topicStr: string, msg: Eth2InMessage): Uint8Array { +export function msgIdFn(gossipTopicCache: GossipTopicCache, msg: GossipsubMessage): Uint8Array { + const topic = gossipTopicCache.getTopic(msg.topic); + + let vec: Uint8Array[]; + switch (topic.fork) { + // message id for phase0. + // ``` + // SHA256(MESSAGE_DOMAIN_VALID_SNAPPY + snappy_decompress(message.data))[:20] + // ``` case ForkName.phase0: - return computeMsgIdPhase0(topic, msg); + vec = [MESSAGE_DOMAIN_VALID_SNAPPY, msg.data]; + break; + + // message id for altair. + // ``` + // SHA256( + // MESSAGE_DOMAIN_VALID_SNAPPY + + // uint_to_bytes(uint64(len(message.topic))) + + // message.topic + + // snappy_decompress(message.data) + // )[:20] + // ``` + // https://github.com/ethereum/eth2.0-specs/blob/v1.1.0-alpha.7/specs/altair/p2p-interface.md#topics-and-messages case ForkName.altair: - case ForkName.bellatrix: - return computeMsgIdAltair(topic, topicStr, msg); + case ForkName.bellatrix: { + vec = [MESSAGE_DOMAIN_VALID_SNAPPY, intToBytes(msg.topic.length, 8), Buffer.from(msg.topic), msg.data]; + break; + } } -} -/** - * Function to compute message id for phase0. - * ``` - * SHA256(MESSAGE_DOMAIN_VALID_SNAPPY + snappy_decompress(message.data))[:20] - * ``` - */ -export function computeMsgIdPhase0(topic: GossipTopic, msg: Eth2InMessage): Uint8Array { - switch (topic.encoding ?? DEFAULT_ENCODING) { - case GossipEncoding.ssz_snappy: - try { - const uncompressed = getUncompressedData(msg); - return hashGossipMsgData(MESSAGE_DOMAIN_VALID_SNAPPY, uncompressed); - } catch (e) { - return hashGossipMsgData(MESSAGE_DOMAIN_INVALID_SNAPPY, msg.data); - } - } + return digest(Buffer.concat(vec)).slice(0, 20); } -/** - * Function to compute message id for altair. - * - * ``` - * SHA256( - * MESSAGE_DOMAIN_VALID_SNAPPY + - * uint_to_bytes(uint64(len(message.topic))) + - * message.topic + - * snappy_decompress(message.data) - * )[:20] - * ``` - * https://github.com/ethereum/consensus-specs/blob/v1.1.10/specs/altair/p2p-interface.md#topics-and-messages - */ -export function computeMsgIdAltair(topic: GossipTopic, topicStr: string, msg: Eth2InMessage): Uint8Array { - switch (topic.encoding ?? DEFAULT_ENCODING) { - case GossipEncoding.ssz_snappy: - try { - const uncompressed = getUncompressedData(msg); - return hashGossipMsgData( - MESSAGE_DOMAIN_VALID_SNAPPY, - intToBytes(topicStr.length, 8), - Buffer.from(topicStr), - uncompressed - ); - } catch (e) { - return hashGossipMsgData( - MESSAGE_DOMAIN_INVALID_SNAPPY, - intToBytes(topicStr.length, 8), - Buffer.from(topicStr), - msg.data - ); - } - } -} +export class DataTransformSnappy { + constructor(private readonly gossipTopicCache: GossipTopicCache) {} -function hashGossipMsgData(...dataArrToHash: Uint8Array[]): Uint8Array { - return digest(Buffer.concat(dataArrToHash)).slice(0, GOSSIP_MSGID_LENGTH); + /** + * Takes the data published by peers on a topic and transforms the data. + * Should be the reverse of outboundTransform(). Example: + * - `inboundTransform()`: decompress snappy payload + * - `outboundTransform()`: compress snappy payload + */ + inboundTransform(topicStr: string, data: Uint8Array): Uint8Array { + // No need to parse topic, everything is snappy compressed + return uncompress(data); + } + /** + * Takes the data to be published (a topic and associated data) transforms the data. The + * transformed data will then be used to create a `RawGossipsubMessage` to be sent to peers. + */ + outboundTransform(topicStr: string, data: Uint8Array): Uint8Array { + // No need to parse topic, everything is snappy compressed + return compress(data); + } } diff --git a/packages/lodestar/src/network/gossip/gossipsub.ts b/packages/lodestar/src/network/gossip/gossipsub.ts index bab213dcecc..111d94925ee 100644 --- a/packages/lodestar/src/network/gossip/gossipsub.ts +++ b/packages/lodestar/src/network/gossip/gossipsub.ts @@ -1,10 +1,9 @@ /* eslint-disable @typescript-eslint/naming-convention */ -import Gossipsub from "libp2p-gossipsub"; -import {messageIdToString} from "libp2p-gossipsub/src/utils/messageIdToString"; -import {digest} from "@chainsafe/as-sha256"; -import {ERR_TOPIC_VALIDATOR_IGNORE, ERR_TOPIC_VALIDATOR_REJECT} from "libp2p-gossipsub/src/constants"; -import {InMessage, utils} from "libp2p-interfaces/src/pubsub"; import Libp2p from "libp2p"; +import Gossipsub from "libp2p-gossipsub"; +import {GossipsubMessage, SignaturePolicy, TopicStr} from "libp2p-gossipsub/src/types"; +import {PeerScore, PeerScoreParams} from "libp2p-gossipsub/src/score"; +import PeerId from "peer-id"; import {AbortSignal} from "@chainsafe/abort-controller"; import {IBeaconConfig} from "@chainsafe/lodestar-config"; import {ATTESTATION_SUBNET_COUNT, ForkName, SYNC_COMMITTEE_SUBNET_COUNT} from "@chainsafe/lodestar-params"; @@ -21,18 +20,11 @@ import { GossipTypeMap, ValidatorFnsByType, GossipHandlers, - Eth2InMessage, } from "./interface"; import {getGossipSSZType, GossipTopicCache, stringifyGossipTopic} from "./topic"; -import {computeMsgId, encodeMessageData} from "./encoding"; -import {DEFAULT_ENCODING} from "./constants"; -import {GossipValidationError} from "./errors"; -import {GOSSIP_MAX_SIZE} from "../../constants"; +import {DataTransformSnappy, fastMsgIdFn, msgIdFn} from "./encoding"; import {createValidatorFnsByType} from "./validation"; import {Map2d, Map2dArr} from "../../util/map"; -import PeerStreams from "libp2p-interfaces/src/pubsub/peer-streams"; -import {RPC} from "libp2p-gossipsub/src/message/rpc"; -import {normalizeInRpcMessage} from "libp2p-interfaces/src/pubsub/utils"; import { computeGossipPeerScoreParams, @@ -43,8 +35,22 @@ import { } from "./scoringParameters"; import {Eth2Context} from "../../chain"; import {computeAllPeersScoreWeights} from "./scoreMetrics"; +import {MetricsRegister, TopicLabel, TopicStrToLabel} from "libp2p-gossipsub/src/metrics"; +import {PeersData} from "../peers/peersData"; +import {ClientKind} from "../peers/client"; + +/* eslint-disable @typescript-eslint/naming-convention */ + +// TODO: Export this type +type GossipsubEvents = { + "gossipsub:message": { + propagationSource: PeerId; + msgId: string; + msg: GossipsubMessage; + }; +}; -export interface IGossipsubModules { +export type Eth2GossipsubModules = { config: IBeaconConfig; libp2p: Libp2p; logger: ILogger; @@ -52,7 +58,12 @@ export interface IGossipsubModules { signal: AbortSignal; eth2Context: Eth2Context; gossipHandlers: GossipHandlers; -} + peersData: PeersData; +}; + +export type Eth2GossipsubOpts = { + allowPublishToZeroPeers?: boolean; +}; /** * Wrapper around js-libp2p-gossipsub with the following extensions: @@ -69,32 +80,48 @@ export interface IGossipsubModules { */ export class Eth2Gossipsub extends Gossipsub { readonly jobQueues: GossipJobQueues; + readonly scoreParams: Partial; private readonly config: IBeaconConfig; private readonly logger: ILogger; + private readonly peersData: PeersData; // Internal caches private readonly gossipTopicCache: GossipTopicCache; private readonly validatorFnsByType: ValidatorFnsByType; - constructor(modules: IGossipsubModules) { + constructor(opts: Eth2GossipsubOpts, modules: Eth2GossipsubModules) { + const gossipTopicCache = new GossipTopicCache(modules.config); + + const scoreParams = computeGossipPeerScoreParams(modules); + // Gossipsub parameters defined here: // https://github.com/ethereum/consensus-specs/blob/v1.1.10/specs/phase0/p2p-interface.md#the-gossip-domain-gossipsub super(modules.libp2p, { gossipIncoming: true, - globalSignaturePolicy: "StrictNoSign" as const, + globalSignaturePolicy: SignaturePolicy.StrictNoSign, + allowPublishToZeroPeers: opts.allowPublishToZeroPeers, D: GOSSIP_D, Dlo: GOSSIP_D_LOW, Dhi: GOSSIP_D_HIGH, Dlazy: 6, - scoreParams: computeGossipPeerScoreParams(modules), + scoreParams, scoreThresholds: gossipScoreThresholds, - fastMsgIdFn: (msg: InMessage) => Buffer.from(digest(msg.data)).toString("hex"), + // the default in gossipsub is 3s is not enough since lodestar suffers from I/O lag + gossipsubIWantFollowupMs: 12 * 1000, // 12s + fastMsgIdFn: fastMsgIdFn, + msgIdFn: msgIdFn.bind(msgIdFn, gossipTopicCache), + dataTransform: new DataTransformSnappy(gossipTopicCache), + metricsRegister: modules.metrics ? ((modules.metrics.register as unknown) as MetricsRegister) : null, + metricsTopicStrToLabel: modules.metrics ? getMetricsTopicStrToLabel(modules.config) : undefined, + asyncValidation: true, }); - const {config, logger, metrics, signal, gossipHandlers} = modules; + this.scoreParams = scoreParams; + const {config, logger, metrics, signal, gossipHandlers, peersData} = modules; this.config = config; this.logger = logger; - this.gossipTopicCache = new GossipTopicCache(config); + this.peersData = peersData; + this.gossipTopicCache = gossipTopicCache; // Note: We use the validator functions as handlers. No handler will be registered to gossipsub. // libp2p-js layer will emit the message to an EventEmitter that won't be listened by anyone. @@ -109,231 +136,14 @@ export class Eth2Gossipsub extends Gossipsub { this.jobQueues = jobQueues; if (metrics) { - metrics.gossipMesh.peersByType.addCollect(() => this.onScrapeMetrics(metrics)); - } - } - - async start(): Promise { - return super.start(); - } - - async stop(): Promise { - try { - await super.stop(); - } catch (error) { - if ((error as GossipValidationError).code !== "ERR_HEARTBEAT_NO_RUNNING") { - throw error; - } - } - } - - /** - * @override Use eth2 msg id and cache results to the msg - * The cached msgId inside the message will be ignored when we send messages to other peers - * since we don't have this field in protobuf. - */ - getMsgId(msg: Eth2InMessage): Uint8Array { - let msgId = msg.msgId; - if (!msgId) { - const topicStr = msg.topicIDs[0]; - const topic = this.gossipTopicCache.getTopic(topicStr); - msgId = computeMsgId(topic, topicStr, msg); - msg.msgId = msgId; - } - return msgId; - } - - /** - * Get cached message id string if we have it. - */ - getCachedMsgIdStr(msg: Eth2InMessage): string | undefined { - const cachedMsgId = msg.msgId; - return cachedMsgId ? messageIdToString(cachedMsgId) : undefined; - } - - // Temporaly reverts https://github.com/libp2p/js-libp2p-interfaces/pull/103 while a proper fixed is done upstream - // Lodestar wants to use our own queue instead of gossipsub queue introduced in https://github.com/libp2p/js-libp2p-interfaces/pull/103 - async _processRpc(idB58Str: string, peerStreams: PeerStreams, rpc: RPC): Promise { - this.log("rpc from", idB58Str); - const subs = rpc.subscriptions; - const msgs = rpc.msgs; - - if (subs.length) { - // update peer subscriptions - subs.forEach((subOpt) => { - this._processRpcSubOpt(idB58Str, subOpt); - }); - this.emit("pubsub:subscription-change", peerStreams.id, subs); - } - - if (!this._acceptFrom(idB58Str)) { - this.log("received message from unacceptable peer %s", idB58Str); - return false; - } - - if (msgs.length) { - await Promise.all( - msgs.map(async (message) => { - if ( - !( - this.canRelayMessage || - (message.topicIDs && message.topicIDs.some((topic) => this.subscriptions.has(topic))) - ) - ) { - this.log("received message we didn't subscribe to. Dropping."); - return; - } - const msg = normalizeInRpcMessage(message, idB58Str); - await this._processRpcMessage(msg); - }) - ); - } - // not a direct implementation of js-libp2p-gossipsub, this is from gossipsub - // https://github.com/ChainSafe/js-libp2p-gossipsub/blob/751ea73e9b7dc2287ca56786857d32ec2ce796b9/ts/index.ts#L366 - if (rpc.control) { - await super._processRpcControlMessage(idB58Str, rpc.control); - } - return true; - } - - /** - * Similar to gossipsub 0.13.0 except that no await - * TODO: override getMsgIdIfNotSeen and add metric - * See https://github.com/ChainSafe/js-libp2p-gossipsub/pull/187/files - */ - async _processRpcMessage(msg: InMessage): Promise { - let canonicalMsgIdStr; - if (this.getFastMsgIdStr && this.fastMsgIdCache) { - // check duplicate - // change: no await needed - const fastMsgIdStr = this.getFastMsgIdStr(msg); - canonicalMsgIdStr = this.fastMsgIdCache.get(fastMsgIdStr); - if (canonicalMsgIdStr !== undefined) { - void this.score.duplicateMessage(msg, canonicalMsgIdStr); - return; - } - // change: no await needed - canonicalMsgIdStr = messageIdToString(this.getMsgId(msg)); - - this.fastMsgIdCache.put(fastMsgIdStr, canonicalMsgIdStr); - } else { - // check duplicate - // change: no await needed - canonicalMsgIdStr = messageIdToString(this.getMsgId(msg)); - if (this.seenCache.has(canonicalMsgIdStr)) { - void this.score.duplicateMessage(msg, canonicalMsgIdStr); - return; - } - } - - // put in cache - this.seenCache.put(canonicalMsgIdStr); - - await this.score.validateMessage(canonicalMsgIdStr); - - // await super._processRpcMessage(msg); - // this is from libp2p-interface 4.0.4 - // https://github.com/libp2p/js-libp2p-interfaces/blob/libp2p-interfaces%404.0.4/packages/interfaces/src/pubsub/index.js#L461 - if (this.peerId.toB58String() === msg.from && !this.emitSelf) { - return; - } - - // Ensure the message is valid before processing it - try { - await this.validate(msg); - } catch (/** @type {any} */ err) { - this.log("Message is invalid, dropping it. %O", err); - return; - } - - // Emit to self: no need as we don't do that in this child class - // this._emitMessage(msg); - - return this._publish((utils.normalizeOutRpcMessage(msg) as unknown) as InMessage); - } - - // // Snippet of _processRpcMessage from https://github.com/libp2p/js-libp2p-interfaces/blob/92245d66b0073f0a72fed9f7abcf4b533102f1fd/packages/interfaces/src/pubsub/index.js#L442 - // async _processRpcMessage(msg: InMessage): Promise { - // try { - // await this.validate(msg); - // } catch (err) { - // this.log("Message is invalid, dropping it. %O", err); - // return; - // } - // } - - /** - * @override https://github.com/ChainSafe/js-libp2p-gossipsub/blob/3c3c46595f65823fcd7900ed716f43f76c6b355c/ts/index.ts#L436 - * @override https://github.com/libp2p/js-libp2p-interfaces/blob/ff3bd10704a4c166ce63135747e3736915b0be8d/src/pubsub/index.js#L513 - * Note: this does not call super. All logic is re-implemented below - */ - async validate(message: Eth2InMessage): Promise { - try { - // messages must have a single topicID - const topicStr = Array.isArray(message.topicIDs) ? message.topicIDs[0] : undefined; - - // message sanity check - if (!topicStr || message.topicIDs.length > 1) { - throw new GossipValidationError(ERR_TOPIC_VALIDATOR_REJECT, "Not exactly one topicID"); - } - if (message.data === undefined) { - throw new GossipValidationError(ERR_TOPIC_VALIDATOR_REJECT, "No message.data"); - } - if (message.data.length > GOSSIP_MAX_SIZE) { - throw new GossipValidationError(ERR_TOPIC_VALIDATOR_REJECT, "message.data too big"); - } - - if (message.from || message.signature || message.key || message.seqno) { - throw new GossipValidationError(ERR_TOPIC_VALIDATOR_REJECT, "StrictNoSigning invalid"); - } - - // We use 'StrictNoSign' policy, no need to validate message signature - - // Also validates that the topicStr is known - const topic = this.gossipTopicCache.getTopic(topicStr); - - // Get seenTimestamp before adding the message to the queue or add async delays - const seenTimestampSec = Date.now() / 1000; - - // No error here means that the incoming object is valid - await this.validatorFnsByType[topic.type](topic, message, seenTimestampSec); - } catch (e) { - // JobQueue may throw non-typed errors - const code = e instanceof GossipValidationError ? e.code : ERR_TOPIC_VALIDATOR_IGNORE; - // async to compute msgId with sha256 from multiformats/hashes/sha2 - const messageId = await this.getCanonicalMsgIdStr(message); - await this.score.rejectMessage(message, messageId, code); - await this.gossipTracer.rejectMessage(messageId, code); - throw e; + metrics.gossipMesh.peersByType.addCollect(() => this.onScrapeLodestarMetrics(metrics)); } - } - - /** - * @override - * See https://github.com/libp2p/js-libp2p-interfaces/blob/v0.5.2/src/pubsub/index.js#L428 - * - * Our handlers are attached on the validator functions, so no need to emit the objects internally. - */ - _emitMessage(): void { - // Objects are handled in the validator functions, no need to do anything here - } - /** - * @override - * Differs from upstream `unsubscribe` by _always_ unsubscribing, - * instead of unsubsribing only when no handlers are attached to the topic - * - * See https://github.com/libp2p/js-libp2p-interfaces/blob/v0.8.3/src/pubsub/index.js#L720 - */ - unsubscribe(topicStr: string): void { - if (!this.started) { - throw new Error("Pubsub is not started"); - } + this.on("gossipsub:message", this.onGossipsubMessage.bind(this)); - if (this.subscriptions.has(topicStr)) { - this.subscriptions.delete(topicStr); - this.peers.forEach((_, id) => this._sendSubscriptions(id, [topicStr], false)); - } + // Having access to this data is CRUCIAL for debugging. While this is a massive log, it must not be deleted. + // Scoring issues require this dump + current peer score stats to re-calculate scores. + this.logger.debug("Gossipsub score params", {params: JSON.stringify(scoreParams)}); } /** @@ -341,10 +151,10 @@ export class Eth2Gossipsub extends Gossipsub { */ async publishObject(topic: GossipTopicMap[K], object: GossipTypeMap[K]): Promise { const topicStr = this.getGossipTopicString(topic); - this.logger.verbose("Publish to topic", {topic: topicStr}); const sszType = getGossipSSZType(topic); const messageData = (sszType.serialize as (object: GossipTypeMap[GossipType]) => Uint8Array)(object); - await this.publish(topicStr, encodeMessageData(topic.encoding ?? DEFAULT_ENCODING, messageData)); + const sentPeers = await this.publish(topicStr, messageData); + this.logger.verbose("Publish to topic", {topic: topicStr, sentPeers}); } /** @@ -427,10 +237,17 @@ export class Eth2Gossipsub extends Gossipsub { return stringifyGossipTopic(this.config, topic); } - private onScrapeMetrics(metrics: IMetrics): void { - for (const {peersMap, metricsGossip} of [ - {peersMap: this.mesh, metricsGossip: metrics.gossipMesh}, - {peersMap: this.topics, metricsGossip: metrics.gossipTopic}, + private onScrapeLodestarMetrics(metrics: IMetrics): void { + const mesh = this["mesh"] as Map>; + const topics = this["topics"] as Map>; + const peers = this["peers"] as Map; + const score = this["score"] as PeerScore; + const meshPeersByClient = new Map(); + const meshPeerIdStrs = new Set(); + + for (const {peersMap, metricsGossip, type} of [ + {peersMap: mesh, metricsGossip: metrics.gossipMesh, type: "mesh"}, + {peersMap: topics, metricsGossip: metrics.gossipTopic, type: "topics"}, ]) { // Pre-aggregate results by fork so we can fill the remaining metrics with 0 const peersByTypeByFork = new Map2d(); @@ -454,6 +271,17 @@ export class Eth2Gossipsub extends Gossipsub { peersByTypeByFork.set(topic.fork, topic.type, peers.size); } } + + if (type === "mesh") { + for (const peer of peers) { + if (!meshPeerIdStrs.has(peer)) { + meshPeerIdStrs.add(peer); + const client = + this.peersData.connectedPeers.get(peer)?.agentClient?.toString() ?? ClientKind.Unknown.toString(); + meshPeersByClient.set(client, (meshPeersByClient.get(client) ?? 0) + 1); + } + } + } } // beacon attestation mesh gets counted separately so we can track mesh peers by subnet @@ -479,21 +307,25 @@ export class Eth2Gossipsub extends Gossipsub { } } + for (const [client, peers] of meshPeersByClient.entries()) { + metrics.gossipPeer.meshPeersByClient.set({client}, peers); + } + // track gossip peer score let peerCountScoreGraylist = 0; let peerCountScorePublish = 0; let peerCountScoreGossip = 0; let peerCountScoreMesh = 0; const {graylistThreshold, publishThreshold, gossipThreshold} = gossipScoreThresholds; - const gossipScores = []; - - for (const peerIdStr of this.peers.keys()) { - const score = this.score.score(peerIdStr); - if (score >= graylistThreshold) peerCountScoreGraylist++; - if (score >= publishThreshold) peerCountScorePublish++; - if (score >= gossipThreshold) peerCountScoreGossip++; - if (score >= 0) peerCountScoreMesh++; - gossipScores.push(score); + const gossipScores: number[] = []; + + for (const peerIdStr of peers.keys()) { + const s = score.score(peerIdStr); + if (s >= graylistThreshold) peerCountScoreGraylist++; + if (s >= publishThreshold) peerCountScorePublish++; + if (s >= gossipThreshold) peerCountScoreGossip++; + if (s >= 0) peerCountScoreMesh++; + gossipScores.push(s); } // Access once for all calls below @@ -504,11 +336,12 @@ export class Eth2Gossipsub extends Gossipsub { scoreByThreshold.set({threshold: "mesh"}, peerCountScoreMesh); // Breakdown on each score weight + // TODO: consider removing as it's duplicate to new gossipsub const sw = computeAllPeersScoreWeights( - this.peers.keys(), - this.score.peerStats, - this.score.params, - this.score.peerIPs, + peers.keys(), + score.peerStats, + score.params, + score.peerIPs, this.gossipTopicCache ); @@ -527,6 +360,30 @@ export class Eth2Gossipsub extends Gossipsub { // Register full score too metrics.gossipPeer.score.set(sw.score); } + + private onGossipsubMessage(event: GossipsubEvents["gossipsub:message"]): void { + const {propagationSource, msgId, msg} = event; + + // TODO: validation GOSSIP_MAX_SIZE + // - Should be done here after inserting the message in the mcache? + // - Should be done in the inboundtransform? + // - Should be a parameter in gossipsub: maxMsgDataSize? + + // Also validates that the topicStr is known + const topic = this.gossipTopicCache.getTopic(msg.topic); + + // Get seenTimestamp before adding the message to the queue or add async delays + const seenTimestampSec = Date.now() / 1000; + + // Puts object in queue, validates, then processes + this.validatorFnsByType[topic.type](topic, msg, propagationSource.toString(), seenTimestampSec) + .then((acceptance) => { + this.reportMessageValidationResult(msgId, propagationSource, acceptance); + }) + .catch((e) => { + this.logger.error("Error onGossipsubMessage", {}, e); + }); + } } /** @@ -537,3 +394,31 @@ function attSubnetLabel(subnet: number): string { if (subnet > 9) return String(subnet); else return `0${subnet}`; } + +function getMetricsTopicStrToLabel(config: IBeaconConfig): TopicStrToLabel { + const metricsTopicStrToLabel = new Map(); + const topics: GossipTopic[] = []; + + for (const {name: fork} of config.forksAscendingEpochOrder) { + for (let subnet = 0; subnet < ATTESTATION_SUBNET_COUNT; subnet++) { + topics.push({fork, type: GossipType.beacon_attestation, subnet}); + } + + for (let subnet = 0; subnet < SYNC_COMMITTEE_SUBNET_COUNT; subnet++) { + topics.push({fork, type: GossipType.sync_committee, subnet}); + } + + topics.push({fork, type: GossipType.beacon_block}); + topics.push({fork, type: GossipType.beacon_aggregate_and_proof}); + topics.push({fork, type: GossipType.voluntary_exit}); + topics.push({fork, type: GossipType.proposer_slashing}); + topics.push({fork, type: GossipType.attester_slashing}); + topics.push({fork, type: GossipType.sync_committee_contribution_and_proof}); + } + + for (const topic of topics) { + metricsTopicStrToLabel.set(stringifyGossipTopic(config, topic), topic.type); + } + + return metricsTopicStrToLabel; +} diff --git a/packages/lodestar/src/network/gossip/interface.ts b/packages/lodestar/src/network/gossip/interface.ts index d1621ff819b..9b1495b08c5 100644 --- a/packages/lodestar/src/network/gossip/interface.ts +++ b/packages/lodestar/src/network/gossip/interface.ts @@ -8,8 +8,8 @@ import StrictEventEmitter from "strict-event-emitter-types"; import {EventEmitter} from "events"; import {IBeaconConfig} from "@chainsafe/lodestar-config"; import LibP2p from "libp2p"; +import {GossipsubMessage, MessageAcceptance, PeerIdStr} from "libp2p-gossipsub/src/types"; import {ILogger} from "@chainsafe/lodestar-utils"; -import {InMessage} from "libp2p-interfaces/src/pubsub"; import {IBeaconChain} from "../../chain"; import {NetworkEvent} from "../events"; import {JobItemQueue} from "../../util/queue"; @@ -103,15 +103,6 @@ export interface IGossipModules { chain: IBeaconChain; } -/** - * Extend the standard InMessage with additional fields so that we don't have to compute them twice. - * When we send messages to other peers, protobuf will just ignore these fields. - */ -export type Eth2InMessage = InMessage & { - msgId?: Uint8Array; - uncompressedData?: Uint8Array; -}; - /** * Contains various methods for validation of incoming gossip topic data. * The conditions for valid gossip topics and how they are handled are specified here: @@ -123,11 +114,18 @@ export type Eth2InMessage = InMessage & { * * js-libp2p-gossipsub expects validation functions that look like this */ -export type GossipValidatorFn = (topic: GossipTopic, message: Eth2InMessage, seenTimestampSec: number) => Promise; +export type GossipValidatorFn = ( + topic: GossipTopic, + msg: GossipsubMessage, + propagationSource: PeerIdStr, + seenTimestampSec: number +) => Promise; export type ValidatorFnsByType = {[K in GossipType]: GossipValidatorFn}; -export type GossipJobQueues = {[K in GossipType]: JobItemQueue<[GossipTopic, InMessage, number], void>}; +export type GossipJobQueues = { + [K in GossipType]: JobItemQueue, ResolvedType>; +}; export type GossipHandlerFn = ( object: GossipTypeMap[GossipType], @@ -144,6 +142,7 @@ export type GossipHandlers = { ) => Promise; }; -export type InMessageTimestamp = InMessage & { - seenTimestampMs: number; -}; +// eslint-disable-next-line @typescript-eslint/no-explicit-any +export type ResolvedType Promise> = F extends (...args: any) => Promise + ? T + : never; diff --git a/packages/lodestar/src/network/gossip/scoringParameters.ts b/packages/lodestar/src/network/gossip/scoringParameters.ts index d01d6fa790f..b7d8257056d 100644 --- a/packages/lodestar/src/network/gossip/scoringParameters.ts +++ b/packages/lodestar/src/network/gossip/scoringParameters.ts @@ -5,7 +5,7 @@ import {PeerScoreThresholds} from "libp2p-gossipsub/src/score"; import {defaultTopicScoreParams, PeerScoreParams, TopicScoreParams} from "libp2p-gossipsub/src/score/peer-score-params"; import {Eth2Context} from "../../chain"; import {getActiveForks} from "../forks"; -import {IGossipsubModules} from "./gossipsub"; +import {Eth2GossipsubModules} from "./gossipsub"; import {GossipType} from "./interface"; import {stringifyGossipTopic} from "./topic"; @@ -71,7 +71,7 @@ type TopicScoreInput = { export function computeGossipPeerScoreParams({ config, eth2Context, -}: Pick): Partial { +}: Pick): Partial { const decayIntervalMs = config.SECONDS_PER_SLOT * 1000; const decayToZero = 0.01; const epochDurationMs = config.SECONDS_PER_SLOT * SLOTS_PER_EPOCH * 1000; @@ -99,6 +99,7 @@ export function computeGossipPeerScoreParams({ // js-gossipsub doesn't have behaviourPenaltiesThreshold behaviourPenaltyDecay, behaviourPenaltyWeight: gossipScoreThresholds.gossipThreshold / (targetValue * targetValue), + behaviourPenaltyThreshold, topicScoreCap, IPColocationFactorWeight: -1 * topicScoreCap, }; @@ -245,7 +246,8 @@ function getTopicScoreParams( params.meshMessageDeliveriesThreshold = threshold(params.meshMessageDeliveriesDecay, expectedMessageRate / 50); params.meshMessageDeliveriesCap = Math.max(capFactor * params.meshMessageDeliveriesThreshold, 2); params.meshMessageDeliveriesActivation = activationWindow; - params.meshMessageDeliveriesWindow = 2 * 1000; // 2s + // the default in gossipsub is 2s is not enough since lodestar suffers from I/O lag + params.meshMessageDeliveriesWindow = 12 * 1000; // 12s params.meshFailurePenaltyDecay = params.meshMessageDeliveriesDecay; params.meshMessageDeliveriesWeight = (-1 * maxPositiveScore) / (params.topicWeight * Math.pow(params.meshMessageDeliveriesThreshold, 2)); diff --git a/packages/lodestar/src/network/gossip/validation/index.ts b/packages/lodestar/src/network/gossip/validation/index.ts index ce917ae0f32..41622b78e65 100644 --- a/packages/lodestar/src/network/gossip/validation/index.ts +++ b/packages/lodestar/src/network/gossip/validation/index.ts @@ -1,4 +1,4 @@ -import {ERR_TOPIC_VALIDATOR_IGNORE, ERR_TOPIC_VALIDATOR_REJECT} from "libp2p-gossipsub/src/constants"; +import {MessageAcceptance} from "libp2p-gossipsub/src/types"; import {AbortSignal} from "@chainsafe/abort-controller"; import {IChainForkConfig} from "@chainsafe/lodestar-config"; import {ILogger, mapValues} from "@chainsafe/lodestar-utils"; @@ -12,11 +12,9 @@ import { GossipHandlers, GossipHandlerFn, } from "../interface"; -import {GossipValidationError} from "../errors"; import {GossipActionError, GossipAction} from "../../../chain/errors"; import {createValidationQueues} from "./queue"; import {getGossipAcceptMetadataByType, GetGossipAcceptMetadataFn} from "./onAccept"; -import {getUncompressedData} from "../encoding"; type ValidatorFnModules = { config: IChainForkConfig; @@ -42,8 +40,8 @@ export function createValidatorFnsByType( const validatorFnsByType = mapValues( jobQueues, (jobQueue): GossipValidatorFn => { - return async function gossipValidatorFnWithQueue(topic, gossipMsg, seenTimestampsMs) { - await jobQueue.push(topic, gossipMsg, seenTimestampsMs); + return async function gossipValidatorFnWithQueue(topic, gossipMsg, propagationSource, seenTimestampSec) { + return await jobQueue.push(topic, gossipMsg, propagationSource, seenTimestampSec); }; } ); @@ -73,29 +71,30 @@ function getGossipValidatorFn( const {config, logger, metrics} = modules; const getGossipObjectAcceptMetadata = getGossipAcceptMetadataByType[type] as GetGossipAcceptMetadataFn; - return async function gossipValidatorFn(topic, gossipMsg, seenTimestampSec) { + return async function gossipValidatorFn(topic, msg, propagationSource, seenTimestampSec) { // Define in scope above try {} to be used in catch {} if object was parsed let gossipObject; try { // Deserialize object from bytes ONLY after being picked up from the validation queue try { const sszType = getGossipSSZType(topic); - const messageData = getUncompressedData(gossipMsg); - gossipObject = sszType.deserialize(messageData); + gossipObject = sszType.deserialize(msg.data); } catch (e) { // TODO: Log the error or do something better with it - throw new GossipActionError(GossipAction.REJECT, {code: (e as Error).message}); + return MessageAcceptance.Reject; } - await (gossipHandler as GossipHandlerFn)(gossipObject, topic, gossipMsg.receivedFrom, seenTimestampSec); + await (gossipHandler as GossipHandlerFn)(gossipObject, topic, propagationSource, seenTimestampSec); const metadata = getGossipObjectAcceptMetadata(config, gossipObject, topic); logger.debug(`gossip - ${type} - accept`, metadata); metrics?.gossipValidationAccept.inc({topic: type}); + + return MessageAcceptance.Accept; } catch (e) { if (!(e instanceof GossipActionError)) { logger.error(`Gossip validation ${type} threw a non-GossipActionError`, {}, e as Error); - throw new GossipValidationError(ERR_TOPIC_VALIDATOR_IGNORE, (e as Error).message); + return MessageAcceptance.Ignore; } // If the gossipObject was deserialized include its short metadata with the error data @@ -106,12 +105,12 @@ function getGossipValidatorFn( case GossipAction.IGNORE: logger.debug(`gossip - ${type} - ignore`, errorData); metrics?.gossipValidationIgnore.inc({topic: type}); - throw new GossipValidationError(ERR_TOPIC_VALIDATOR_IGNORE, e.message); + return MessageAcceptance.Ignore; case GossipAction.REJECT: logger.debug(`gossip - ${type} - reject`, errorData); metrics?.gossipValidationReject.inc({topic: type}); - throw new GossipValidationError(ERR_TOPIC_VALIDATOR_REJECT, e.message); + return MessageAcceptance.Reject; } } }; diff --git a/packages/lodestar/src/network/gossip/validation/queue.ts b/packages/lodestar/src/network/gossip/validation/queue.ts index 1b128290f33..8471e072329 100644 --- a/packages/lodestar/src/network/gossip/validation/queue.ts +++ b/packages/lodestar/src/network/gossip/validation/queue.ts @@ -1,9 +1,8 @@ import {AbortSignal} from "@chainsafe/abort-controller"; -import {InMessage} from "libp2p-interfaces/src/pubsub"; import {mapValues} from "@chainsafe/lodestar-utils"; import {IMetrics} from "../../../metrics"; import {JobItemQueue, JobQueueOpts, QueueType} from "../../../util/queue"; -import {GossipJobQueues, GossipTopic, GossipType, ValidatorFnsByType} from "../interface"; +import {GossipJobQueues, GossipType, GossipValidatorFn, ResolvedType, ValidatorFnsByType} from "../interface"; /** * Numbers from https://github.com/sigp/lighthouse/blob/b34a79dc0b02e04441ba01fd0f304d1e203d877d/beacon_node/network/src/beacon_processor/mod.rs#L69 @@ -44,8 +43,8 @@ export function createValidationQueues( ): GossipJobQueues { return mapValues(gossipQueueOpts, (opts, type) => { const gossipValidatorFn = gossipValidatorFns[type]; - return new JobItemQueue<[GossipTopic, InMessage, number], void>( - (topic, message, seenTimestampsMs) => gossipValidatorFn(topic, message, seenTimestampsMs), + return new JobItemQueue, ResolvedType>( + gossipValidatorFn, {signal, ...opts}, metrics ? { diff --git a/packages/lodestar/src/network/network.ts b/packages/lodestar/src/network/network.ts index 880fcd61cd5..fb4ee605817 100644 --- a/packages/lodestar/src/network/network.ts +++ b/packages/lodestar/src/network/network.ts @@ -86,7 +86,7 @@ export class Network implements INetwork { opts ); - this.gossip = new Eth2Gossipsub({ + this.gossip = new Eth2Gossipsub(opts, { config, libp2p, logger, @@ -98,6 +98,7 @@ export class Network implements INetwork { currentSlot: this.clock.currentSlot, currentEpoch: this.clock.currentEpoch, }, + peersData: this.peersData, }); this.attnetsService = new AttnetsService(config, chain, this.gossip, metadata, logger, opts); @@ -107,6 +108,7 @@ export class Network implements INetwork { { libp2p, reqResp: this.reqResp, + gossip: this.gossip, attnetsService: this.attnetsService, syncnetsService: this.syncnetsService, logger, diff --git a/packages/lodestar/src/network/options.ts b/packages/lodestar/src/network/options.ts index 843fd937e1f..4468da55584 100644 --- a/packages/lodestar/src/network/options.ts +++ b/packages/lodestar/src/network/options.ts @@ -1,9 +1,10 @@ import {ENR, IDiscv5DiscoveryInputOptions} from "@chainsafe/discv5"; +import {Eth2GossipsubOpts} from "./gossip/gossipsub"; import {defaultGossipHandlerOpts, GossipHandlerOpts} from "./gossip/handlers"; import {PeerManagerOpts} from "./peers"; import {defaultRateLimiterOpts, RateLimiterOpts} from "./reqresp/response/rateLimiter"; -export interface INetworkOptions extends PeerManagerOpts, RateLimiterOpts, GossipHandlerOpts { +export interface INetworkOptions extends PeerManagerOpts, RateLimiterOpts, GossipHandlerOpts, Eth2GossipsubOpts { localMultiaddrs: string[]; bootMultiaddrs?: string[]; subscribeAllSubnets?: boolean; diff --git a/packages/lodestar/src/network/peers/peerManager.ts b/packages/lodestar/src/network/peers/peerManager.ts index cbe80c6f29f..b8010f94c9b 100644 --- a/packages/lodestar/src/network/peers/peerManager.ts +++ b/packages/lodestar/src/network/peers/peerManager.ts @@ -15,7 +15,7 @@ import {prettyPrintPeerId} from "../util"; import {PeersData, PeerData} from "./peersData"; import {ISubnetsService} from "../subnets"; import {PeerDiscovery, SubnetDiscvQueryMs} from "./discover"; -import {IPeerRpcScoreStore, ScoreState} from "./score"; +import {IPeerRpcScoreStore, ScoreState, updateGossipsubScores} from "./score"; import {clientFromAgentVersion, ClientKind} from "./client"; import { getConnectedPeerIds, @@ -25,6 +25,7 @@ import { renderIrrelevantPeerType, } from "./utils"; import {SubnetType} from "../metadata"; +import {Eth2Gossipsub} from "../gossip/gossipsub"; /** heartbeat performs regular updates such as updating reputations and performing discovery requests */ const HEARTBEAT_INTERVAL_MS = 30 * 1000; @@ -40,6 +41,11 @@ const CHECK_PING_STATUS_INTERVAL = 10 * 1000; /** A peer is considered long connection if it's >= 1 day */ const LONG_PEER_CONNECTION_MS = 24 * 60 * 60 * 1000; +/** + * Relative factor of peers that are allowed to have a negative gossipsub score without penalizing them in lodestar. + */ +const ALLOWED_NEGATIVE_GOSSIPSUB_FACTOR = 0.1; + // TODO: // maxPeers and targetPeers should be dynamic on the num of validators connected // The Node should compute a recomended value every interval and log a warning @@ -70,6 +76,7 @@ export type PeerManagerModules = { logger: ILogger; metrics: IMetrics | null; reqResp: IReqResp; + gossip: Eth2Gossipsub; attnetsService: ISubnetsService; syncnetsService: ISubnetsService; chain: IBeaconChain; @@ -100,6 +107,7 @@ export class PeerManager { private logger: ILogger; private metrics: IMetrics | null; private reqResp: IReqResp; + private gossipsub: Eth2Gossipsub; private attnetsService: ISubnetsService; private syncnetsService: ISubnetsService; private chain: IBeaconChain; @@ -120,6 +128,7 @@ export class PeerManager { this.logger = modules.logger; this.metrics = modules.metrics; this.reqResp = modules.reqResp; + this.gossipsub = modules.gossip; this.attnetsService = modules.attnetsService; this.syncnetsService = modules.syncnetsService; this.chain = modules.chain; @@ -156,6 +165,10 @@ export class PeerManager { this.intervals = [ setInterval(this.pingAndStatusTimeouts.bind(this), CHECK_PING_STATUS_INTERVAL), setInterval(this.heartbeat.bind(this), HEARTBEAT_INTERVAL_MS), + setInterval( + this.updateGossipsubScores.bind(this), + this.gossipsub.scoreParams.decayInterval ?? HEARTBEAT_INTERVAL_MS + ), ]; } @@ -451,6 +464,16 @@ export class PeerManager { } } + private updateGossipsubScores(): void { + const gossipsubScores = new Map(); + for (const peerIdStr of this.connectedPeers.keys()) { + gossipsubScores.set(peerIdStr, this.gossipsub.getScore(peerIdStr)); + } + + const toIgnoreNegativePeers = Math.ceil(this.opts.targetPeers * ALLOWED_NEGATIVE_GOSSIPSUB_FACTOR); + updateGossipsubScores(this.peerRpcScores, gossipsubScores, toIgnoreNegativePeers); + } + private pingAndStatusTimeouts(): void { const now = Date.now(); const peersToStatus: PeerId[] = []; diff --git a/packages/lodestar/src/network/peers/score.ts b/packages/lodestar/src/network/peers/score.ts index 1b01e7edf3c..67cb41300cb 100644 --- a/packages/lodestar/src/network/peers/score.ts +++ b/packages/lodestar/src/network/peers/score.ts @@ -1,6 +1,7 @@ import PeerId from "peer-id"; +import {MapDef, pruneSetToMax} from "../../util/map"; +import {gossipScoreThresholds} from "../gossip/scoringParameters"; import {IMetrics} from "../../metrics"; -import {pruneSetToMax} from "../../util/map"; /** The default score for new peers */ const DEFAULT_SCORE = 0; @@ -8,6 +9,9 @@ const DEFAULT_SCORE = 0; const MIN_SCORE_BEFORE_DISCONNECT = -20; /** The minimum reputation before a peer is banned */ const MIN_SCORE_BEFORE_BAN = -50; +// If a peer has a lodestar score below this constant all other score parts will get ignored and +// the peer will get banned regardless of the other parts. +const MIN_LODESTAR_SCORE_BEFORE_BAN = -60.0; /** The maximum score a peer can obtain */ const MAX_SCORE = 100; /** The minimum score a peer can obtain */ @@ -21,6 +25,12 @@ const HALFLIFE_DECAY_MS = -Math.log(2) / SCORE_HALFLIFE_MS; const BANNED_BEFORE_DECAY_MS = 30 * 60 * 1000; /** Limit of entries in the scores map */ const MAX_ENTRIES = 1000; +/** + * We weight negative gossipsub scores in such a way that they never result in a disconnect by + * themselves. This "solves" the problem of non-decaying gossipsub scores for disconnected peers. + */ +const GOSSIPSUB_NEGATIVE_SCORE_WEIGHT = (MIN_SCORE_BEFORE_DISCONNECT + 1) / gossipScoreThresholds.graylistThreshold; +const GOSSIPSUB_POSITIVE_SCORE_WEIGHT = GOSSIPSUB_NEGATIVE_SCORE_WEIGHT; export enum PeerAction { /** Immediately ban peer */ @@ -71,6 +81,7 @@ export interface IPeerRpcScoreStore { getScoreState(peer: PeerId): ScoreState; applyAction(peer: PeerId, action: PeerAction, actionName?: string): void; update(): void; + updateGossipsubScore(peerId: PeerIdStr, newScore: number, ignore: boolean): void; } export interface IPeerRpcScoreStoreModules { @@ -83,8 +94,8 @@ export interface IPeerRpcScoreStoreModules { * The decay rate applies equally to positive and negative scores. */ export class PeerRpcScoreStore implements IPeerRpcScoreStore { + private readonly scores = new MapDef(() => new PeerScore()); private readonly metrics: IMetrics | null; - private readonly scores = new Map(); private readonly lastUpdate = new Map(); // TODO: Persist scores, at least BANNED status to disk @@ -94,7 +105,7 @@ export class PeerRpcScoreStore implements IPeerRpcScoreStore { } getScore(peer: PeerId): number { - return this.scores.get(peer.toB58String()) ?? DEFAULT_SCORE; + return this.scores.get(peer.toB58String())?.getScore() ?? DEFAULT_SCORE; } getScoreState(peer: PeerId): ScoreState { @@ -102,7 +113,8 @@ export class PeerRpcScoreStore implements IPeerRpcScoreStore { } applyAction(peer: PeerId, action: PeerAction, actionName?: string): void { - this.add(peer, peerActionScore[action]); + const peerScore = this.scores.getOrDefault(peer.toB58String()); + peerScore.add(peerActionScore[action]); this.metrics?.peersReportPeerCount.inc({reason: actionName}); } @@ -110,56 +122,151 @@ export class PeerRpcScoreStore implements IPeerRpcScoreStore { update(): void { // Bound size of data structures pruneSetToMax(this.scores, MAX_ENTRIES); - pruneSetToMax(this.lastUpdate, MAX_ENTRIES); - for (const [peerIdStr, prevScore] of this.scores) { - const newScore = this.decayScore(peerIdStr, prevScore); + for (const [peerIdStr, peerScore] of this.scores) { + const newScore = peerScore.update(); // Prune scores below threshold if (Math.abs(newScore) < SCORE_THRESHOLD) { this.scores.delete(peerIdStr); - this.lastUpdate.delete(peerIdStr); - } - - // If above threshold, persist decayed value - else { - this.scores.set(peerIdStr, newScore); } } } - private decayScore(peer: PeerIdStr, prevScore: number): number { + updateGossipsubScore(peerId: PeerIdStr, newScore: number, ignore: boolean): void { + const peerScore = this.scores.getOrDefault(peerId); + peerScore.updateGossipsubScore(newScore, ignore); + } +} + +/** + * Manage score of a peer. + */ +export class PeerScore { + private lodestarScore: number; + private gossipScore: number; + private ignoreNegativeGossipScore: boolean; + /** The final score, computed from the above */ + private score: number; + private lastUpdate: number; + + constructor() { + this.lodestarScore = DEFAULT_SCORE; + this.gossipScore = DEFAULT_SCORE; + this.score = DEFAULT_SCORE; + this.ignoreNegativeGossipScore = false; + this.lastUpdate = Date.now(); + } + + getScore(): number { + return this.score; + } + + add(scoreDelta: number): void { + let newScore = this.lodestarScore + scoreDelta; + if (newScore > MAX_SCORE) newScore = MAX_SCORE; + if (newScore < MIN_SCORE) newScore = MIN_SCORE; + + this.setLodestarScore(newScore); + } + + /** + * Applies time-based logic such as decay rates to the score. + * This function should be called periodically. + * + * Return the new score. + */ + update(): number { const nowMs = Date.now(); - const lastUpdate = this.lastUpdate.get(peer) ?? nowMs; // Decay the current score // Using exponential decay based on a constant half life. - const sinceLastUpdateMs = nowMs - lastUpdate; + const sinceLastUpdateMs = nowMs - this.lastUpdate; // If peer was banned, lastUpdate will be in the future - if (sinceLastUpdateMs > 0 && prevScore !== 0) { - this.lastUpdate.set(peer, nowMs); + if (sinceLastUpdateMs > 0 && this.lodestarScore !== 0) { + this.lastUpdate = nowMs; // e^(-ln(2)/HL*t) const decayFactor = Math.exp(HALFLIFE_DECAY_MS * sinceLastUpdateMs); - return prevScore * decayFactor; - } else { - return prevScore; + this.setLodestarScore(this.lodestarScore * decayFactor); + } + + return this.lodestarScore; + } + + updateGossipsubScore(newScore: number, ignore: boolean): void { + // we only update gossipsub if last_updated is in the past which means either the peer is + // not banned or the BANNED_BEFORE_DECAY time is over. + if (this.lastUpdate <= Date.now()) { + this.gossipScore = newScore; + this.ignoreNegativeGossipScore = ignore; } } - private add(peer: PeerId, scoreDelta: number): void { - const prevScore = this.getScore(peer); + /** + * Updating lodestarScore should always go through this method, + * so that we update this.score accordingly. + */ + private setLodestarScore(newScore: number): void { + this.lodestarScore = newScore; + this.updateState(); + } - let newScore = this.decayScore(peer.toB58String(), prevScore) + scoreDelta; - if (newScore > MAX_SCORE) newScore = MAX_SCORE; - if (newScore < MIN_SCORE) newScore = MIN_SCORE; + /** + * Compute the final score, ban peer if needed + */ + private updateState(): void { + const prevState = scoreToState(this.score); + this.recomputeScore(); + const newState = scoreToState(this.score); - const prevState = scoreToState(prevScore); - const newState = scoreToState(newScore); if (prevState !== ScoreState.Banned && newState === ScoreState.Banned) { // ban this peer for at least BANNED_BEFORE_DECAY_MS seconds - this.lastUpdate.set(peer.toB58String(), Date.now() + BANNED_BEFORE_DECAY_MS); + this.lastUpdate = Date.now() + BANNED_BEFORE_DECAY_MS; } + } - this.scores.set(peer.toB58String(), newScore); + /** + * Compute the final score + */ + private recomputeScore(): void { + this.score = this.lodestarScore; + if (this.score <= MIN_LODESTAR_SCORE_BEFORE_BAN) { + // ignore all other scores, i.e. do nothing here + return; + } + + if (this.gossipScore >= 0) { + this.score += this.gossipScore * GOSSIPSUB_POSITIVE_SCORE_WEIGHT; + } else if (!this.ignoreNegativeGossipScore) { + this.score += this.gossipScore * GOSSIPSUB_NEGATIVE_SCORE_WEIGHT; + } + } +} + +/** + * Utility to update gossipsub score of connected peers + */ +export function updateGossipsubScores( + peerRpcScores: IPeerRpcScoreStore, + gossipsubScores: Map, + toIgnoreNegativePeers: number +): void { + // sort by gossipsub score desc + const sortedPeerIds = Array.from(gossipsubScores.keys()).sort( + (a, b) => (gossipsubScores.get(b) ?? 0) - (gossipsubScores.get(a) ?? 0) + ); + for (const peerId of sortedPeerIds) { + const gossipsubScore = gossipsubScores.get(peerId); + if (gossipsubScore !== undefined) { + let ignore = false; + if (gossipsubScore < 0 && toIgnoreNegativePeers > 0) { + // We ignore the negative score for the best negative peers so that their + // gossipsub score can recover without getting disconnected. + ignore = true; + toIgnoreNegativePeers -= 1; + } + + peerRpcScores.updateGossipsubScore(peerId, gossipsubScore, ignore); + } } } diff --git a/packages/lodestar/test/e2e/chain/lightclient.test.ts b/packages/lodestar/test/e2e/chain/lightclient.test.ts index cbc82c8844a..f29ec5f4c92 100644 --- a/packages/lodestar/test/e2e/chain/lightclient.test.ts +++ b/packages/lodestar/test/e2e/chain/lightclient.test.ts @@ -69,6 +69,7 @@ describe("chain / lightclient", function () { params: testParams, options: { sync: {isSingleNode: true}, + network: {allowPublishToZeroPeers: true}, api: {rest: {enabled: true, api: ["lightclient"], port: restPort}}, }, validatorCount, @@ -94,6 +95,9 @@ describe("chain / lightclient", function () { }); await Promise.all(validators.map((validator) => validator.start())); + afterEachCallbacks.push(async () => { + await Promise.all(validators.map((v) => v.stop())); + }); // This promise chain does: // 1. Wait for the beacon node to emit one head that has a snapshot associated to it diff --git a/packages/lodestar/test/e2e/network/gossipsub.test.ts b/packages/lodestar/test/e2e/network/gossipsub.test.ts index a3e17057524..33e085f8410 100644 --- a/packages/lodestar/test/e2e/network/gossipsub.test.ts +++ b/packages/lodestar/test/e2e/network/gossipsub.test.ts @@ -110,9 +110,8 @@ describe("gossipsub", function () { // Wait to have a peer connected to a topic while (!controller.signal.aborted) { await sleep(500); - const topicStr = Array.from(netA.gossip.mesh.keys())[0]; - const peersOnTopic = netA.gossip.mesh.get(topicStr); - if (peersOnTopic && peersOnTopic?.size > 0) { + const topicStr = netA.gossip.getTopics()[0]; + if (topicStr && netA.gossip.getMeshPeers(topicStr).length > 0) { break; } } @@ -143,9 +142,8 @@ describe("gossipsub", function () { // Wait to have a peer connected to a topic while (!controller.signal.aborted) { await sleep(500); - const topicStr = Array.from(netA.gossip.mesh.keys())[0]; - const peersOnTopic = netA.gossip.mesh.get(topicStr); - if (peersOnTopic && peersOnTopic?.size > 0) { + const topicStr = netA.gossip.getTopics()[0]; + if (topicStr && netA.gossip.getMeshPeers(topicStr).length > 0) { break; } } diff --git a/packages/lodestar/test/e2e/network/network.test.ts b/packages/lodestar/test/e2e/network/network.test.ts index e12b75a03b3..ed03c7fecfe 100644 --- a/packages/lodestar/test/e2e/network/network.test.ts +++ b/packages/lodestar/test/e2e/network/network.test.ts @@ -245,11 +245,11 @@ describe("network", function () { it("Should subscribe to gossip core topics on demand", async () => { const {network: netA} = await createTestNode("A"); - expect(netA.gossip.subscriptions.size).to.equal(0); + expect(netA.gossip.getTopics().length).to.equal(0); netA.subscribeGossipCoreTopics(); - expect(netA.gossip.subscriptions.size).to.equal(5); + expect(netA.gossip.getTopics().length).to.equal(5); netA.unsubscribeGossipCoreTopics(); - expect(netA.gossip.subscriptions.size).to.equal(0); + expect(netA.gossip.getTopics().length).to.equal(0); netA.close(); }); }); diff --git a/packages/lodestar/test/e2e/network/peers/peerManager.test.ts b/packages/lodestar/test/e2e/network/peers/peerManager.test.ts index 98187223d5f..7e2c5281ebb 100644 --- a/packages/lodestar/test/e2e/network/peers/peerManager.test.ts +++ b/packages/lodestar/test/e2e/network/peers/peerManager.test.ts @@ -6,7 +6,7 @@ import {config} from "@chainsafe/lodestar-config/default"; import {BitArray} from "@chainsafe/ssz"; import {IReqResp, ReqRespMethod} from "../../../../src/network/reqresp"; import {PeerRpcScoreStore, PeerManager} from "../../../../src/network/peers"; -import {NetworkEvent, NetworkEventBus} from "../../../../src/network"; +import {Eth2Gossipsub, NetworkEvent, NetworkEventBus} from "../../../../src/network"; import {PeersData} from "../../../../src/network/peers/peersData"; import {createNode, getAttnets, getSyncnets} from "../../../utils/network"; import {MockBeaconChain} from "../../../utils/mocks/chain/chain"; @@ -84,6 +84,7 @@ describe("network / peers / PeerManager", function () { networkEventBus, attnetsService: mockSubnetsService, syncnetsService: mockSubnetsService, + gossip: ({getScore: () => 0, scoreParams: {decayInterval: 1000}} as unknown) as Eth2Gossipsub, peersData: new PeersData(), }, { diff --git a/packages/lodestar/test/e2e/sync/finalizedSync.test.ts b/packages/lodestar/test/e2e/sync/finalizedSync.test.ts index a76e8d1ddff..ee38649269a 100644 --- a/packages/lodestar/test/e2e/sync/finalizedSync.test.ts +++ b/packages/lodestar/test/e2e/sync/finalizedSync.test.ts @@ -1,8 +1,8 @@ import {IChainConfig} from "@chainsafe/lodestar-config"; +import {assert} from "chai"; import {getDevBeaconNode} from "../../utils/node/beacon"; import {waitForEvent} from "../../utils/events/resolver"; import {phase0, ssz} from "@chainsafe/lodestar-types"; -import assert from "assert"; import {getAndInitDevValidators} from "../../utils/node/validator"; import {ChainEvent} from "../../../src/chain"; import {Network} from "../../../src/network"; @@ -34,7 +34,7 @@ describe("sync / finalized sync", function () { const bn = await getDevBeaconNode({ params: beaconParams, - options: {sync: {isSingleNode: true}}, + options: {sync: {isSingleNode: true}, network: {allowPublishToZeroPeers: true}}, validatorCount, logger: loggerNodeA, }); @@ -53,6 +53,9 @@ describe("sync / finalized sync", function () { afterEachCallbacks.push(() => Promise.all(validators.map((validator) => validator.stop()))); await Promise.all(validators.map((validator) => validator.start())); + afterEachCallbacks.push(() => Promise.all(validators.map((v) => v.stop()))); + // stop beacon node after validators + afterEachCallbacks.push(() => bn.close()); await waitForEvent(bn.chain.emitter, ChainEvent.finalized, 240000); loggerNodeA.important("Node A emitted finalized checkpoint event"); @@ -64,6 +67,7 @@ describe("sync / finalized sync", function () { genesisTime: bn.chain.getHeadState().genesisTime, logger: loggerNodeB, }); + afterEachCallbacks.push(() => bn2.close()); afterEachCallbacks.push(() => bn2.close()); diff --git a/packages/lodestar/test/e2e/sync/unknownBlockSync.test.ts b/packages/lodestar/test/e2e/sync/unknownBlockSync.test.ts index 24f427143bc..983c0d19ae4 100644 --- a/packages/lodestar/test/e2e/sync/unknownBlockSync.test.ts +++ b/packages/lodestar/test/e2e/sync/unknownBlockSync.test.ts @@ -4,7 +4,7 @@ import {waitForEvent} from "../../utils/events/resolver"; import {phase0, ssz} from "@chainsafe/lodestar-types"; import {getAndInitDevValidators} from "../../utils/node/validator"; import {ChainEvent} from "../../../src/chain"; -import {Network} from "../../../src/network"; +import {Network, NetworkEvent} from "../../../src/network"; import {connect} from "../../utils/network"; import {testLogger, LogLevel, TestLoggerOpts} from "../../utils/logger"; import {fromHexString} from "@chainsafe/ssz"; @@ -46,7 +46,7 @@ describe("sync / unknown block sync", function () { const bn = await getDevBeaconNode({ params: testParams, - options: {sync: {isSingleNode: true}}, + options: {sync: {isSingleNode: true}, network: {allowPublishToZeroPeers: true}}, validatorCount, logger: loggerNodeA, }); @@ -65,6 +65,9 @@ describe("sync / unknown block sync", function () { afterEachCallbacks.push(() => Promise.all(validators.map((v) => v.stop()))); await Promise.all(validators.map((validator) => validator.start())); + afterEachCallbacks.push(() => Promise.all(validators.map((v) => v.stop()))); + // stop bn after validators + afterEachCallbacks.push(() => bn.close()); await waitForEvent(bn.chain.emitter, ChainEvent.checkpoint, 240000); loggerNodeA.important("Node A emitted checkpoint event"); @@ -76,6 +79,7 @@ describe("sync / unknown block sync", function () { genesisTime: bn.chain.getHeadState().genesisTime, logger: loggerNodeB, }); + afterEachCallbacks.push(() => bn2.close()); afterEachCallbacks.push(() => bn2.close()); @@ -87,10 +91,10 @@ describe("sync / unknown block sync", function () { ); await connect(bn2.network as Network, bn.network.peerId, bn.network.localMultiaddrs); - - await bn2.api.beacon.publishBlock(head).catch((e) => { + await bn2.chain.processBlock(head).catch((e) => { if (e instanceof BlockError && e.type.code === BlockErrorCode.PARENT_UNKNOWN) { // Expected + bn2.network.events.emit(NetworkEvent.unknownBlockParent, head, bn2.network.peerId.toB58String()); } else { throw e; } diff --git a/packages/lodestar/test/e2e/sync/wss.test.ts b/packages/lodestar/test/e2e/sync/wss.test.ts index 80bed2a8fb3..08990dca11e 100644 --- a/packages/lodestar/test/e2e/sync/wss.test.ts +++ b/packages/lodestar/test/e2e/sync/wss.test.ts @@ -1,4 +1,3 @@ -import {assert} from "chai"; import {GENESIS_SLOT, SLOTS_PER_EPOCH} from "@chainsafe/lodestar-params"; import {phase0, Slot} from "@chainsafe/lodestar-types"; import {initBLS} from "@chainsafe/lodestar-cli/src/util"; @@ -26,13 +25,8 @@ describe("Start from WSS", function () { await initBLS(); }); - const afterEachCallbacks: (() => Promise | void)[] = []; - afterEach(async () => { - while (afterEachCallbacks.length > 0) { - const callback = afterEachCallbacks.pop(); - if (callback) await callback(); - } - }); + const afterEachCallbacks: (() => Promise | unknown)[] = []; + afterEach(async () => Promise.all(afterEachCallbacks.splice(0, afterEachCallbacks.length))); it("using another node", async function () { // Should reach justification in 3 epochs max, and finalization in 4 epochs max @@ -71,6 +65,7 @@ describe("Start from WSS", function () { rest: {enabled: true, api: ["debug"]} as RestApiOptions, }, sync: {isSingleNode: true}, + network: {allowPublishToZeroPeers: true}, }, validatorCount: 32, logger: loggerNodeA, @@ -133,10 +128,6 @@ describe("Start from WSS", function () { await connect(bnStartingFromWSS.network as Network, bn.network.peerId, bn.network.localMultiaddrs); - try { - await waitForSynced; - } catch (e) { - assert.fail("Failed to backfill sync to other node in time"); - } + await waitForSynced; }); }); diff --git a/packages/lodestar/test/scripts/blsPubkeyBytesFrequency.ts b/packages/lodestar/test/scripts/blsPubkeyBytesFrequency.ts index eb31676bba1..e72f2371e0a 100644 --- a/packages/lodestar/test/scripts/blsPubkeyBytesFrequency.ts +++ b/packages/lodestar/test/scripts/blsPubkeyBytesFrequency.ts @@ -2,7 +2,7 @@ import fs from "node:fs"; import {getClient} from "@chainsafe/lodestar-api"; import {config} from "@chainsafe/lodestar-config/default"; import {newZeroedArray} from "@chainsafe/lodestar-beacon-state-transition"; -import SHA256 from "@chainsafe/as-sha256"; +import {digest} from "@chainsafe/as-sha256"; // Script to analyze if a raw BLS pubkey bytes are sufficiently even distributed. // If so, a shorter slice of the pubkey bytes can be used as key for the pubkey to index map. @@ -82,7 +82,7 @@ function analyzeBytesCollisions(pubkeys: string[]): void { let key: string; if (useHash) { const pubkey = Buffer.from(pubkeyStr, "hex"); - const pubkeyHash = SHA256.digest(pubkey); + const pubkeyHash = digest(pubkey); key = Buffer.from(pubkeyHash.slice(offset, offset + i)).toString("hex"); } else { key = pubkeyStr.slice(offset * 2, (offset + i) * 2); diff --git a/packages/lodestar/test/sim/merge-interop.test.ts b/packages/lodestar/test/sim/merge-interop.test.ts index dd9d211d335..a2da498a105 100644 --- a/packages/lodestar/test/sim/merge-interop.test.ts +++ b/packages/lodestar/test/sim/merge-interop.test.ts @@ -318,7 +318,7 @@ describe("executionEngine / ExecutionEngineHttp", function () { options: { api: {rest: {enabled: true} as RestApiOptions}, sync: {isSingleNode: true}, - network: {discv5: null}, + network: {allowPublishToZeroPeers: true, discv5: null}, eth1: {enabled: true, providerUrls: [jsonRpcUrl]}, executionEngine: {urls: [engineApiUrl], jwtSecretHex}, }, diff --git a/packages/lodestar/test/sim/multiNodeSingleThread.test.ts b/packages/lodestar/test/sim/multiNodeSingleThread.test.ts index d0675990794..5a7bee201aa 100644 --- a/packages/lodestar/test/sim/multiNodeSingleThread.test.ts +++ b/packages/lodestar/test/sim/multiNodeSingleThread.test.ts @@ -70,7 +70,7 @@ describe("Run multi node single thread interop validators (no eth1) until checkp }; const logger = testLogger(`Node ${i}`, testLoggerOpts); - const node = await getDevBeaconNode({ + const bn = await getDevBeaconNode({ params: {...testParams, ALTAIR_FORK_EPOCH: altairForkEpoch}, options: {api: {rest: {port: 10000 + i}}}, validatorCount: nodeCount * validatorsPerNode, @@ -79,12 +79,13 @@ describe("Run multi node single thread interop validators (no eth1) until checkp }); const {validators: nodeValidators} = await getAndInitDevValidators({ - node, + node: bn, validatorsPerClient: validatorsPerNode, validatorClientCount: 1, startIndex: i * validatorsPerNode, testLoggerOpts, }); + afterEachCallbacks.push(async () => await Promise.all(validators.map((validator) => validator.stop()))); afterEachCallbacks.push(async () => { await Promise.all(validators.map((validator) => validator.stop())); @@ -100,12 +101,20 @@ describe("Run multi node single thread interop validators (no eth1) until checkp }); loggers.push(logger); - nodes.push(node); + nodes.push(bn); validators.push(...nodeValidators); } const stopInfoTracker = simTestInfoTracker(nodes[0], loggers[0]); + afterEachCallbacks.push(async () => { + stopInfoTracker(); + await Promise.all(nodes.map((node) => node.close())); + console.log("--- Stopped all nodes ---"); + // Wait a bit for nodes to shutdown + await sleep(3000); + }); + // Connect all nodes with each other for (let i = 0; i < nodeCount; i++) { for (let j = 0; j < nodeCount; j++) { diff --git a/packages/lodestar/test/sim/singleNodeSingleThread.test.ts b/packages/lodestar/test/sim/singleNodeSingleThread.test.ts index a835187da7e..f3b2b6eaa88 100644 --- a/packages/lodestar/test/sim/singleNodeSingleThread.test.ts +++ b/packages/lodestar/test/sim/singleNodeSingleThread.test.ts @@ -103,6 +103,7 @@ describe("Run single node single thread interop validators (no eth1) until check options: { api: {rest: {enabled: true} as RestApiOptions}, sync: {isSingleNode: true}, + network: {allowPublishToZeroPeers: true}, executionEngine: {mode: "mock", genesisBlockHash: toHexString(INTEROP_BLOCK_HASH)}, }, validatorCount: validatorClientCount * validatorsPerClient, diff --git a/packages/lodestar/test/unit-mainnet/network/gossip/scoringParameters.test.ts b/packages/lodestar/test/unit-mainnet/network/gossip/scoringParameters.test.ts index 6561fb66183..9810629b014 100644 --- a/packages/lodestar/test/unit-mainnet/network/gossip/scoringParameters.test.ts +++ b/packages/lodestar/test/unit-mainnet/network/gossip/scoringParameters.test.ts @@ -131,7 +131,7 @@ describe("computeGossipPeerScoreParams", function () { expect(params.meshMessageDeliveriesDecay).closeTo(0.930572, TOLERANCE); expect(params.meshMessageDeliveriesCap).closeTo(68.6255, TOLERANCE); expect(params.meshMessageDeliveriesActivation).to.be.equal(384 * 1000); - expect(params.meshMessageDeliveriesWindow).to.be.equal(2 * 1000); + expect(params.meshMessageDeliveriesWindow).to.be.equal(12 * 1000); expect(params.meshFailurePenaltyWeight).closeTo(-0.73044, TOLERANCE); expect(params.meshFailurePenaltyDecay).closeTo(0.93057, TOLERANCE); @@ -165,7 +165,7 @@ describe("computeGossipPeerScoreParams", function () { expect(params.meshMessageDeliveriesDecay).closeTo(0.97163, TOLERANCE); expect(params.meshMessageDeliveriesCap).closeTo(2.0547574, TOLERANCE); expect(params.meshMessageDeliveriesActivation).to.be.equal(384 * 1000); - expect(params.meshMessageDeliveriesWindow).to.be.equal(2 * 1000); + expect(params.meshMessageDeliveriesWindow).to.be.equal(12 * 1000); expect(params.meshFailurePenaltyWeight).closeTo(-458.31055, TOLERANCE); expect(params.meshFailurePenaltyDecay).closeTo(0.97163, TOLERANCE); @@ -207,7 +207,7 @@ describe("computeGossipPeerScoreParams", function () { expect(params.meshMessageDeliveriesDecay).closeTo(0.96466, TOLERANCE); expect(params.meshMessageDeliveriesCap).closeTo(69.88248, TOLERANCE); expect(params.meshMessageDeliveriesActivation).to.be.equal(204 * 1000); - expect(params.meshMessageDeliveriesWindow).to.be.equal(2 * 1000); + expect(params.meshMessageDeliveriesWindow).to.be.equal(12 * 1000); expect(params.meshFailurePenaltyWeight).closeTo(-360.6548, TOLERANCE); expect(params.meshFailurePenaltyDecay).closeTo(0.96466, TOLERANCE); diff --git a/packages/lodestar/test/unit/network/gossip/gossipsub.test.ts b/packages/lodestar/test/unit/network/gossip/gossipsub.test.ts deleted file mode 100644 index 07370089d70..00000000000 --- a/packages/lodestar/test/unit/network/gossip/gossipsub.test.ts +++ /dev/null @@ -1,106 +0,0 @@ -import {expect, assert} from "chai"; -import Libp2p from "libp2p"; -import {InMessage} from "libp2p-interfaces/src/pubsub"; -import {ERR_TOPIC_VALIDATOR_REJECT} from "libp2p-gossipsub/src/constants"; -import {AbortController} from "@chainsafe/abort-controller"; -import {ForkName, SLOTS_PER_EPOCH} from "@chainsafe/lodestar-params"; -import {ssz} from "@chainsafe/lodestar-types"; - -import {Eth2Gossipsub, GossipHandlers, GossipType, GossipEncoding} from "../../../../src/network/gossip"; -import {stringifyGossipTopic} from "../../../../src/network/gossip/topic"; -import {encodeMessageData} from "../../../../src/network/gossip/encoding"; -import {GossipValidationError} from "../../../../src/network/gossip/errors"; - -import {config} from "../../../utils/config"; -import {generateEmptySignedBlock} from "../../../utils/block"; -import {createNode} from "../../../utils/network"; -import {testLogger} from "../../../utils/logger"; -import {GossipAction, GossipActionError} from "../../../../src/chain/errors"; -import {Eth2Context} from "../../../../src/chain"; - -describe("network / gossip / validation", function () { - const logger = testLogger(); - const metrics = null; - const gossipType = GossipType.beacon_block; - - let message: InMessage; - let topicString: string; - let libp2p: Libp2p; - let eth2Context: Eth2Context; - - let controller: AbortController; - beforeEach(() => { - controller = new AbortController(); - eth2Context = { - activeValidatorCount: 16, - currentEpoch: 1000, - currentSlot: 1000 * SLOTS_PER_EPOCH, - }; - }); - afterEach(() => controller.abort()); - - beforeEach(async function () { - const signedBlock = generateEmptySignedBlock(); - topicString = stringifyGossipTopic(config, {type: gossipType, fork: ForkName.phase0}); - message = { - data: encodeMessageData(GossipEncoding.ssz_snappy, ssz.phase0.SignedBeaconBlock.serialize(signedBlock)), - receivedFrom: "0", - topicIDs: [topicString], - }; - - const multiaddr = "/ip4/127.0.0.1/tcp/0"; - libp2p = await createNode(multiaddr); - }); - - it("should throw on failed validation", async () => { - const gossipHandlersPartial: Partial = { - [gossipType]: async () => { - throw new GossipActionError(GossipAction.REJECT, {code: "TEST_ERROR"}); - }, - }; - - const gossipSub = new Eth2Gossipsub({ - config, - gossipHandlers: gossipHandlersPartial as GossipHandlers, - logger, - libp2p, - metrics, - signal: controller.signal, - eth2Context, - }); - - try { - await gossipSub.validate(message); - assert.fail("Expect error here"); - } catch (e) { - expect({ - message: (e as Error).message, - code: (e as GossipValidationError).code, - }).to.deep.equal({ - message: "TEST_ERROR", - code: ERR_TOPIC_VALIDATOR_REJECT, - }); - } - }); - - it("should not throw on successful validation", async () => { - const gossipHandlersPartial: Partial = { - [gossipType]: async () => { - // - }, - }; - - const gossipSub = new Eth2Gossipsub({ - config, - gossipHandlers: gossipHandlersPartial as GossipHandlers, - logger, - libp2p, - metrics, - signal: controller.signal, - eth2Context, - }); - - await gossipSub.validate(message); - // no error means pass validation - }); -}); diff --git a/packages/lodestar/test/unit/network/peers/score.test.ts b/packages/lodestar/test/unit/network/peers/score.test.ts index 04cb0a35de5..8b6cc55e10f 100644 --- a/packages/lodestar/test/unit/network/peers/score.test.ts +++ b/packages/lodestar/test/unit/network/peers/score.test.ts @@ -1,6 +1,7 @@ import {expect} from "chai"; import PeerId from "peer-id"; -import {PeerAction, ScoreState, PeerRpcScoreStore} from "../../../../src/network/peers/score"; +import sinon from "sinon"; +import {PeerAction, ScoreState, PeerRpcScoreStore, updateGossipsubScores} from "../../../../src/network/peers/score"; describe("simple block provider score tracking", function () { const peer = PeerId.createFromB58String("Qma9T5YraSnpRDZqRR4krcSJabThc8nwZuJV3LercPHufi"); @@ -8,7 +9,9 @@ describe("simple block provider score tracking", function () { // eslint-disable-next-line @typescript-eslint/explicit-function-return-type function mockStore() { - return {scoreStore: new PeerRpcScoreStore()}; + const scoreStore = new PeerRpcScoreStore(); + const peerScores = scoreStore["scores"]; + return {scoreStore, peerScores}; } it("Should return default score, without any previous action", function () { @@ -40,17 +43,48 @@ describe("simple block provider score tracking", function () { ]; for (const [minScore, timeToDecay] of decayTimes) it(`Should decay MIN_SCORE to ${minScore} after ${timeToDecay} ms`, () => { - const {scoreStore} = mockStore(); - scoreStore["scores"].set(peer.toB58String(), MIN_SCORE); - scoreStore["lastUpdate"].set(peer.toB58String(), Date.now() - timeToDecay * factorForJsBadMath); + const {scoreStore, peerScores} = mockStore(); + const peerScore = peerScores.get(peer.toB58String()); + if (peerScore) { + peerScore["lastUpdate"] = Date.now() - timeToDecay * factorForJsBadMath; + peerScore["lodestarScore"] = MIN_SCORE; + } scoreStore.update(); expect(scoreStore.getScore(peer)).to.be.greaterThan(minScore); }); - it("should not go belove min score", function () { + it("should not go below min score", function () { const {scoreStore} = mockStore(); scoreStore.applyAction(peer, PeerAction.Fatal); scoreStore.applyAction(peer, PeerAction.Fatal); expect(scoreStore.getScore(peer)).to.be.gte(MIN_SCORE); }); }); + +describe("updateGossipsubScores", function () { + const sandbox = sinon.createSandbox(); + const peerRpcScoresStub = sandbox.createStubInstance(PeerRpcScoreStore); + + this.afterEach(() => { + sandbox.restore(); + }); + + it("should update gossipsub peer scores", () => { + updateGossipsubScores( + peerRpcScoresStub, + new Map([ + ["a", 10], + ["b", -10], + ["c", -20], + ["d", -5], + ]), + 2 + ); + expect(peerRpcScoresStub.updateGossipsubScore.calledWith("a", 10, false)).to.be.true; + // should ignore b d since they are 2 biggest negative scores + expect(peerRpcScoresStub.updateGossipsubScore.calledWith("b", -10, true)).to.be.true; + expect(peerRpcScoresStub.updateGossipsubScore.calledWith("d", -5, true)).to.be.true; + // should not ignore c as it's lowest negative scores + expect(peerRpcScoresStub.updateGossipsubScore.calledWith("c", -20, false)).to.be.true; + }); +}); diff --git a/scripts/dev/node1.sh b/scripts/dev/node1.sh new file mode 100755 index 00000000000..d4bc3c9c262 --- /dev/null +++ b/scripts/dev/node1.sh @@ -0,0 +1,19 @@ +#!/usr/bin/env bash + +GENESIS_TIME=$(date +%s) + +packages/cli/bin/lodestar dev \ + --genesisValidators 8 \ + --startValidators 0:7 \ + --genesisTime $GENESIS_TIME \ + --enr.ip 127.0.0.1 \ + --rootDir .lodestar/node1 \ + --reset \ + --api.rest.enabled \ + --api.rest.api '*' \ + --metrics.enabled \ + --logLevel debug \ + --eth1.enabled false \ + --network.requestCountPeerLimit 1000000 \ + --network.blockCountTotalLimit 1000000 \ + --network.blockCountPeerLimit 1000000 diff --git a/scripts/dev/node2.sh b/scripts/dev/node2.sh new file mode 100755 index 00000000000..d19e623834f --- /dev/null +++ b/scripts/dev/node2.sh @@ -0,0 +1,22 @@ +#!/usr/bin/env bash + +# Fetch node1 data +ENR=$(curl -s http://localhost:9596/eth/v1/node/identity | jq .data.enr) +GENESIS_TIME=$(curl -s http://localhost:9596/eth/v1/beacon/genesis | jq .data.genesis_time) + +packages/cli/bin/lodestar dev \ + --genesisValidators 8 \ + --genesisTime $GENESIS_TIME \ + --enr.ip 127.0.0.1 \ + --rootDir .lodestar/node2 \ + --reset \ + --api.rest.enabled \ + --api.rest.api '*' \ + --metrics.enabled \ + --metrics.serverPort 8009 \ + --logLevel debug \ + --eth1.enabled false \ + --port 9001 \ + --api.rest.port 9597 \ + --network.connectToDiscv5Bootnodes true \ + --network.discv5.bootEnrs $ENR \ No newline at end of file diff --git a/yarn.lock b/yarn.lock index 61c5508739a..55985594abc 100644 --- a/yarn.lock +++ b/yarn.lock @@ -6555,6 +6555,14 @@ iso-random-stream@^2.0.0: events "^3.3.0" readable-stream "^3.4.0" +iso-random-stream@^2.0.2: + version "2.0.2" + resolved "https://registry.yarnpkg.com/iso-random-stream/-/iso-random-stream-2.0.2.tgz#a24f77c34cfdad9d398707d522a6a0cc640ff27d" + integrity sha512-yJvs+Nnelic1L2vH2JzWvvPQFA4r7kSTnpST/+LkAQjSz0hos2oqLD+qIVi9Qk38Hoe7mNDt3j0S27R58MVjLQ== + dependencies: + events "^3.3.0" + readable-stream "^3.4.0" + iso-url@^1.1.5: version "1.1.5" resolved "https://registry.yarnpkg.com/iso-url/-/iso-url-1.1.5.tgz#875a0f2bf33fa1fc200f8d89e3f49eee57a8f0d9" @@ -7163,22 +7171,45 @@ libp2p-crypto@^0.21.2: protobufjs "^6.11.2" uint8arrays "^3.0.0" -libp2p-gossipsub@^0.13.2: - version "0.13.2" - resolved "https://registry.yarnpkg.com/libp2p-gossipsub/-/libp2p-gossipsub-0.13.2.tgz#3a21d3556f8fb832d8659fdfee118b43da2ba3e7" - integrity sha512-r2i+GM5m58IDSIMP/9fv3L0/3V2Bci3NsVnlWEyxptcpUcUt7Dif5FN1lsiFPIkCtjks+OmhWPQmMM/7J/a6Rg== +libp2p-gossipsub@^0.14.0: + version "0.14.0" + resolved "https://registry.yarnpkg.com/libp2p-gossipsub/-/libp2p-gossipsub-0.14.0.tgz#a05ce2a41b3aa6640f723f0d0363ad2808d56946" + integrity sha512-70YnK/zwYbAx2xx7CqPROje37AVLAu8OUgWJ3cSMN6AmCVvUMepebXbQvfnDg2+uLkNnnr1huaqHsceN2BYnkQ== dependencies: "@types/debug" "^4.1.7" debug "^4.3.1" denque "^1.5.0" err-code "^3.0.1" + iso-random-stream "^2.0.2" it-pipe "^1.1.0" - libp2p-interfaces "^4.0.4" + libp2p-crypto "^0.21.2" + libp2p-interfaces "4.0.4" + multiformats "^9.6.4" peer-id "^0.16.0" protobufjs "^6.11.2" uint8arrays "^3.0.0" -libp2p-interfaces@^4.0.0, libp2p-interfaces@^4.0.4: +libp2p-interfaces@4.0.4: + version "4.0.4" + resolved "https://registry.yarnpkg.com/libp2p-interfaces/-/libp2p-interfaces-4.0.4.tgz#aa88ce344cb73bdf3972c7ef4a79566c27a41228" + integrity sha512-pD46BpdZBM4OEs/LdYJ74BT582OyL+MLOawZr+YRdQTt3s0UZBInc0SF/AhhL2je9bcfkMe4Z1ko+/nninTNvw== + dependencies: + abort-controller "^3.0.0" + abortable-iterator "^3.0.0" + debug "^4.3.1" + err-code "^3.0.1" + it-length-prefixed "^5.0.2" + it-pipe "^1.1.0" + it-pushable "^1.4.2" + libp2p-crypto "^0.21.0" + multiaddr "^10.0.0" + multiformats "^9.1.2" + p-queue "^6.6.2" + peer-id "^0.16.0" + protobufjs "^6.10.2" + uint8arrays "^3.0.0" + +libp2p-interfaces@^4.0.0: version "4.0.6" resolved "https://registry.yarnpkg.com/libp2p-interfaces/-/libp2p-interfaces-4.0.6.tgz#462062e04a680703bca18eb1d7a9c963a39344d1" integrity sha512-3KjzNEIWhi+VoOamLvgKKUE/xqwxSw/JYqsBnfMhAWVRvRtosROtVT03wci2XbuuowCYw+/hEX1xKJIR1w5n0A== @@ -7971,6 +8002,11 @@ multiformats@^9.4.2, multiformats@^9.4.5: resolved "https://registry.yarnpkg.com/multiformats/-/multiformats-9.4.5.tgz#9ac47bbc87aadb09d4bd05e9cd3da6f4436414f6" integrity sha512-zQxukxsHM34EJi3yT3MkUlycY9wEouyrAz0PSN+CyCj6cYchJZ4LrTH74YtlsxVyAK6waz/gnVLmJwi3P0knKg== +multiformats@^9.6.4: + version "9.6.4" + resolved "https://registry.yarnpkg.com/multiformats/-/multiformats-9.6.4.tgz#5dce1f11a407dbb69aa612cb7e5076069bb759ca" + integrity sha512-fCCB6XMrr6CqJiHNjfFNGT0v//dxOBMrOMqUIzpPc/mmITweLEyhvMpY9bF+jZ9z3vaMAau5E8B68DW77QMXkg== + multihashes@^4.0.3: version "4.0.3" resolved "https://registry.yarnpkg.com/multihashes/-/multihashes-4.0.3.tgz#426610539cd2551edbf533adeac4c06b3b90fb05"