Skip to content

Commit b3e6167

Browse files
committed
double_tx_relay: use miniwallet
1 parent 6381616 commit b3e6167

File tree

1 file changed

+61
-200
lines changed

1 file changed

+61
-200
lines changed

src/scenarios/double_tx_relay.py

Lines changed: 61 additions & 200 deletions
Original file line numberDiff line numberDiff line change
@@ -1,17 +1,13 @@
11
#!/usr/bin/env python3
2-
import asyncio
3-
import random
4-
import math
52
import time
63
from datetime import datetime
74
from decimal import Decimal
8-
9-
from warnet.test_framework_bridge import WarnetTestFramework
10-
from scenarios.utils import ensure_miner, next_block_delta, get_block_reward_sats
11-
from test_framework.test_node import TestNode
12-
import concurrent.futures
135
from threading import Lock
146

7+
from test_framework.authproxy import JSONRPCException
8+
from test_framework.blocktools import COINBASE_MATURITY
9+
from test_framework.wallet import MiniWallet
10+
from warnet.test_framework_bridge import WarnetTestFramework
1511

1612
BLOCKS_WAIT_TILL_SPENDABLE = 101
1713
MIN_UTXO_AMOUNT = Decimal(0.001)
@@ -31,48 +27,9 @@ def add_options(self, parser):
3127
"--txproductionrate",
3228
dest="txproductionrate",
3329
default=5,
34-
# action="store_true",
30+
type=int,
3531
help="Rate per second at which transactions are generated. (default 5 tx/s)",
3632
)
37-
parser.add_argument(
38-
"--speedyinitialblockrewards",
39-
dest="speedyinitialblockrewards",
40-
default=True,
41-
type=bool,
42-
help="Mines blocks quickly at first to get the first rewards available. (default True)",
43-
)
44-
45-
def enqueue_block(self, block_and_node):
46-
self.block_queue.append(block_and_node)
47-
48-
def dequeue_spendable_block_if_available(self):
49-
if len(self.block_queue) < BLOCKS_WAIT_TILL_SPENDABLE:
50-
return None
51-
return self.block_queue.pop(0)
52-
53-
def get_node_ip(self, node):
54-
# loop 3 times to get the ip address
55-
for n in range(3):
56-
ipaddress = node.getnetworkinfo()["localaddresses"][n]["address"]
57-
if ipaddress != "0.0.0.0" and ipaddress != "::":
58-
return ipaddress
59-
return None
60-
61-
def connect_nodes(self):
62-
nodes_without_peers = []
63-
for node in self.nodes:
64-
try:
65-
if node.getpeerinfo() == []:
66-
nodes_without_peers.append(node.index)
67-
random_node = random.choice(self.nodes)
68-
ipaddress = self.get_node_ip(random_node)
69-
node.addnode(f"{ipaddress}:18444", "add")
70-
except Exception:
71-
pass
72-
if len(nodes_without_peers) == 0:
73-
return False
74-
self.log.info("Nodes without peers: %s", nodes_without_peers)
75-
return True
7633

