From 7ba54fce914e0969e701f9dfd692fdc3ffae07dc Mon Sep 17 00:00:00 2001 From: Scotty <66335769+ScottyPoi@users.noreply.github.com> Date: Mon, 15 Jul 2024 16:33:40 -0600 Subject: [PATCH] State-interop fixes (#602) * state: implement findNodes RPC method * RPC: remove 33byte expectation for contentKeys * RPC: fix listener in state_findContent * portalnetwork: hacky fix for network differences in contentAdded event * state: fixes * gossip: parallel gossipContent * fix integration test --- packages/cli/src/rpc/modules/portal.ts | 26 ++++++++- packages/cli/src/rpc/validators.ts | 6 -- .../src/networks/contentLookup.ts | 16 ++---- .../portalnetwork/src/networks/network.ts | 56 ++++++++++++------- .../portalnetwork/src/networks/state/state.ts | 2 +- .../src/networks/state/statedb.ts | 7 +-- .../portalnetwork/src/networks/state/util.ts | 14 ++++- .../test/integration/state.spec.ts | 10 ++-- 8 files changed, 87 insertions(+), 50 deletions(-) diff --git a/packages/cli/src/rpc/modules/portal.ts b/packages/cli/src/rpc/modules/portal.ts index d99f95cea..afcd620d1 100644 --- a/packages/cli/src/rpc/modules/portal.ts +++ b/packages/cli/src/rpc/modules/portal.ts @@ -143,6 +143,10 @@ export class portal { [validators.enr], [validators.array(validators.distance)], ]) + this.stateFindNodes = middleware(this.stateFindNodes.bind(this), 2, [ + [validators.enr], + [validators.array(validators.distance)], + ]) this.historySendFindNodes = middleware(this.historySendFindNodes.bind(this), 2, [ [validators.dstId], [validators.array(validators.distance)], @@ -532,6 +536,26 @@ export class portal { this.logger(enrs) return res?.enrs.map((v) => ENR.decode(v).encodeTxt()) } + async stateFindNodes(params: [string, number[]]) { + const [enr, distances] = params + const dstId = ENR.decodeTxt(enr).nodeId + this.logger(`stateFindNodes request received with these distances [${distances.toString()}]`) + this.logger(`sending stateFindNodes request to ${shortId(dstId)}`) + if (!isValidId(dstId)) { + return { + code: INVALID_PARAMS, + message: 'invalid node id', + } + } + const res = await this._state.sendFindNodes(enr, distances) + if (!res) { + return [] + } + const enrs = res?.enrs.map((v) => ENR.decode(v).encodeTxt()) + this.logger(`stateFindNodes request returned ${res?.total} enrs:`) + this.logger(enrs) + return res?.enrs.map((v) => ENR.decode(v).encodeTxt()) + } async historySendFindNodes(params: [string, number[]]) { const [dstId, distances] = params this.logger(`portal_historySendFindNodes`) @@ -691,7 +715,7 @@ export class portal { this._state.on( 'ContentAdded', (hash: string, _contentType: StateNetworkContentType, value: Uint8Array) => { - if (hash.slice(2) === contentKey.slice(4)) { + if (hash === contentKey) { clearTimeout(timeout) resolve(value) } diff --git a/packages/cli/src/rpc/validators.ts b/packages/cli/src/rpc/validators.ts index f6f275666..41562b738 100644 --- a/packages/cli/src/rpc/validators.ts +++ b/packages/cli/src/rpc/validators.ts @@ -186,12 +186,6 @@ export const validators = { message: `invalid argument ${index}: hex string without 0x prefix`, } } - if (params[index].length !== 68) { - return { - code: INVALID_PARAMS, - message: `invalid argument ${index}: content key must be 33 bytes`, - } - } } }, diff --git a/packages/portalnetwork/src/networks/contentLookup.ts b/packages/portalnetwork/src/networks/contentLookup.ts index ff08a7b87..f0080dabc 100644 --- a/packages/portalnetwork/src/networks/contentLookup.ts +++ b/packages/portalnetwork/src/networks/contentLookup.ts @@ -5,7 +5,6 @@ import { hexToBytes, short } from '@ethereumjs/util' import { serializedContentKeyToContentId, shortId } from '../util/index.js' -import { HistoryNetworkContentType } from './history/types.js' import { getContentKey } from './history/util.js' import type { BaseNetwork } from './network.js' @@ -118,16 +117,13 @@ export class ContentLookup { peer.hasContent = true return new Promise((resolve) => { let timeout: any = undefined - const utpDecoder = ( - contentKey: string, - contentType: HistoryNetworkContentType, - content: Uint8Array, - ) => { + const utpDecoder = (contentKey: string, contentType: number, content: Uint8Array) => { const _contentKey = getContentKey(contentType, fromHexString(contentKey)) - if (_contentKey === toHexString(this.contentKey)) { - this.logger( - `Received content for this contentType: ${HistoryNetworkContentType[contentType]} + contentKey: ${toHexString(this.contentKey)}`, - ) + if ( + contentKey === toHexString(this.contentKey) || + _contentKey === toHexString(this.contentKey) + ) { + this.logger(`Received content for this contentKey: ${toHexString(this.contentKey)}`) this.network.removeListener('ContentAdded', utpDecoder) clearTimeout(timeout) this.content = { content, utp: true } diff --git a/packages/portalnetwork/src/networks/network.ts b/packages/portalnetwork/src/networks/network.ts index a61c7bc28..1f0a877a5 100644 --- a/packages/portalnetwork/src/networks/network.ts +++ b/packages/portalnetwork/src/networks/network.ts @@ -828,32 +828,48 @@ export abstract class BaseNetwork extends EventEmitter { selector: MessageCodes.OFFER, value: offerMsg, }) + const offered = await Promise.allSettled( + peers.map(async (peer) => { + this.logger.extend(`gossipContent`)( + `Offering ${toHexString(contentKey)} to ${shortId(peer.nodeId)}`, + ) + const res = await this.sendMessage(peer, payload, this.networkId) + return [peer, res] + }), + ) let accepted = 0 - for (const peer of peers) { - const res = await this.sendMessage(peer, payload, this.networkId) - if (res.length > 0) { - try { - const decoded = PortalWireMessageType.deserialize(res) - if (decoded.selector === MessageCodes.ACCEPT) { - const msg = decoded.value as AcceptMessage - if (msg.contentKeys.get(0) === true) { - accepted++ - const id = new DataView(msg.connectionId.buffer).getUint16(0, false) - await this.handleNewRequest({ - networkId: this.networkId, - contentKeys: [contentKey], - peerId: peer.nodeId, - connectionId: id, - requestCode: RequestCode.OFFER_WRITE, - contents: [encodeWithVariantPrefix([content])], - }) + for (const offer of offered) { + if (offer.status === 'fulfilled') { + const [peer, res] = offer.value as [ENR, Uint8Array] + if (res.length > 0) { + try { + const decoded = PortalWireMessageType.deserialize(res) + if (decoded.selector === MessageCodes.ACCEPT) { + const msg = decoded.value as AcceptMessage + if (msg.contentKeys.get(0) === true) { + this.logger.extend(`gossipContent`)( + `${toHexString(contentKey)} accepted by ${shortId(peer.nodeId)}`, + ) + accepted++ + this.logger.extend(`gossipContent`)(`accepted: ${accepted}`) + const id = new DataView(msg.connectionId.buffer).getUint16(0, false) + void this.handleNewRequest({ + networkId: this.networkId, + contentKeys: [contentKey], + peerId: peer.nodeId, + connectionId: id, + requestCode: RequestCode.OFFER_WRITE, + contents: [encodeWithVariantPrefix([content])], + }) + } } + } catch { + /** Noop */ } - } catch { - /** Noop */ } } } + this.logger.extend(`gossipContent`)(`total: accepted: ${accepted}`) return accepted } diff --git a/packages/portalnetwork/src/networks/state/state.ts b/packages/portalnetwork/src/networks/state/state.ts index e84ae7660..caa50dfcf 100644 --- a/packages/portalnetwork/src/networks/state/state.ts +++ b/packages/portalnetwork/src/networks/state/state.ts @@ -43,7 +43,7 @@ export class StateNetwork extends BaseNetwork { super({ client, db, radius, maxStorage, networkId: NetworkId.StateNetwork }) this.networkId = NetworkId.StateNetwork this.logger = debug(this.enr.nodeId.slice(0, 5)).extend('Portal').extend('StateNetwork') - this.stateDB = new StateDB(client.db.db) + this.stateDB = new StateDB(this.db.db) this.routingTable.setLogger(this.logger) } diff --git a/packages/portalnetwork/src/networks/state/statedb.ts b/packages/portalnetwork/src/networks/state/statedb.ts index 327b73127..1e9cba308 100644 --- a/packages/portalnetwork/src/networks/state/statedb.ts +++ b/packages/portalnetwork/src/networks/state/statedb.ts @@ -33,14 +33,11 @@ export class StateDB { * @returns true if content is stored successfully */ async storeContent(contentKey: Uint8Array, content: Uint8Array) { + const log = this.logger.extend('storeContent') + log(`called with contentKey: ${toHexString(contentKey)} and content: [${content.length} bytes]`) const dbKey = getDatabaseKey(contentKey) const dbContent = getDatabaseContent(keyType(contentKey), content) await this.db.put(dbKey, dbContent) - this.logger( - `storeContent (${content.length}) bytes: \ncontentKey: ${toHexString( - contentKey, - )}\ndbKey: 0x${dbKey}\ndbSize: ${(await this.keys()).length}`, - ) return true } diff --git a/packages/portalnetwork/src/networks/state/util.ts b/packages/portalnetwork/src/networks/state/util.ts index 9e48e0f39..4f58217a0 100644 --- a/packages/portalnetwork/src/networks/state/util.ts +++ b/packages/portalnetwork/src/networks/state/util.ts @@ -9,9 +9,11 @@ import { AccountTrieNodeKey, AccountTrieNodeRetrieval, ContractCodeKey, + ContractCodeOffer, ContractRetrieval, StateNetworkContentType, StorageTrieNodeKey, + StorageTrieNodeOffer, StorageTrieNodeRetrieval, } from './types.js' @@ -201,10 +203,18 @@ export function getDatabaseContent(type: StateNetworkContentType, content: Uint8 dbContent = AccountTrieNodeRetrieval.deserialize(content).node break case StateNetworkContentType.ContractTrieNode: - dbContent = StorageTrieNodeRetrieval.deserialize(content).node + try { + dbContent = StorageTrieNodeRetrieval.deserialize(content).node + } catch { + dbContent = StorageTrieNodeOffer.deserialize(content).storageProof.slice(-1)[0] + } break case StateNetworkContentType.ContractByteCode: - dbContent = ContractRetrieval.deserialize(content).code + try { + dbContent = ContractCodeOffer.deserialize(content).code + } catch { + dbContent = ContractRetrieval.deserialize(content).code + } break } return bytesToUnprefixedHex(dbContent) diff --git a/packages/portalnetwork/test/integration/state.spec.ts b/packages/portalnetwork/test/integration/state.spec.ts index 695ddb855..3801d2149 100644 --- a/packages/portalnetwork/test/integration/state.spec.ts +++ b/packages/portalnetwork/test/integration/state.spec.ts @@ -91,10 +91,10 @@ describe('AccountTrieNode Gossip / Request', async () => { await new Promise((r) => setTimeout(r, 200)) const storedInNode1: Set = new Set() const storedInNode2: Set = new Set() - for await (const key of node1.db.db.keys()) { + for await (const key of network1.db.db.keys()) { storedInNode1.add(key) } - for await (const key of node2.db.db.keys()) { + for await (const key of network2.db.db.keys()) { storedInNode2.add(key) } @@ -113,7 +113,7 @@ describe('AccountTrieNode Gossip / Request', async () => { expect(requested?.value).instanceOf(Uint8Array) assert.deepEqual(requested!.value, expected, 'retrieved value is correct') }) - for await (const key of node2.db.db.keys()) { + for await (const key of network2.db.db.keys()) { storedInNode2.add(key) } it('should store some nodes in node2', async () => { @@ -192,9 +192,9 @@ describe('getAccount via network', async () => { expect(result.gossipCount).toEqual(3) }) const storedInNodes = await Promise.all( - clients.map(async (client) => { + networks.map(async (network) => { const stored: Set = new Set() - for await (const key of client.db.db.keys()) { + for await (const key of network.db.db.keys()) { stored.add(key) } return stored