Skip to content

Commit

Permalink
State-interop fixes (#602)
Browse files Browse the repository at this point in the history
* state: implement findNodes RPC method

* RPC: remove 33byte expectation for contentKeys

* RPC: fix listener in state_findContent

* portalnetwork: hacky fix for network differences in contentAdded event

* state: fixes

* gossip: parallel gossipContent

* fix integration test
  • Loading branch information
ScottyPoi authored Jul 15, 2024
1 parent 30eefe9 commit 7ba54fc
Show file tree
Hide file tree
Showing 8 changed files with 87 additions and 50 deletions.
26 changes: 25 additions & 1 deletion packages/cli/src/rpc/modules/portal.ts
Original file line number Diff line number Diff line change
Expand Up @@ -143,6 +143,10 @@ export class portal {
[validators.enr],
[validators.array(validators.distance)],
])
this.stateFindNodes = middleware(this.stateFindNodes.bind(this), 2, [
[validators.enr],
[validators.array(validators.distance)],
])
this.historySendFindNodes = middleware(this.historySendFindNodes.bind(this), 2, [
[validators.dstId],
[validators.array(validators.distance)],
Expand Down Expand Up @@ -532,6 +536,26 @@ export class portal {
this.logger(enrs)
return res?.enrs.map((v) => ENR.decode(v).encodeTxt())
}
async stateFindNodes(params: [string, number[]]) {
const [enr, distances] = params
const dstId = ENR.decodeTxt(enr).nodeId
this.logger(`stateFindNodes request received with these distances [${distances.toString()}]`)
this.logger(`sending stateFindNodes request to ${shortId(dstId)}`)
if (!isValidId(dstId)) {
return {
code: INVALID_PARAMS,
message: 'invalid node id',
}
}
const res = await this._state.sendFindNodes(enr, distances)
if (!res) {
return []
}
const enrs = res?.enrs.map((v) => ENR.decode(v).encodeTxt())
this.logger(`stateFindNodes request returned ${res?.total} enrs:`)
this.logger(enrs)
return res?.enrs.map((v) => ENR.decode(v).encodeTxt())
}
async historySendFindNodes(params: [string, number[]]) {
const [dstId, distances] = params
this.logger(`portal_historySendFindNodes`)
Expand Down Expand Up @@ -691,7 +715,7 @@ export class portal {
this._state.on(
'ContentAdded',
(hash: string, _contentType: StateNetworkContentType, value: Uint8Array) => {
if (hash.slice(2) === contentKey.slice(4)) {
if (hash === contentKey) {
clearTimeout(timeout)
resolve(value)
}
Expand Down
6 changes: 0 additions & 6 deletions packages/cli/src/rpc/validators.ts
Original file line number Diff line number Diff line change
Expand Up @@ -186,12 +186,6 @@ export const validators = {
message: `invalid argument ${index}: hex string without 0x prefix`,
}
}
if (params[index].length !== 68) {
return {
code: INVALID_PARAMS,
message: `invalid argument ${index}: content key must be 33 bytes`,
}
}
}
},

Expand Down
16 changes: 6 additions & 10 deletions packages/portalnetwork/src/networks/contentLookup.ts
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,6 @@ import { hexToBytes, short } from '@ethereumjs/util'

import { serializedContentKeyToContentId, shortId } from '../util/index.js'

import { HistoryNetworkContentType } from './history/types.js'
import { getContentKey } from './history/util.js'

import type { BaseNetwork } from './network.js'
Expand Down Expand Up @@ -118,16 +117,13 @@ export class ContentLookup {
peer.hasContent = true
return new Promise((resolve) => {
let timeout: any = undefined
const utpDecoder = (
contentKey: string,
contentType: HistoryNetworkContentType,
content: Uint8Array,
) => {
const utpDecoder = (contentKey: string, contentType: number, content: Uint8Array) => {
const _contentKey = getContentKey(contentType, fromHexString(contentKey))
if (_contentKey === toHexString(this.contentKey)) {
this.logger(
`Received content for this contentType: ${HistoryNetworkContentType[contentType]} + contentKey: ${toHexString(this.contentKey)}`,
)
if (
contentKey === toHexString(this.contentKey) ||
_contentKey === toHexString(this.contentKey)
) {
this.logger(`Received content for this contentKey: ${toHexString(this.contentKey)}`)
this.network.removeListener('ContentAdded', utpDecoder)
clearTimeout(timeout)
this.content = { content, utp: true }
Expand Down
56 changes: 36 additions & 20 deletions packages/portalnetwork/src/networks/network.ts
Original file line number Diff line number Diff line change
Expand Up @@ -828,32 +828,48 @@ export abstract class BaseNetwork extends EventEmitter {
selector: MessageCodes.OFFER,
value: offerMsg,
})
const offered = await Promise.allSettled(
peers.map(async (peer) => {
this.logger.extend(`gossipContent`)(
`Offering ${toHexString(contentKey)} to ${shortId(peer.nodeId)}`,
)
const res = await this.sendMessage(peer, payload, this.networkId)
return [peer, res]
}),
)
let accepted = 0
for (const peer of peers) {
const res = await this.sendMessage(peer, payload, this.networkId)
if (res.length > 0) {
try {
const decoded = PortalWireMessageType.deserialize(res)
if (decoded.selector === MessageCodes.ACCEPT) {
const msg = decoded.value as AcceptMessage
if (msg.contentKeys.get(0) === true) {
accepted++
const id = new DataView(msg.connectionId.buffer).getUint16(0, false)
await this.handleNewRequest({
networkId: this.networkId,
contentKeys: [contentKey],
peerId: peer.nodeId,
connectionId: id,
requestCode: RequestCode.OFFER_WRITE,
contents: [encodeWithVariantPrefix([content])],
})
for (const offer of offered) {
if (offer.status === 'fulfilled') {
const [peer, res] = offer.value as [ENR, Uint8Array]
if (res.length > 0) {
try {
const decoded = PortalWireMessageType.deserialize(res)
if (decoded.selector === MessageCodes.ACCEPT) {
const msg = decoded.value as AcceptMessage
if (msg.contentKeys.get(0) === true) {
this.logger.extend(`gossipContent`)(
`${toHexString(contentKey)} accepted by ${shortId(peer.nodeId)}`,
)
accepted++
this.logger.extend(`gossipContent`)(`accepted: ${accepted}`)
const id = new DataView(msg.connectionId.buffer).getUint16(0, false)
void this.handleNewRequest({
networkId: this.networkId,
contentKeys: [contentKey],
peerId: peer.nodeId,
connectionId: id,
requestCode: RequestCode.OFFER_WRITE,
contents: [encodeWithVariantPrefix([content])],
})
}
}
} catch {
/** Noop */
}
} catch {
/** Noop */
}
}
}
this.logger.extend(`gossipContent`)(`total: accepted: ${accepted}`)
return accepted
}

Expand Down
2 changes: 1 addition & 1 deletion packages/portalnetwork/src/networks/state/state.ts
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@ export class StateNetwork extends BaseNetwork {
super({ client, db, radius, maxStorage, networkId: NetworkId.StateNetwork })
this.networkId = NetworkId.StateNetwork
this.logger = debug(this.enr.nodeId.slice(0, 5)).extend('Portal').extend('StateNetwork')
this.stateDB = new StateDB(client.db.db)
this.stateDB = new StateDB(this.db.db)
this.routingTable.setLogger(this.logger)
}

Expand Down
7 changes: 2 additions & 5 deletions packages/portalnetwork/src/networks/state/statedb.ts
Original file line number Diff line number Diff line change
Expand Up @@ -33,14 +33,11 @@ export class StateDB {
* @returns true if content is stored successfully
*/
async storeContent(contentKey: Uint8Array, content: Uint8Array) {
const log = this.logger.extend('storeContent')
log(`called with contentKey: ${toHexString(contentKey)} and content: [${content.length} bytes]`)
const dbKey = getDatabaseKey(contentKey)
const dbContent = getDatabaseContent(keyType(contentKey), content)
await this.db.put(dbKey, dbContent)
this.logger(
`storeContent (${content.length}) bytes: \ncontentKey: ${toHexString(
contentKey,
)}\ndbKey: 0x${dbKey}\ndbSize: ${(await this.keys()).length}`,
)
return true
}

Expand Down
14 changes: 12 additions & 2 deletions packages/portalnetwork/src/networks/state/util.ts
Original file line number Diff line number Diff line change
Expand Up @@ -9,9 +9,11 @@ import {
AccountTrieNodeKey,
AccountTrieNodeRetrieval,
ContractCodeKey,
ContractCodeOffer,
ContractRetrieval,
StateNetworkContentType,
StorageTrieNodeKey,
StorageTrieNodeOffer,
StorageTrieNodeRetrieval,
} from './types.js'

Expand Down Expand Up @@ -201,10 +203,18 @@ export function getDatabaseContent(type: StateNetworkContentType, content: Uint8
dbContent = AccountTrieNodeRetrieval.deserialize(content).node
break
case StateNetworkContentType.ContractTrieNode:
dbContent = StorageTrieNodeRetrieval.deserialize(content).node
try {
dbContent = StorageTrieNodeRetrieval.deserialize(content).node
} catch {
dbContent = StorageTrieNodeOffer.deserialize(content).storageProof.slice(-1)[0]
}
break
case StateNetworkContentType.ContractByteCode:
dbContent = ContractRetrieval.deserialize(content).code
try {
dbContent = ContractCodeOffer.deserialize(content).code
} catch {
dbContent = ContractRetrieval.deserialize(content).code
}
break
}
return bytesToUnprefixedHex(dbContent)
Expand Down
10 changes: 5 additions & 5 deletions packages/portalnetwork/test/integration/state.spec.ts
Original file line number Diff line number Diff line change
Expand Up @@ -91,10 +91,10 @@ describe('AccountTrieNode Gossip / Request', async () => {
await new Promise((r) => setTimeout(r, 200))
const storedInNode1: Set<string> = new Set()
const storedInNode2: Set<string> = new Set()
for await (const key of node1.db.db.keys()) {
for await (const key of network1.db.db.keys()) {
storedInNode1.add(key)
}
for await (const key of node2.db.db.keys()) {
for await (const key of network2.db.db.keys()) {
storedInNode2.add(key)
}

Expand All @@ -113,7 +113,7 @@ describe('AccountTrieNode Gossip / Request', async () => {
expect(requested?.value).instanceOf(Uint8Array)
assert.deepEqual(requested!.value, expected, 'retrieved value is correct')
})
for await (const key of node2.db.db.keys()) {
for await (const key of network2.db.db.keys()) {
storedInNode2.add(key)
}
it('should store some nodes in node2', async () => {
Expand Down Expand Up @@ -192,9 +192,9 @@ describe('getAccount via network', async () => {
expect(result.gossipCount).toEqual(3)
})
const storedInNodes = await Promise.all(
clients.map(async (client) => {
networks.map(async (network) => {
const stored: Set<string> = new Set()
for await (const key of client.db.db.keys()) {
for await (const key of network.db.db.keys()) {
stored.add(key)
}
return stored
Expand Down

0 comments on commit 7ba54fc

Please sign in to comment.