Skip to content

Commit

Permalink
fix: tag mesh peers on graft, remove tag on pruning (ChainSafe#383)
Browse files Browse the repository at this point in the history
  • Loading branch information
maschad committed Jan 3, 2023
1 parent b7e832b commit e1abad6
Show file tree
Hide file tree
Showing 6 changed files with 116 additions and 103 deletions.
3 changes: 0 additions & 3 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,6 @@
* chore: update cd action by @mpetrunic in https://github.com/ChainSafe/js-libp2p-gossipsub/pull/245


<<<<<<< HEAD
## [5.4.1](https://github.com/ChainSafe/js-libp2p-gossipsub/compare/v5.4.0...v5.4.1) (2022-12-23)


Expand All @@ -35,8 +34,6 @@

* tracer to track delivered message if duplicate ([#385](https://github.com/ChainSafe/js-libp2p-gossipsub/issues/385)) ([0c8ddee](https://github.com/ChainSafe/js-libp2p-gossipsub/commit/0c8ddee13a94b44f182ea685cdddc6b7cee43ec4))

=======
>>>>>>> 84e30bc (chore(master): release 5.3.0 (#379))
## [5.3.0](https://github.com/ChainSafe/js-libp2p-gossipsub/compare/v5.2.1...v5.3.0) (2022-12-01)


Expand Down
8 changes: 0 additions & 8 deletions package-lock.json

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion package.json
Original file line number Diff line number Diff line change
Expand Up @@ -71,7 +71,7 @@
"@libp2p/interface-connection-manager": "^1.3.0",
"@libp2p/interface-keys": "^1.0.3",
"@libp2p/interface-peer-id": "^1.0.4",
"@libp2p/interface-peer-store": "^1.2.2",
"@libp2p/interface-peer-store": "^1.2.6",
"@libp2p/interface-pubsub": "^3.0.0",
"@libp2p/interface-registrar": "^2.0.3",
"@libp2p/interfaces": "^3.0.3",
Expand Down
11 changes: 2 additions & 9 deletions src/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -1499,7 +1499,7 @@ export class GossipSub extends EventEmitter<GossipsubEvents> implements PubSub<G
return []
}

this.components.peerStore.tagPeer(peerIdFromString(id), 'gossipsub-mesh-peer', {
await this.components.peerStore.tagPeer(peerIdFromString(id), 'gossipsub-mesh-peer', {
value: 100 // value should be 0-100
})

Expand Down Expand Up @@ -1552,7 +1552,7 @@ export class GossipSub extends EventEmitter<GossipsubEvents> implements PubSub<G
}
}

this.components.peerStore.unTagPeer(peerIdFromString(id), 'gossipsub-mesh-peer')
await this.components.peerStore.unTagPeer(peerIdFromString(id), 'gossipsub-mesh-peer')
}

/**
Expand Down Expand Up @@ -2535,8 +2535,6 @@ export class GossipSub extends EventEmitter<GossipsubEvents> implements PubSub<G
} else {
topics.push(topic)
}
// Untag peer upon pruning
this.components.peerStore.unTagPeer(peerIdFromString(id), 'gossipsub-mesh-peer')
}

const graftPeer = (id: PeerIdStr, reason: InclusionReason): void => {
Expand All @@ -2555,11 +2553,6 @@ export class GossipSub extends EventEmitter<GossipsubEvents> implements PubSub<G
} else {
topics.push(topic)
}

// Tag the peer upon grafting
this.components.peerStore.tagPeer(peerIdFromString(id), 'gossipsub-mesh-peer', {
value: 100 // value should be 0-100
})
}

// drop all peers with negative score, without PX
Expand Down
113 changes: 112 additions & 1 deletion test/e2e/go-gossipsub.spec.ts
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,8 @@ import { mockNetwork } from '@libp2p/interface-mocks'
import { stop } from '@libp2p/interfaces/startable'
import { TopicScoreParams } from '../../src/score/peer-score-params.js'
import { awaitEvents, checkReceivedSubscription, checkReceivedSubscriptions } from '../utils/events.js'
import sinon, { SinonStubbedInstance } from 'sinon'
import { Tag } from '@libp2p/interface-peer-store'

/**
* These tests were translated from:
Expand Down Expand Up @@ -76,6 +78,115 @@ describe('go-libp2p-pubsub gossipsub tests', function () {
mockNetwork.reset()
})

it('should add the tag gossipsub-mesh-peer upon grafting', async function () {
// Create 20 gossipsub nodes
// Sparsely connect nodes
// Subscribe to the topic, all nodes, waiting for each subscription to propagate first
// Publish 100 messages, each from a random node
// Assert that the subscribed nodes receive every message
psubs = await createComponentsArray({
number: 20,
init: {
scoreParams: {
IPColocationFactorThreshold: 20,
behaviourPenaltyWeight: 0
}
}
})
const topic = 'foobar'

await sparseConnect(psubs)

const nodeAGraftSpy = psubs[0].pubsub as Partial<GossipSub> as SinonStubbedInstance<{
handleGraft: GossipSub['handleGraft']
}>
sinon.spy(nodeAGraftSpy, 'handleGraft')

// add subscriptions to each node
for (const ps of psubs) {
ps.pubsub.subscribe(topic)
// wait for announce to propagate
await delay(100)
}
// await mesh rebalancing
await Promise.all(psubs.map(async (ps) => await awaitEvents(ps.pubsub, 'gossipsub:heartbeat', 2)))

const allTags: Tag[][] = []

psubs.forEach(async (subscribedPeer) => {
const tagArray = await subscribedPeer.components.peerStore.getTags(subscribedPeer.components.peerId)
allTags.push(tagArray)
})

expect(nodeAGraftSpy.handleGraft.callCount).to.be.greaterThan(0)
expect(
allTags.map((tags) => {
tags.map((tag) => {
tag.name.toString()
})
})
).to.include('gossipsub-mesh-peer')
})
it('should remove the tag gossipsub-mesh-peer upon pruning', async function () {
// Create 20 gossipsub nodes
// Sparsely connect nodes
// Subscribe to the topic, all nodes, waiting for each subscription to propagate first
// Publish 100 messages, each from a random node
// Assert that the subscribed nodes receive every message
psubs = await createComponentsArray({
number: 20,
init: {
scoreParams: {
IPColocationFactorThreshold: 20,
behaviourPenaltyWeight: 0
}
}
})
const topic = 'foobar'
psubs.forEach((ps) => ps.pubsub.subscribe(topic))

const unsubscribedPeers: GossipSubAndComponents[] = []

// every node connected to every other
await denseConnect(psubs)
// wait a bit to take effect
await Promise.all(psubs.map(async (ps) => await awaitEvents(ps.pubsub, 'gossipsub:heartbeat', 2)))

// disconnect some peers from the mesh to get some PRUNEs
psubs.slice(0, 5).forEach((ps) => {
unsubscribedPeers.push(ps)
ps.pubsub.unsubscribe(topic)
})

// wait a bit to take effect
await Promise.all(psubs.map(async (ps) => await awaitEvents(ps.pubsub, 'gossipsub:heartbeat', 2)))

const nodeAPruneSpy = unsubscribedPeers[0].pubsub as Partial<GossipSub> as SinonStubbedInstance<{
handlePrune: GossipSub['handlePrune']
}>
sinon.spy(nodeAPruneSpy, 'handlePrune')

// await mesh rebalancing
await Promise.all(psubs.map(async (ps) => await awaitEvents(ps.pubsub, 'gossipsub:heartbeat', 2)))

const allTags: Tag[][] = []

unsubscribedPeers.forEach(async (unsubscribedPeer) => {
const tagArray = await unsubscribedPeer.components.peerStore.getTags(unsubscribedPeer.components.peerId)
console.log('pushing tag array: ', tagArray)
allTags.push(tagArray)
})

expect(nodeAPruneSpy.handlePrune.callCount).to.be.greaterThan(0)
expect(
allTags.map((tags) => {
tags.map((tag) => {
tag.name.toString()
})
})
).not.to.include('gossipsub-mesh-peer')
})

it('test sparse gossipsub', async function () {
// Create 20 gossipsub nodes
// Subscribe to the topic, all nodes
Expand Down Expand Up @@ -924,7 +1035,7 @@ describe('go-libp2p-pubsub gossipsub tests', function () {
// Assert that the nodes are connected
// Subscribe to the topic, all nodes
// Publish a message from each node
// Assert that all nodes receive the messages
// Assert that all nodes receive t he messages
// Disconnect peers
// Assert peers reconnect
// Publish a message from each node
Expand Down
82 changes: 1 addition & 81 deletions test/gossip.spec.ts
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ import { mockNetwork } from '@libp2p/interface-mocks'
import { stubInterface } from 'ts-sinon'
import { Registrar } from '@libp2p/interface-registrar'
import { createEd25519PeerId } from '@libp2p/peer-id-factory'
import { PeerStore, Tag } from '@libp2p/interface-peer-store'
import { PeerStore } from '@libp2p/interface-peer-store'
import { ConnectionManager } from '@libp2p/interface-connection-manager'

describe('gossip', () => {
Expand Down Expand Up @@ -80,86 +80,6 @@ describe('gossip', () => {
nodeASpy.pushGossip.restore()
})

it('should add the tag gossipsub-mesh-peer upon grafting', async function () {
this.timeout(10e4)
const topic = 'Z'
// add subscriptions to each node
nodes.forEach((n) => n.pubsub.subscribe(topic))

// every node connected to every other
await connectAllPubSubNodes(nodes)

// wait for subscriptions to be transmitted
await Promise.all(nodes.map(async (n) => await pEvent(n.pubsub, 'subscription-change')))

// await mesh rebalancing
await Promise.all(nodes.map(async (n) => await pEvent(n.pubsub, 'gossipsub:heartbeat')))

// const allTags: Tag[][] = []

// nodes.forEach(async (subscribedPeer) => {
// allTags.push(await subscribedPeer.components.peerStore.getTags(subscribedPeer.components.peerId))
// })

// // gossip happens during the heartbeat
// await pEvent(nodes[nodes.length - 1].pubsub, 'gossipsub:heartbeat')

// expect(
// allTags.map((tags) => {
// tags.map((tag) => {
// tag.name.toString()
// })
// })
// ).to.include('gossipsub-mesh-peer')

await pEvent(nodes[nodes.length - 1].pubsub, 'gossipsub:heartbeat')

expect(
(await nodes[0].components.peerStore.getTags(nodes[0].components.peerId)).map((tag) => tag.name.toString())
).to.include('gossipsub-mesh-peer')
})

it('should remove the tag gossipsub-mesh-peer upon pruning', async function () {
this.timeout(10e4)
const topic = 'Z'
// add subscriptions to each node
nodes.forEach((n) => n.pubsub.subscribe(topic))

// every node connected to every other
await connectAllPubSubNodes(nodes)

// wait for subscriptions to be transmitted
await Promise.all(nodes.map(async (n) => await pEvent(n.pubsub, 'subscription-change')))

// disconnect some peers from the mesh to get some PRUNEs and the store the disconnected peers
const unsubscribedPeers: GossipSubAndComponents[] = []

nodes.slice(0, 5).forEach((ps) => {
ps.pubsub.unsubscribe(topic)
unsubscribedPeers.push(ps)
})

// await mesh rebalancing
await Promise.all(nodes.map(async (n) => await pEvent(n.pubsub, 'gossipsub:heartbeat')))

const allTags: Tag[][] = []

unsubscribedPeers.forEach(async (unsubscribedPeer) => {
allTags.push(await unsubscribedPeer.components.peerStore.getTags(unsubscribedPeer.components.peerId))
})

// gossip happens during the heartbeat
await pEvent(unsubscribedPeers[4].pubsub, 'gossipsub:heartbeat')

expect(
allTags.map((tags) => {
tags.map((tag) => {
tag.name.toString()
})
})
).to.not.include('gossipsub-mesh-peer')
})

it('should reject incoming messages bigger than maxInboundDataLength limit', async function () {
this.timeout(10e4)
const nodeA = nodes[0]
Expand Down

0 comments on commit e1abad6

Please sign in to comment.