Skip to content

Commit 72d748f

Browse files
Acquires the reorg semaphore before the next reorg
1 parent 3cc04f8 commit 72d748f

File tree

1 file changed

+41
-31
lines changed

1 file changed

+41
-31
lines changed

electrumx/server/block_processor.py

Lines changed: 41 additions & 31 deletions
Original file line numberDiff line numberDiff line change
@@ -255,6 +255,7 @@ def __init__(self, env: "Env", db: DB, daemon: Daemon, notifications: "Notificat
255255
self.touched = set()
256256
self.semaphore = asyncio.Semaphore()
257257
self.reorg_count = 0
258+
self.reorg_processing = False
258259
self.height = -1
259260
self.tip = None # type: Optional[bytes]
260261
self.tip_advanced_event = asyncio.Event()
@@ -354,6 +355,10 @@ async def check_and_advance_blocks(self, raw_blocks):
354355
hprevs = [self.coin.header_prevhash(h) for h in headers]
355356
chain = [self.tip] + [self.coin.header_hash(h) for h in headers[:-1]]
356357

358+
if self.reorg_processing:
359+
self.logger.warning("Awaiting the previous reorg to finished...")
360+
await self.semaphore.acquire()
361+
357362
if hprevs == chain:
358363
start = time.monotonic()
359364
await self.run_in_thread_with_lock(self.advance_blocks, blocks)
@@ -387,38 +392,42 @@ async def reorg_chain(self, count=None):
387392
388393
Count is the number of blocks to simulate a reorg, or None for
389394
a real reorg."""
395+
self.reorg_processing = True
390396
if count is None:
391397
self.logger.info("chain reorg detected")
392398
else:
393399
self.logger.info(f"faking a reorg of {count:,d} blocks")
394-
await self.flush(True)
395-
396-
async def get_raw_blocks(last_height, hex_hashes) -> Sequence[bytes]:
397-
heights = range(last_height, last_height - len(hex_hashes), -1)
398-
try:
399-
blocks = [self.db.read_raw_block(height) for height in heights]
400-
self.logger.info(f"read {len(blocks)} blocks from disk")
401-
return blocks
402-
except FileNotFoundError:
403-
return await self.daemon.raw_blocks(hex_hashes)
404-
405-
def flush_backup():
406-
# self.touched can include other addresses which is
407-
# harmless, but remove None.
408-
self.touched.discard(None)
409-
self.db.flush_backup(self.flush_data(), self.touched)
410-
411-
_start, last, hashes = await self.reorg_hashes(count)
412-
# Reverse and convert to hex strings.
413-
hashes = [hash_to_hex_str(hash) for hash in reversed(hashes)]
414-
for hex_hashes in chunks(hashes, 50):
415-
raw_blocks = await get_raw_blocks(last, hex_hashes)
416-
await self.run_in_thread_with_lock(self.backup_blocks, raw_blocks)
417-
await self.run_in_thread_with_lock(flush_backup)
418-
last -= len(raw_blocks)
419-
await self.prefetcher.reset_height(self.height)
420-
self.backed_up_event.set()
421-
self.backed_up_event.clear()
400+
try:
401+
await self.flush(True, force=True)
402+
403+
async def get_raw_blocks(last_height, hex_hashes) -> Sequence[bytes]:
404+
heights = range(last_height, last_height - len(hex_hashes), -1)
405+
try:
406+
blocks = [self.db.read_raw_block(height) for height in heights]
407+
self.logger.info(f"read {len(blocks)} blocks from disk")
408+
return blocks
409+
except FileNotFoundError:
410+
return await self.daemon.raw_blocks(hex_hashes)
411+
412+
def flush_backup():
413+
# self.touched can include other addresses which is
414+
# harmless, but remove None.
415+
self.touched.discard(None)
416+
self.db.flush_backup(self.flush_data(), self.touched)
417+
418+
_start, last, hashes = await self.reorg_hashes(count)
419+
# Reverse and convert to hex strings.
420+
hashes = [hash_to_hex_str(hash) for hash in reversed(hashes)]
421+
for hex_hashes in chunks(hashes, 50):
422+
raw_blocks = await get_raw_blocks(last, hex_hashes)
423+
await self.run_in_thread_with_lock(self.backup_blocks, raw_blocks)
424+
await self.run_in_thread_with_lock(flush_backup)
425+
last -= len(raw_blocks)
426+
await self.prefetcher.reset_height(self.height)
427+
self.backed_up_event.set()
428+
self.backed_up_event.clear()
429+
finally:
430+
self.reorg_processing = False
422431

423432
async def reorg_hashes(self, count):
424433
"""Return a pair (start, last, hashes) of blocks to back up during a
@@ -504,9 +513,9 @@ def flush_data(self):
504513
self.op_data_cache,
505514
)
506515

507-
async def flush(self, flush_utxos):
516+
async def flush(self, flush_utxos, force=False):
508517
def flush():
509-
self.db.flush_dbs(self.flush_data(), flush_utxos, self.estimate_txs_remaining)
518+
self.db.flush_dbs(self.flush_data(), flush_utxos, self.estimate_txs_remaining, force=force)
510519

511520
await self.run_in_thread_with_lock(flush)
512521

@@ -564,6 +573,7 @@ def advance_blocks(self, blocks):
564573
self.undo_infos.append((undo_info, height))
565574
self.atomicals_undo_infos.append((atomicals_undo_info, height))
566575
self.db.write_raw_block(block.raw, height)
576+
self.logger.debug(f'processed block {height} with {len(block.transactions)} txs')
567577

568578
headers = [block.header for block in blocks]
569579
self.height = height
@@ -3990,7 +4000,7 @@ def backup_blocks(self, raw_blocks: Sequence[bytes]):
39904000
The blocks should be in order of decreasing height, starting at.
39914001
self.height. A flush is performed once the blocks are backed up.
39924002
"""
3993-
self.db.assert_flushed(self.flush_data())
4003+
# self.db.assert_flushed(self.flush_data())
39944004
assert self.height >= len(raw_blocks)
39954005
genesis_activation = self.coin.GENESIS_ACTIVATION
39964006

0 commit comments

Comments
 (0)