Skip to content

Commit be743a3

Browse files
committed
feat(reku): storable checkpoint
refactor: optimize autocc, simplify polling, getBlock, ignoreLogs
1 parent b73c7f6 commit be743a3

File tree

5 files changed

+92
-64
lines changed

5 files changed

+92
-64
lines changed

packages/reku/README.md

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -26,7 +26,7 @@ i.e. It starts with 'realtime' mode by default.
2626

2727
Options:
2828
- `store`?: the Store used to cache the <txhash, logindex> that already processed.
29-
- `storeKeyPrefix`?: set the prefix to all keys when set key-value to store (cache), e.g. key = prefix+'txHashList', prefix can be "project:app:" to form a "project:app:txHashList" redis key., defult: ''
29+
- `storeKeyPrefix`?: set the prefix to all keys when set key-value to store (cache), e.g. key = prefix+'txHashList', prefix can be "project:app:network:" to form a "project:app:network:txHashList" redis key., defult: ''
3030
- `storeTtl`?: the ttl for <txhash, logindex> record in store, defualt: no limit
3131
- `batchBlocksCount`?: how many blocks to get per `getLogs` check, in readtime mode it waits until the new block num >= `batchBlocksCount`.
3232
- `delayBlockFromLatest`?: mostly for realtime mode; each time cc wait until `latest height > toBlock + delayBlockFromLatest`, default: 1

packages/reku/event/crosschecker/autochecker.ts

Lines changed: 29 additions & 29 deletions
Original file line numberDiff line numberDiff line change
@@ -8,7 +8,7 @@ import { BaseCrossChecker } from './basechecker'
88

