diff --git a/packages/portalnetwork/src/client/client.ts b/packages/portalnetwork/src/client/client.ts index 79a4814c1..9ee1853d1 100644 --- a/packages/portalnetwork/src/client/client.ts +++ b/packages/portalnetwork/src/client/client.ts @@ -12,6 +12,7 @@ import { HistoryNetwork } from '../networks/history/history.js' import { BeaconLightClientNetwork, NetworkId, + NetworkNames, StateNetwork, SyncStrategy, } from '../networks/index.js' @@ -26,6 +27,7 @@ import { TransportLayer } from './types.js' import type { IDiscv5CreateOptions, SignableENRInput } from '@chainsafe/discv5' import type { ITalkReqMessage, ITalkRespMessage } from '@chainsafe/discv5/message' import type { Debugger } from 'debug' +import type * as PromClient from 'prom-client' import type { BaseNetwork } from '../networks/network.js' import type { INodeAddress, @@ -273,8 +275,6 @@ export class PortalNetwork extends EventEmitter { }) if (opts.metrics) { this.metrics = opts.metrics - this.metrics.knownDiscv5Nodes.collect = () => - this.metrics?.knownDiscv5Nodes.set(this.discv5.kadValues().length) this.metrics.currentDBSize.collect = async () => { this.metrics?.currentDBSize.set(await this.db.currentSize()) } @@ -303,6 +303,12 @@ export class PortalNetwork extends EventEmitter { } this.shouldRefresh && network.startRefresh() await network.prune() + if (this.metrics) { + network.on('ContentAdded', async () => { + const metric = (NetworkNames[network.networkId] + '_dbSize') as keyof PortalNetworkMetrics + ;(this.metrics![metric]).set(await network.db.size()) + }) + } } void this.bootstrap() } @@ -329,6 +335,7 @@ export class PortalNetwork extends EventEmitter { await this.db.close() for (const network of this.networks.values()) { network.stopRefresh() + network.removeAllListeners() } } @@ -406,11 +413,12 @@ export class PortalNetwork extends EventEmitter { message: ITalkReqMessage, ) => { this.metrics?.totalBytesReceived.inc(message.request.length) + const network = this.networks.get(bytesToHex(message.protocol) as NetworkId) + if (bytesToHex(message.protocol) === NetworkId.UTPNetwork) { await this.handleUTP(nodeAddress, message, message.request) return } - const network = this.networks.get(bytesToHex(message.protocol) as NetworkId) if (!network) { this.logger(`Received TALKREQ message on unsupported network ${bytesToHex(message.protocol)}`) await this.sendPortalNetworkResponse(nodeAddress, message.id, new Uint8Array()) @@ -418,11 +426,18 @@ export class PortalNetwork extends EventEmitter { return } + if (this.metrics) { + const metric = (NetworkNames[bytesToHex(message.protocol) as NetworkId] + + '_talkReqReceived') as keyof PortalNetworkMetrics + this.metrics[metric].inc() + } await network.handle(message, nodeAddress) } private onTalkResp = (_: any, __: any, message: ITalkRespMessage) => { - this.metrics?.totalBytesReceived.inc(message.response.length) + if (this.metrics) { + this.metrics?.totalBytesReceived.inc(message.response.length) + } } /** diff --git a/packages/portalnetwork/src/client/types.ts b/packages/portalnetwork/src/client/types.ts index e8ef9a5d0..ed6c3bab0 100644 --- a/packages/portalnetwork/src/client/types.ts +++ b/packages/portalnetwork/src/client/types.ts @@ -71,8 +71,6 @@ interface Counter { } export interface PortalNetworkMetrics { totalContentLookups: Counter - knownHistoryNodes: Gauge - knownDiscv5Nodes: Gauge successfulContentLookups: Counter failedContentLookups: Counter offerMessagesSent: Counter diff --git a/packages/portalnetwork/src/networks/network.ts b/packages/portalnetwork/src/networks/network.ts index 4b558187c..d0d4f1e21 100644 --- a/packages/portalnetwork/src/networks/network.ts +++ b/packages/portalnetwork/src/networks/network.ts @@ -26,8 +26,9 @@ import type { OfferMessage, PingMessage, PongMessage, + PortalNetwork, -} from '../index.js' + PortalNetworkMetrics} from '../index.js' import { BasicRadius, ClientInfoAndCapabilities, @@ -37,6 +38,7 @@ import { MAX_PACKET_SIZE, MessageCodes, NetworkId, + NetworkNames, NodeLookup, PingPongErrorCodes, PingPongPayloadExtensions, @@ -49,8 +51,8 @@ import { encodeWithVariantPrefix, generateRandomNodeIdAtDistance, randUint16, - shortId, -} from '../index.js' + + shortId} from '../index.js' import { FoundContent } from '../wire/types.js' import { NetworkDB } from './networkDB.js' @@ -110,11 +112,6 @@ export abstract class BaseNetwork extends EventEmitter { db, logger: this.logger, }) - if (this.portal.metrics) { - this.portal.metrics.knownHistoryNodes.collect = () => { - this.portal.metrics?.knownHistoryNodes.set(this.routingTable.size) - } - } this.gossipManager = new GossipManager(this, gossipCount) } @@ -142,6 +139,10 @@ export abstract class BaseNetwork extends EventEmitter { networkId: NetworkId, utpMessage?: boolean, ): Promise { + if (this.portal.metrics) { + const metric = (this.networkName + '_talkReqSent') as keyof PortalNetworkMetrics + this.portal.metrics[metric].inc() + } try { const res = await this.portal.sendPortalNetworkMessage(enr, payload, networkId, utpMessage) return res @@ -938,6 +939,14 @@ export abstract class BaseNetwork extends EventEmitter { this.logger.extend('bucketRefresh')( `Finished bucket refresh with ${newSize} peers (${newSize - size} new peers)`, ) + if (this.portal.metrics !== undefined) { + const metric = (NetworkNames[this.networkId] + '_peers') as keyof PortalNetworkMetrics + try { + (this.portal.metrics[metric]).set(this.routingTable.size) + } catch (err) { + this.logger.extend('bucketRefresh')(`Error updating ${metric}: ${(err as any).message}`) + } + } } /** diff --git a/packages/portalnetwork/src/networks/types.ts b/packages/portalnetwork/src/networks/types.ts index 5fd430094..f715cd5f6 100644 --- a/packages/portalnetwork/src/networks/types.ts +++ b/packages/portalnetwork/src/networks/types.ts @@ -39,6 +39,10 @@ export enum NetworkId { UTPNetwork = '0x757470', } +export const NetworkNames = Object.fromEntries(Object.entries(NetworkId).map(([n, i]) => [i, n])) + +export type NetworkName = keyof typeof NetworkNames + export type SubNetwork = T extends '0x500a' ? HistoryNetwork : T extends '0x504a' diff --git a/packages/portalnetwork/src/util/config.ts b/packages/portalnetwork/src/util/config.ts index 0d9166b56..4ea03df22 100644 --- a/packages/portalnetwork/src/util/config.ts +++ b/packages/portalnetwork/src/util/config.ts @@ -106,7 +106,7 @@ export const cliConfig = async (args: PortalClientOpts) => { bootnodes.push(bootnode) } } - const metrics = setupMetrics() + const metrics = setupMetrics(networks.map((x) => x.networkId)) const clientConfig: Partial = { config, diff --git a/packages/portalnetwork/src/util/metrics.ts b/packages/portalnetwork/src/util/metrics.ts index d9d46e718..b4947b897 100644 --- a/packages/portalnetwork/src/util/metrics.ts +++ b/packages/portalnetwork/src/util/metrics.ts @@ -1,86 +1,186 @@ -import * as PromClient from 'prom-client' +import { Counter, Gauge, Histogram, Summary } from 'prom-client' -export const setupMetrics = () => { +import { NetworkId, NetworkNames } from '../networks/types.js' + +import type { PortalNetworkMetrics } from '../client/types.js' +import type { Metric } from 'prom-client' + +export enum MetricType { + Counter, + Gauge, + Histogram, + Summary, +} + +const metricTypes = { + [MetricType.Gauge]: Gauge, + [MetricType.Counter]: Counter, + [MetricType.Histogram]: Histogram, + [MetricType.Summary]: Summary, +} + +interface MetricParams { + metric: MetricType + name: string + help: string +} + +const createMetric = ({ metric, name, help }: MetricParams) => { + return (networks: NetworkId[]) => { + const metrics: Record> = {} + for (const network of networks) { + const metricName = NetworkNames[network] + '_' + name + metrics[metricName] = new metricTypes[metric]({ + name: 'ultralight_' + metricName, + help, + }) + } + return metrics + } +} + +const createMetrics = (metrics: MetricParams[], networks: NetworkId[]) => { + let m: Record> = {} + const metricsFunctions = metrics.map(createMetric) + for (const metricFunction of metricsFunctions) { + m = { ...m, ...metricFunction(networks) } + } + return m as Record> +} + +const ultralightMetrics = [ + { + name: 'peers', + metric: MetricType.Gauge, + help: 'how many peers are in the routing table', + }, + { + name: 'dbSize', + metric: MetricType.Gauge, + help: 'how many MBs are currently stored in the db', + }, + { + name: 'talkReqSent', + metric: MetricType.Counter, + help: 'how many talk requests have been sent', + }, + { + name: 'talkReqReceived', + metric: MetricType.Counter, + help: 'how many talk requests have been received', + }, + { + name: 'utpPacketsSent', + metric: MetricType.Counter, + help: 'how many UTP packets have been sent', + }, + { + name: 'utpPacketsReceived', + metric: MetricType.Counter, + help: 'how many UTP packets have been received', + }, + { + name: 'utpStreamsTotal', + metric: MetricType.Gauge, + help: 'how many total UTP streams were opened', + }, + { + name: 'utpWriteStreamsOpened', + metric: MetricType.Gauge, + help: 'how many UTP write streams were opened', + }, + { + name: 'utpWriteStreamsCompleted', + metric: MetricType.Gauge, + help: 'how many UTP write streams were completed', + }, + { + name: 'utpReadStreamsOpened', + metric: MetricType.Gauge, + help: 'how many UTP read streams were opened', + }, + { + name: 'utpReadStreamsCompleted', + metric: MetricType.Gauge, + help: 'how many UTP read streams were completed', + }, +] + +export const setupMetrics = ( + networks: NetworkId[] = [NetworkId.HistoryNetwork], +): PortalNetworkMetrics => { + const metrics = createMetrics(ultralightMetrics, [...networks]) return { - knownDiscv5Nodes: new PromClient.Gauge({ - name: 'ultralight_known_discv5_peers', - help: 'how many peers are in discv5 routing table', - async collect() {}, - }), - knownHistoryNodes: new PromClient.Gauge({ - name: 'ultralight_known_history_peers', - help: 'how many peers are in discv5 routing table', - async collect() {}, - }), - totalContentLookups: new PromClient.Gauge({ + ...metrics, + totalContentLookups: new Gauge({ name: 'ultralight_total_content_lookups', help: 'total number of content lookups initiated', }), - successfulContentLookups: new PromClient.Counter({ + successfulContentLookups: new Counter({ name: 'ultralight_successful_content_lookups', help: 'how many content lookups successfully returned content', }), - failedContentLookups: new PromClient.Counter({ + failedContentLookups: new Counter({ name: 'ultralight_failed_content_lookups', help: 'how many content lookups failed to return content', }), - offerMessagesSent: new PromClient.Counter({ + offerMessagesSent: new Counter({ name: 'ultralight_offer_messages_sent', help: 'how many offer messages have been sent', }), - offerMessagesReceived: new PromClient.Counter({ + offerMessagesReceived: new Counter({ name: 'ultralight_offer_messages_received', help: 'how many offer messages have been received', }), - acceptMessagesSent: new PromClient.Counter({ + acceptMessagesSent: new Counter({ name: 'ultralight_accept_messages_sent', help: 'how many accept messages have been sent', }), - acceptMessagesReceived: new PromClient.Counter({ + acceptMessagesReceived: new Counter({ name: 'ultralight_accept_messages_received', help: 'how many accept messages have been received', }), - findContentMessagesSent: new PromClient.Counter({ + findContentMessagesSent: new Counter({ name: 'ultralight_findContent_messages_sent', help: 'how many findContent messages have been sent', }), - findContentMessagesReceived: new PromClient.Counter({ + findContentMessagesReceived: new Counter({ name: 'ultralight_findContent_messages_received', help: 'how many findContent messages have been received', }), - contentMessagesSent: new PromClient.Counter({ + contentMessagesSent: new Counter({ name: 'ultralight_content_messages_sent', help: 'how many content messages have been sent', }), - contentMessagesReceived: new PromClient.Counter({ + contentMessagesReceived: new Counter({ name: 'ultralight_content_messages_received', help: 'how many content messages have been received', }), - findNodesMessagesSent: new PromClient.Counter({ + findNodesMessagesSent: new Counter({ name: 'ultralight_findNodes_messages_sent', help: 'how many findNodes messages have been sent', }), - findNodesMessagesReceived: new PromClient.Counter({ + findNodesMessagesReceived: new Counter({ name: 'ultralight_findNodes_messages_received', help: 'how many findNodes messages have been received', }), - nodesMessagesSent: new PromClient.Counter({ + nodesMessagesSent: new Counter({ name: 'ultralight_nodes_messages_sent', help: 'how many nodes messages have been sent', }), - nodesMessagesReceived: new PromClient.Counter({ + nodesMessagesReceived: new Counter({ name: 'ultralight_nodes_messages_received', help: 'how many nodes messages have been received', }), - totalBytesReceived: new PromClient.Counter({ + totalBytesReceived: new Counter({ name: 'ultralight_total_bytes_received', help: 'how many bytes have been received in Portal Network message payloads', }), - totalBytesSent: new PromClient.Counter({ + totalBytesSent: new Counter({ name: 'ultralight_total_bytes_sent', help: 'how many bytes have been sent in Portal Network message payloads', }), - currentDBSize: new PromClient.Gauge({ + currentDBSize: new Gauge({ name: 'ultralight_db_size', help: 'how many MBs are currently stored in the db', }), diff --git a/packages/portalnetwork/src/wire/utp/PortalNetworkUtp/index.ts b/packages/portalnetwork/src/wire/utp/PortalNetworkUtp/index.ts index b9a56ba60..8255ff65c 100644 --- a/packages/portalnetwork/src/wire/utp/PortalNetworkUtp/index.ts +++ b/packages/portalnetwork/src/wire/utp/PortalNetworkUtp/index.ts @@ -3,9 +3,11 @@ import type { INewRequest, INodeAddress, PortalNetwork, + PortalNetworkMetrics, } from '../../../index.js' import { NetworkId, + NetworkNames, Packet, PacketType, RequestCode, @@ -89,6 +91,23 @@ export class PortalNetworkUTP { async handleNewRequest(params: INewRequest): Promise { const { contentKeys, enr, connectionId, requestCode } = params + if (this.client.metrics) { + const utpMetric = (NetworkNames[params.networkId] + + '_utpStreamsTotal') as keyof PortalNetworkMetrics + this.client.metrics[utpMetric].inc() + if ( + requestCode === RequestCode.FOUNDCONTENT_WRITE || + requestCode === RequestCode.OFFER_WRITE + ) { + const utpWriteMetric = (NetworkNames[params.networkId] + + '_utpWriteStreamsOpened') as keyof PortalNetworkMetrics + this.client.metrics[utpWriteMetric].inc() + } else { + const utpReadMetric = (NetworkNames[params.networkId] + + '_utpReadStreamsOpened') as keyof PortalNetworkMetrics + this.client.metrics[utpReadMetric].inc() + } + } if (this.requestManagers[enr.nodeId] === undefined) { this.requestManagers[enr.nodeId] = new RequestManager(enr.nodeId, this.logger) } @@ -162,6 +181,11 @@ export class PortalNetworkUTP { async send(enr: ENR | INodeAddress, msg: Uint8Array, networkId: NetworkId) { try { await this.client.sendPortalNetworkMessage(enr, msg, networkId, true) + if (this.client.metrics) { + const utpMetric = (NetworkNames[networkId] + + '_utpPacketsSent') as keyof PortalNetworkMetrics + this.client.metrics[utpMetric].inc() + } } catch (err) { this.logger.extend('error')(`Error sending message to ${enr.nodeId}: ${err}`) this.closeAllPeerRequests(enr.nodeId) diff --git a/packages/portalnetwork/src/wire/utp/Socket/ReadSocket.ts b/packages/portalnetwork/src/wire/utp/Socket/ReadSocket.ts index 275f5e99b..3bd79afe5 100644 --- a/packages/portalnetwork/src/wire/utp/Socket/ReadSocket.ts +++ b/packages/portalnetwork/src/wire/utp/Socket/ReadSocket.ts @@ -1,9 +1,11 @@ +import { NetworkNames } from '../../../networks/types.js' import { UtpSocketType } from '../Packets/index.js' import { ContentReader } from './ContentReader.js' import { UtpSocket } from './UtpSocket.js' import { ConnectionState } from './socketTyping.js' +import type { PortalNetworkMetrics } from '../../../client/types.js' import type { Packet, PacketType, UtpSocketOptions } from '../Packets/index.js' export class ReadSocket extends UtpSocket { @@ -122,5 +124,10 @@ export class ReadSocket extends UtpSocket { this.logger.extend('CLOSE')(`Running compiler.`) return this.compile() } + if (this.utp.client.metrics) { + const utpReadMetric = (NetworkNames[this.networkId] + + '_utpReadStreamsCompleted') as keyof PortalNetworkMetrics + this.utp.client.metrics[utpReadMetric].inc() + } } } diff --git a/packages/portalnetwork/src/wire/utp/Socket/WriteSocket.ts b/packages/portalnetwork/src/wire/utp/Socket/WriteSocket.ts index 161d36327..33efe281a 100644 --- a/packages/portalnetwork/src/wire/utp/Socket/WriteSocket.ts +++ b/packages/portalnetwork/src/wire/utp/Socket/WriteSocket.ts @@ -1,7 +1,9 @@ +import { NetworkNames } from '../../../networks/types.js' import { ConnectionState, ContentWriter, PacketType, UtpSocketType, randUint16 } from '../index.js' import { UtpSocket } from './UtpSocket.js' +import type { PortalNetworkMetrics } from '../../../client/types.js' import type { ICreateData, UtpSocketOptions } from '../index.js' export class WriteSocket extends UtpSocket { @@ -66,6 +68,11 @@ export class WriteSocket extends UtpSocket { return false } close(): void { + if (this.utp.client.metrics) { + const metric = (NetworkNames[this.networkId] + + '_utpWriteStreamsCompleted') as keyof PortalNetworkMetrics + this.utp.client.metrics[metric].inc() + } clearInterval(this.packetManager.congestionControl.timeoutCounter) this.packetManager.congestionControl.removeAllListeners() this._clearTimeout()