diff --git a/.env.example b/.env.example index 167bcda2..705da033 100644 --- a/.env.example +++ b/.env.example @@ -1,6 +1,5 @@ # Bitcoin network, testnet by default NETWORK=testnet -# LOGGER_LEVEL=info # Set /token/generate default domain param # DOMAIN=localhost @@ -31,9 +30,9 @@ JWT_SECRET= # JWT_DENYLIST= # Bitcoin data provider, support mempool and electrs -# use mempool.space as default, electrs as fallback -# change to electrs if you want to use electrs as default and mempool.space as fallback -BITCOIN_DATA_PROVIDER=mempool +# use electrs as default, mempool as fallback +# change to mempool if you want to use mempool.space as default and electrs as fallback +BITCOIN_DATA_PROVIDER=electrs # Bitcoin Mempool.space API URL # optinal when BITCOIN_DATA_PROVIDER=electrs BITCOIN_MEMPOOL_SPACE_API_URL=https://mempool.space @@ -51,10 +50,6 @@ CKB_RPC_URL=https://testnet.ckb.dev/rpc PAYMASTER_PRIVATE_KEY= # Paymaster cell capacity in shannons PAYMASTER_CELL_CAPACITY=31600000000 -# Paymaster cell queue preset count, used to refill paymaster cell. -PAYMASTER_CELL_PRESET_COUNT=500 -# Paymaster cell refill threshold, refill paymaster cell when the balance is less than this threshold. -PAYMASTER_CELL_REFILL_THRESHOLD=0.3 # Check the paymaster BTC UTXO when processing rgb++ ckb transaction PAYMASTER_RECEIVE_UTXO_CHECK=false # Paymaster bitcoin address, used to receive BTC from users @@ -66,8 +61,6 @@ PAYMASTER_BTC_CONTAINER_FEE_SATS=7000 UNLOCKER_CRON_SCHEDULE='*/5 * * * *' # BTCTimeLock cell unlock batch size UNLOCKER_CELL_BATCH_SIZE=100 -# BTCTimeLock cell unlocker monitor slug, used for monitoring unlocker status on sentry -UNLOCKER_MONITOR_SLUG=btctimelock-cells-unlocker # RGB++ CKB transaction Queue cron job delay in milliseconds # the /rgbpp/v1/transaction/ckb-tx endpoint is called, the transaction will be added to the queue diff --git a/README.md b/README.md index df128b63..98b325d2 100644 --- a/README.md +++ b/README.md @@ -30,7 +30,6 @@ Update the configuration values: ```env # Bitcoin network, testnet by default NETWORK=testnet -# LOGGER_LEVEL=info # Set /token/generate default domain param # DOMAIN=localhost @@ -61,9 +60,9 @@ JWT_SECRET= # JWT_DENYLIST= # Bitcoin data provider, support mempool and electrs -# use mempool.space as default, electrs as fallback -# change to electrs if you want to use electrs as default and mempool.space as fallback -BITCOIN_DATA_PROVIDER=mempool +# use electrs as default, mempool as fallback +# change to mempool if you want to use mempool.space as default and electrs as fallback +BITCOIN_DATA_PROVIDER=electrs # Bitcoin Mempool.space API URL # optinal when BITCOIN_DATA_PROVIDER=electrs BITCOIN_MEMPOOL_SPACE_API_URL=https://mempool.space @@ -81,10 +80,6 @@ CKB_RPC_URL=https://testnet.ckb.dev/rpc PAYMASTER_PRIVATE_KEY= # Paymaster cell capacity in shannons PAYMASTER_CELL_CAPACITY=31600000000 -# Paymaster cell queue preset count, used to refill paymaster cell. -PAYMASTER_CELL_PRESET_COUNT=500 -# Paymaster cell refill threshold, refill paymaster cell when the balance is less than this threshold. -PAYMASTER_CELL_REFILL_THRESHOLD=0.3 # Check the paymaster BTC UTXO when processing rgb++ ckb transaction PAYMASTER_RECEIVE_UTXO_CHECK=false # Paymaster bitcoin address, used to receive BTC from users @@ -96,8 +91,6 @@ PAYMASTER_BTC_CONTAINER_FEE_SATS=7000 UNLOCKER_CRON_SCHEDULE='*/5 * * * *' # BTCTimeLock cell unlock batch size UNLOCKER_CELL_BATCH_SIZE=100 -# BTCTimeLock cell unlocker monitor slug, used for monitoring unlocker status on sentry -UNLOCKER_MONITOR_SLUG=btctimelock-cells-unlocker # RGB++ CKB transaction Queue cron job delay in milliseconds # the /rgbpp/v1/transaction/ckb-tx endpoint is called, the transaction will be added to the queue diff --git a/package.json b/package.json index dcfde9d9..d89db6b5 100644 --- a/package.json +++ b/package.json @@ -1,6 +1,6 @@ { "name": "btc-assets-api", - "version": "2.1.0", + "version": "2.2.0", "title": "Bitcoin/RGB++ Assets API", "description": "", "main": "index.js", @@ -41,9 +41,9 @@ "@fastify/swagger-ui": "^3.0.0", "@immobiliarelabs/fastify-sentry": "^8.0.1", "@nervosnetwork/ckb-sdk-utils": "^0.109.1", - "@rgbpp-sdk/btc": "0.0.0-snap-20240430102443", - "@rgbpp-sdk/ckb": "0.0.0-snap-20240430102443", - "@rgbpp-sdk/service": "0.0.0-snap-20240430102443", + "@rgbpp-sdk/btc": "^0.1.0", + "@rgbpp-sdk/ckb": "^0.1.0", + "@rgbpp-sdk/service": "^0.1.0", "@sentry/node": "^7.102.1", "@sentry/profiling-node": "^7.102.1", "async-retry": "^1.3.3", diff --git a/pnpm-lock.yaml b/pnpm-lock.yaml index 2ad9e282..da71d149 100644 --- a/pnpm-lock.yaml +++ b/pnpm-lock.yaml @@ -54,14 +54,14 @@ dependencies: specifier: ^0.109.1 version: 0.109.1 '@rgbpp-sdk/btc': - specifier: 0.0.0-snap-20240430102443 - version: 0.0.0-snap-20240430102443(@ckb-lumos/lumos@0.22.2) + specifier: ^0.1.0 + version: 0.1.0(@ckb-lumos/lumos@0.22.2) '@rgbpp-sdk/ckb': - specifier: 0.0.0-snap-20240430102443 - version: 0.0.0-snap-20240430102443(@ckb-lumos/lumos@0.22.2)(lodash@4.17.21) + specifier: ^0.1.0 + version: 0.1.0(@ckb-lumos/lumos@0.22.2)(lodash@4.17.21) '@rgbpp-sdk/service': - specifier: 0.0.0-snap-20240430102443 - version: 0.0.0-snap-20240430102443 + specifier: ^0.1.0 + version: 0.1.0 '@sentry/node': specifier: ^7.102.1 version: 7.102.1 @@ -1269,14 +1269,14 @@ packages: resolution: {integrity: sha512-2LuNTFBIO0m7kKIQvvPHN6UE63VjpmL9rnEEaOOaiSPbZK+zUOYIzBAWcED+3XYzhYsd/0mD57VdxAEqqV52CQ==} dev: true - /@rgbpp-sdk/btc@0.0.0-snap-20240430102443(@ckb-lumos/lumos@0.22.2): - resolution: {integrity: sha512-gUscYKTdTc/MonrLL6YlOlabZUgiQ29f6jIAh6LYY52qkAH48PKpJ+HwYr8yLvBdU/JMhijeRaj6PwXG/19AEQ==} + /@rgbpp-sdk/btc@0.1.0(@ckb-lumos/lumos@0.22.2): + resolution: {integrity: sha512-9talyTo3bhHpbLmPNXakMJ7J4NplVKhYYVXBf44z8vEbqi37iFLxYJf+vPrLk5UzXkI9XH5Grx6ty3mrp/jBug==} dependencies: '@bitcoinerlab/secp256k1': 1.1.1 '@ckb-lumos/codec': 0.22.2 '@nervosnetwork/ckb-types': 0.109.1 - '@rgbpp-sdk/ckb': 0.0.0-snap-20240430102443(@ckb-lumos/lumos@0.22.2)(lodash@4.17.21) - '@rgbpp-sdk/service': 0.0.0-snap-20240430102443 + '@rgbpp-sdk/ckb': 0.1.0(@ckb-lumos/lumos@0.22.2)(lodash@4.17.21) + '@rgbpp-sdk/service': 0.1.0 bip32: 4.0.0 bitcoinjs-lib: 6.1.5 ecpair: 2.1.0 @@ -1286,8 +1286,8 @@ packages: - debug dev: false - /@rgbpp-sdk/ckb@0.0.0-snap-20240430102443(@ckb-lumos/lumos@0.22.2)(lodash@4.17.21): - resolution: {integrity: sha512-bAeOk1NlZmbuKiT+3DKwrMbtawtB8FiHnFGsIps4/DAWd0CFi5MThQ19bJXa/CL6RKYsPoyI3RWfm3BtHdQ9FQ==} + /@rgbpp-sdk/ckb@0.1.0(@ckb-lumos/lumos@0.22.2)(lodash@4.17.21): + resolution: {integrity: sha512-C+wBxf9VxBD/YhbjxKDExHfbV8QoT0yjjXnHoMoMmadalIwEhzM6a3vnJc++PA8Iv5RslAPlmnncvZfe/yrFVw==} dependencies: '@ckb-lumos/base': 0.22.2 '@ckb-lumos/codec': 0.22.2 @@ -1295,7 +1295,7 @@ packages: '@nervosnetwork/ckb-sdk-core': 0.109.1 '@nervosnetwork/ckb-sdk-utils': 0.109.1 '@nervosnetwork/ckb-types': 0.109.1 - '@rgbpp-sdk/service': 0.0.0-snap-20240430102443 + '@rgbpp-sdk/service': 0.1.0 '@spore-sdk/core': 0.2.0-beta.8(@ckb-lumos/lumos@0.22.2)(lodash@4.17.21) axios: 1.6.8 camelcase-keys: 7.0.2 @@ -1306,8 +1306,8 @@ packages: - lodash dev: false - /@rgbpp-sdk/service@0.0.0-snap-20240430102443: - resolution: {integrity: sha512-9lWF246i7LVYHjVhbeyWLldzKTN6e7kgsIMBBCOVcMsZ8NvDFoosRabtdKLdD6Vh3SS+As2aC0oyL22uqG3iAA==} + /@rgbpp-sdk/service@0.1.0: + resolution: {integrity: sha512-POOjjBeSkqTfddclvxVCAeCd/gXXFyo0kXHsZxjOVwybgWKDOTVBxpTKv3AfEYIHcYiQgI52MuCdPLbCgpTp3g==} dependencies: '@ckb-lumos/base': 0.22.2 '@ckb-lumos/codec': 0.22.2 diff --git a/src/@types/fastify/index.d.ts b/src/@types/fastify/index.d.ts index ab4ab505..15d4b3dd 100644 --- a/src/@types/fastify/index.d.ts +++ b/src/@types/fastify/index.d.ts @@ -4,6 +4,8 @@ import Paymaster from '../../services/paymaster'; import SPVClient from '../../services/spv'; import CKBClient from '../../services/ckb'; import BitcoinClient from '../../services/bitcoin'; +import RgbppCollector from '../../services/rgbpp'; +import UTXOSyncer from '../../services/utxo'; declare module 'fastify' { // eslint-disable-next-line @typescript-eslint/no-unused-vars @@ -15,5 +17,7 @@ declare module 'fastify' { spv: SPVClient; paymaster: Paymaster; transactionProcessor: TransactionProcessor; + rgbppCollector: RgbppCollector; + utxoSyncer: UTXOSyncer; } } diff --git a/src/container.ts b/src/container.ts index 6c60eae6..0b0f8147 100644 --- a/src/container.ts +++ b/src/container.ts @@ -8,6 +8,8 @@ import Unlocker from './services/unlocker'; import SPVClient from './services/spv'; import CKBClient from './services/ckb'; import BitcoinClient from './services/bitcoin'; +import RgbppCollector from './services/rgbpp'; +import UTXOSyncer from './services/utxo'; export interface Cradle { env: typeof env; @@ -19,6 +21,8 @@ export interface Cradle { paymaster: Paymaster; unlocker: Unlocker; transactionProcessor: TransactionProcessor; + rgbppCollector: RgbppCollector; + utxoSyncer: UTXOSyncer; } const container = createContainer({ @@ -40,6 +44,8 @@ container.register({ paymaster: asClass(Paymaster).singleton(), transactionProcessor: asClass(TransactionProcessor).singleton(), unlocker: asClass(Unlocker).singleton(), + rgbppCollector: asClass(RgbppCollector).singleton(), + utxoSyncer: asClass(UTXOSyncer).singleton(), }); export default container; diff --git a/src/env.ts b/src/env.ts index fd3b3024..5f7f00bf 100644 --- a/src/env.ts +++ b/src/env.ts @@ -161,6 +161,48 @@ const envSchema = z .enum(['true', 'false']) .default('false') .transform((value) => value === 'true'), + + /** + * UTXO sync data cache enable flag, used to cache the UTXO sync data + * enable by default + */ + UTXO_SYNC_DATA_CACHE_ENABLE: z + .enum(['true', 'false']) + .default('true') + .transform((value) => value === 'true'), + /** + * UTXO sync repeat base duration, used to set the UTXO sync repeat interval + * repeat job start interval is 10 seconds by default + */ + UTXO_SYNC_REPEAT_BASE_DURATION: z.coerce.number().default(10 * 1000), + /** + * UTXO sync repeat max duration, used to maximum the UTXO sync repeat interval + * 1 hour by default + */ + UTXO_SYNC_REPEAT_MAX_DURATION: z.coerce.number().default(60 * 60 * 1000), + /** + * UTXO sync repeat expired duration, used to remove the expired UTXO sync job + * 336 hours by default + */ + UTXO_SYNC_REPEAT_EXPRIED_DURATION: z.coerce.number().default(336 * 60 * 60 * 1000), + /** + * UTXO sync data cache expire duration, used to cache the UTXO sync data + * 30 minutes by default + */ + UTXO_SYNC_DATA_CACHE_EXPIRE: z.coerce.number().default(30 * 60 * 1000), + + /** + * RGB++ collect data cache enable flag, used to cache the RGB++ collect data + * enable by default + */ + RGBPP_COLLECT_DATA_CACHE_ENABLE: z + .enum(['true', 'false']) + .default('true') + .transform((value) => value === 'true'), + /** + * RGB++ collect data cache expire duration, used to cache the RGB++ collect data + */ + RGBPP_COLLECT_DATA_CACHE_EXPIRE: z.coerce.number().default(30 * 60 * 1000), }) .and( z.union([ diff --git a/src/plugins/cron.ts b/src/plugins/cron.ts index 6f476a41..f973007a 100644 --- a/src/plugins/cron.ts +++ b/src/plugins/cron.ts @@ -3,6 +3,8 @@ import TransactionProcessor from '../services/transaction'; import cron from 'fastify-cron'; import { Env } from '../env'; import Unlocker from '../services/unlocker'; +import RgbppCollector from '../services/rgbpp'; +import UTXOSyncer from '../services/utxo'; export default fp(async (fastify) => { try { @@ -48,10 +50,10 @@ export default fp(async (fastify) => { fastify.addHook('onReady', async () => { transactionProcessor.startProcess({ onActive: (job) => { - fastify.log.info(`Job active: ${job.id}`); + fastify.log.info(`[TransactionProcessor] job active: ${job.id}`); }, onCompleted: (job) => { - fastify.log.info(`Job completed: ${job.id}`); + fastify.log.info(`[TransactionProcessor] job completed: ${job.id}`); }, }); }); @@ -78,6 +80,45 @@ export default fp(async (fastify) => { }, }; + if (env.UTXO_SYNC_DATA_CACHE_ENABLE) { + const utxoSyncer: UTXOSyncer = fastify.container.resolve('utxoSyncer'); + fastify.addHook('onReady', async () => { + utxoSyncer.startProcess({ + onActive: (job) => { + fastify.log.info(`[UTXOSyncer] job active: ${job.id}`); + }, + onCompleted: async (job) => { + fastify.log.info(`[UTXOSyncer] job completed: ${job.id}`); + if (env.RGBPP_COLLECT_DATA_CACHE_ENABLE) { + const { btcAddress, utxos } = job.returnvalue; + const rgbppCollector: RgbppCollector = fastify.container.resolve('rgbppCollector'); + await rgbppCollector.enqueueCollectJob(btcAddress, utxos, true); + } + }, + }); + }); + fastify.addHook('onClose', async () => { + utxoSyncer.closeProcess(); + }); + } + + if (env.RGBPP_COLLECT_DATA_CACHE_ENABLE) { + const rgbppCollector: RgbppCollector = fastify.container.resolve('rgbppCollector'); + fastify.addHook('onReady', async () => { + rgbppCollector.startProcess({ + onActive: (job) => { + fastify.log.info(`[RgbppCollector] job active: ${job.id}`); + }, + onCompleted: (job) => { + fastify.log.info(`[RgbppCollector] job completed: ${job.id}`); + }, + }); + }); + fastify.addHook('onClose', async () => { + rgbppCollector.closeProcess(); + }); + } + // processing unlock BTC_TIME_LOCK cells const unlocker: Unlocker = fastify.container.resolve('unlocker'); const monitorSlug = env.UNLOCKER_MONITOR_SLUG; diff --git a/src/routes/bitcoin/address.ts b/src/routes/bitcoin/address.ts index 515c6183..b65b8b7a 100644 --- a/src/routes/bitcoin/address.ts +++ b/src/routes/bitcoin/address.ts @@ -4,8 +4,11 @@ import { Balance, Transaction, UTXO } from './types'; import validateBitcoinAddress from '../../utils/validators'; import { ZodTypeProvider } from 'fastify-type-provider-zod'; import z from 'zod'; +import { Env } from '../../env'; const addressRoutes: FastifyPluginCallback, Server, ZodTypeProvider> = (fastify, _, done) => { + const env: Env = fastify.container.resolve('env'); + fastify.addHook('preHandler', async (request) => { const { address } = request.params as { address: string }; const valid = validateBitcoinAddress(address); @@ -25,6 +28,10 @@ const addressRoutes: FastifyPluginCallback, Server, ZodType }), querystring: z.object({ min_satoshi: z.coerce.number().optional().describe('The minimum value of the UTXO in satoshi'), + no_cache: z + .enum(['true', 'false']) + .default('false') + .describe('Whether to disable cache to get utxos, default is false'), }), response: { 200: Balance, @@ -33,8 +40,15 @@ const addressRoutes: FastifyPluginCallback, Server, ZodType }, async (request) => { const { address } = request.params; - const { min_satoshi } = request.query; - const utxos = await fastify.bitcoin.getAddressTxsUtxo({ address }); + const { min_satoshi, no_cache } = request.query; + + let utxosCache = null; + if (env.UTXO_SYNC_DATA_CACHE_ENABLE && no_cache !== 'true') { + utxosCache = await fastify.utxoSyncer.getUTXOsFromCache(address); + await fastify.utxoSyncer.enqueueSyncJob(address); + } + const utxos = utxosCache ? utxosCache : await fastify.bitcoin.getAddressTxsUtxo({ address }); + return utxos.reduce( (acc: Balance, utxo: UTXO) => { if (utxo.status.confirmed) { @@ -74,6 +88,10 @@ const addressRoutes: FastifyPluginCallback, Server, ZodType .default('true') .describe('Only return confirmed UTXOs'), min_satoshi: z.coerce.number().optional().describe('The minimum value of the UTXO in satoshi'), + no_cache: z + .enum(['true', 'false']) + .default('false') + .describe('Whether to disable cache to get utxos, default is false'), }), response: { 200: z.array(UTXO), @@ -82,8 +100,17 @@ const addressRoutes: FastifyPluginCallback, Server, ZodType }, async function (request) { const { address } = request.params; - const { only_confirmed, min_satoshi } = request.query; - let utxos = await fastify.bitcoin.getAddressTxsUtxo({ address }); + const { only_confirmed, min_satoshi, no_cache } = request.query; + + let utxosCache = null; + if (env.UTXO_SYNC_DATA_CACHE_ENABLE && no_cache !== 'true') { + utxosCache = await fastify.utxoSyncer.getUTXOsFromCache(address); + await fastify.utxoSyncer.enqueueSyncJob(address); + } + let utxos = utxosCache ? utxosCache : await fastify.bitcoin.getAddressTxsUtxo({ address }); + if (utxosCache) { + fastify.log.debug(`[UTXO] get utxos from cache: ${address}`); + } // compatible with the case where only_confirmed is undefined if (only_confirmed === 'true' || only_confirmed === 'undefined') { diff --git a/src/routes/bitcoin/index.ts b/src/routes/bitcoin/index.ts index 1fc8e084..5195d984 100644 --- a/src/routes/bitcoin/index.ts +++ b/src/routes/bitcoin/index.ts @@ -8,9 +8,11 @@ import container from '../../container'; import { ZodTypeProvider } from 'fastify-type-provider-zod'; import BitcoinClient from '../../services/bitcoin'; import feesRoutes from './fees'; +import UTXOSyncer from '../../services/utxo'; const bitcoinRoutes: FastifyPluginCallback, Server, ZodTypeProvider> = (fastify, _, done) => { fastify.decorate('bitcoin', container.resolve('bitcoin')); + fastify.decorate('utxoSyncer', container.resolve('utxoSyncer')); fastify.register(infoRoute); fastify.register(blockRoutes, { prefix: '/block' }); diff --git a/src/routes/cron/collect-rgbpp-cells.ts b/src/routes/cron/collect-rgbpp-cells.ts new file mode 100644 index 00000000..5d9236ce --- /dev/null +++ b/src/routes/cron/collect-rgbpp-cells.ts @@ -0,0 +1,48 @@ +import pino from 'pino'; +import { FastifyPluginCallback } from 'fastify'; +import { Server } from 'http'; +import { ZodTypeProvider } from 'fastify-type-provider-zod'; +import container from '../../container'; +import { VERCEL_MAX_DURATION } from '../../constants'; +import RgbppCollector from '../../services/rgbpp'; + +const collectRgbppCellsCronRoute: FastifyPluginCallback, Server, ZodTypeProvider> = ( + fastify, + _, + done, +) => { + fastify.get( + '/collect-rgbpp-cells', + { + schema: { + tags: ['Cron Task'], + description: 'Run UTXO sync cron task to update data cache, used for serverless deployment', + }, + }, + async () => { + const logger = container.resolve('logger'); + const rgbppCollector: RgbppCollector = container.resolve('rgbppCollector'); + try { + await new Promise((resolve) => { + setTimeout(resolve, (VERCEL_MAX_DURATION - 10) * 1000); + rgbppCollector.startProcess({ + onActive: (job) => { + logger.info(`[rgbppCollector] Job active: ${job.id}`); + }, + onCompleted: (job) => { + logger.info(`[rgbppCollector] Job completed: ${job.id}`); + }, + }); + }); + await rgbppCollector.pauseProcess(); + await rgbppCollector.closeProcess(); + } catch (err) { + logger.error(err); + fastify.Sentry.captureException(err); + } + }, + ); + done(); +}; + +export default collectRgbppCellsCronRoute; diff --git a/src/routes/cron/index.ts b/src/routes/cron/index.ts index 72e2b527..fb1222c6 100644 --- a/src/routes/cron/index.ts +++ b/src/routes/cron/index.ts @@ -1,12 +1,16 @@ -import { FastifyPluginCallback } from "fastify"; -import { ZodTypeProvider } from "fastify-type-provider-zod"; -import { Server } from "http"; -import processTransactionsCronRoute from "./process-transactions"; -import unlockCellsCronRoute from "./unlock-cells"; +import { FastifyPluginCallback } from 'fastify'; +import { ZodTypeProvider } from 'fastify-type-provider-zod'; +import { Server } from 'http'; +import processTransactionsCronRoute from './process-transactions'; +import unlockCellsCronRoute from './unlock-cells'; +import syncUTXOCronRoute from './sync-utxo'; +import collectRgbppCellsCronRoute from './collect-rgbpp-cells'; const cronRoutes: FastifyPluginCallback, Server, ZodTypeProvider> = (fastify, _, done) => { fastify.register(processTransactionsCronRoute); fastify.register(unlockCellsCronRoute); + fastify.register(syncUTXOCronRoute); + fastify.register(collectRgbppCellsCronRoute); done(); }; diff --git a/src/routes/cron/process-transactions.ts b/src/routes/cron/process-transactions.ts index 1c273c2d..b1853668 100644 --- a/src/routes/cron/process-transactions.ts +++ b/src/routes/cron/process-transactions.ts @@ -27,10 +27,10 @@ const processTransactionsCronRoute: FastifyPluginCallback, setTimeout(resolve, (VERCEL_MAX_DURATION - 10) * 1000); transactionProcessor.startProcess({ onActive: (job) => { - logger.info(`Job active: ${job.id}`); + logger.info(`[TransactionProcessor] Job active: ${job.id}`); }, onCompleted: (job) => { - logger.info(`Job completed: ${job.id}`); + logger.info(`[TransactionProcessor] Job completed: ${job.id}`); }, }); }); diff --git a/src/routes/cron/sync-utxo.ts b/src/routes/cron/sync-utxo.ts new file mode 100644 index 00000000..7c64d055 --- /dev/null +++ b/src/routes/cron/sync-utxo.ts @@ -0,0 +1,44 @@ +import pino from 'pino'; +import { FastifyPluginCallback } from 'fastify'; +import { Server } from 'http'; +import { ZodTypeProvider } from 'fastify-type-provider-zod'; +import container from '../../container'; +import { VERCEL_MAX_DURATION } from '../../constants'; +import UTXOSyncer from '../../services/utxo'; + +const syncUTXOCronRoute: FastifyPluginCallback, Server, ZodTypeProvider> = (fastify, _, done) => { + fastify.get( + '/sync-utxo', + { + schema: { + tags: ['Cron Task'], + description: 'Run UTXO sync cron task to update data cache, used for serverless deployment', + }, + }, + async () => { + const logger = container.resolve('logger'); + const utxoSyncer: UTXOSyncer = container.resolve('utxoSyncer'); + try { + await new Promise((resolve) => { + setTimeout(resolve, (VERCEL_MAX_DURATION - 10) * 1000); + utxoSyncer.startProcess({ + onActive: (job) => { + logger.info(`[UTXOSyncer] Job active: ${job.id}`); + }, + onCompleted: (job) => { + logger.info(`[UTXOSyncer] Job completed: ${job.id}`); + }, + }); + }); + await utxoSyncer.pauseProcess(); + await utxoSyncer.closeProcess(); + } catch (err) { + logger.error(err); + fastify.Sentry.captureException(err); + } + }, + ); + done(); +}; + +export default syncUTXOCronRoute; diff --git a/src/routes/rgbpp/address.ts b/src/routes/rgbpp/address.ts index 3ece1881..9601843a 100644 --- a/src/routes/rgbpp/address.ts +++ b/src/routes/rgbpp/address.ts @@ -3,16 +3,14 @@ import { Server } from 'http'; import validateBitcoinAddress from '../../utils/validators'; import { ZodTypeProvider } from 'fastify-type-provider-zod'; import { Cell, Script } from './types'; -import { buildRgbppLockArgs, genRgbppLockScript } from '@rgbpp-sdk/ckb/lib/utils/rgbpp'; -import { CKBIndexerQueryOptions } from '@ckb-lumos/ckb-indexer/lib/type'; import { blockchain } from '@ckb-lumos/base'; -import { UTXO } from '../../services/bitcoin/schema'; -import pLimit from 'p-limit'; -import asyncRetry from 'async-retry'; import z from 'zod'; +import { serializeScript } from '@nervosnetwork/ckb-sdk-utils'; import { Env } from '../../env'; const addressRoutes: FastifyPluginCallback, Server, ZodTypeProvider> = (fastify, _, done) => { + const env: Env = fastify.container.resolve('env'); + fastify.addHook('preHandler', async (request) => { const { btc_address } = request.params as { btc_address: string }; const valid = validateBitcoinAddress(btc_address); @@ -21,34 +19,6 @@ const addressRoutes: FastifyPluginCallback, Server, ZodType } }); - const env: Env = fastify.container.resolve('env'); - const limit = pLimit(env.CKB_RPC_MAX_CONCURRENCY); - - async function getRgbppAssetsByUtxo(utxo: UTXO, typeScript?: Script) { - try { - const { txid, vout } = utxo; - const args = buildRgbppLockArgs(vout, txid); - - const query: CKBIndexerQueryOptions = { - lock: genRgbppLockScript(args, process.env.NETWORK === 'mainnet'), - }; - - if (typeScript) { - query.type = typeScript; - } - - const collector = fastify.ckb.indexer.collector(query).collect(); - const cells: Cell[] = []; - for await (const cell of collector) { - cells.push(cell); - } - return cells; - } catch (e) { - fastify.Sentry.captureException(e); - throw e; - } - } - fastify.get( '/:btc_address/assets', { @@ -70,6 +40,10 @@ const addressRoutes: FastifyPluginCallback, Server, ZodType - as a hex string: '0x...' (You can pack by @ckb-lumos/codec blockchain.Script.pack({ "codeHash": "0x...", ... })) `, ), + no_cache: z + .enum(['true', 'false']) + .default('false') + .describe('Whether to disable cache to get RGB++ assets, default is false'), }), response: { 200: z.array(Cell), @@ -78,8 +52,7 @@ const addressRoutes: FastifyPluginCallback, Server, ZodType }, async (request) => { const { btc_address } = request.params; - const { type_script } = request.query; - const utxos = await fastify.bitcoin.getAddressTxsUtxo({ address: btc_address }); + const { type_script, no_cache } = request.query; let typeScript: Script | undefined = undefined; if (type_script) { @@ -90,17 +63,32 @@ const addressRoutes: FastifyPluginCallback, Server, ZodType } } - const cells = await Promise.all( - utxos.map((utxo) => - limit(() => - asyncRetry(() => getRgbppAssetsByUtxo(utxo, typeScript), { - retries: 2, - onRetry: (e, attempt) => fastify.log.warn(`[getRgbppAssetsByUtxo] ${e.message} retry ${attempt}`), - }), - ), - ), - ); - return cells.flat(); + let utxosCache = null; + if (env.UTXO_SYNC_DATA_CACHE_ENABLE && no_cache !== 'true') { + utxosCache = await fastify.utxoSyncer.getUTXOsFromCache(btc_address); + await fastify.utxoSyncer.enqueueSyncJob(btc_address); + } + const utxos = utxosCache ? utxosCache : await fastify.bitcoin.getAddressTxsUtxo({ address: btc_address }); + + let rgbppCache = null; + if (env.RGBPP_COLLECT_DATA_CACHE_ENABLE && no_cache !== 'true') { + rgbppCache = await fastify.rgbppCollector.getRgbppCellsFromCache(btc_address); + await fastify.rgbppCollector.enqueueCollectJob(btc_address, utxos); + } + + if (rgbppCache) { + fastify.log.debug(`[RGB++] get cells from cache: ${btc_address}`); + if (typeScript) { + return rgbppCache.filter( + (cell) => cell.cellOutput.type && serializeScript(cell.cellOutput.type) === serializeScript(typeScript!), + ); + } + return rgbppCache; + } + + const rgbppUtxoCellsParis = await fastify.rgbppCollector.collectRgbppUtxoCellsPairs(utxos, typeScript); + const cells = rgbppUtxoCellsParis.map((pair) => pair.cells).flat(); + return cells; }, ); diff --git a/src/routes/rgbpp/assets.ts b/src/routes/rgbpp/assets.ts index ff9a2273..58340565 100644 --- a/src/routes/rgbpp/assets.ts +++ b/src/routes/rgbpp/assets.ts @@ -2,9 +2,8 @@ import { FastifyPluginCallback } from 'fastify'; import { ZodTypeProvider } from 'fastify-type-provider-zod'; import { Server } from 'http'; import z from 'zod'; -import { buildRgbppLockArgs, genRgbppLockScript } from '@rgbpp-sdk/ckb/lib/utils/rgbpp'; import { Cell } from './types'; -import { CKBIndexerQueryOptions } from '@ckb-lumos/ckb-indexer/lib/type'; +import { UTXO } from '../../services/bitcoin/schema'; const assetsRoute: FastifyPluginCallback, Server, ZodTypeProvider> = (fastify, _, done) => { fastify.get( @@ -24,18 +23,20 @@ const assetsRoute: FastifyPluginCallback, Server, ZodTypePr async (request) => { const { btc_txid } = request.params; const transaction = await fastify.bitcoin.getTx({ txid: btc_txid }); - const cells: Cell[] = []; - for (let index = 0; index < transaction.vout.length; index++) { - const args = buildRgbppLockArgs(index, btc_txid); - const query: CKBIndexerQueryOptions = { - lock: genRgbppLockScript(args, process.env.NETWORK === 'mainnet'), - }; - const collector = fastify.ckb.indexer.collector(query).collect(); - for await (const cell of collector) { - cells.push(cell); - } - } - return cells; + + const utxos = transaction.vout.map((vout, index) => { + return { + txid: btc_txid, + vout: index, + value: vout.value, + status: { + confirmed: true, + }, + } as UTXO; + }); + + const batchCells = await fastify.rgbppCollector.getRgbppCellsByBatchRequest(utxos); + return batchCells.flat(); }, ); @@ -56,19 +57,18 @@ const assetsRoute: FastifyPluginCallback, Server, ZodTypePr }, async (request) => { const { btc_txid, vout } = request.params; - const args = buildRgbppLockArgs(vout, btc_txid); - const lockScript = genRgbppLockScript(args, process.env.NETWORK === 'mainnet'); - - const collector = fastify.ckb.indexer.collector({ - lock: lockScript, - }); + const utxo: UTXO = { + txid: btc_txid, + vout, + // We don't need the value here, so we just set it to 0 + value: 0, + status: { + confirmed: true, + }, + }; - const collect = collector.collect(); - const cells: Cell[] = []; - for await (const cell of collect) { - cells.push(cell); - } - return cells; + const batchCells = await fastify.rgbppCollector.getRgbppCellsByBatchRequest([utxo]); + return batchCells.flat(); }, ); diff --git a/src/routes/rgbpp/index.ts b/src/routes/rgbpp/index.ts index 15c22921..7a670fb9 100644 --- a/src/routes/rgbpp/index.ts +++ b/src/routes/rgbpp/index.ts @@ -11,6 +11,8 @@ import paymasterRoutes from './paymaster'; const rgbppRoutes: FastifyPluginCallback, Server, ZodTypeProvider> = (fastify, _, done) => { fastify.decorate('transactionProcessor', container.resolve('transactionProcessor')); fastify.decorate('paymaster', container.resolve('paymaster')); + fastify.decorate('rgbppCollector', container.resolve('rgbppCollector')); + fastify.decorate('utxoSyncer', container.resolve('utxoSyncer')); fastify.decorate('ckb', container.resolve('ckb')); fastify.decorate('bitcoin', container.resolve('bitcoin')); fastify.decorate('spv', container.resolve('spv')); diff --git a/src/routes/token/generate.ts b/src/routes/token/generate.ts index d9aabee0..1844733c 100644 --- a/src/routes/token/generate.ts +++ b/src/routes/token/generate.ts @@ -36,8 +36,22 @@ const generateRoute: FastifyPluginCallback, Server, ZodType async (request) => { const { app, domain } = request.body; const uuid = randomUUID(); - const token = fastify.jwt.sign({ sub: app, aud: domain, jti: uuid }); - return { id: uuid, token }; + + try { + // Ensure the domain is a valid URL and extract the host + const url = domain.startsWith('http') ? domain : `https://${domain}`; + const { host, pathname } = new URL(url); + + if (pathname !== '/') { + throw new Error('Must be a valid domain without path'); + } + + const token = fastify.jwt.sign({ sub: app, aud: host, jti: uuid }); + return { id: uuid, token }; + } catch (e) { + fastify.Sentry.captureException(e); + throw new Error('Failed to generate token: ' + (e as Error).message); + } }, ); done(); diff --git a/src/services/base/data-cache.ts b/src/services/base/data-cache.ts new file mode 100644 index 00000000..4b03dd1e --- /dev/null +++ b/src/services/base/data-cache.ts @@ -0,0 +1,51 @@ +import { Redis } from 'ioredis'; +import { z } from 'zod'; + +interface IDataCacheOptions { + prefix: string; + schema: z.ZodType; + expire: number; +} + +class DataCacheError extends Error { + constructor(message: string) { + super(message); + this.name = 'DataCacheError'; + } +} + +export default class DataCache { + private redis: Redis; + private prefix: string; + private schema: z.ZodType; + private expire: number; + + constructor(redis: Redis, options: IDataCacheOptions) { + this.redis = redis; + this.prefix = options.prefix; + this.schema = options.schema; + this.expire = options.expire; + } + + public async set(btcAddress: string, data: unknown) { + const parsed = this.schema.safeParse(data); + if (!parsed.success) { + throw new DataCacheError(parsed.error.message); + } + const key = `data-cache:${this.prefix}:${btcAddress}`; + await this.redis.set(key, JSON.stringify(parsed.data), 'PX', this.expire); + return parsed.data; + } + + public async get(btcAddress: string): Promise { + const key = `data-cache:${this.prefix}:${btcAddress}`; + const data = await this.redis.get(key); + if (data) { + const parsed = this.schema.safeParse(JSON.parse(data)); + if (parsed.success) { + return parsed.data; + } + } + return null; + } +} diff --git a/src/services/base/queue-worker.ts b/src/services/base/queue-worker.ts new file mode 100644 index 00000000..b618c699 --- /dev/null +++ b/src/services/base/queue-worker.ts @@ -0,0 +1,107 @@ +import { Job, JobsOptions, Queue, QueueOptions, Worker, WorkerOptions } from 'bullmq'; +import Redis from 'ioredis'; +import * as Sentry from '@sentry/node'; + +interface IQueueWorkerOptions { + name: string; + connection: Redis; + queue?: Omit; + worker?: Omit; +} + +interface IProcessCallbacks { + onActive?: (job: Job) => void; + onCompleted?: (job: Job) => void; + onFailed?: (job: Job | undefined, err: Error) => void; +} + +export default abstract class BaseQueueWorker { + protected queue: Queue; + protected worker: Worker; + + constructor(options: IQueueWorkerOptions) { + const { name, connection, queue, worker } = options; + this.queue = new Queue(name, { + connection, + ...queue, + }); + this.worker = new Worker( + name, + async (job: Job) => { + const span = Sentry.startInactiveSpan({ name: this.constructor.name, op: 'process' }); + const returnvalue = await this.process(job); + span?.end(); + return returnvalue; + }, + { + connection, + autorun: false, + ...worker, + }, + ); + } + + abstract process(job: Job): Promise; + + /** + * Add a job to the queue + * @param jobId - the job id + * @param data - the data for the job + */ + public async addJob(jobId: string, data: T, options?: Omit) { + const job = await this.queue.add(jobId, data, { + ...options, + jobId, + }); + return job; + } + + /** + * Get the queue job counts + */ + public async getQueueJobCounts() { + const counts = await this.queue.getJobCounts(); + return counts; + } + + /** + * Check if the worker is running + */ + public async isWorkerRunning() { + return this.worker.isRunning(); + } + + /** + * Start the process + * @param callbacks - the callbacks for the process + * - onCompleted: the callback when the job is completed + * - onFailed: the callback when the job is failed + */ + public async startProcess(callbacks?: IProcessCallbacks): Promise { + if (callbacks?.onActive) { + this.worker.on('active', callbacks?.onActive); + } + if (callbacks?.onCompleted) { + this.worker.on('completed', callbacks.onCompleted); + } + if (callbacks?.onFailed) { + this.worker.on('failed', callbacks.onFailed); + } + await this.worker.run(); + } + + /** + * Pause the process + */ + public async pauseProcess(): Promise { + await this.worker.pause(); + } + + /** + * Close the process + */ + public async closeProcess(): Promise { + await this.worker.close(); + await this.queue.close(); + } +} diff --git a/src/services/rgbpp.ts b/src/services/rgbpp.ts new file mode 100644 index 00000000..3a3b6358 --- /dev/null +++ b/src/services/rgbpp.ts @@ -0,0 +1,218 @@ +import { UTXO } from './bitcoin/schema'; +import pLimit from 'p-limit'; +import asyncRetry from 'async-retry'; +import { Cradle } from '../container'; +import { IndexerCell, buildRgbppLockArgs, genRgbppLockScript } from '@rgbpp-sdk/ckb'; +import * as Sentry from '@sentry/node'; +import { RPC, Script } from '@ckb-lumos/lumos'; +import { Job } from 'bullmq'; +import { z } from 'zod'; +import { Cell } from '../routes/rgbpp/types'; +import BaseQueueWorker from './base/queue-worker'; +import DataCache from './base/data-cache'; +import { groupBy } from 'lodash'; + +type GetCellsParams = Parameters; +type SearchKey = GetCellsParams[0]; +type CKBBatchRequest = { exec: () => Promise<{ objects: IndexerCell[] }[]> }; + +type RgbppUtxoCellsPair = { + utxo: UTXO; + cells: Cell[]; +}; + +interface IRgbppCollectRequest { + btcAddress: string; + utxos: UTXO[]; +} + +interface IRgbppCollectJobReturn { + [key: string]: Cell[]; +} + +export interface IProcessCallbacks { + onActive?: (job: Job) => void; + onCompleted?: (job: Job) => void; + onFailed?: (job: Job | undefined, err: Error) => void; +} + +export const RGBPP_COLLECTOR_QUEUE_NAME = 'rgbpp-collector-queue'; + +class RgbppCollectorError extends Error { + constructor(message: string) { + super(message); + this.name = 'RgbppCollectorError'; + } +} + +/** + * RgbppCollector is used to collect the cells for the utxos. + * The cells are stored in the cache with the btc address as the key, + * will be recollect when the utxos are updated or new collect job is enqueued. + */ +export default class RgbppCollector extends BaseQueueWorker { + private limit: pLimit.Limit; + private dataCache: DataCache; + + constructor(private cradle: Cradle) { + super({ + name: RGBPP_COLLECTOR_QUEUE_NAME, + connection: cradle.redis, + worker: { + lockDuration: 60_000, + removeOnComplete: { count: 0 }, + removeOnFail: { count: 0 }, + }, + }); + this.dataCache = new DataCache(cradle.redis, { + prefix: 'rgbpp-collector-data', + schema: z.record(z.array(Cell)), + expire: cradle.env.RGBPP_COLLECT_DATA_CACHE_EXPIRE, + }); + this.limit = pLimit(100); + } + + /** + * Capture the exception to the sentry scope with the btc address and utxos + * @param job - the job that failed + * @param err - the error + */ + private captureJobExceptionToSentryScope(job: Job, err: Error) { + const { btcAddress, utxos } = job.data; + Sentry.withScope((scope) => { + scope.setTag('btcAddress', btcAddress); + scope.setContext('utxos', { + utxos: JSON.stringify(utxos), + }); + this.cradle.logger.error(err); + scope.captureException(err); + }); + } + + /** + * Get the rgbpp cells batch request for the utxos + * @param utxos - the utxos to collect + * @param typeScript - the type script to filter the cells + */ + public async getRgbppCellsByBatchRequest(utxos: UTXO[], typeScript?: Script) { + const batchRequest: CKBBatchRequest = this.cradle.ckb.rpc.createBatchRequest( + utxos.map((utxo) => { + const { txid, vout } = utxo; + const args = buildRgbppLockArgs(vout, txid); + const searchKey: SearchKey = { + script: genRgbppLockScript(args, process.env.NETWORK === 'mainnet'), + scriptType: 'lock', + }; + if (typeScript) { + searchKey.filter = { + script: typeScript, + }; + } + // TOOD: In extreme cases, the num of search target cells may be more than limit=0x64=100 + // Priority: Low + const params: GetCellsParams = [searchKey, 'desc', '0x64']; + return ['getCells', ...params]; + }), + ); + const result = await batchRequest.exec(); + const cells = result.map(({ objects }) => { + return objects.map((indexerCell) => { + const { output, outPoint, outputData, blockNumber, txIndex } = indexerCell; + return { + outPoint, + cellOutput: output, + data: outputData, + blockNumber, + txIndex, + } as Cell; + }); + }); + return cells; + } + + /** + * Get the rgbpp cells from cache + * @param btcAddress - the btc address + */ + public async getRgbppCellsFromCache(btcAddress: string) { + const data = await this.dataCache.get(btcAddress); + if (!data) { + return null; + } + return Object.values(data).flat(); + } + + /** + * Collect the cells for the utxos, return the utxo and the cells + * @param utxos - the utxos to collect + * @param typeScript - the type script to filter the cells + */ + public async collectRgbppUtxoCellsPairs(utxos: UTXO[], typeScript?: Script): Promise { + const bucketSize = Math.ceil(utxos.length / this.cradle.env.CKB_RPC_MAX_CONCURRENCY); + // split the utxos into buckets, every bucket has almost the same size + const buckets = groupBy(utxos, () => Math.floor(Math.random() * bucketSize)) as Record; + const data = await Promise.all( + Object.values(buckets).map((group: UTXO[]) => { + return this.limit(() => + asyncRetry( + async () => { + const batchCells = await this.getRgbppCellsByBatchRequest(group, typeScript); + return batchCells.map((cells, index: number) => { + const utxo = group[index]; + return { utxo, cells }; + }); + }, + { + retries: 2, + }, + ), + ); + }), + ); + const pairs = data.flat().filter(({ cells }) => cells.length > 0); + return pairs; + } + + /** + * Enqueue a collect job to the queue + * @param utxos - the utxos to collect + */ + public async enqueueCollectJob( + btcAddress: string, + utxos: UTXO[], + allowDuplicate?: boolean, + ): Promise> { + let jobId = btcAddress; + if (allowDuplicate) { + // add a timestamp to the job id to allow duplicate jobs + // used for the case that the utxos are updated + jobId = `${btcAddress}:${Date.now()}`; + } + return this.addJob(jobId, { btcAddress, utxos }); + } + + /** + * Process the collect job, collect the cells for the utxos + * concurrently controlled by the CKB_RPC_MAX_CONCURRENCY + * retry 2 times if failed, and return the utxo and cells + */ + public async process(job: Job) { + try { + const { btcAddress, utxos } = job.data; + const pairs = await this.collectRgbppUtxoCellsPairs(utxos); + const data = pairs.reduce((acc, { utxo, cells }) => { + const key = `${utxo.txid}:${utxo.vout}`; + acc[key] = cells; + return acc; + }, {} as IRgbppCollectJobReturn); + this.dataCache.set(btcAddress, data); + return data; + } catch (e) { + const { message, stack } = e as Error; + const error = new RgbppCollectorError(message); + error.stack = stack; + this.captureJobExceptionToSentryScope(job, error); + throw e; + } + } +} diff --git a/src/services/transaction.ts b/src/services/transaction.ts index b311197f..4e2f4b30 100644 --- a/src/services/transaction.ts +++ b/src/services/transaction.ts @@ -22,7 +22,7 @@ import { } from '@rgbpp-sdk/ckb/lib/utils/rgbpp'; import * as Sentry from '@sentry/node'; import { Transaction as BitcoinTransaction } from 'bitcoinjs-lib'; -import { DelayedError, Job, Queue, Worker } from 'bullmq'; +import { DelayedError, Job } from 'bullmq'; import { Cradle } from '../container'; import { Transaction } from '../routes/bitcoin/types'; import { CKBRawTransaction, CKBVirtualResult } from '../routes/rgbpp/types'; @@ -35,6 +35,8 @@ import { JwtPayload } from '../plugins/jwt'; import { serializeCellDep } from '@nervosnetwork/ckb-sdk-utils'; import { BitcoinClientAPIError } from './bitcoin'; import { HttpStatusCode } from 'axios'; +import BaseQueueWorker from './base/queue-worker'; +import { Env } from '../env'; export interface ITransactionRequest { txid: string; @@ -95,22 +97,35 @@ class OpReturnNotFoundError extends Error { * - add paymaster cell and sign the CKB transaction if needed * - sending CKB transaction to the network and waiting for confirmation */ -export default class TransactionProcessor implements ITransactionProcessor { +export default class TransactionProcessor + extends BaseQueueWorker + implements ITransactionProcessor +{ private cradle: Cradle; - private queue: Queue; - private worker: Worker; constructor(cradle: Cradle) { - this.cradle = cradle; - this.queue = new Queue(TRANSACTION_QUEUE_NAME, { - connection: cradle.redis, - defaultJobOptions: this.defaultJobOptions, - }); - this.worker = new Worker(TRANSACTION_QUEUE_NAME, this.process.bind(this), { + const defaultJobOptions = TransactionProcessor.getDefaultJobOptions(cradle.env); + super({ + name: TRANSACTION_QUEUE_NAME, connection: cradle.redis, - autorun: false, - concurrency: 10, + queue: { + defaultJobOptions, + }, + worker: { + concurrency: 10, + }, }); + this.cradle = cradle; + } + + public static getDefaultJobOptions(env: Env) { + return { + attempts: env.TRANSACTION_QUEUE_JOB_ATTEMPTS, + backoff: { + type: 'exponential', + delay: env.TRANSACTION_QUEUE_JOB_DELAY, + }, + }; } private get isMainnet() { @@ -125,16 +140,6 @@ export default class TransactionProcessor implements ITransactionProcessor { return getBtcTimeLockScript(this.isMainnet); } - public get defaultJobOptions() { - return { - attempts: this.cradle.env.TRANSACTION_QUEUE_JOB_ATTEMPTS, - backoff: { - type: 'exponential', - delay: this.cradle.env.TRANSACTION_QUEUE_JOB_DELAY, - }, - }; - } - private isRgbppLock(lock: CKBComponents.Script) { return lock.codeHash === this.rgbppLockScript.codeHash && lock.hashType === this.rgbppLockScript.hashType; } @@ -343,7 +348,7 @@ export default class TransactionProcessor implements ITransactionProcessor { private async appendSporeCobuildWitness(signedTx: CKBRawTransaction) { const inputs = await Promise.all( signedTx.inputs.map(async (input) => { - return this.cradle.ckb.rpc.getLiveCell(input.previousOutput!, false); + return this.cradle.ckb.rpc.getLiveCell(input.previousOutput!, true); }), ); const sporeLiveCells = inputs @@ -457,6 +462,19 @@ export default class TransactionProcessor implements ITransactionProcessor { this.cradle.logger.info(`[TransactionProcessor] Mark paymaster cell as spent: ${txHash}`); await this.cradle.paymaster.markPaymasterCellAsSpent(txid, signedTx!); } + + // trigger the UTXO sync job if the cache is enabled + // after the transaction is confirmed, the UTXO sync job will be triggered to sync the UTXO data + // then the RGB++ cells cache will be updated with the latest UTXO data + if (this.cradle.env.UTXO_SYNC_DATA_CACHE_ENABLE) { + try { + const addresses = btcTx.vout.map((vout) => vout.scriptpubkey_address).filter((address) => address); + await Promise.all(addresses.map((address) => this.cradle.utxoSyncer.enqueueSyncJob(address!))); + } catch (err) { + // ignore the error if enqueue sync job failed, to avoid the transaction failed + // already catch the error inside the utxo syncer + } + } return txHash; } catch (err) { // fix the pool rejected transaction by increasing the fee rate @@ -540,21 +558,6 @@ export default class TransactionProcessor implements ITransactionProcessor { } } - /** - * Get the queue job counts - */ - public async getQueueJobCounts() { - const counts = await this.queue.getJobCounts(); - return counts; - } - - /** - * Check if the worker is running - */ - public async isWorkerRunning() { - return this.worker.isRunning(); - } - /** * Enqueue a transaction request to the Queue, waiting for processing * @param request - the transaction request @@ -598,37 +601,4 @@ export default class TransactionProcessor implements ITransactionProcessor { ); return results; } - - /** - * Start the transaction process - * @param callbacks - the callbacks for the process - * - onCompleted: the callback when the job is completed - * - onFailed: the callback when the job is failed - */ - public async startProcess(callbacks?: IProcessCallbacks): Promise { - if (callbacks?.onActive) { - this.worker.on('active', callbacks?.onActive); - } - if (callbacks?.onCompleted) { - this.worker.on('completed', callbacks.onCompleted); - } - if (callbacks?.onFailed) { - this.worker.on('failed', callbacks.onFailed); - } - await this.worker.run(); - } - - /** - * Pause the transaction process - */ - public async pauseProcess(): Promise { - await this.worker.pause(); - } - - /** - * Close the transaction process - */ - public async closeProcess(): Promise { - await this.worker.close(); - } } diff --git a/src/services/utxo.ts b/src/services/utxo.ts new file mode 100644 index 00000000..9402745b --- /dev/null +++ b/src/services/utxo.ts @@ -0,0 +1,195 @@ +import { sha256 } from 'bitcoinjs-lib/src/crypto'; +import { Cradle } from '../container'; +import BaseQueueWorker from './base/queue-worker'; +import { UTXO } from './bitcoin/schema'; +import { z } from 'zod'; +import { Job, RepeatOptions } from 'bullmq'; +import * as Sentry from '@sentry/node'; +import DataCache from './base/data-cache'; +import { throttle } from 'lodash'; +import validateBitcoinAddress from '../utils/validators'; + +interface IUTXOSyncRequest { + btcAddress: string; +} + +interface IUTXOSyncJobReturn { + btcAddress: string; + utxos: UTXO[]; + // use sha256(latest_txs_id) as the key, so we can check if the data is updated + txsHash: string; +} + +export const UTXO_SYNCER_QUEUE_NAME = 'utxo-syncer-queue'; + +class UTXOSyncerError extends Error { + constructor(message: string) { + super(message); + this.name = 'UTXOSyncerError'; + } +} + +/** + * UTXOSyncer is used to sync the utxos for the btc address. + * The utxos are stored in the cache with the btc address as the key, + * will be resync when the btc address txs are updated. + */ +export default class UTXOSyncer extends BaseQueueWorker { + private cradle: Cradle; + private dataCache: DataCache; + + constructor(cradle: Cradle) { + const defaultJobOptions = UTXOSyncer.getDefaultJobOptions(cradle); + const repeatStrategy = UTXOSyncer.getRepeatStrategy(cradle); + super({ + name: UTXO_SYNCER_QUEUE_NAME, + connection: cradle.redis, + queue: { + defaultJobOptions, + settings: { + repeatStrategy, + }, + }, + worker: { + lockDuration: 60_000, + removeOnComplete: { count: 0 }, + removeOnFail: { count: 0 }, + settings: { + repeatStrategy, + }, + }, + }); + this.cradle = cradle; + this.dataCache = new DataCache(cradle.redis, { + prefix: 'utxo-syncer-data', + schema: z.object({ + btcAddress: z.string(), + utxos: z.array(UTXO), + txsHash: z.string(), + }), + expire: cradle.env.UTXO_SYNC_DATA_CACHE_EXPIRE, + }); + } + + public static getDefaultJobOptions(cradle: Cradle) { + return { + attempts: 2, + backoff: { + type: 'exponential', + delay: cradle.env.UTXO_SYNC_REPEAT_BASE_DURATION, + }, + }; + } + + public static getRepeatStrategy(cradle: Cradle) { + return (millis: number, opts: RepeatOptions) => { + const { count = 0 } = opts; + if (count === 0) { + // immediately process the job when first added + return millis; + } + // Exponential increase the repeat interval, with a maximum of maxDuration + // For default values (base=10s, max=3600s), the interval will be 10s, 20s, 40s, 80s, 160s, ..., 3600s, 3600s, ... + const baseDuration = cradle.env.UTXO_SYNC_REPEAT_BASE_DURATION; + const maxDuration = cradle.env.UTXO_SYNC_REPEAT_MAX_DURATION; + // Add some random delay to avoid all jobs being processed at the same time + const duration = Math.min(Math.pow(2, count) * baseDuration, maxDuration) + Math.random() * 1000; + cradle.logger.info(`[UTXOSyncer] Repeat job ${opts.jobId} in ${duration}ms`); + return millis + duration; + }; + } + + /** + * Capture the job exception to Sentry with the btcAddress tag + * @param job - the job that failed + * @param err - the error that caused the job to fail + */ + private captureJobExceptionToSentryScope(job: Job, err: Error) { + const { btcAddress } = job.data; + Sentry.withScope((scope) => { + scope.setTag('btcAddress', btcAddress); + this.cradle.logger.error(err); + scope.captureException(err); + }); + } + + public async getUTXOsFromCache(btcAddress: string) { + const data = await this.dataCache.get(btcAddress); + if (!data) { + return null; + } + return data.utxos; + } + + private async _enqueueSyncJob(btcAddress: string) { + if (!validateBitcoinAddress(btcAddress)) { + throw new UTXOSyncerError(`Invalid btc address: ${btcAddress}`); + } + + const jobs = await this.queue.getRepeatableJobs(); + const repeatableJob = jobs.find((job) => job.name === btcAddress); + + if (repeatableJob) { + // Remove the existing repeatable job to update the start date, let the job be processed immediately + this.cradle.logger.info(`[UTXOSyncer] Remove existing repeatable job for ${btcAddress}`); + await this.queue.removeRepeatableByKey(repeatableJob.key); + } + + return this.addJob( + btcAddress, + { btcAddress }, + { + repeat: { + pattern: 'exponential', + // bullmq will end the repeat job when the end date is reached + // https://github.com/taskforcesh/bullmq/blob/cce0774cffcee591407eee4d4530daa37aab3eca/src/classes/repeat.ts#L51 + endDate: Date.now() + this.cradle.env.UTXO_SYNC_REPEAT_EXPRIED_DURATION, + }, + }, + ); + } + + private enqueueSyncJobThrottle = throttle((address) => this._enqueueSyncJob(address), 1000, { + leading: true, + }); + + /** + * Enqueue a sync job for the btc address, with a throttle to avoid too many jobs being enqueued at the same time + */ + public enqueueSyncJob(btcAddress: string) { + this.cradle.logger.info(`[UTXOSyncer] Enqueue sync job for ${btcAddress}, ${Date.now()}`); + return this.enqueueSyncJobThrottle(btcAddress); + } + + public async process(job: Job): Promise { + try { + const { btcAddress } = job.data; + if (!validateBitcoinAddress(btcAddress)) { + if (job.repeatJobKey) { + await this.queue.removeRepeatableByKey(job.repeatJobKey); + } + throw new Error(`Invalid btc address: ${btcAddress}`); + } + + const txs = await this.cradle.bitcoin.getAddressTxs({ address: btcAddress }); + const txsHash = sha256(Buffer.from(txs.map((tx) => tx.txid).join(','))).toString(); + + // check if the data is updated + const cached = await this.dataCache.get(btcAddress); + if (cached && txsHash === cached.txsHash) { + this.cradle.logger.info(`[UTXOSyncer] ${btcAddress} is up to date, skip sync job`); + return cached; + } + + const utxos = await this.cradle.bitcoin.getAddressTxsUtxo({ address: btcAddress }); + const data = { btcAddress, utxos, txsHash }; + return this.dataCache.set(btcAddress, data); + } catch (e) { + const { message, stack } = e as Error; + const error = new UTXOSyncerError(message); + error.stack = stack; + this.captureJobExceptionToSentryScope(job, error); + throw e; + } + } +} diff --git a/test/app.test.ts b/test/app.test.ts index a27bfa22..cde10e5f 100644 --- a/test/app.test.ts +++ b/test/app.test.ts @@ -38,6 +38,8 @@ test('`/docs/json` - 200', async () => { '/rgbpp/v1/paymaster/info', '/cron/process-transactions', '/cron/unlock-cells', + '/cron/sync-utxo', + '/cron/collect-rgbpp-cells', ]); await fastify.close(); diff --git a/test/routes/__snapshots__/token.test.ts.snap b/test/routes/__snapshots__/token.test.ts.snap index 86b6214d..db66a933 100644 --- a/test/routes/__snapshots__/token.test.ts.snap +++ b/test/routes/__snapshots__/token.test.ts.snap @@ -11,3 +11,15 @@ exports[`\`/token/generate\` - 400 1`] = ` } ]" `; + +exports[`\`/token/generate\` - without params 1`] = ` +"[ + { + "code": "invalid_type", + "expected": "object", + "received": "null", + "path": [], + "message": "Expected object, received null" + } +]" +`; diff --git a/test/routes/token.test.ts b/test/routes/token.test.ts index 591ba71a..3466da5e 100644 --- a/test/routes/token.test.ts +++ b/test/routes/token.test.ts @@ -1,7 +1,27 @@ import { expect, test } from 'vitest'; import { buildFastify } from '../../src/app'; -test('`/token/generate` - 400', async () => { +test('`/token/generate` - successfuly', async () => { + const fastify = buildFastify(); + await fastify.ready(); + + const response = await fastify.inject({ + method: 'POST', + url: '/token/generate', + payload: { + app: 'test', + domain: 'test.com', + }, + }); + const data = response.json(); + + expect(response.statusCode).toBe(200); + expect(data.token).toBeDefined(); + + await fastify.close(); +}); + +test('`/token/generate` - without params', async () => { const fastify = buildFastify(); await fastify.ready(); @@ -17,7 +37,7 @@ test('`/token/generate` - 400', async () => { await fastify.close(); }); -test('`/token/generate` - 200', async () => { +test('`/token/generate` - invalid domain', async () => { const fastify = buildFastify(); await fastify.ready(); @@ -26,7 +46,67 @@ test('`/token/generate` - 200', async () => { url: '/token/generate', payload: { app: 'test', - domain: 'test.com', + domain: '\\', + }, + }); + const data = response.json(); + + expect(response.statusCode).toBe(500); + expect(data.message).toEqual('Failed to generate token: Invalid URL'); + + await fastify.close(); +}); + +test('`/token/generate` - with pathname', async () => { + const fastify = buildFastify(); + await fastify.ready(); + + const response = await fastify.inject({ + method: 'POST', + url: '/token/generate', + payload: { + app: 'test', + domain: 'http://test.com/abc', + }, + }); + const data = response.json(); + + expect(response.statusCode).toBe(500); + expect(data.message).toEqual('Failed to generate token: Must be a valid domain without path'); + + await fastify.close(); +}); + +test('`/token/generate` - with protocol', async () => { + const fastify = buildFastify(); + await fastify.ready(); + + const response = await fastify.inject({ + method: 'POST', + url: '/token/generate', + payload: { + app: 'test', + domain: 'https://test.com', + }, + }); + const data = response.json(); + + expect(response.statusCode).toBe(200); + expect(data.token).toBeDefined(); + + await fastify.close(); +}); + +test('`/token/generate` - with port', async () => { + const fastify = buildFastify(); + await fastify.ready(); + + const response = await fastify.inject({ + method: 'POST', + url: '/token/generate', + payload: { + app: 'test', + domain: 'test.com:3000', }, }); const data = response.json(); diff --git a/test/services/__snapshots__/rgbpp.test.ts.snap b/test/services/__snapshots__/rgbpp.test.ts.snap new file mode 100644 index 00000000..dea78f97 --- /dev/null +++ b/test/services/__snapshots__/rgbpp.test.ts.snap @@ -0,0 +1,166 @@ +// Vitest Snapshot v1, https://vitest.dev/guide/snapshot.html + +exports[`RgbppCollector > collectRgbppUtxoCellsPairs: should return the utxo and the cells 1`] = ` +[ + { + "cells": [ + { + "blockNumber": "0x123", + "cellOutput": { + "capacity": "0x123", + "lock": { + "args": "0x0000000009b1c4d969cc25cbbde6f73bf2920b62a00916f1148493ef28bfb677f0a94f2c", + "codeHash": "0x61ca7a4796a4eb19ca4f0d065cb9b10ddcf002f10f7cbb810c706cb6bb5c3248", + "hashType": "type", + }, + "type": { + "args": "0x", + "codeHash": "0x25c29dc317811a6f6f3985a7a9ebc4838bd388d19d0feeecf0bcd60f6c0975bb", + "hashType": "type", + }, + }, + "data": "0x", + "outPoint": { + "index": "0x0", + "txHash": "0x", + }, + "txIndex": "0x0", + }, + ], + "utxo": { + "status": { + "confirmed": true, + }, + "txid": "0x2c4fa9f077b6bf28ef938414f11609a0620b92f23bf7e6bdcb25cc69d9c4b109", + "value": 100000000, + "vout": 0, + }, + }, + { + "cells": [ + { + "blockNumber": "0x456", + "cellOutput": { + "capacity": "0x123", + "lock": { + "args": "0x0100000009b1c4d969cc25cbbde6f73bf2920b62a00916f1148493ef28bfb677f0a94f2c", + "codeHash": "0x61ca7a4796a4eb19ca4f0d065cb9b10ddcf002f10f7cbb810c706cb6bb5c3248", + "hashType": "type", + }, + "type": { + "args": "0x", + "codeHash": "0x25c29dc317811a6f6f3985a7a9ebc4838bd388d19d0feeecf0bcd60f6c0975bb", + "hashType": "type", + }, + }, + "data": "0x", + "outPoint": { + "index": "0x0", + "txHash": "0x", + }, + "txIndex": "0x0", + }, + ], + "utxo": { + "status": { + "confirmed": true, + }, + "txid": "0x2c4fa9f077b6bf28ef938414f11609a0620b92f23bf7e6bdcb25cc69d9c4b109", + "value": 200000000, + "vout": 1, + }, + }, +] +`; + +exports[`RgbppCollector > getRgbppCellsBatchRequest: should be filtered by typeScript 1`] = ` +[ + [ + "getCells", + { + "filter": { + "script": { + "args": "0x", + "codeHash": "0x25c29dc317811a6f6f3985a7a9ebc4838bd388d19d0feeecf0bcd60f6c0975bb", + "hashType": "data", + }, + }, + "script": { + "args": "0x0000000009b1c4d969cc25cbbde6f73bf2920b62a00916f1148493ef28bfb677f0a94f2c", + "codeHash": "0x61ca7a4796a4eb19ca4f0d065cb9b10ddcf002f10f7cbb810c706cb6bb5c3248", + "hashType": "type", + }, + "scriptType": "lock", + }, + "desc", + "0x64", + ], + [ + "getCells", + { + "filter": { + "script": { + "args": "0x", + "codeHash": "0x25c29dc317811a6f6f3985a7a9ebc4838bd388d19d0feeecf0bcd60f6c0975bb", + "hashType": "data", + }, + }, + "script": { + "args": "0x0100000009b1c4d969cc25cbbde6f73bf2920b62a00916f1148493ef28bfb677f0a94f2c", + "codeHash": "0x61ca7a4796a4eb19ca4f0d065cb9b10ddcf002f10f7cbb810c706cb6bb5c3248", + "hashType": "type", + }, + "scriptType": "lock", + }, + "desc", + "0x64", + ], +] +`; + +exports[`RgbppCollector > getRgbppCellsBatchRequest: should return the batch request for the utxos 1`] = ` +[ + [ + "getCells", + { + "script": { + "args": "0x0000000009b1c4d969cc25cbbde6f73bf2920b62a00916f1148493ef28bfb677f0a94f2c", + "codeHash": "0x61ca7a4796a4eb19ca4f0d065cb9b10ddcf002f10f7cbb810c706cb6bb5c3248", + "hashType": "type", + }, + "scriptType": "lock", + }, + "desc", + "0x64", + ], + [ + "getCells", + { + "script": { + "args": "0x0100000009b1c4d969cc25cbbde6f73bf2920b62a00916f1148493ef28bfb677f0a94f2c", + "codeHash": "0x61ca7a4796a4eb19ca4f0d065cb9b10ddcf002f10f7cbb810c706cb6bb5c3248", + "hashType": "type", + }, + "scriptType": "lock", + }, + "desc", + "0x64", + ], +] +`; + +exports[`RgbppCollector > getRgbppCellsByBatchRequest: should be filtered by typeScript 1`] = ` +[ + [], + [], +] +`; + +exports[`RgbppCollector > getRgbppCellsByBatchRequest: should return the batch request for the utxos 1`] = ` +[ + [], + [], +] +`; + +exports[`RgbppCollector > getRgbppCellsByBatchRequest: should return the utxo and the cells 1`] = `[]`; diff --git a/test/services/rgbpp.test.ts b/test/services/rgbpp.test.ts new file mode 100644 index 00000000..b1d187d1 --- /dev/null +++ b/test/services/rgbpp.test.ts @@ -0,0 +1,91 @@ +import container from '../../src/container'; +import { describe, test, beforeEach, afterEach, vi, expect } from 'vitest'; +import RgbppCollector from '../../src/services/rgbpp'; +import { UTXO } from '../../src/services/bitcoin/schema'; +import { Script } from '@ckb-lumos/base'; +// import { IndexerCell } from '@ckb-lumos/ckb-indexer/lib/type'; +// import { buildRgbppLockArgs, genRgbppLockScript } from '@rgbpp-sdk/ckb'; + +// const xudtTypeScript = { +// codeHash: '0x25c29dc317811a6f6f3985a7a9ebc4838bd388d19d0feeecf0bcd60f6c0975bb', +// hashType: 'type', +// args: '0x', +// }; + +describe('RgbppCollector', () => { + let rgbppCollector: RgbppCollector; + + beforeEach(async () => { + const cradle = container.cradle; + rgbppCollector = new RgbppCollector(cradle); + }); + + afterEach(() => { + vi.clearAllMocks(); + }); + + test('getRgbppCellsByBatchRequest: should return the batch request for the utxos', async () => { + const utxos: UTXO[] = [ + { + txid: '0x2c4fa9f077b6bf28ef938414f11609a0620b92f23bf7e6bdcb25cc69d9c4b109', + vout: 0, + value: 100000000, + status: { confirmed: true }, + }, + { + txid: '0x2c4fa9f077b6bf28ef938414f11609a0620b92f23bf7e6bdcb25cc69d9c4b109', + vout: 1, + value: 200000000, + status: { confirmed: true }, + }, + ]; + + const cells = await rgbppCollector.getRgbppCellsByBatchRequest(utxos); + expect(cells).toMatchSnapshot(); + }); + + test('getRgbppCellsByBatchRequest: should be filtered by typeScript', async () => { + const utxos: UTXO[] = [ + { + txid: '0x2c4fa9f077b6bf28ef938414f11609a0620b92f23bf7e6bdcb25cc69d9c4b109', + vout: 0, + value: 100000000, + status: { confirmed: true }, + }, + { + txid: '0x2c4fa9f077b6bf28ef938414f11609a0620b92f23bf7e6bdcb25cc69d9c4b109', + vout: 1, + value: 200000000, + status: { confirmed: true }, + }, + ]; + + const typeScript: Script = { + codeHash: '0x25c29dc317811a6f6f3985a7a9ebc4838bd388d19d0feeecf0bcd60f6c0975bb', + hashType: 'data', + args: '0x', + }; + + const cells = await rgbppCollector.getRgbppCellsByBatchRequest(utxos, typeScript); + expect(cells).toMatchSnapshot(); + }); + + test('getRgbppCellsByBatchRequest: should return the utxo and the cells', async () => { + const utxos: UTXO[] = [ + { + txid: '0x2c4fa9f077b6bf28ef938414f11609a0620b92f23bf7e6bdcb25cc69d9c4b109', + vout: 0, + value: 100000000, + status: { confirmed: true }, + }, + { + txid: '0x2c4fa9f077b6bf28ef938414f11609a0620b92f23bf7e6bdcb25cc69d9c4b109', + vout: 1, + value: 200000000, + status: { confirmed: true }, + }, + ]; + const rgbppUtxoCellsPairs = await rgbppCollector.collectRgbppUtxoCellsPairs(utxos); + expect(rgbppUtxoCellsPairs).toMatchSnapshot(); + }); +}); diff --git a/test/services/utxo.test.ts b/test/services/utxo.test.ts new file mode 100644 index 00000000..83af452e --- /dev/null +++ b/test/services/utxo.test.ts @@ -0,0 +1,60 @@ +import container, { Cradle } from '../../src/container'; +import { describe, test, beforeEach, afterEach, vi, expect } from 'vitest'; +import UTXOSyncer from '../../src/services/utxo'; + +describe('UTXOSyncer', () => { + let cradle: Cradle; + let utxoSyncer: UTXOSyncer; + + beforeEach(async () => { + cradle = container.cradle; + utxoSyncer = new UTXOSyncer(cradle); + }); + + afterEach(() => { + vi.clearAllMocks(); + }); + + test('getRepetStrategy: should be return current time when first run', () => { + const strategy = UTXOSyncer.getRepeatStrategy(cradle); + const now = Date.now(); + const excuteAt = strategy(now, { count: 0 }); + expect(excuteAt).toBe(now); + }); + + test('getRepetStrategy: should be exponential increase the repeat interval', () => { + const strategy = UTXOSyncer.getRepeatStrategy(container.cradle); + const now = Date.now(); + const excuteAt = strategy(now, { count: 2 }); + expect(excuteAt).toBeGreaterThan(now + cradle.env.UTXO_SYNC_REPEAT_BASE_DURATION * 2 ** 2); + }); + + test('getRepetStrategy: should be return maxDuration when interval is greater than maxDuration', () => { + const strategy = UTXOSyncer.getRepeatStrategy(container.cradle); + cradle.env.UTXO_SYNC_REPEAT_MAX_DURATION = 60 * 1000; + const now = Date.now(); + const excuteAt = strategy(now, { count: 100 }); + expect(excuteAt).toBeGreaterThan(now + cradle.env.UTXO_SYNC_REPEAT_MAX_DURATION); + }); + + test('enqueueSyncJob: should be add job to queue', async () => { + const spy = vi.spyOn(utxoSyncer, 'addJob'); + await utxoSyncer.enqueueSyncJob('tb1quqtqsh5jrlr9p5wnpu3rs883lqh4avpwc766x3'); + expect(spy).toHaveBeenCalled(); + }); + + test('enqueueSyncJob: should not be remove repeat job when enqueued duplicate jobs', async () => { + await utxoSyncer.enqueueSyncJob('tb1quqtqsh5jrlr9p5wnpu3rs883lqh4avpwc766x3'); + const spy = vi.spyOn(utxoSyncer['queue'], 'removeRepeatableByKey'); + await utxoSyncer.enqueueSyncJob('tb1quqtqsh5jrlr9p5wnpu3rs883lqh4avpwc766x3'); + expect(spy).not.toHaveBeenCalled(); + }); + + test('enqueueSyncJob: should be remove repeat job when is exists', async () => { + await utxoSyncer.enqueueSyncJob('tb1quqtqsh5jrlr9p5wnpu3rs883lqh4avpwc766x3'); + await new Promise((resolve) => setTimeout(resolve, 1100)); + const spy = vi.spyOn(utxoSyncer['queue'], 'removeRepeatableByKey'); + await utxoSyncer.enqueueSyncJob('tb1quqtqsh5jrlr9p5wnpu3rs883lqh4avpwc766x3'); + expect(spy).toHaveBeenCalled(); + }); +});