Skip to content

Commit 4f6cf12

Browse files
committed
polished getlogs for subql missed blocks
1 parent 8aae21f commit 4f6cf12

File tree

3 files changed

+88
-26
lines changed

3 files changed

+88
-26
lines changed

packages/eth-providers/src/base-provider-dd.ts

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -48,9 +48,12 @@ const TRACE_METHODS = [
4848
'getTransactionByHash',
4949
'getReceipt',
5050
'_getReceipt',
51+
'_checkSubqlHeight',
5152
'_sanitizeRawFilter',
53+
'_getMaxTargetBlock',
54+
'_getSubqlMissedLogs',
5255
'getLogs',
53-
'_waitForSubql',
56+
'_waitForCache',
5457
'getIndexerMetadata',
5558
'healthCheck',
5659
'addEventListener',

packages/eth-providers/src/base-provider.ts

Lines changed: 74 additions & 25 deletions
Original file line numberDiff line numberDiff line change
@@ -73,6 +73,7 @@ import {
7373
decodeEthGas,
7474
encodeGasLimit,
7575
filterLog,
76+
filterLogByAddress,
7677
filterLogByTopics,
7778
getAllReceiptsAtBlock,
7879
getHealthResult,
@@ -403,10 +404,13 @@ export abstract class BaseProvider extends AbstractProvider {
403404
_onNewHead = async ([header, attempts]: [Header, number]) => {
404405
attempts--;
405406
const blockHash = header.hash.toHex();
407+
const blockNumber = header.number.toNumber();
408+
406409
try {
407410
const receipts = await getAllReceiptsAtBlock(this.api, blockHash);
408411
// update block cache
409412
this.blockCache.addReceipts(blockHash, receipts);
413+
this.blockCache.setlastCachedHeight(blockNumber);
410414

411415
// eth_subscribe
412416
await this._notifySubscribers(header, receipts);
@@ -1742,16 +1746,44 @@ export abstract class BaseProvider extends AbstractProvider {
17421746
await this._isBlockCanonical(txFromCache.blockHash, txFromCache.blockNumber)
17431747
) return txFromCache;
17441748

1745-
// smallest block number to make sure there is no gap between subql and block cache
1746-
const subqlTargetBlock = await this.bestBlockNumber - this.blockCache.cachedBlockHashes.length + 1;
1747-
await this._waitForSubql(subqlTargetBlock);
1749+
// make sure there is no gap between subql and cache
1750+
await this._checkSubqlHeight(this.blockCache.cachedBlockHashes.length);
17481751

17491752
const txFromSubql = await this.subql?.getTxReceiptByHash(txHash);
17501753
return txFromSubql
17511754
? subqlReceiptAdapter(txFromSubql)
17521755
: null;
17531756
};
17541757

1758+
_checkSubqlHeight = async (_maxMissedBlockCount: number): Promise<number> => {
1759+
if (!this.subql) return;
1760+
1761+
/* ---------------
1762+
usually subql is delayed for a couple blocks
1763+
so it doesn't make sense to check too small range (less than 5 block)
1764+
this can also prevent throwing error when provider just started
1765+
--------------- */
1766+
const SUBQL_DELAYED_OK_RANGE = 5;
1767+
const maxMissedBlockCount = Math.max(SUBQL_DELAYED_OK_RANGE, _maxMissedBlockCount);
1768+
1769+
const lastProcessedHeight = await this.subql.getLastProcessedHeight();
1770+
const minSubqlHeight = await this.bestBlockNumber - maxMissedBlockCount;
1771+
if (lastProcessedHeight < minSubqlHeight) {
1772+
return logger.throwError(
1773+
'subql indexer height is less than the minimum height required',
1774+
Logger.errors.SERVER_ERROR,
1775+
{
1776+
lastProcessedHeight,
1777+
minSubqlHeight,
1778+
maxMissedBlockCount,
1779+
curHeight: await this.bestBlockNumber,
1780+
}
1781+
);
1782+
}
1783+
1784+
return lastProcessedHeight;
1785+
};
1786+
17551787
_sanitizeRawFilter = async (rawFilter: LogFilter): Promise<SanitizedLogFilter> => {
17561788
const { fromBlock, toBlock, blockHash, address, topics } = rawFilter;
17571789
const filter: SanitizedLogFilter = {
@@ -1788,6 +1820,33 @@ export abstract class BaseProvider extends AbstractProvider {
17881820
return filter;
17891821
};
17901822

1823+
_getMaxTargetBlock = async (toBlock: number): Promise<number> => {
1824+
const upperBound = await this.finalizedBlockNumber;
1825+
return Math.min(toBlock, upperBound);
1826+
};
1827+
1828+
_getSubqlMissedLogs = async (toBlock: number, filter: SanitizedLogFilter): Promise<Log[]> => {
1829+
const SUBQL_MAX_MISSED_BLOCKS = 20;
1830+
1831+
const targetBlock = await this._getMaxTargetBlock(toBlock);
1832+
const lastProcessedHeight = await this._checkSubqlHeight(SUBQL_MAX_MISSED_BLOCKS);
1833+
const missedBlockCount = targetBlock - lastProcessedHeight;
1834+
if (missedBlockCount <= 0) return [];
1835+
1836+
const firstMissedBlock = lastProcessedHeight + 1;
1837+
const missedBlocks = Array.from({ length: missedBlockCount }, (_, i) => firstMissedBlock + i);
1838+
const missedBlockHashes = await Promise.all(missedBlocks.map(this._getBlockHash.bind(this)));
1839+
1840+
await this._waitForCache(targetBlock);
1841+
1842+
// all logs should be in cache since missedBlockCount <= 20
1843+
// no need to filter by blocknumber anymore, since these logs are from missedBlocks directly
1844+
return missedBlockHashes
1845+
.map(this.blockCache.getLogsAtBlock.bind(this))
1846+
.flat()
1847+
.filter(log => filterLogByAddress(log, filter.address));
1848+
};
1849+
17911850
// Bloom-filter Queries
17921851
getLogs = async (rawFilter: LogFilter): Promise<Log[]> => {
17931852
if (!this.subql) {
@@ -1798,40 +1857,30 @@ export abstract class BaseProvider extends AbstractProvider {
17981857

17991858
const filter = await this._sanitizeRawFilter(rawFilter);
18001859

1801-
await this._waitForSubql(filter.toBlock);
1802-
18031860
const subqlLogs = await this.subql.getFilteredLogs(filter); // only filtered by blockNumber and address
1804-
return subqlLogs
1861+
const extraLogs = await this._getSubqlMissedLogs(filter.toBlock, filter);
1862+
1863+
return subqlLogs.concat(extraLogs)
18051864
.filter(log => filterLogByTopics(log, filter.topics))
18061865
.map(log => this.formatter.filterLog(log));
18071866
};
18081867

1809-
/* ----------
1810-
make sure subql already indexed to target block number
1811-
currently unfinalized blocks indexing is NOT enabled
1812-
so upper bound would be finalized block number
1813-
---------- */
1814-
_waitForSubql = async (_targetBlock: number) => {
1815-
if (!this.subql) return;
1816-
1817-
const SUBQL_MAX_WAIT_BLOCKS = 3;
1818-
1819-
const upperBound = await this.finalizedBlockNumber;
1820-
const targetBlock = Math.min(_targetBlock, upperBound);
1868+
_waitForCache = async (_targetBlock: number) => {
1869+
const CACHE_MAX_WAIT_BLOCKS = 2;
18211870

1822-
let lastProcessedHeight = await this.subql.getLastProcessedHeight();
1823-
if (targetBlock - lastProcessedHeight > SUBQL_MAX_WAIT_BLOCKS) {
1871+
const targetBlock = await this._getMaxTargetBlock(_targetBlock);
1872+
let lastCachedHeight = await this.subql.getLastProcessedHeight();
1873+
if (targetBlock - lastCachedHeight > CACHE_MAX_WAIT_BLOCKS){
18241874
return logger.throwError(
1825-
`subql indexer is not synced to target block, please wait for it to catch up. Estimated ${(targetBlock - lastProcessedHeight) * 12}s remaining ...`,
1875+
'blockCache is not synced to target block, please wait for it to catch up',
18261876
Logger.errors.SERVER_ERROR,
1827-
{ targetBlock, lastProcessedHeight }
1877+
{ targetBlock, lastCachedHeight }
18281878
);
18291879
}
18301880

1831-
// wait at most 3 * 12 = 36s
1832-
while (lastProcessedHeight < targetBlock) {
1881+
while (lastCachedHeight < targetBlock) {
18331882
await sleep(1000);
1834-
lastProcessedHeight = await this.subql.getLastProcessedHeight();
1883+
lastCachedHeight = this.blockCache.lastCachedHeight;
18351884
}
18361885
};
18371886

packages/eth-providers/src/utils/BlockCache.ts

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,5 @@
1+
import { Log } from '@ethersproject/abstract-provider';
2+
13
import { FullReceipt } from './receiptHelper';
24

35
export type TxHashToReceipt = Record<string, FullReceipt>;
@@ -15,14 +17,19 @@ export class BlockCache {
1517
txHashToReceipt: TxHashToReceipt;
1618
cachedBlockHashes: string[];
1719
maxCachedBlocks: number;
20+
lastCachedHeight: number;
1821

1922
constructor(maxCachedBlocks = 200) {
2023
this.txHashToReceipt = {};
2124
this.blockHashToReceipts = {};
2225
this.cachedBlockHashes = [];
2326
this.maxCachedBlocks = maxCachedBlocks;
27+
this.lastCachedHeight = -1;
2428
}
2529

30+
setlastCachedHeight = (blockNumber: number) =>
31+
(this.lastCachedHeight = blockNumber);
32+
2633
// automatically preserve a sliding window of ${maxCachedBlocks} blocks
2734
addReceipts = (blockHash: string, receipts: FullReceipt[]): void => {
2835
this.blockHashToReceipts[blockHash] = receipts;
@@ -51,6 +58,9 @@ export class BlockCache {
5158
getReceiptAtBlock = (txHash: string, blockHash: string): FullReceipt | null =>
5259
this.getAllReceiptsAtBlock(blockHash).find(r => r.transactionHash === txHash) ?? null;
5360

61+
getLogsAtBlock = (blockHash: string): Log[] =>
62+
this.getAllReceiptsAtBlock(blockHash).map(r => r.logs).flat();
63+
5464
inspect = (): CacheInspect => ({
5565
maxCachedBlocks: this.maxCachedBlocks,
5666
cachedBlocksCount: Object.keys(this.blockHashToReceipts).length,

0 commit comments

Comments
 (0)