From 941425599bed58c6e7e17789072ccae5e8f4597a Mon Sep 17 00:00:00 2001 From: Shunji Zhan Date: Tue, 12 Dec 2023 15:57:49 +0800 Subject: [PATCH] fix bestblock definition --- examples/docker-compose-bodhi-stack.yml | 4 +- package.json | 1 + .../eth-providers/src/base-provider-dd.ts | 1 - packages/eth-providers/src/base-provider.ts | 86 ++++++++----------- .../eth-providers/src/utils/BlockCache.ts | 11 ++- .../src/__tests__/e2e/endpoint.test.ts | 44 ++++++---- 6 files changed, 77 insertions(+), 70 deletions(-) diff --git a/examples/docker-compose-bodhi-stack.yml b/examples/docker-compose-bodhi-stack.yml index 40121b80a..e23166dd9 100644 --- a/examples/docker-compose-bodhi-stack.yml +++ b/examples/docker-compose-bodhi-stack.yml @@ -22,7 +22,7 @@ services: POSTGRES_PASSWORD: postgres subquery-node: - image: acala/evm-subql:2.7.13 + image: acala/evm-subql:2.7.16 container_name: subquery-node ports: - 3000:3000 @@ -69,7 +69,7 @@ services: - --indexer=http://subquery-node:3000 eth-rpc-adapter-server: - image: acala/eth-rpc-adapter:2.7.13 + image: acala/eth-rpc-adapter:2.7.16 container_name: eth-rpc-adapter-server restart: always depends_on: diff --git a/package.json b/package.json index 0e035bfe8..1089cca6f 100644 --- a/package.json +++ b/package.json @@ -25,6 +25,7 @@ "start:chain": "docker compose up --build -- chain-ready", "start:eth-rpc-adapter": "docker compose up --build -- eth-rpc-adapter-ready", "start:eth-rpc-adapter-subql": "docker compose up --build -- eth-rpc-adapter-with-subql-ready", + "feed-tx": "yarn e2e:feed-tx && yarn e2e:feed-tx-2", "e2e:feed-tx": "yarn workspace evm-waffle-example-dex run test", "e2e:feed-tx-2": "yarn workspace evm-waffle-example-e2e run test", "e2e:eth-providers": "yarn start:chain; yarn e2e:feed-tx; yarn start:eth-rpc-adapter; yarn workspace @acala-network/eth-providers run test:e2e", diff --git a/packages/eth-providers/src/base-provider-dd.ts b/packages/eth-providers/src/base-provider-dd.ts index 96e898d3e..4a9cce73d 100644 --- a/packages/eth-providers/src/base-provider-dd.ts +++ b/packages/eth-providers/src/base-provider-dd.ts @@ -53,7 +53,6 @@ const TRACE_METHODS = [ '_getMaxTargetBlock', '_getSubqlMissedLogs', 'getLogs', - '_waitForCache', 'getIndexerMetadata', 'healthCheck', 'addEventListener', diff --git a/packages/eth-providers/src/base-provider.ts b/packages/eth-providers/src/base-provider.ts index 495eb4cbe..0bae90c7a 100644 --- a/packages/eth-providers/src/base-provider.ts +++ b/packages/eth-providers/src/base-provider.ts @@ -88,7 +88,6 @@ import { runWithRetries, runWithTiming, sendTx, - sleep, sortObjByKey, subqlReceiptAdapter, throwNotImplemented, @@ -341,14 +340,17 @@ export abstract class BaseProvider extends AbstractProvider { } get bestBlockHash() { - return firstValueFrom(this.best$).then(({ hash }) => hash); + return this.blockCache.lastCachedBlock.hash; } + get bestBlockNumber() { - return firstValueFrom(this.best$).then(({ number }) => number); + return this.blockCache.lastCachedBlock.number; } + get finalizedBlockHash() { return firstValueFrom(this.finalized$).then(({ hash }) => hash); } + get finalizedBlockNumber() { return firstValueFrom(this.finalized$).then(({ number }) => number); } @@ -362,15 +364,21 @@ export abstract class BaseProvider extends AbstractProvider { this.finalizedHead$ = this.api.rx.rpc.chain.subscribeFinalizedHeads(); const headSub = this.head$.subscribe(header => { - this.best$.next({ hash: header.hash.toHex(), number: header.number.toNumber() }); + this.best$.next({ + hash: header.hash.toHex(), + number: header.number.toNumber(), + }); }); const finalizedSub = this.finalizedHead$.subscribe(header => { this.finalizedBlockHashes.add(header.hash.toHex()); - this.finalized$.next({ hash: header.hash.toHex(), number: header.number.toNumber() }); + this.finalized$.next({ + hash: header.hash.toHex(), + number: header.number.toNumber(), + }); }); - await firstValueFrom(this.head$); + const firstBlock = await firstValueFrom(this.head$); await firstValueFrom(this.finalizedHead$); const safeHead$ = this.safeMode @@ -393,6 +401,11 @@ export abstract class BaseProvider extends AbstractProvider { this.#finalizedHeadTasks.set(header.hash.toHex(), task); }); + this.blockCache.setlastCachedBlock({ + hash: firstBlock.hash.toHex(), + number: firstBlock.number.toNumber(), + }); + return () => { headSub.unsubscribe(); finalizedSub.unsubscribe(); @@ -410,7 +423,7 @@ export abstract class BaseProvider extends AbstractProvider { const receipts = await getAllReceiptsAtBlock(this.api, blockHash); // update block cache this.blockCache.addReceipts(blockHash, receipts); - this.blockCache.setlastCachedHeight(blockNumber); + this.blockCache.setlastCachedBlock({ hash: blockHash, number: blockNumber }); // eth_subscribe await this._notifySubscribers(header, receipts); @@ -558,11 +571,9 @@ export abstract class BaseProvider extends AbstractProvider { isReady = async (): Promise => { try { await this.api.isReadyOrError; - await this.getNetwork(); - if (!this.#subscription) { - this.#subscription = this.startSubscriptions(); - } + this.#subscription ??= this.startSubscriptions(); + // wait for subscription to happen await this.#subscription; } catch (e) { @@ -593,27 +604,27 @@ export abstract class BaseProvider extends AbstractProvider { }; getNetwork = async (): Promise => { - if (!this.network) { - this.network = { - name: this.api.runtimeVersion.specName.toString(), - chainId: await this.chainId(), - }; - } + await this.isReady(); + + this.network ??= { + name: this.api.runtimeVersion.specName.toString(), + chainId: await this.chainId(), + }; return this.network; }; - netVersion = async (): Promise => { - return this.api.consts.evmAccounts.chainId.toString(); - }; + netVersion = async (): Promise => + this.api.consts.evmAccounts.chainId.toString(); - chainId = async (): Promise => { - return this.api.consts.evmAccounts.chainId.toNumber(); - }; + chainId = async (): Promise => + this.api.consts.evmAccounts.chainId.toNumber(); - getBlockNumber = async (): Promise => { - return this.safeMode ? this.finalizedBlockNumber : this.bestBlockNumber; - }; + getBlockNumber = async (): Promise => ( + this.safeMode + ? this.finalizedBlockNumber + : this.bestBlockNumber + ); getBlockData = async (_blockTag: BlockTag | Promise, full?: boolean): Promise => { const blockTag = await this._ensureSafeModeBlockTagFinalization(_blockTag); @@ -1820,7 +1831,7 @@ export abstract class BaseProvider extends AbstractProvider { _getSubqlMissedLogs = async (toBlock: number, filter: SanitizedLogFilter): Promise => { const targetBlock = await this._getMaxTargetBlock(toBlock); - const lastProcessedHeight = await this._checkSubqlHeight(); // all missed logs should be in cache + const lastProcessedHeight = await this._checkSubqlHeight(); const missedBlockCount = targetBlock - lastProcessedHeight; if (missedBlockCount <= 0) return []; @@ -1828,8 +1839,6 @@ export abstract class BaseProvider extends AbstractProvider { const missedBlocks = Array.from({ length: missedBlockCount }, (_, i) => firstMissedBlock + i); const missedBlockHashes = await Promise.all(missedBlocks.map(this._getBlockHash.bind(this))); - await this._waitForCache(targetBlock); - // no need to filter by blocknumber anymore, since these logs are from missedBlocks directly return missedBlockHashes .map(this.blockCache.getLogsAtBlock.bind(this)) @@ -1855,25 +1864,6 @@ export abstract class BaseProvider extends AbstractProvider { .map(log => this.formatter.filterLog(log)); }; - _waitForCache = async (_targetBlock: number) => { - const CACHE_MAX_WAIT_BLOCKS = 2; - - const targetBlock = await this._getMaxTargetBlock(_targetBlock); - let lastCachedHeight = await this.subql.getLastProcessedHeight(); - if (targetBlock - lastCachedHeight > CACHE_MAX_WAIT_BLOCKS){ - return logger.throwError( - 'blockCache is not synced to target block, please wait for it to catch up', - Logger.errors.SERVER_ERROR, - { targetBlock, lastCachedHeight } - ); - } - - while (lastCachedHeight < targetBlock) { - await sleep(1000); - lastCachedHeight = this.blockCache.lastCachedHeight; - } - }; - getIndexerMetadata = async (): Promise<_Metadata | undefined> => { return this.subql?.getIndexerMetadata(); }; diff --git a/packages/eth-providers/src/utils/BlockCache.ts b/packages/eth-providers/src/utils/BlockCache.ts index ccffa0ef3..a9b342f4e 100644 --- a/packages/eth-providers/src/utils/BlockCache.ts +++ b/packages/eth-providers/src/utils/BlockCache.ts @@ -4,6 +4,10 @@ import { FullReceipt } from './receiptHelper'; export type TxHashToReceipt = Record; export type BlockHashToReceipts = Record; +export type Block = { + number: number; + hash: string; +} export interface CacheInspect { maxCachedBlocks: number; @@ -17,18 +21,17 @@ export class BlockCache { txHashToReceipt: TxHashToReceipt; cachedBlockHashes: string[]; maxCachedBlocks: number; - lastCachedHeight: number; + lastCachedBlock: Block; constructor(maxCachedBlocks = 200) { this.txHashToReceipt = {}; this.blockHashToReceipts = {}; this.cachedBlockHashes = []; this.maxCachedBlocks = maxCachedBlocks; - this.lastCachedHeight = -1; + this.lastCachedBlock = null; } - setlastCachedHeight = (blockNumber: number) => - (this.lastCachedHeight = blockNumber); + setlastCachedBlock = (block: Block) => (this.lastCachedBlock = block); // automatically preserve a sliding window of ${maxCachedBlocks} blocks addReceipts = (blockHash: string, receipts: FullReceipt[]): void => { diff --git a/packages/eth-rpc-adapter/src/__tests__/e2e/endpoint.test.ts b/packages/eth-rpc-adapter/src/__tests__/e2e/endpoint.test.ts index 5d0f74cea..6af522df3 100644 --- a/packages/eth-rpc-adapter/src/__tests__/e2e/endpoint.test.ts +++ b/packages/eth-rpc-adapter/src/__tests__/e2e/endpoint.test.ts @@ -107,17 +107,31 @@ describe('endpoint', () => { const DETERMINISTIC_SETUP_TOTAL_TXS = 12; const DETERMINISTIC_SETUP_TOTAL_LOGS = 13; let tries = 0; - let [allTxReceipts, allLogs] = await Promise.all([subql.getAllTxReceipts(), subql.getAllLogs()]); + let [allTxReceipts, allLogs] = await Promise.all([ + subql.getAllTxReceipts(), + subql.getAllLogs(), + ]); while ( - (allTxReceipts.length < DETERMINISTIC_SETUP_TOTAL_TXS || allLogs.length < DETERMINISTIC_SETUP_TOTAL_LOGS) && - tries++ < 10 + tries++ < 5 && + ( + allTxReceipts.length < DETERMINISTIC_SETUP_TOTAL_TXS || + allLogs.length < DETERMINISTIC_SETUP_TOTAL_LOGS + ) ) { - console.log(`let's give subql a little bit more time to index, retrying #${tries} in 5s ...`); - await sleep(10000); - [allTxReceipts, allLogs] = await Promise.all([subql.getAllTxReceipts(), subql.getAllLogs()]); + console.log(allTxReceipts.length, allLogs.length); + console.log(`let's give subql a little bit more time to index, retrying #${tries} in 3s ...`); + await sleep(3000); + + [allTxReceipts, allLogs] = await Promise.all([ + subql.getAllTxReceipts(), + subql.getAllLogs(), + ]); } - if (allTxReceipts.length < DETERMINISTIC_SETUP_TOTAL_TXS || allLogs.length < DETERMINISTIC_SETUP_TOTAL_LOGS) { + if ( + allTxReceipts.length < DETERMINISTIC_SETUP_TOTAL_TXS || + allLogs.length < DETERMINISTIC_SETUP_TOTAL_LOGS + ) { throw new Error(` test env setup failed! expected ${DETERMINISTIC_SETUP_TOTAL_TXS} Txs in subql but got ${allTxReceipts.length} @@ -222,7 +236,7 @@ describe('endpoint', () => { describe('eth_getLogs', () => { const ALL_BLOCK_RANGE_FILTER = { fromBlock: 'earliest' }; - describe('when no filter', () => { + describe.concurrent('when no filter', () => { it('returns all logs from latest block', async () => { const res = (await eth_getLogs([{}])).data.result; expect(res.length).to.equal(2); @@ -231,7 +245,7 @@ describe('endpoint', () => { }); }); - describe('filter by address', () => { + describe.concurrent('filter by address', () => { it('returns correct logs', async () => { /* ---------- single address ---------- */ for (const log of allLogs) { @@ -251,7 +265,7 @@ describe('endpoint', () => { }); }); - describe('filter by block number', () => { + describe.concurrent('filter by block number', () => { it('returns correct logs', async () => { const BIG_NUMBER = 88888888; const BIG_NUMBER_HEX = '0x54C5638'; @@ -299,7 +313,7 @@ describe('endpoint', () => { }); }); - describe('filter by block tag', () => { + describe.concurrent('filter by block tag', () => { it('returns correct logs for valid tag', async () => { let res: Awaited>; let expectedLogs: LogHexified[]; @@ -350,7 +364,7 @@ describe('endpoint', () => { }); }); - describe('filter by topics', () => { + describe.concurrent('filter by topics', () => { it('returns correct logs', async () => { let res: Awaited>; let expectedLogs: LogHexified[]; @@ -396,7 +410,7 @@ describe('endpoint', () => { }); }); - describe('filter by blockhash', () => { + describe.concurrent('filter by blockhash', () => { it('returns correct logs', async () => { const allLogsFromSubql = await subql.getAllLogs().then(logs => logs.map(hexilifyLog)); for (const log of allLogsFromSubql) { @@ -407,7 +421,7 @@ describe('endpoint', () => { }); }); - describe('filter by multiple params', () => { + describe.concurrent('filter by multiple params', () => { it('returns correct logs', async () => { let res: Awaited>; let expectedLogs: LogHexified[]; @@ -474,7 +488,7 @@ describe('endpoint', () => { expect(res2.data.result).to.deep.equal(res.data.result); }); - it('should throw correct error is subql is not synced', async () => { + it('should throw correct error if subql is not synced', async () => { const curblockNum = await provider.getBlockNumber(); const pendings = [] as any[]; for (let i = 0; i < 5; i++) {