99
export class AutoCrossChecker extends BaseCrossChecker {
1010
cache: CrossCheckerCacheManager | undefined = undefined
11-
checkpointBlockNumber = 0
11+
checkpointBlockNumber: number | undefined
1212

1313
constructor(
1414
provider: Providers,
@@ -41,6 +41,11 @@ export class AutoCrossChecker extends BaseCrossChecker {
4141
}
4242
}
4343

44+
async setCheckpoint(cp: number) {
45+
this.checkpointBlockNumber = cp
46+
await this.cache!.setCheckpoint(cp)
47+
}
48+
4449
// TODO: keep type hint of inside AutoCrossCheckParam
4550
/**
4651
* real time auto crosscheck, start from the lastest block
@@ -52,17 +57,20 @@ export class AutoCrossChecker extends BaseCrossChecker {
5257
this.cache = new CrossCheckerCacheManager(options?.store, { keyPrefix: options?.storeKeyPrefix, logger: this.logger, ttl: options?.storeTtl })
5358

5459
const latestblocknum = await retryOnNull(async () => await this.provider.provider?.getBlockNumber())
60+
61+
// resume checkpoint priority: options.fromBlock > cache > latestblocknum + 1
62+
const defaultInitCheckpoint = await this.cache.getCheckpoint() ?? latestblocknum + 1
63+
5564
const {
56-
fromBlock = latestblocknum + 1,
65+
fromBlock = defaultInitCheckpoint,
5766
batchBlocksCount = 10,
5867
pollingInterval = 3000,
5968
blockInterval = ETH_BLOCK_INTERVAL,
6069
delayBlockFromLatest = 1,
61-
toBlock, ignoreLogs,
70+
toBlock,
6271
} = options
6372

64-
// init checkpoint block num
65-
this.checkpointBlockNumber = fromBlock
73+
await this.setCheckpoint(fromBlock)
6674

6775
const ccrOptions: CrossCheckRangeParam = {
6876
...options,
@@ -72,26 +80,12 @@ export class AutoCrossChecker extends BaseCrossChecker {
7280
await options.onMissingLog(log)
7381
this.cache!.addLogs([log])
7482
},
75-
fromBlock: -1, // placeholder
76-
toBlock: -1, // placeholder
77-
}
78-
79-
if (ignoreLogs)
80-
await this.cache.addLogs(ignoreLogs)
81-
82-
// initialize the ignore logs from redis
83-
ccrOptions.ignoreLogs = await this.cache.getLogs()
84-
85-
const updateCCROptions = async (ccrOptions: any) => {
86-
// iterate block range
87-
ccrOptions.fromBlock = this.checkpointBlockNumber
88-
// batchBlocksCount should > 0
89-
ccrOptions.toBlock = ccrOptions.fromBlock + batchBlocksCount - 1
83+
fromBlock,
84+
toBlock: fromBlock + batchBlocksCount - 1,
9085
}
9186

9287
const waitNextCrosscheck = async (): Promise<boolean> => {
93-
// TODO: use blockNumber for performance
94-
const latestblocknum = (await retryOnNull(async () => await this.provider.provider?.getBlock('latest'))).number
88+
const latestblocknum = await retryOnNull(async () => await this.provider.provider?.getBlockNumber())
9589
this.logger.info('[*] ccrOptions: fromBlock', ccrOptions.fromBlock, ', toBlock', ccrOptions.toBlock, ', latestblocknum', latestblocknum)
9690
if (ccrOptions.toBlock + delayBlockFromLatest > latestblocknum) {
9791
// sleep until the toBlock
@@ -106,22 +100,28 @@ export class AutoCrossChecker extends BaseCrossChecker {
106100
? () => { ccrOptions.toBlock = Math.min(ccrOptions.toBlock, toBlock); return true }
107101
: waitNextCrosscheck
108102

103+
const updateCCROptions = async (ccrOptions: any) => {
104+
// only set after cc succ
105+
await this.setCheckpoint(ccrOptions.toBlock + 1)
106+
// iterate block range
107+
ccrOptions.fromBlock = this.checkpointBlockNumber
108+
// batchBlocksCount should > 0
109+
ccrOptions.toBlock = ccrOptions.fromBlock + batchBlocksCount - 1
110+
}
111+
109112
const endingCondition = toBlock
110113
// ends on up to options.toBlock
111-
? () => this.checkpointBlockNumber > toBlock
114+
? () => this.checkpointBlockNumber! > toBlock
112115
// never ends if options.toBlock is not provided
113116
: () => false
114117

115118
// TODO: replace polling with schedule cron
116119
await polling(async () => {
117-
await updateCCROptions(ccrOptions)
118-
119120
if (await waitOrUpdateToBlock()) {
120121
await this.crossCheckRange(ccrOptions)
121-
// only set after cc succ
122-
this.checkpointBlockNumber = ccrOptions.toBlock + 1
122+
// only update options after cc succ
123+
await updateCCROptions(ccrOptions)
123124
}
124-
125125
return endingCondition()
126126
}, pollingInterval)
127127
}
@@ -130,7 +130,7 @@ export class AutoCrossChecker extends BaseCrossChecker {
130130
const newlogs = await super.diff(logs, ignoreLogs)
131131
const res: ethers.Log[] = []
132132
for (const log of newlogs) {
133-
const key = this.cache!.encodeKey(log)
133+
const key = this.cache!.encodeLogKey(log)
134134
const logExist = await this.cache!.has(key)
135135
if (!logExist)
136136
res.push(log)

packages/reku/event/crosschecker/cache/manager.ts

Lines changed: 40 additions & 28 deletions
Original file line numberDiff line numberDiff line change
@@ -26,39 +26,13 @@ export class CrossCheckerCacheManager extends SimpleStoreManager {
2626
this.logger = options?.logger ?? logger
2727
}
2828

29-
/**
30-
* @dev can add this.style: string = 'redis' when supporting other store type
31-
* @param log
32-
* @returns
33-
*/
34-
encodeKey(log: SimpleLog): string {
35-
const key = log.index && !this.noLogIndex ? `${this.storeKeyPrefix + log.transactionHash}:${log.index}` : this.storeKeyPrefix + log.transactionHash
36-
logger.debug('cc-cm-encodeKey', key)
37-
return key
38-
}
39-
40-
decodeKey(key: string): SimpleLog {
41-
logger.debug('cc-cm-decodeKey', key)
42-
if (!key.startsWith(this.storeKeyPrefix))
43-
throw new Error(`The prefix ${this.storeKeyPrefix} is not a prefix of ${key}`)
44-
45-
const _noprefix_key = key.slice(this.storeKeyPrefix.length)
46-
47-
const parts = _noprefix_key.split(':')
48-
if (parts.length > 2)
49-
throw new Error(`wrong key format when decoding, expecting ${this.storeKeyPrefix}+xx:xx, getting ${key}`)
50-
51-
const log = { transactionHash: parts[0], index: parts.length === 2 ? parseInt(parts[1]) : undefined }
52-
return log
53-
}
54-
5529
/**
5630
* add log into store record that can indicate a log
5731
* @param log
5832
*/
5933
async addLog(log: SimpleLog) {
6034
this.logger.debug('cache manager - addLog:', log.transactionHash, log.index)
61-
const key = this.encodeKey(log)
35+
const key = this.encodeLogKey(log)
6236
await this.set(key, true, this.ttl)
6337
}
6438

@@ -83,8 +57,46 @@ export class CrossCheckerCacheManager extends SimpleStoreManager {
8357
this.logger.debug('cachemanager-getLogs:', keys)
8458
const logs: SimpleLog[] = []
8559
for (const key of keys)
86-
logs.push(this.decodeKey(key))
60+
logs.push(this.decodeLogKey(key))
8761

8862
return logs
8963
}
64+
65+
/**
66+
* @dev can add this.style: string = 'redis' when supporting other store type
67+
* @param log
68+
* @returns
69+
*/
70+
encodeLogKey(log: SimpleLog): string {
71+
const key = log.index && !this.noLogIndex ? `${this.storeKeyPrefix + log.transactionHash}:${log.index}` : this.storeKeyPrefix + log.transactionHash
72+
logger.debug('cc-cm-encodeKey', key)
73+
return key
74+
}
75+
76+
decodeLogKey(key: string): SimpleLog {
77+
logger.debug('cc-cm-decodeKey', key)
78+
if (!key.startsWith(this.storeKeyPrefix))
79+
throw new Error(`The prefix ${this.storeKeyPrefix} is not a prefix of ${key}`)
80+
81+
const _noprefix_key = key.slice(this.storeKeyPrefix.length)
82+
83+
const parts = _noprefix_key.split(':')
84+
if (parts.length > 2)
85+
throw new Error(`wrong key format when decoding, expecting ${this.storeKeyPrefix}+xx:xx, getting ${key}`)
86+
87+
const log = { transactionHash: parts[0], index: parts.length === 2 ? parseInt(parts[1]) : undefined }
88+
return log
89+
}
90+
91+
/**
92+
* ttl: no limit
93+
* @param checkpoint
94+
*/
95+
async setCheckpoint(checkpoint: number) {
96+
await this.set(`${this.storeKeyPrefix}checkpoint`, checkpoint)
97+
}
98+
99+
async getCheckpoint(): Promise<number | undefined> {
100+
return await this.get(`${this.storeKeyPrefix}checkpoint`)
101+
}
90102
}

