Skip to content

Commit

Permalink
gracefully handle shutdowns
Browse files Browse the repository at this point in the history
  • Loading branch information
mouseless0x committed Feb 16, 2025
1 parent a7b5a5d commit b8d1503
Show file tree
Hide file tree
Showing 4 changed files with 34 additions and 2 deletions.
16 changes: 14 additions & 2 deletions src/cli/setupServer.ts
Original file line number Diff line number Diff line change
Expand Up @@ -378,9 +378,21 @@ export const setupServer = async ({
"dumping mempool before shutdown"
)

// mark all executors as processed
for (const account of senderManager.getActiveWallets()) {
senderManager.markWalletProcessed(account)
}

process.exit(0)
}

process.on("SIGINT", gracefulShutdown)
process.on("SIGTERM", gracefulShutdown)
const signals = [
"SIGINT",
"SIGTERM",
"unhandledRejection",
"uncaughtException"
]
signals.forEach((signal) => {
process.on(signal, async () => await gracefulShutdown(signal))
})
}
9 changes: 9 additions & 0 deletions src/executor/senderManager/createMemorySenderManager.ts
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,15 @@ export const createMemorySenderManager = ({
)

metrics.walletsAvailable.set(availableWallets.length)
},
getActiveWallets: () => {
// Active wallets are those that are in the total pool but not in the available pool
return wallets.filter(
(wallet) =>
!availableWallets.some(
(available) => available.address === wallet.address
)
)
}
}
}
10 changes: 10 additions & 0 deletions src/executor/senderManager/createRedisSenderManager.ts
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,9 @@ export const createRedisSenderManager = async ({
entries: wallets.map((w) => w.address)
})

// Track active wallets for this instance
const activeWallets = new Set<Account>()

return {
getAllWallets: () => [...wallets],
getWallet: async () => {
Expand All @@ -79,6 +82,8 @@ export const createRedisSenderManager = async ({
throw new Error("wallet not found")
}

activeWallets.add(wallet)

logger.trace(
{ executor: wallet.address },
"got wallet from sender manager"
Expand All @@ -91,11 +96,16 @@ export const createRedisSenderManager = async ({
return wallet
},
markWalletProcessed: async (wallet: Account) => {
activeWallets.delete(wallet)

redisQueue.push(wallet.address).then(() => {
redisQueue.llen().then((len) => {
metrics.walletsAvailable.set(len)
})
})
},
getActiveWallets: () => {
return [...activeWallets]
}
}
}
1 change: 1 addition & 0 deletions src/executor/senderManager/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ export type SenderManager = {
getAllWallets: () => Account[]
getWallet: () => Promise<Account>
markWalletProcessed: (wallet: Account) => Promise<void>
getActiveWallets: () => Account[]
}

export const getSenderManager = async ({
Expand Down

0 comments on commit b8d1503

Please sign in to comment.