7734
def run_test(self):
7835
self.log.info("Starting Double TX Relay Scenario")
@@ -81,160 +38,64 @@ def run_test(self):
8138
self.sent_txs = 0 # protected by mutex
8239
self.time_of_last_log = datetime.now() # protected by mutex
8340
self.failed_txs = 0 # protected by mutex
84-
85-
self.log.info("Checking all nodes have a peer")
86-
# ensure all nodes have a peer
87-
while self.connect_nodes():
88-
self.log.info("Waiting for all nodes to have a peer")
89-
time.sleep(5)
90-
91-
if self.options.speedyinitialblockrewards:
92-
self.log.info("Generating initial blocks and rewards")
93-
for node in self.nodes:
94-
self.generate_block(self.nodes[0], node)
95-
for _ in range(BLOCKS_WAIT_TILL_SPENDABLE):
96-
self.generate_block(self.nodes[0], self.nodes[0])
97-
98-
self.log.info("Starting block mining and tx sending in real time")
99-
100-
asyncio.run(self.mainasync())
101-
102-
def generate_block(self, generating_node: TestNode, receiving_node: TestNode):
103-
try:
104-
self.log.info("Generating block for node: %s", receiving_node.index)
105-
wallet = ensure_miner(receiving_node)
106-
addr = wallet.getnewaddress(address_type="bech32m")
107-
block = self.generatetoaddress(generating_node, 1, addr)[0]
108-
# allow time for block to propagate
109-
time.sleep(3)
110-
self.enqueue_block((block, receiving_node))
111-
spendable_block_and_node = self.dequeue_spendable_block_if_available()
112-
if spendable_block_and_node is not None:
113-
self.split_block_up(spendable_block_and_node)
114-
except Exception as e:
115-
self.log.error("Error generating block: %s", e)
116-
117-
def split_block_up(self, block_and_node):
118-
block = block_and_node[0]
119-
node = block_and_node[1]
120-
try:
121-
# TODO: check if block is orphaned?
122-
height = node.getblock(block)["height"]
123-
reward = get_block_reward_sats(height)
124-
self.log.info(
125-
"Splitting %s sat block reward at height %s reward up for node: %s",
126-
reward,
127-
height,
128-
node.index,
129-
)
130-
wallet = ensure_miner(node)
131-
addresses = {}
132-
for _ in range(math.floor(reward / 10000000) - 1):
133-
addresses[wallet.getnewaddress(address_type="bech32m")] = "0.1"
134-
node.send(outputs=addresses)
135-
except Exception as e:
136-
self.log.error("Error splitting block up: %s", e)
137-
138-
async def mainasync(self):
139-
await asyncio.gather(self.mineblocks(), self.sendtransactions())
140-
141-
async def mineblocks(self):
142-
print(
143-
f"Starting process to select a random node to mine a block roughly every {AVERAGE_BLOCK_TIME} seconds"
144-
)
145-
while True:
146-
try:
147-
# trunk-ignore(bandit/B311)
148-
node = random.choice(self.nodes)
149-
self.generate_block(node, node)
150-
delay = next_block_delta(AVERAGE_BLOCK_TIME)
151-
self.log.info(f"Waiting {'%.0f' % delay} seconds to mine next block")
152-
await asyncio.sleep(delay)
153-
except Exception as e:
154-
self.log.error("Error mining blocks: %s", e)
155-
await asyncio.sleep(10)
156-
157-
async def sendtransactions(self):
158-
print(
41+
self.node_details = {}
42+
self.miniwallet = MiniWallet(self.nodes[0])
43+
num_blocks = COINBASE_MATURITY + 100
44+
self.log.info(f"Generating {num_blocks} blocks...")
45+
self.generate(self.miniwallet, num_blocks)
46+
self.log.info("Rescanning utxos")
47+
self.miniwallet.rescan_utxos()
48+
49+
split_num = 1000
50+
confirmed_utxos = self.miniwallet.get_utxos(confirmed_only=True)
51+
print(f"Starting with {len(confirmed_utxos)} confirmed utxos, now splitting each into 1000...")
52+
for i, utxo in enumerate(confirmed_utxos):
53+
tx = self.miniwallet.send_self_transfer_multi(from_node=self.nodes[0], utxos_to_spend=[utxo], num_outputs=split_num)
54+
self.log.info(f"Sent tx {i} of {len(confirmed_utxos)}: {tx['txid']}")
55+
self.generate(self.miniwallet, 6)
56+
self.miniwallet.rescan_utxos()
57+
58+
confirmed_utxos = self.miniwallet.get_utxos(confirmed_only=True)
59+
self.log.info(f"Now got {len(confirmed_utxos)} utxos")
60+
61+
self.log.info(
15962
f"Starting process to send transactions at a rate of {self.options.txproductionrate} tx/s"
16063
)
161-
with concurrent.futures.ThreadPoolExecutor(max_workers=100) as executor:
162-
while True:
163-
executor.submit(self.send_transaction)
164-
# await asyncio.sleep(10)
165-
await asyncio.sleep(1 / int(self.options.txproductionrate))
166-
167-
def send_transaction(self):
168-
try:
169-
# trunk-ignore(bandit/B311)
170-
from_node = random.choice(self.nodes)
171-
# trunk-ignore(bandit/B311)
172-
to_node = random.choice(self.nodes)
173-
wallet = ensure_miner(from_node)
174-
utxos = wallet.listunspent(include_unsafe=False, query_options={"maximumAmount": 0.1})
175-
if len(utxos) == 0:
176-
return
177-
utxo = random.choice(utxos)
178-
fee_rate = Decimal("0.00001000")
179-
try:
180-
multiplier = Decimal(math.exp(random.random() * 0.2 - 0.1))
181-
fee_rate = from_node.estimatesmartfee(5)["feerate"] * multiplier
182-
fee_rate = fee_rate.quantize(Decimal(".00000001"))
183-
184-
# trunk-ignore(bandit/B110)
185-
except Exception:
186-
pass
187-
# 1 in 1 out taproot transaction should be 111 vbytes
188-
fee = fee_rate * Decimal("0.111")
189-
if fee < Decimal("0.00000111"):
190-
fee = Decimal("0.00000111")
191-
amount = Decimal(utxo["amount"])
192-
amount = amount - fee
193-
amount = amount.quantize(Decimal(".00000001"))
194-
195-
raw_transaction_inputs = [
196-
{
197-
"txid": utxo["txid"],
198-
"vout": utxo["vout"],
199-
},
200-
]
201-
raw_transaction_outputs = [
202-
{
203-
to_node.getnewaddress(address_type="bech32m"): amount,
204-
},
205-
]
206-
if amount < Decimal(MIN_UTXO_AMOUNT):
207-
raw_transaction_outputs = [
208-
{
209-
"data": "",
210-
},
211-
]
212-
tx = from_node.createrawtransaction(
213-
inputs=raw_transaction_inputs, outputs=raw_transaction_outputs
214-
)
215-
tx = from_node.signrawtransactionwithwallet(tx)["hex"]
216-
from_node.sendrawtransaction(tx)
217-
# runs in thread pool so mutex is needed
218-
with self.mutex:
219-
self.sent_txs += 1
220-
seconds_since_log = (datetime.now() - self.time_of_last_log).total_seconds()
221-
if seconds_since_log > 59:
222-
self.log.info(
223-
f"Sent roughly {'%.2f' % (self.sent_txs / 60)} transactions per second over the last minute"
224-
)
225-
self.log.info(
226-
f"Roughly {'%.2f' % (self.failed_txs / 60)} transactions per second have failed over the last minute"
227-
)
228-
self.sent_txs = 0
229-
self.failed_txs = 0
230-
self.time_of_last_log = datetime.now()
231-
except Exception as e:
232-
with self.mutex:
233-
self.failed_txs += 1
234-
self.log.error("Error sending transaction: %s", e)
235-
236-
237-
# bcli -rpcwallet=test-wallet -named send outputs="{\"$ADDRESS\": 1}" fee_rate=10
64+
tx_count = 0
65+
start_time = time.time()
66+
interval = 1 / self.options.txproductionrate
67+
time_to_next_tx = 0
68+
next_log_time = start_time + 10
69+
70+
while True:
71+
for node in self.nodes:
72+
time.sleep(time_to_next_tx)
73+
74+
tx_start_time = time.time()
75+
if len(confirmed_utxos) == 0:
76+
self.generate(self.miniwallet, 1)
77+
self.miniwallet.rescan_utxos()
78+
confirmed_utxos = self.miniwallet.get_utxos(confirmed_only=True)
79+
utxo = confirmed_utxos.pop()
80+
try:
81+
self.miniwallet.send_self_transfer(from_node=node, utxo_to_spend=utxo)
82+
tx_count += 1
83+
except JSONRPCException as e:
84+
self.log.warning(f"tx failed: {e}, continuing")
85+
86+
# Adjust next sleep
87+
tx_end_time = time.time()
88+
tx_duration = tx_end_time - tx_start_time
89+
time_to_next_tx = max(0, interval - tx_duration)
90+
91+
current_time = time.time()
92+
elapsed_time = current_time - start_time
93+
94+
if current_time >= next_log_time:
95+
tps = tx_count / elapsed_time
96+
self.log.info(f"Transactions per second (TPS): {tps}")
97+
next_log_time += 10
98+
23899

239100
if __name__ == "__main__":
240101
print("Running Double TX Relay Scenario")

0 commit comments

Comments
 (0)