Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add metrics and revamp logic #206

Merged
merged 50 commits into from
Apr 5, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
50 commits
Select commit Hold shift + click to select a range
5162d70
Add metrics and revamp logic
dapplion Mar 8, 2022
92ca5cb
Compile tests
dapplion Mar 15, 2022
515a55e
Fix tests
dapplion Mar 15, 2022
8b6378b
Customize await policy
dapplion Mar 15, 2022
a72e4ed
Run prettier on files
dapplion Mar 15, 2022
6b37825
Relax heartbeat test condition
dapplion Mar 15, 2022
1aeae56
Re-add util package for browser testing
dapplion Mar 15, 2022
665d049
Remove unused dependencies
dapplion Mar 16, 2022
13e510d
Add getMeshPeers
dapplion Mar 16, 2022
68e81e8
Fix rpc metric name
dapplion Mar 16, 2022
a40e9a9
Improve metrics
dapplion Mar 17, 2022
79e934b
More gossip promise metrics
dapplion Mar 17, 2022
109c5a1
Fix metrics typo
dapplion Mar 17, 2022
c5f60d7
Clarify metrics names
dapplion Mar 17, 2022
d35cc6e
Track metric of score deltas
dapplion Mar 17, 2022
ea7c285
Fix score metrics
dapplion Mar 17, 2022
6103695
Add buckets for gossipsub_score_cache_delta
dapplion Mar 17, 2022
475bfda
Review PeerScore logic
dapplion Mar 17, 2022
af59ed4
Dump peer stats
dapplion Mar 17, 2022
4f9e11c
Fix PeerStats constructor
dapplion Mar 17, 2022
130c16a
Enable strict typescript checks
dapplion Mar 17, 2022
d70e923
Fix compute-score logic
dapplion Mar 18, 2022
f9b587c
f
dapplion Mar 18, 2022
16568f0
Fix peer stats types
dapplion Mar 18, 2022
51917ce
Reenable go-gossipsub tests (#201)
twoeths Mar 18, 2022
6fd5811
Fix test types
dapplion Mar 18, 2022
8b4c0d1
Set as connected in addPeer()
dapplion Mar 18, 2022
7ef0e8f
Prune publishedMessageIds
dapplion Mar 19, 2022
cbbb790
Fix markFirstMessageDelivery typo
dapplion Mar 20, 2022
8419b56
Same logic in scoreMetrics
dapplion Mar 20, 2022
d469111
More metrics for p3 and p7 (#213)
twoeths Mar 22, 2022
2ca29fa
Forward messages to floodsub peers (#214)
twoeths Mar 23, 2022
2cc6be0
Add missing msgId in validateReceivedMessage (#215)
twoeths Mar 23, 2022
575199c
Fix 'test gossipsub opportunistic grafting'
twoeths Mar 23, 2022
8a75b41
Fix tests in browser
twoeths Mar 23, 2022
535ca58
Fix tests suspended issue in browsers
twoeths Mar 23, 2022
70c3528
Merge code from master
twoeths Mar 23, 2022
ca7733f
Fix 'test gossipsub fanout expiry' go-gossipsub test
twoeths Mar 23, 2022
92b40f9
GossipsubIWantFollowupTime as a param (#216)
twoeths Mar 24, 2022
6b99dbf
Increase resolution of delay metrics (#217)
dapplion Mar 24, 2022
61f64d1
Fix minMeshMessageDeliveriesWindow (#218)
twoeths Mar 25, 2022
14e9d6b
Use maxMeshMessageDeliveriesWindowSec for metric
twoeths Mar 25, 2022
0514022
Rename gossipsubIWantFollowupMs option, revert/correct tracer.prune()…
twoeths Mar 25, 2022
21a79e9
Reset behaviourPenalty histogram to track current count
dapplion Mar 25, 2022
2bcf74f
publish(): return number of sent peers
twoeths Mar 25, 2022
b4dbc07
Add getScore()
twoeths Mar 27, 2022
949a8f3
makePrune: update PeerStat so that we don't apply p3 penalty
twoeths Mar 29, 2022
ba0755e
Remove redundant this.score.prune() in heartbeat
twoeths Mar 31, 2022
9e93427
Track iasked cache size per heartbeat and remove TODOs
twoeths Mar 31, 2022
b8693c5
validateReceivedMessage: check duplicate message first (#223)
twoeths Apr 4, 2022
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 4 additions & 1 deletion .eslintrc.js
Original file line number Diff line number Diff line change
Expand Up @@ -29,8 +29,9 @@ module.exports = {
}
],
'@typescript-eslint/indent': 'off', // This is the job of StandardJS, they are competing rules so we turn off the Typescript one.
'node/no-unsupported-features/es-syntax': 'off', // Allows us to use Import and Export keywords.
'@typescript-eslint/no-non-null-assertion': 'off',
'@typescript-eslint/member-delimiter-style': 'off',
'node/no-unsupported-features/es-syntax': 'off', // Allows us to use Import and Export keywords.
"@typescript-eslint/strict-boolean-expressions": [
"error",
{
Expand All @@ -42,6 +43,8 @@ module.exports = {
'no-mixed-operators': 'off',
'space-before-function-paren': 'off',
'comma-dangle': 'off',
// Allow to place comments before the else {} block
'brace-style': 'off',
indent: 'off'
}
}
13 changes: 8 additions & 5 deletions package.json
Original file line number Diff line number Diff line change
Expand Up @@ -44,8 +44,11 @@
"debug": "^4.3.1",
"denque": "^1.5.0",
"err-code": "^3.0.1",
"iso-random-stream": "^2.0.2",
"it-pipe": "^1.1.0",
"libp2p-interfaces": "^4.0.4",
"libp2p-crypto": "^0.21.2",
"libp2p-interfaces": "4.0.4",
"multiformats": "^9.6.4",
"peer-id": "^0.16.0",
"protobufjs": "^6.11.2",
"uint8arrays": "^3.0.0"
Expand Down Expand Up @@ -75,10 +78,10 @@
"eslint-plugin-standard": "^4.0.1",
"it-pair": "^1.0.0",
"libp2p": "0.36.1",
"libp2p-floodsub": "^0.29.0",
"libp2p-interfaces-compliance-tests": "^4.0.6",
"libp2p-mplex": "^0.10.3",
"libp2p-websockets": "^0.16.1",
"libp2p-floodsub": "^0.29.1",
"libp2p-interfaces-compliance-tests": "^4.0.8",
"libp2p-mplex": "^0.10.7",
"libp2p-websockets": "^0.16.2",
"lodash": "^4.17.15",
"multiaddr": "^10.0.0",
"os": "^0.1.1",
Expand Down
128 changes: 79 additions & 49 deletions test/2-nodes.spec.ts
Original file line number Diff line number Diff line change
@@ -1,10 +1,12 @@
import chai from 'chai'
import { fromString as uint8ArrayFromString } from 'uint8arrays/from-string'
import delay from 'delay'
import Gossipsub, { multicodec } from '../ts'
import { createGossipsubs, createConnectedGossipsubs, expectSet, stopNode, first } from './utils'
import Gossipsub from '../ts'
import { createGossipsubs, createPubsubs, createConnectedGossipsubs, expectSet, stopNode, first } from './utils'
import { RPC } from '../ts/message/rpc'
import { InMessage, PeerId } from 'libp2p-interfaces/src/pubsub'
import PubsubBaseProtocol, { PeerId } from 'libp2p-interfaces/src/pubsub'
import { FloodsubID, GossipsubIDv11 } from '../ts/constants'
import { GossipsubMessage } from '../ts/types'

chai.use(require('dirty-chai'))
chai.use(require('chai-spies'))
Expand All @@ -13,6 +15,28 @@ const expect = chai.expect
const shouldNotHappen = () => expect.fail()

describe('2 nodes', () => {
describe('Pubsub dial', () => {
let nodes: PubsubBaseProtocol[]

// Create pubsub nodes
before(async () => {
nodes = await createPubsubs({ number: 2 })
})

after(() => Promise.all(nodes.map(stopNode)))

it('Dial from nodeA to nodeB happened with pubsub', async () => {
await nodes[0]._libp2p.dialProtocol(nodes[1]._libp2p.peerId, FloodsubID)

while (nodes[0]['peers'].size === 0 || nodes[1]['peers'].size === 0) {
await delay(10)
}

expect(nodes[0]['peers'].size).to.be.eql(1)
expect(nodes[1]['peers'].size).to.be.eql(1)
})
})

describe('basics', () => {
let nodes: Gossipsub[] = []

Expand All @@ -24,15 +48,14 @@ describe('2 nodes', () => {
after(() => Promise.all(nodes.map(stopNode)))

it('Dial from nodeA to nodeB happened with pubsub', async () => {
await nodes[0]._libp2p.dialProtocol(nodes[1]._libp2p.peerId, multicodec)
await delay(10)
await Promise.all([
new Promise((resolve) => nodes[0].once('gossipsub:heartbeat', resolve)),
new Promise((resolve) => nodes[1].once('gossipsub:heartbeat', resolve))
])
await nodes[0]._libp2p.dialProtocol(nodes[1]._libp2p.peerId, GossipsubIDv11)

expect(nodes[0].peers.size).to.be.eql(1)
expect(nodes[1].peers.size).to.be.eql(1)
while (nodes[0]['peers'].size === 0 || nodes[1]['peers'].size === 0) {
await delay(10)
}

expect(nodes[0]['peers'].size).to.be.eql(1)
expect(nodes[1]['peers'].size).to.be.eql(1)
})
})

Expand All @@ -47,7 +70,14 @@ describe('2 nodes', () => {
after(() => Promise.all(nodes.map(stopNode)))

it('Subscribe to a topic', async () => {
const topic = 'Z'
const topic = 'test_topic'

// await subscription change, after calling subscribe
const subscriptionEventPromise = Promise.all([
new Promise((resolve) => nodes[0].once('pubsub:subscription-change', (...args) => resolve(args))),
new Promise((resolve) => nodes[1].once('pubsub:subscription-change', (...args) => resolve(args)))
])

nodes[0].subscribe(topic)
nodes[1].subscribe(topic)

Expand All @@ -61,14 +91,14 @@ describe('2 nodes', () => {

const [changedPeerId, changedSubs] = evt0 as [PeerId, RPC.ISubOpts[]]

expectSet(nodes[0].subscriptions, [topic])
expectSet(nodes[1].subscriptions, [topic])
expect(nodes[0].peers.size).to.equal(1)
expect(nodes[1].peers.size).to.equal(1)
expectSet(nodes[0].topics.get(topic), [nodes[1].peerId.toB58String()])
expectSet(nodes[1].topics.get(topic), [nodes[0].peerId.toB58String()])
expectSet(nodes[0]['subscriptions'], [topic])
expectSet(nodes[1]['subscriptions'], [topic])
expect(nodes[0]['peers'].size).to.equal(1)
expect(nodes[1]['peers'].size).to.equal(1)
expectSet(nodes[0]['topics'].get(topic), [nodes[1].peerId.toB58String()])
expectSet(nodes[1]['topics'].get(topic), [nodes[0].peerId.toB58String()])

expect(changedPeerId.toB58String()).to.equal(first(nodes[0].peers).id.toB58String())
expect(changedPeerId.toB58String()).to.equal(first(nodes[0]['peers']).id.toB58String())
expect(changedSubs).to.have.lengthOf(1)
expect(changedSubs[0].topicID).to.equal(topic)
expect(changedSubs[0].subscribe).to.equal(true)
Expand All @@ -79,8 +109,8 @@ describe('2 nodes', () => {
new Promise((resolve) => nodes[1].once('gossipsub:heartbeat', resolve))
])

expect(first(nodes[0].mesh.get(topic))).to.equal(first(nodes[0].peers).id.toB58String())
expect(first(nodes[1].mesh.get(topic))).to.equal(first(nodes[1].peers).id.toB58String())
expect(first(nodes[0]['mesh'].get(topic))).to.equal(first(nodes[0]['peers']).id.toB58String())
expect(first(nodes[1]['mesh'].get(topic))).to.equal(first(nodes[1]['peers']).id.toB58String())
})
})

Expand Down Expand Up @@ -109,29 +139,29 @@ describe('2 nodes', () => {
afterEach(() => Promise.all(nodes.map(stopNode)))

it('Publish to a topic - nodeA', async () => {
const promise = new Promise<InMessage>((resolve) => nodes[1].once(topic, resolve))
const promise = new Promise<GossipsubMessage>((resolve) => nodes[1].once(topic, resolve))
nodes[0].once(topic, (m) => shouldNotHappen)

nodes[0].publish(topic, uint8ArrayFromString('hey'))

const msg = await promise

expect(msg.data.toString()).to.equal('hey')
expect(msg.from).to.be.eql(nodes[0].peerId.toB58String())
expect(msg.from).to.be.eql(nodes[0].peerId.toBytes())

nodes[0].removeListener(topic, shouldNotHappen)
})

it('Publish to a topic - nodeB', async () => {
const promise = new Promise<InMessage>((resolve) => nodes[0].once(topic, resolve))
const promise = new Promise<GossipsubMessage>((resolve) => nodes[0].once(topic, resolve))
nodes[1].once(topic, shouldNotHappen)

nodes[1].publish(topic, uint8ArrayFromString('banana'))

const msg = await promise

expect(msg.data.toString()).to.equal('banana')
expect(msg.from).to.be.eql(nodes[1].peerId.toB58String())
expect(msg.from).to.be.eql(nodes[1].peerId.toBytes())

nodes[1].removeListener(topic, shouldNotHappen)
})
Expand All @@ -143,11 +173,11 @@ describe('2 nodes', () => {

nodes[0].on(topic, receivedMsg)

function receivedMsg(msg: InMessage) {
expect(msg.data.toString().startsWith('banana')).to.be.true
expect(msg.from).to.be.eql(nodes[1].peerId.toB58String())
function receivedMsg(msg: RPC.IMessage) {
expect(msg.data!.toString().startsWith('banana')).to.be.true
expect(msg.from).to.be.eql(nodes[1].peerId.toBytes())
expect(msg.seqno).to.be.a('Uint8Array')
expect(msg.topicIDs).to.be.eql([topic])
expect(msg.topic).to.be.eql(topic)

if (++counter === 10) {
nodes[0].removeListener(topic, receivedMsg)
Expand All @@ -168,7 +198,7 @@ describe('2 nodes', () => {

// Create pubsub nodes
beforeEach(async () => {
nodes = await createConnectedGossipsubs({ number: 2 })
nodes = await createConnectedGossipsubs({ number: 2, options: {allowPublishToZeroPeers: true} })
})

// Create subscriptions
Expand All @@ -188,16 +218,16 @@ describe('2 nodes', () => {

it('Unsubscribe from a topic', async () => {
nodes[0].unsubscribe(topic)
expect(nodes[0].subscriptions.size).to.equal(0)
expect(nodes[0]['subscriptions'].size).to.equal(0)

const [changedPeerId, changedSubs] = await new Promise<[PeerId, RPC.ISubOpts[]]>((resolve) => {
nodes[1].once('pubsub:subscription-change', (...args: [PeerId, RPC.ISubOpts[]]) => resolve(args))
})
await new Promise((resolve) => nodes[1].once('gossipsub:heartbeat', resolve))

expect(nodes[1].peers.size).to.equal(1)
expectSet(nodes[1].topics.get(topic), [])
expect(changedPeerId.toB58String()).to.equal(first(nodes[1].peers).id.toB58String())
expect(nodes[1]['peers'].size).to.equal(1)
expectSet(nodes[1]['topics'].get(topic), [])
expect(changedPeerId.toB58String()).to.equal(first(nodes[1]['peers']).id.toB58String())
expect(changedSubs).to.have.lengthOf(1)
expect(changedSubs[0].topicID).to.equal(topic)
expect(changedSubs[0].subscribe).to.equal(false)
Expand Down Expand Up @@ -245,10 +275,10 @@ describe('2 nodes', () => {
nodes[0].subscribe('Za')
nodes[1].subscribe('Zb')

expect(nodes[0].peers.size).to.equal(0)
expectSet(nodes[0].subscriptions, ['Za'])
expect(nodes[1].peers.size).to.equal(0)
expectSet(nodes[1].subscriptions, ['Zb'])
expect(nodes[0]['peers'].size).to.equal(0)
expectSet(nodes[0]['subscriptions'], ['Za'])
expect(nodes[1]['peers'].size).to.equal(0)
expectSet(nodes[1]['subscriptions'], ['Zb'])
})

after(() => Promise.all(nodes.map(stopNode)))
Expand All @@ -257,20 +287,20 @@ describe('2 nodes', () => {
this.timeout(5000)

await Promise.all([
nodes[0]._libp2p.dialProtocol(nodes[1]._libp2p.peerId, multicodec),
nodes[0]._libp2p.dialProtocol(nodes[1]._libp2p.peerId, GossipsubIDv11),
new Promise((resolve) => nodes[0].once('pubsub:subscription-change', resolve)),
new Promise((resolve) => nodes[1].once('pubsub:subscription-change', resolve))
])
expect(nodes[0].peers.size).to.equal(1)
expect(nodes[1].peers.size).to.equal(1)
expect(nodes[0]['peers'].size).to.equal(1)
expect(nodes[1]['peers'].size).to.equal(1)

expectSet(nodes[0].subscriptions, ['Za'])
expect(nodes[1].peers.size).to.equal(1)
expectSet(nodes[1].topics.get('Za'), [nodes[0].peerId.toB58String()])
expectSet(nodes[0]['subscriptions'], ['Za'])
expect(nodes[1]['peers'].size).to.equal(1)
expectSet(nodes[1]['topics'].get('Za'), [nodes[0].peerId.toB58String()])

expectSet(nodes[1].subscriptions, ['Zb'])
expect(nodes[0].peers.size).to.equal(1)
expectSet(nodes[0].topics.get('Zb'), [nodes[1].peerId.toB58String()])
expectSet(nodes[1]['subscriptions'], ['Zb'])
expect(nodes[0]['peers'].size).to.equal(1)
expectSet(nodes[0]['topics'].get('Zb'), [nodes[1].peerId.toB58String()])
})
})

Expand All @@ -284,8 +314,8 @@ describe('2 nodes', () => {

it("nodes don't have peers after stopped", async () => {
await Promise.all(nodes.map(stopNode))
expect(nodes[0].peers.size).to.equal(0)
expect(nodes[1].peers.size).to.equal(0)
expect(nodes[0]['peers'].size).to.equal(0)
expect(nodes[1]['peers'].size).to.equal(0)
})
})
})
Loading