Skip to content

Commit

Permalink
fix bestblock definition
Browse files Browse the repository at this point in the history
  • Loading branch information
shunjizhan committed Dec 12, 2023
1 parent ca59c76 commit 9414255
Show file tree
Hide file tree
Showing 6 changed files with 77 additions and 70 deletions.
4 changes: 2 additions & 2 deletions examples/docker-compose-bodhi-stack.yml
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ services:
POSTGRES_PASSWORD: postgres

subquery-node:
image: acala/evm-subql:2.7.13
image: acala/evm-subql:2.7.16
container_name: subquery-node
ports:
- 3000:3000
Expand Down Expand Up @@ -69,7 +69,7 @@ services:
- --indexer=http://subquery-node:3000

eth-rpc-adapter-server:
image: acala/eth-rpc-adapter:2.7.13
image: acala/eth-rpc-adapter:2.7.16
container_name: eth-rpc-adapter-server
restart: always
depends_on:
Expand Down
1 change: 1 addition & 0 deletions package.json
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
"start:chain": "docker compose up --build -- chain-ready",
"start:eth-rpc-adapter": "docker compose up --build -- eth-rpc-adapter-ready",
"start:eth-rpc-adapter-subql": "docker compose up --build -- eth-rpc-adapter-with-subql-ready",
"feed-tx": "yarn e2e:feed-tx && yarn e2e:feed-tx-2",
"e2e:feed-tx": "yarn workspace evm-waffle-example-dex run test",
"e2e:feed-tx-2": "yarn workspace evm-waffle-example-e2e run test",
"e2e:eth-providers": "yarn start:chain; yarn e2e:feed-tx; yarn start:eth-rpc-adapter; yarn workspace @acala-network/eth-providers run test:e2e",
Expand Down
1 change: 0 additions & 1 deletion packages/eth-providers/src/base-provider-dd.ts
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,6 @@ const TRACE_METHODS = [
'_getMaxTargetBlock',
'_getSubqlMissedLogs',
'getLogs',
'_waitForCache',
'getIndexerMetadata',
'healthCheck',
'addEventListener',
Expand Down
86 changes: 38 additions & 48 deletions packages/eth-providers/src/base-provider.ts
Original file line number Diff line number Diff line change
Expand Up @@ -88,7 +88,6 @@ import {
runWithRetries,
runWithTiming,
sendTx,
sleep,
sortObjByKey,
subqlReceiptAdapter,
throwNotImplemented,
Expand Down Expand Up @@ -341,14 +340,17 @@ export abstract class BaseProvider extends AbstractProvider {
}

get bestBlockHash() {
return firstValueFrom(this.best$).then(({ hash }) => hash);
return this.blockCache.lastCachedBlock.hash;
}

get bestBlockNumber() {
return firstValueFrom(this.best$).then(({ number }) => number);
return this.blockCache.lastCachedBlock.number;
}

get finalizedBlockHash() {
return firstValueFrom(this.finalized$).then(({ hash }) => hash);
}

get finalizedBlockNumber() {
return firstValueFrom(this.finalized$).then(({ number }) => number);
}
Expand All @@ -362,15 +364,21 @@ export abstract class BaseProvider extends AbstractProvider {
this.finalizedHead$ = this.api.rx.rpc.chain.subscribeFinalizedHeads();

const headSub = this.head$.subscribe(header => {
this.best$.next({ hash: header.hash.toHex(), number: header.number.toNumber() });
this.best$.next({
hash: header.hash.toHex(),
number: header.number.toNumber(),
});
});

const finalizedSub = this.finalizedHead$.subscribe(header => {
this.finalizedBlockHashes.add(header.hash.toHex());
this.finalized$.next({ hash: header.hash.toHex(), number: header.number.toNumber() });
this.finalized$.next({
hash: header.hash.toHex(),
number: header.number.toNumber(),
});
});

await firstValueFrom(this.head$);
const firstBlock = await firstValueFrom(this.head$);
await firstValueFrom(this.finalizedHead$);

const safeHead$ = this.safeMode
Expand All @@ -393,6 +401,11 @@ export abstract class BaseProvider extends AbstractProvider {
this.#finalizedHeadTasks.set(header.hash.toHex(), task);
});

this.blockCache.setlastCachedBlock({
hash: firstBlock.hash.toHex(),
number: firstBlock.number.toNumber(),
});

return () => {
headSub.unsubscribe();
finalizedSub.unsubscribe();
Expand All @@ -410,7 +423,7 @@ export abstract class BaseProvider extends AbstractProvider {
const receipts = await getAllReceiptsAtBlock(this.api, blockHash);
// update block cache
this.blockCache.addReceipts(blockHash, receipts);
this.blockCache.setlastCachedHeight(blockNumber);
this.blockCache.setlastCachedBlock({ hash: blockHash, number: blockNumber });

// eth_subscribe
await this._notifySubscribers(header, receipts);
Expand Down Expand Up @@ -558,11 +571,9 @@ export abstract class BaseProvider extends AbstractProvider {
isReady = async (): Promise<void> => {
try {
await this.api.isReadyOrError;
await this.getNetwork();

if (!this.#subscription) {
this.#subscription = this.startSubscriptions();
}
this.#subscription ??= this.startSubscriptions();

// wait for subscription to happen
await this.#subscription;
} catch (e) {
Expand Down Expand Up @@ -593,27 +604,27 @@ export abstract class BaseProvider extends AbstractProvider {
};

getNetwork = async (): Promise<Network> => {
if (!this.network) {
this.network = {
name: this.api.runtimeVersion.specName.toString(),
chainId: await this.chainId(),
};
}
await this.isReady();

this.network ??= {
name: this.api.runtimeVersion.specName.toString(),
chainId: await this.chainId(),
};

return this.network;
};

netVersion = async (): Promise<string> => {
return this.api.consts.evmAccounts.chainId.toString();
};
netVersion = async (): Promise<string> =>
this.api.consts.evmAccounts.chainId.toString();

chainId = async (): Promise<number> => {
return this.api.consts.evmAccounts.chainId.toNumber();
};
chainId = async (): Promise<number> =>
this.api.consts.evmAccounts.chainId.toNumber();

getBlockNumber = async (): Promise<number> => {
return this.safeMode ? this.finalizedBlockNumber : this.bestBlockNumber;
};
getBlockNumber = async (): Promise<number> => (
this.safeMode
? this.finalizedBlockNumber
: this.bestBlockNumber
);

getBlockData = async (_blockTag: BlockTag | Promise<BlockTag>, full?: boolean): Promise<BlockData> => {
const blockTag = await this._ensureSafeModeBlockTagFinalization(_blockTag);
Expand Down Expand Up @@ -1820,16 +1831,14 @@ export abstract class BaseProvider extends AbstractProvider {

_getSubqlMissedLogs = async (toBlock: number, filter: SanitizedLogFilter): Promise<Log[]> => {
const targetBlock = await this._getMaxTargetBlock(toBlock);
const lastProcessedHeight = await this._checkSubqlHeight(); // all missed logs should be in cache
const lastProcessedHeight = await this._checkSubqlHeight();
const missedBlockCount = targetBlock - lastProcessedHeight;
if (missedBlockCount <= 0) return [];

const firstMissedBlock = lastProcessedHeight + 1;
const missedBlocks = Array.from({ length: missedBlockCount }, (_, i) => firstMissedBlock + i);
const missedBlockHashes = await Promise.all(missedBlocks.map(this._getBlockHash.bind(this)));

await this._waitForCache(targetBlock);

// no need to filter by blocknumber anymore, since these logs are from missedBlocks directly
return missedBlockHashes
.map(this.blockCache.getLogsAtBlock.bind(this))
Expand All @@ -1855,25 +1864,6 @@ export abstract class BaseProvider extends AbstractProvider {
.map(log => this.formatter.filterLog(log));
};

_waitForCache = async (_targetBlock: number) => {
const CACHE_MAX_WAIT_BLOCKS = 2;

const targetBlock = await this._getMaxTargetBlock(_targetBlock);
let lastCachedHeight = await this.subql.getLastProcessedHeight();
if (targetBlock - lastCachedHeight > CACHE_MAX_WAIT_BLOCKS){
return logger.throwError(
'blockCache is not synced to target block, please wait for it to catch up',
Logger.errors.SERVER_ERROR,
{ targetBlock, lastCachedHeight }
);
}

while (lastCachedHeight < targetBlock) {
await sleep(1000);
lastCachedHeight = this.blockCache.lastCachedHeight;
}
};

getIndexerMetadata = async (): Promise<_Metadata | undefined> => {
return this.subql?.getIndexerMetadata();
};
Expand Down
11 changes: 7 additions & 4 deletions packages/eth-providers/src/utils/BlockCache.ts
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,10 @@ import { FullReceipt } from './receiptHelper';

export type TxHashToReceipt = Record<string, FullReceipt>;
export type BlockHashToReceipts = Record<string, FullReceipt[]>;
export type Block = {
number: number;
hash: string;
}

export interface CacheInspect {
maxCachedBlocks: number;
Expand All @@ -17,18 +21,17 @@ export class BlockCache {
txHashToReceipt: TxHashToReceipt;
cachedBlockHashes: string[];
maxCachedBlocks: number;
lastCachedHeight: number;
lastCachedBlock: Block;

constructor(maxCachedBlocks = 200) {
this.txHashToReceipt = {};
this.blockHashToReceipts = {};
this.cachedBlockHashes = [];
this.maxCachedBlocks = maxCachedBlocks;
this.lastCachedHeight = -1;
this.lastCachedBlock = null;
}

setlastCachedHeight = (blockNumber: number) =>
(this.lastCachedHeight = blockNumber);
setlastCachedBlock = (block: Block) => (this.lastCachedBlock = block);

// automatically preserve a sliding window of ${maxCachedBlocks} blocks
addReceipts = (blockHash: string, receipts: FullReceipt[]): void => {
Expand Down
44 changes: 29 additions & 15 deletions packages/eth-rpc-adapter/src/__tests__/e2e/endpoint.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -107,17 +107,31 @@ describe('endpoint', () => {
const DETERMINISTIC_SETUP_TOTAL_TXS = 12;
const DETERMINISTIC_SETUP_TOTAL_LOGS = 13;
let tries = 0;
let [allTxReceipts, allLogs] = await Promise.all([subql.getAllTxReceipts(), subql.getAllLogs()]);
let [allTxReceipts, allLogs] = await Promise.all([
subql.getAllTxReceipts(),
subql.getAllLogs(),
]);
while (
(allTxReceipts.length < DETERMINISTIC_SETUP_TOTAL_TXS || allLogs.length < DETERMINISTIC_SETUP_TOTAL_LOGS) &&
tries++ < 10
tries++ < 5 &&
(
allTxReceipts.length < DETERMINISTIC_SETUP_TOTAL_TXS ||
allLogs.length < DETERMINISTIC_SETUP_TOTAL_LOGS
)
) {
console.log(`let's give subql a little bit more time to index, retrying #${tries} in 5s ...`);
await sleep(10000);
[allTxReceipts, allLogs] = await Promise.all([subql.getAllTxReceipts(), subql.getAllLogs()]);
console.log(allTxReceipts.length, allLogs.length);
console.log(`let's give subql a little bit more time to index, retrying #${tries} in 3s ...`);
await sleep(3000);

[allTxReceipts, allLogs] = await Promise.all([
subql.getAllTxReceipts(),
subql.getAllLogs(),
]);
}

if (allTxReceipts.length < DETERMINISTIC_SETUP_TOTAL_TXS || allLogs.length < DETERMINISTIC_SETUP_TOTAL_LOGS) {
if (
allTxReceipts.length < DETERMINISTIC_SETUP_TOTAL_TXS ||
allLogs.length < DETERMINISTIC_SETUP_TOTAL_LOGS
) {
throw new Error(`
test env setup failed!
expected ${DETERMINISTIC_SETUP_TOTAL_TXS} Txs in subql but got ${allTxReceipts.length}
Expand Down Expand Up @@ -222,7 +236,7 @@ describe('endpoint', () => {
describe('eth_getLogs', () => {
const ALL_BLOCK_RANGE_FILTER = { fromBlock: 'earliest' };

describe('when no filter', () => {
describe.concurrent('when no filter', () => {
it('returns all logs from latest block', async () => {
const res = (await eth_getLogs([{}])).data.result;
expect(res.length).to.equal(2);
Expand All @@ -231,7 +245,7 @@ describe('endpoint', () => {
});
});

describe('filter by address', () => {
describe.concurrent('filter by address', () => {
it('returns correct logs', async () => {
/* ---------- single address ---------- */
for (const log of allLogs) {
Expand All @@ -251,7 +265,7 @@ describe('endpoint', () => {
});
});

describe('filter by block number', () => {
describe.concurrent('filter by block number', () => {
it('returns correct logs', async () => {
const BIG_NUMBER = 88888888;
const BIG_NUMBER_HEX = '0x54C5638';
Expand Down Expand Up @@ -299,7 +313,7 @@ describe('endpoint', () => {
});
});

describe('filter by block tag', () => {
describe.concurrent('filter by block tag', () => {
it('returns correct logs for valid tag', async () => {
let res: Awaited<ReturnType<typeof eth_getLogs>>;
let expectedLogs: LogHexified[];
Expand Down Expand Up @@ -350,7 +364,7 @@ describe('endpoint', () => {
});
});

describe('filter by topics', () => {
describe.concurrent('filter by topics', () => {
it('returns correct logs', async () => {
let res: Awaited<ReturnType<typeof eth_getLogs>>;
let expectedLogs: LogHexified[];
Expand Down Expand Up @@ -396,7 +410,7 @@ describe('endpoint', () => {
});
});

describe('filter by blockhash', () => {
describe.concurrent('filter by blockhash', () => {
it('returns correct logs', async () => {
const allLogsFromSubql = await subql.getAllLogs().then(logs => logs.map(hexilifyLog));
for (const log of allLogsFromSubql) {
Expand All @@ -407,7 +421,7 @@ describe('endpoint', () => {
});
});

describe('filter by multiple params', () => {
describe.concurrent('filter by multiple params', () => {
it('returns correct logs', async () => {
let res: Awaited<ReturnType<typeof eth_getLogs>>;
let expectedLogs: LogHexified[];
Expand Down Expand Up @@ -474,7 +488,7 @@ describe('endpoint', () => {
expect(res2.data.result).to.deep.equal(res.data.result);
});

it('should throw correct error is subql is not synced', async () => {
it('should throw correct error if subql is not synced', async () => {
const curblockNum = await provider.getBlockNumber();
const pendings = [] as any[];
for (let i = 0; i < 5; i++) {
Expand Down

0 comments on commit 9414255

Please sign in to comment.