@@ -15,6 +15,7 @@ import (
15
15
"sort"
16
16
"strconv"
17
17
"strings"
18
+ "sync"
18
19
"sync/atomic"
19
20
"time"
20
21
@@ -27,6 +28,7 @@ type lwdStreamer struct {
27
28
cache * common.BlockCache
28
29
chainName string
29
30
pingEnable bool
31
+ mutex sync.Mutex
30
32
walletrpc.UnimplementedCompactTxStreamerServer
31
33
}
32
34
@@ -57,12 +59,15 @@ func checkTaddress(taddr string) error {
57
59
58
60
// GetLatestBlock returns the height of the best chain, according to zcashd.
59
61
func (s * lwdStreamer ) GetLatestBlock (ctx context.Context , placeholder * walletrpc.ChainSpec ) (* walletrpc.BlockID , error ) {
62
+ // Lock to ensure we return consistent height and hash
63
+ s .mutex .Lock ()
64
+ defer s .mutex .Unlock ()
60
65
latestBlock := s .cache .GetLatestHeight ()
61
- latestHash := s .cache .GetLatestHash ()
62
66
63
67
if latestBlock == - 1 {
64
68
return nil , errors .New ("cache is empty, server is probably not yet ready" )
65
69
}
70
+ latestHash := s .cache .GetLatestHash ()
66
71
67
72
return & walletrpc.BlockID {Height : uint64 (latestBlock ), Hash : latestHash }, nil
68
73
}
@@ -151,11 +156,10 @@ func (s *lwdStreamer) GetBlock(ctx context.Context, id *walletrpc.BlockID) (*wal
151
156
// 'end' inclusively.
152
157
func (s * lwdStreamer ) GetBlockRange (span * walletrpc.BlockRange , resp walletrpc.CompactTxStreamer_GetBlockRangeServer ) error {
153
158
blockChan := make (chan * walletrpc.CompactBlock )
154
- errChan := make (chan error )
155
159
if span .Start == nil || span .End == nil {
156
160
return errors .New ("must specify start and end heights" )
157
161
}
158
-
162
+ errChan := make ( chan error )
159
163
go common .GetBlockRange (s .cache , blockChan , errChan , int (span .Start .Height ), int (span .End .Height ))
160
164
161
165
for {
@@ -410,6 +414,9 @@ var mempoolList []string
410
414
var lastMempool time.Time
411
415
412
416
func (s * lwdStreamer ) GetMempoolTx (exclude * walletrpc.Exclude , resp walletrpc.CompactTxStreamer_GetMempoolTxServer ) error {
417
+ s .mutex .Lock ()
418
+ defer s .mutex .Unlock ()
419
+
413
420
if time .Since (lastMempool ).Seconds () >= 2 {
414
421
lastMempool = time .Now ()
415
422
// Refresh our copy of the mempool.
0 commit comments