packages/reku/tests/.env.example

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,12 @@
1+
############### General config ###############
2+
3+
# general - provider url
4+
MAINNET_WSS=""
5+
MAINNET_HTTP=""
6+
SEPOLIA_WSS=""
7+
SEPOLIA_HTTP=""
8+
9+
# general - redis
10+
REDIS_HOST="localhost"
11+
REDIS_PORT=6379
12+
REDIS_TTL=7200000 # 2 hour in ms

packages/reku/tests/crosscheck.test.ts

Lines changed: 10 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -2,19 +2,23 @@
22
import { ethers } from 'ethers'
33
import { describe, expect, it, vi } from 'vitest'
44
import { Logger, setLogger } from '@ora-io/utils'
5+
import dotenv from 'dotenv'
56
import { AutoCrossChecker } from '../event/crosschecker/autochecker'
67
import { BaseCrossChecker } from '../event/crosschecker/basechecker'
78
import type { BaseCrossCheckParam } from '../event/crosschecker/interface'
9+
dotenv.config({ path: './packages/reku/tests/.env' })
810

9-
const ETHEREUM_RPC_URL = 'https://rpc.ankr.com/eth'
1011
const CONTRACT_ADDRESS = '0xdAC17F958D2ee523a2206206994597C13D831ec7'
1112

13+
const chain = 'mainnet'
14+
15+
const httpProvider = new ethers.JsonRpcProvider(
16+
process.env[`${chain.toUpperCase()}_HTTP`]!,
17+
)
18+
1219
setLogger(new Logger('debug', 'reku-tests'))
1320

1421
export async function crossCheckerTest() {
15-
const rpcUrl = ETHEREUM_RPC_URL
16-
const provider = new ethers.JsonRpcProvider(rpcUrl)
17-
1822
const onMissingLog = async (log: any) => {
1923
console.log('onMissingLog', log)
2024
}
@@ -33,7 +37,7 @@ export async function crossCheckerTest() {
3337
'0xddf252ad1be2c89b69c2b068fc378daa952ba7f163c4a11628f55a4df523b3ef',
3438
]
3539

36-
const cc = new BaseCrossChecker(provider)
40+
const cc = new BaseCrossChecker(httpProvider)
3741
await cc.crossCheckRange({
3842
onMissingLog,
3943
ignoreLogs,
@@ -44,7 +48,7 @@ export async function crossCheckerTest() {
4448
})
4549

4650
// choose catchup mode / realtime mode / catchup then realtime mode by fromBlock & toBlock
47-
const acc = new AutoCrossChecker(provider)
51+
const acc = new AutoCrossChecker(httpProvider)
4852
await acc.start({
4953
onMissingLog,
5054
ignoreLogs,

0 commit comments

Comments
 (0)