@@ -12,6 +12,7 @@ import (
12
12
"errors"
13
13
"io"
14
14
"regexp"
15
+ "sort"
15
16
"strconv"
16
17
"strings"
17
18
"sync/atomic"
@@ -391,6 +392,134 @@ func (s *lwdStreamer) GetMempoolStream(_empty *walletrpc.Empty, resp walletrpc.C
391
392
return err
392
393
}
393
394
395
+ // Key is 32-byte txid (as a 64-character string), data is pointer to compact tx.
396
+ var mempoolMap * map [string ]* walletrpc.CompactTx
397
+ var mempoolList []string
398
+
399
+ // Last time we pulled a copy of the mempool from zcashd.
400
+ var lastMempool time.Time
401
+
402
+ func (s * lwdStreamer ) GetMempoolTx (exclude * walletrpc.Exclude , resp walletrpc.CompactTxStreamer_GetMempoolTxServer ) error {
403
+ if time .Now ().Sub (lastMempool ).Seconds () >= 2 {
404
+ lastMempool = time .Now ()
405
+ // Refresh our copy of the mempool.
406
+ params := make ([]json.RawMessage , 0 )
407
+ result , rpcErr := common .RawRequest ("getrawmempool" , params )
408
+ if rpcErr != nil {
409
+ return rpcErr
410
+ }
411
+ err := json .Unmarshal (result , & mempoolList )
412
+ if err != nil {
413
+ return err
414
+ }
415
+ newmempoolMap := make (map [string ]* walletrpc.CompactTx )
416
+ if mempoolMap == nil {
417
+ mempoolMap = & newmempoolMap
418
+ }
419
+ for _ , txidstr := range mempoolList {
420
+ if ctx , ok := (* mempoolMap )[txidstr ]; ok {
421
+ // This ctx has already been fetched, copy pointer to it.
422
+ newmempoolMap [txidstr ] = ctx
423
+ continue
424
+ }
425
+ txidJSON , err := json .Marshal (txidstr )
426
+ if err != nil {
427
+ return err
428
+ }
429
+ // The "0" is because we only need the raw hex, which is returned as
430
+ // just a hex string, and not even a json string (with quotes).
431
+ params := []json.RawMessage {txidJSON , json .RawMessage ("0" )}
432
+ result , rpcErr := common .RawRequest ("getrawtransaction" , params )
433
+ if rpcErr != nil {
434
+ // Not an error; mempool transactions can disappear
435
+ continue
436
+ }
437
+ // strip the quotes
438
+ var txStr string
439
+ err = json .Unmarshal (result , & txStr )
440
+ if err != nil {
441
+ return err
442
+ }
443
+
444
+ // conver to binary
445
+ txBytes , err := hex .DecodeString (txStr )
446
+ if err != nil {
447
+ return err
448
+ }
449
+ tx := parser .NewTransaction ()
450
+ txdata , err := tx .ParseFromSlice (txBytes )
451
+ if len (txdata ) > 0 {
452
+ return errors .New ("extra data deserializing transaction" )
453
+ }
454
+ newmempoolMap [txidstr ] = & walletrpc.CompactTx {}
455
+ if tx .HasShieldedElements () {
456
+ newmempoolMap [txidstr ] = tx .ToCompact ( /* height */ 0 )
457
+ }
458
+ }
459
+ mempoolMap = & newmempoolMap
460
+ }
461
+ excludeHex := make ([]string , len (exclude .Txid ))
462
+ for i := 0 ; i < len (exclude .Txid ); i ++ {
463
+ excludeHex [i ] = hex .EncodeToString (parser .Reverse (exclude .Txid [i ]))
464
+ }
465
+ for _ , txid := range MempoolFilter (mempoolList , excludeHex ) {
466
+ tx := (* mempoolMap )[txid ]
467
+ if len (tx .Hash ) > 0 {
468
+ err := resp .Send (tx )
469
+ if err != nil {
470
+ return err
471
+ }
472
+ }
473
+ }
474
+ return nil
475
+ }
476
+
477
+ // Return the subset of items that aren't excluded, but
478
+ // if more than one item matches an exclude entry, return
479
+ // all those items.
480
+ func MempoolFilter (items , exclude []string ) []string {
481
+ sort .Slice (items , func (i , j int ) bool {
482
+ return items [i ] < items [j ]
483
+ })
484
+ sort .Slice (exclude , func (i , j int ) bool {
485
+ return exclude [i ] < exclude [j ]
486
+ })
487
+ // Determine how many items match each exclude item.
488
+ nmatches := make ([]int , len (exclude ))
489
+ // is the exclude string less than the item string?
490
+ lessthan := func (e , i string ) bool {
491
+ l := len (e )
492
+ if l > len (i ) {
493
+ l = len (i )
494
+ }
495
+ return e < i [0 :l ]
496
+ }
497
+ ei := 0
498
+ for _ , item := range items {
499
+ for ei < len (exclude ) && lessthan (exclude [ei ], item ) {
500
+ ei ++
501
+ }
502
+ match := ei < len (exclude ) && strings .HasPrefix (item , exclude [ei ])
503
+ if match {
504
+ nmatches [ei ]++
505
+ }
506
+ }
507
+
508
+ // Add each item that isn't uniquely excluded to the results.
509
+ tosend := make ([]string , 0 )
510
+ ei = 0
511
+ for _ , item := range items {
512
+ for ei < len (exclude ) && lessthan (exclude [ei ], item ) {
513
+ ei ++
514
+ }
515
+ match := ei < len (exclude ) && strings .HasPrefix (item , exclude [ei ])
516
+ if ! match || nmatches [ei ] > 1 {
517
+ tosend = append (tosend , item )
518
+ }
519
+ }
520
+ return tosend
521
+ }
522
+
394
523
func getAddressUtxos (arg * walletrpc.GetAddressUtxosArg , f func (* walletrpc.GetAddressUtxosReply ) error ) error {
395
524
for _ , a := range arg .Addresses {
396
525
if err := checkTaddress (a ); err != nil {
@@ -501,6 +630,8 @@ func (s *DarksideStreamer) Reset(ctx context.Context, ms *walletrpc.DarksideMeta
501
630
if err != nil {
502
631
return nil , err
503
632
}
633
+ mempoolMap = nil
634
+ mempoolList = nil
504
635
return & walletrpc.Empty {}, nil
505
636
}
506
637
0 commit comments