-
Notifications
You must be signed in to change notification settings - Fork 27
/
Copy pathscript.py
636 lines (562 loc) · 23.3 KB
/
script.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
from io import BytesIO
from buidl.bech32 import decode_bech32, encode_bech32_checksum
from buidl.ecc import S256Point
from buidl.helper import (
decode_base58,
encode_base58_checksum,
encode_varstr,
hash160,
little_endian_to_int,
int_to_byte,
int_to_little_endian,
read_varstr,
sha256,
)
from buidl.op import (
number_to_op_code,
op_checksig_schnorr,
op_code_to_number,
op_equal,
op_hash160,
op_verify,
OP_CODE_FUNCTIONS,
OP_CODE_NAMES,
TAPROOT_OP_CODE_FUNCTIONS,
)
class Script:
def __init__(self, commands=None):
if commands is None:
self.commands = []
else:
self.commands = commands
self.raw = None
def __repr__(self):
result = ""
for command in self.commands:
if isinstance(command, int):
if OP_CODE_NAMES.get(command):
name = OP_CODE_NAMES.get(command)
else:
name = f"OP_[{command}]"
result += f"{name} "
else:
result += f"{command.hex()} "
return result
def __eq__(self, other):
return self.commands == other.commands
def __add__(self, other):
return Script(self.commands + other.commands)
@classmethod
def parse(cls, stream=None, raw=None):
if stream and raw:
raise ValueError("provide exactly one of stream/raw, not both")
if raw is None:
if stream is None:
raise ValueError("provide one of stream/raw")
raw = read_varstr(stream)
length = len(raw)
s = BytesIO(raw)
# initialize the commands array
commands = []
# initialize the number of bytes we've read to 0
count = 0
# loop until we've read length bytes
while count < length:
# get the current byte
current = s.read(1)
# increment the bytes we've read
count += 1
# convert the current byte to an integer
current_byte = current[0]
# if the current byte is between 1 and 75 inclusive
if current_byte >= 1 and current_byte <= 75:
# we have an command set n to be the current byte
n = current_byte
# add the next n bytes as an command
commands.append(s.read(n))
# increase the count by n
count += n
elif current_byte == 76:
# op_pushdata1
data_length = little_endian_to_int(s.read(1))
commands.append(s.read(data_length))
count += data_length + 1
elif current_byte == 77:
# op_pushdata2
data_length = little_endian_to_int(s.read(2))
commands.append(s.read(data_length))
count += data_length + 2
elif current_byte == 78:
# op_pushdata4
data_length = little_endian_to_int(s.read(4))
commands.append(s.read(data_length))
count += data_length + 4
else:
# we have an op code. set the current byte to op_code
op_code = current_byte
# add the op_code to the list of commands
commands.append(op_code)
obj = cls(commands)
if count != length:
# Would throw error, but Bitcoin Core will read the number of bytes that are there (not what was promised)
print(f"mismatch between length and consumed bytes {count} vs {length}")
obj.raw = raw
return obj
@classmethod
def parse_hex(cls, hex_str):
"""Helper method to make it easier to parse a hex string without converting to bytes"""
return cls.parse(raw=bytes.fromhex(hex_str))
def raw_serialize(self):
if self.raw:
return self.raw
# initialize what we'll send back
result = b""
# go through each command
for command in self.commands:
# if the command is an integer, it's an op code
if isinstance(command, int):
# turn the command into a single byte integer using int_to_byte
result += int_to_byte(command)
else:
# otherwise, this is an element
# get the length in bytes
length = len(command)
# for large lengths, we have to use a pushdata op code
if length < 75:
# turn the length into a single byte integer
result += int_to_byte(length)
elif length > 75 and length < 0x100:
# 76 is pushdata1
result += int_to_byte(76)
result += int_to_byte(length)
elif length >= 0x100 and length <= 520:
# 77 is pushdata2
result += int_to_byte(77)
result += int_to_little_endian(length, 2)
else:
raise ValueError("too long a command")
result += command
return result
def serialize(self):
# get the raw serialization (no prepended length)
result = self.raw_serialize()
# encode_varstr the result
return encode_varstr(result)
def evaluate(self, tx_obj, input_index):
# create a copy as we may need to add to this list if we have a
# RedeemScript
commands = self.commands[:]
if tx_obj.tx_ins[input_index].witness:
witness = tx_obj.tx_ins[input_index].witness.clone()
else:
witness = None
stack = []
altstack = []
op_lookup = OP_CODE_FUNCTIONS
while len(commands) > 0:
command = commands.pop(0)
if isinstance(command, int):
# do what the op code says
operation = op_lookup[command]
if command in (99, 100):
# op_if/op_notif require the commands array
if not operation(stack, commands):
print("bad op: {}".format(OP_CODE_NAMES[command]))
return False
elif command in (107, 108):
# op_toaltstack/op_fromaltstack require the altstack
if not operation(stack, altstack):
print("bad op: {}".format(OP_CODE_NAMES[command]))
return False
elif command in (172, 173, 174, 175, 177, 178, 186):
# SIG ops (172-175, 186) and CLTV/CSV (177, 178)
# are operations that need the tx and input index
if not operation(stack, tx_obj, input_index):
print("bad op: {}".format(OP_CODE_NAMES[command]))
return False
else:
if not operation(stack):
print("bad op: {}".format(OP_CODE_NAMES[command]))
return False
else:
# add the command to the stack
stack.append(command)
# p2sh rule. if the next three commands are:
# OP_HASH160 <20 byte hash> OP_EQUAL this is the RedeemScript
# OP_HASH160 == 0xa9 and OP_EQUAL == 0x87
if (
len(commands) == 3
and commands[0] == 0xA9
and isinstance(commands[1], bytes)
and len(commands[1]) == 20
and commands[2] == 0x87
):
redeem_script = encode_varstr(command)
# we execute the next three op codes
commands.pop()
h160 = commands.pop()
commands.pop()
if not op_hash160(stack):
return False
stack.append(h160)
if not op_equal(stack):
return False
# final result should be a 1
if not op_verify(stack):
print("bad p2sh h160")
return False
# hashes match! now add the RedeemScript
stream = BytesIO(redeem_script)
commands.extend(Script.parse(stream).commands)
# witness program version 0 rule. if stack commands are:
# 0 <20 byte hash> this is p2wpkh
if len(stack) == 2 and stack[0] == b"" and len(stack[1]) == 20:
h160 = stack.pop()
stack.pop()
commands.extend(witness.items)
commands.extend(P2PKHScriptPubKey(h160).commands)
# witness program version 0 rule. if stack commands are:
# 0 <32 byte hash> this is p2wsh
elif len(stack) == 2 and stack[0] == b"" and len(stack[1]) == 32:
s256 = stack.pop()
stack.pop()
commands.extend(witness.items[:-1])
witness_script = witness.items[-1]
if s256 != sha256(witness_script):
print(
f"bad sha256 {s256.hex()} vs {sha256(witness_script).hex()}"
)
return False
# hashes match! now add the Witness Script
stream = BytesIO(encode_varstr(witness_script))
witness_script_commands = Script.parse(stream).commands
commands.extend(witness_script_commands)
# witness program version 1 rule. if stack commands are:
# 1 <32 byte hash> this is p2tr
elif len(stack) == 2 and stack[0] == b"\x01" and len(stack[1]) == 32:
if len(witness) == 0:
print("stack in witness v1 empty")
return False
if witness.has_annex():
# ignore the annex
witness.items.pop()
if len(witness) == 1:
# this is a key path spend
sig = witness[0]
# pubkey is already at stack[1], so assign sig
stack[0] = sig
op_checksig_schnorr(stack, tx_obj, input_index)
elif len(witness) > 1:
# this is a script path spend
control_block = witness.control_block()
tap_script = witness.tap_script()
tweak_point = control_block.external_pubkey(tap_script)
# the tweak point should be what's on the stack
if tweak_point.parity != control_block.parity:
print("bad tweak point parity")
return False
if tweak_point.xonly() != stack.pop():
print("bad tweak point")
return False
# pop off the 1 and start fresh
stack.pop()
tap_script = witness.tap_script()
commands = witness[:-2] + tap_script.commands[:]
op_lookup = TAPROOT_OP_CODE_FUNCTIONS
if len(stack) == 0:
return False
if stack.pop() == b"":
return False
return True
def is_p2pkh(self):
"""Returns whether the script follows the
OP_DUP OP_HASH160 <20 byte hash> OP_EQUALVERIFY OP_CHECKSIG pattern."""
# there should be exactly 5 commands
# OP_DUP (0x76), OP_HASH160 (0xa9), 20-byte hash, OP_EQUALVERIFY (0x88),
# OP_CHECKSIG (0xac)
return (
len(self.commands) == 5
and self.commands[0] == 0x76
and self.commands[1] == 0xA9
and isinstance(self.commands[2], bytes)
and len(self.commands[2]) == 20
and self.commands[3] == 0x88
and self.commands[4] == 0xAC
)
def is_p2sh(self):
"""Returns whether the script follows the
OP_HASH160 <20 byte hash> OP_EQUAL pattern."""
# there should be exactly 3 commands
# OP_HASH160 (0xa9), 20-byte hash, OP_EQUAL (0x87)
return (
len(self.commands) == 3
and self.commands[0] == 0xA9
and isinstance(self.commands[1], bytes)
and len(self.commands[1]) == 20
and self.commands[2] == 0x87
)
def is_p2wpkh(self):
"""Returns whether the script follows the
OP_0 <20 byte hash> pattern."""
return (
len(self.commands) == 2
and self.commands[0] == 0x00
and isinstance(self.commands[1], bytes)
and len(self.commands[1]) == 20
)
def is_p2wsh(self):
"""Returns whether the script follows the
OP_0 <32 byte hash> pattern."""
return (
len(self.commands) == 2
and self.commands[0] == 0x00
and isinstance(self.commands[1], bytes)
and len(self.commands[1]) == 32
)
def is_p2tr(self):
"""Returns whether the script follows the
OP_1 <32 byte hash> pattern."""
return (
len(self.commands) == 2
and self.commands[0] == 0x51
and isinstance(self.commands[1], bytes)
and len(self.commands[1]) == 32
)
def has_op_return(self):
return 106 in self.commands
def is_witness_script(self):
if self.is_p2wpkh() or self.is_p2wsh():
return True
else:
return False
class ScriptPubKey(Script):
"""Represents a ScriptPubKey in a transaction"""
@classmethod
def parse(cls, s):
script_pubkey = super().parse(s)
if script_pubkey.is_p2pkh():
return P2PKHScriptPubKey(script_pubkey.commands[2])
elif script_pubkey.is_p2sh():
return P2SHScriptPubKey(script_pubkey.commands[1])
elif script_pubkey.is_p2wpkh():
return P2WPKHScriptPubKey(script_pubkey.commands[1])
elif script_pubkey.is_p2wsh():
return P2WSHScriptPubKey(script_pubkey.commands[1])
elif script_pubkey.is_p2tr():
return P2TRScriptPubKey(script_pubkey.commands[1])
else:
return script_pubkey
def redeem_script(self):
"""Convert this ScriptPubKey to its RedeemScript equivalent"""
return RedeemScript(self.commands)
class P2PKHScriptPubKey(ScriptPubKey):
def __init__(self, h160):
super().__init__()
if not isinstance(h160, bytes):
raise TypeError("To initialize P2PKHScriptPubKey, a hash160 is needed")
self.commands = [0x76, 0xA9, h160, 0x88, 0xAC]
def hash160(self):
return self.commands[2]
def address(self, network="mainnet"):
if network == "mainnet":
prefix = b"\x00"
else:
prefix = b"\x6f"
# return the encode_base58_checksum the prefix and h160
return encode_base58_checksum(prefix + self.hash160())
class P2SHScriptPubKey(ScriptPubKey):
def __init__(self, h160):
super().__init__()
if not isinstance(h160, bytes):
raise TypeError("To initialize P2SHScriptPubKey, a hash160 is needed")
self.commands = [0xA9, h160, 0x87]
def hash160(self):
return self.commands[1]
def address(self, network="mainnet"):
if network == "mainnet":
prefix = b"\x05"
else:
prefix = b"\xc4"
# return the encode_base58_checksum the prefix and h160
return encode_base58_checksum(prefix + self.hash160())
class RedeemScript(Script):
"""Subclass that represents a RedeemScript for p2sh"""
def is_p2sh_multisig(self):
return self.commands[-1] == 174
def hash160(self):
"""Returns the hash160 of the serialization of the RedeemScript"""
return hash160(self.raw_serialize())
def script_pubkey(self):
"""Returns the ScriptPubKey that this RedeemScript corresponds to"""
return P2SHScriptPubKey(self.hash160())
def address(self, network="mainnet"):
"""Returns the p2sh address for this RedeemScript"""
return self.script_pubkey().address(network)
@classmethod
def convert(cls, raw_redeem_script):
stream = BytesIO(encode_varstr(raw_redeem_script))
return cls.parse(stream)
@classmethod
def create_p2sh_multisig(
cls,
quorum_m,
pubkey_hexes,
sort_keys=True,
expected_addr=None,
expected_addr_network="mainnet",
):
"""
Create a p2sh RedeemScript using a configure threshold (quorum_m) of child public keys (in hex).
To use a custom order of pubkeys, feed them in order and set sort_keys=False
For safety, you may pass in an expected_addr and the method will throw an error if the derived address doesn't match the expected one.
"""
# safety checks
if type(quorum_m) is not int:
raise ValueError(f"quorum_m must be of type int: {quorum_m}")
if quorum_m < 1 or quorum_m > len(pubkey_hexes):
raise ValueError(f"Invalid m-of-n: {quorum_m}-of-{len(pubkey_hexes)}")
commands = [number_to_op_code(quorum_m)]
if sort_keys:
pubkey_hexes = sorted(pubkey_hexes)
for pubkey_hex in pubkey_hexes:
# we want these in binary (not hex)
commands.append(bytes.fromhex(pubkey_hex))
quorum_n = len(pubkey_hexes)
commands.append(number_to_op_code(quorum_n))
commands.append(174) # OP_CHECKMULTISIG
to_return = cls(commands)
if expected_addr:
calculated_addr = to_return.address(network=expected_addr_network)
if expected_addr != calculated_addr:
raise ValueError(
f"Expected address {expected_addr} but calculated {calculated_addr}"
)
return to_return
def get_quorum(self):
"""
Return the m-of-n of this multisig, as in 2-of-3 or 3-of-5
"""
if not self.is_p2sh_multisig():
raise ValueError(f"Not p2sh multisig: {self}")
quorum_m = op_code_to_number(self.commands[0])
# 3 because quorum_m, OP_CHECKMULTISIG, and bitcoin off-by-one error
quorum_n = len(self.commands) - 3
return quorum_m, quorum_n
def signing_pubkeys(self):
"""
The pubkeys needed to sign this transaction, typically children derived from xpubs
"""
if not self.is_p2sh_multisig():
raise ValueError(f"Not p2sh multisig: {self}")
return self.commands[1:-2]
class SegwitPubKey(ScriptPubKey):
def address(self, network="mainnet"):
"""return the bech32 address for the p2wpkh"""
# witness program is the raw serialization
witness_program = self.raw_serialize()
# convert to bech32 address using encode_bech32_checksum
return encode_bech32_checksum(witness_program, network)
def p2sh_address(self, network="mainnet"):
# get the RedeemScript equivalent and get its address
return self.redeem_script().address(network)
class P2WPKHScriptPubKey(SegwitPubKey):
def __init__(self, h160):
super().__init__()
if not isinstance(h160, bytes):
raise TypeError("To initialize P2WPKHScriptPubKey, a hash160 is needed")
self.commands = [0x00, h160]
class P2WSHScriptPubKey(SegwitPubKey):
def __init__(self, s256):
super().__init__()
if not isinstance(s256, bytes):
raise TypeError("To initialize P2WSHScriptPubKey, a sha256 is needed")
self.commands = [0x00, s256]
class P2TRScriptPubKey(ScriptPubKey):
def __init__(self, point):
super().__init__()
if isinstance(point, S256Point):
raw_point = point.xonly()
elif isinstance(point, bytes):
raw_point = point
else:
raise TypeError(
f"To initialize P2TRScriptPubKey, a point is needed {point}"
)
self.commands = [0x51, raw_point]
def address(self, network="mainnet"):
"""return the bech32m address for p2tr"""
# witness program is the raw serialization
witness_program = self.raw_serialize()
# convert to bech32 address using encode_bech32_checksum
return encode_bech32_checksum(witness_program, network)
class WitnessScript(Script):
"""Subclass that represents a WitnessScript for p2wsh"""
@classmethod
def convert(cls, raw_witness_script):
stream = BytesIO(encode_varstr(raw_witness_script))
return cls.parse(stream)
def sha256(self):
"""Returns the sha256 of the raw serialization for witness program"""
return sha256(self.raw_serialize())
def script_pubkey(self):
"""Generates the ScriptPubKey for p2wsh"""
# get the sha256 of the current script
s256 = self.sha256()
# return new p2wsh script using p2wsh_script
return P2WSHScriptPubKey(s256)
def address(self, network="mainnet"):
"""Generates a p2wsh address"""
# grab the entire witness program
witness_program = self.script_pubkey().raw_serialize()
# convert to bech32 address using encode_bech32_checksum
return encode_bech32_checksum(witness_program, network)
def p2sh_address(self, network="mainnet"):
"""Generates a p2sh-p2wsh address"""
# the RedeemScript is the p2wsh ScriptPubKey
redeem_script = self.script_pubkey().redeem_script()
# return the p2sh address of the RedeemScript (remember network)
return redeem_script.address(network)
def is_p2wsh_multisig(self):
return (
OP_CODE_NAMES[self.commands[-1]] == "OP_CHECKMULTISIG"
and isinstance(self.commands[0], int)
and isinstance(self.commands[-2], int)
)
def get_quorum(self):
"""
Return the m-of-n of this multisig, as in 2-of-3 or 3-of-5
"""
if not self.is_p2wsh_multisig():
raise ValueError(f"Not a multisig witness script: {self}")
quorum_m = OP_CODE_NAMES[self.commands[0]].split("OP_")[1]
quorum_n = OP_CODE_NAMES[self.commands[-2]].split("OP_")[1]
return int(quorum_m), int(quorum_n)
def address_to_script_pubkey(s):
if s[:1] in ("1", "m", "n"):
# p2pkh
h160 = decode_base58(s)
return P2PKHScriptPubKey(h160)
elif s[:1] in ("2", "3"):
# p2sh
h160 = decode_base58(s)
return P2SHScriptPubKey(h160)
elif s[:4] in ("bc1q", "tb1q"):
if len(s) == 42:
# p2wpkh
return P2WPKHScriptPubKey(decode_bech32(s)[2])
elif len(s) == 62:
# p2wskh
return P2WSHScriptPubKey(decode_bech32(s)[2])
elif s[:6] == "bcrt1q":
if len(s) == 44:
# p2wpkh
return P2WPKHScriptPubKey(decode_bech32(s)[2])
elif len(s) == 64:
# p2wsh
return P2WSHScriptPubKey(decode_bech32(s)[2])
elif s[:4] in ("bc1p", "tb1p"):
if len(s) != 62:
raise RuntimeError(f"unknown type of address: {s}")
# p2tr
return P2TRScriptPubKey(decode_bech32(s)[2])
raise RuntimeError(f"unknown type of address: {s}")