Skip to content

Commit 04c7610

Browse files
authored
Merge pull request #9595 from yyforyongyu/fix-gossip-syncer
multi: fix flakes and gossip syncer
2 parents a673826 + faf8ce1 commit 04c7610

File tree

6 files changed

+41
-39
lines changed

6 files changed

+41
-39
lines changed

discovery/syncer.go

Lines changed: 11 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -571,15 +571,11 @@ func (g *GossipSyncer) channelGraphSyncer() {
571571
// First, we'll attempt to continue our channel
572572
// synchronization by continuing to send off another
573573
// query chunk.
574-
done, err := g.synchronizeChanIDs()
575-
if err != nil {
576-
log.Errorf("Unable to sync chan IDs: %v", err)
577-
}
574+
done := g.synchronizeChanIDs()
578575

579576
// If this wasn't our last query, then we'll need to
580577
// transition to our waiting state.
581578
if !done {
582-
g.setSyncState(waitingQueryChanReply)
583579
continue
584580
}
585581

@@ -736,14 +732,15 @@ func (g *GossipSyncer) sendGossipTimestampRange(firstTimestamp time.Time,
736732
// been queried for with a response received. We'll chunk our requests as
737733
// required to ensure they fit into a single message. We may re-renter this
738734
// state in the case that chunking is required.
739-
func (g *GossipSyncer) synchronizeChanIDs() (bool, error) {
735+
func (g *GossipSyncer) synchronizeChanIDs() bool {
740736
// If we're in this state yet there are no more new channels to query
741737
// for, then we'll transition to our final synced state and return true
742738
// to signal that we're fully synchronized.
743739
if len(g.newChansToQuery) == 0 {
744740
log.Infof("GossipSyncer(%x): no more chans to query",
745741
g.cfg.peerPub[:])
746-
return true, nil
742+
743+
return true
747744
}
748745

749746
// Otherwise, we'll issue our next chunked query to receive replies
@@ -767,15 +764,21 @@ func (g *GossipSyncer) synchronizeChanIDs() (bool, error) {
767764
log.Infof("GossipSyncer(%x): querying for %v new channels",
768765
g.cfg.peerPub[:], len(queryChunk))
769766

767+
// Change the state before sending the query msg.
768+
g.setSyncState(waitingQueryChanReply)
769+
770770
// With our chunk obtained, we'll send over our next query, then return
771771
// false indicating that we're net yet fully synced.
772772
err := g.cfg.sendToPeer(&lnwire.QueryShortChanIDs{
773773
ChainHash: g.cfg.chainHash,
774774
EncodingType: lnwire.EncodingSortedPlain,
775775
ShortChanIDs: queryChunk,
776776
})
777+
if err != nil {
778+
log.Errorf("Unable to sync chan IDs: %v", err)
779+
}
777780

778-
return false, err
781+
return false
779782
}
780783

781784
// isLegacyReplyChannelRange determines where a ReplyChannelRange message is

discovery/syncer_test.go

Lines changed: 2 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -1478,10 +1478,7 @@ func TestGossipSyncerSynchronizeChanIDs(t *testing.T) {
14781478

14791479
for i := 0; i < chunkSize*2; i += 2 {
14801480
// With our set up complete, we'll request a sync of chan ID's.
1481-
done, err := syncer.synchronizeChanIDs()
1482-
if err != nil {
1483-
t.Fatalf("unable to sync chan IDs: %v", err)
1484-
}
1481+
done := syncer.synchronizeChanIDs()
14851482

14861483
// At this point, we shouldn't yet be done as only 2 items
14871484
// should have been queried for.
@@ -1528,8 +1525,7 @@ func TestGossipSyncerSynchronizeChanIDs(t *testing.T) {
15281525
}
15291526

15301527
// If we issue another query, the syncer should tell us that it's done.
1531-
done, err := syncer.synchronizeChanIDs()
1532-
require.NoError(t, err, "unable to sync chan IDs")
1528+
done := syncer.synchronizeChanIDs()
15331529
if done {
15341530
t.Fatalf("syncer should be finished!")
15351531
}

docs/release-notes/release-notes-0.19.0.md

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -82,6 +82,9 @@
8282
could lead to our ChannelUpdate rate limiting logic being prematurely
8383
triggered.
8484

85+
* [Fixed a bug](https://github.com/lightningnetwork/lnd/pull/9595) where the
86+
initial graph sync query may be failed due to inconsistent state.
87+
8588
# New Features
8689

8790
* [Support](https://github.com/lightningnetwork/lnd/pull/8390) for

itest/lnd_forward_interceptor_test.go

Lines changed: 0 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -438,12 +438,6 @@ func testForwardInterceptorRestart(ht *lntest.HarnessTest) {
438438
require.Equal(ht, lntest.CustomRecordsWithUnendorsed(customRecords),
439439
packet.InWireCustomRecords)
440440

441-
err = carolInterceptor.Send(&routerrpc.ForwardHtlcInterceptResponse{
442-
IncomingCircuitKey: packet.IncomingCircuitKey,
443-
Action: actionResume,
444-
})
445-
require.NoError(ht, err, "failed to send request")
446-
447441
// And now we forward the payment at Carol, expecting only an
448442
// endorsement signal in our incoming custom records.
449443
packet = ht.ReceiveHtlcInterceptor(carolInterceptor)

lnrpc/routerrpc/forward_interceptor.go

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,7 @@ import (
88
"github.com/lightningnetwork/lnd/htlcswitch"
99
"github.com/lightningnetwork/lnd/lnrpc"
1010
"github.com/lightningnetwork/lnd/lntypes"
11+
"github.com/lightningnetwork/lnd/lnutils"
1112
"github.com/lightningnetwork/lnd/lnwire"
1213
"google.golang.org/grpc/codes"
1314
"google.golang.org/grpc/status"
@@ -60,6 +61,9 @@ func (r *forwardInterceptor) run() error {
6061
return err
6162
}
6263

64+
log.Tracef("Received packet from stream: %v",
65+
lnutils.SpewLogClosure(resp))
66+
6367
if err := r.resolveFromClient(resp); err != nil {
6468
return err
6569
}
@@ -73,7 +77,8 @@ func (r *forwardInterceptor) run() error {
7377
func (r *forwardInterceptor) onIntercept(
7478
htlc htlcswitch.InterceptedPacket) error {
7579

76-
log.Tracef("Sending intercepted packet to client %v", htlc)
80+
log.Tracef("Sending intercepted packet to client %v",
81+
lnutils.SpewLogClosure(htlc))
7782

7883
inKey := htlc.IncomingCircuit
7984

tor/controller_test.go

Lines changed: 19 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -319,14 +319,20 @@ func TestReconnectSucceed(t *testing.T) {
319319
// Close the old conn before reconnection.
320320
require.NoError(t, proxy.serverConn.Close())
321321

322-
// Accept the connection inside a goroutine. We will also write some
323-
// data so that the reconnection can succeed. We will mock three writes
324-
// and two reads inside our proxy server,
325-
// - write protocol info
326-
// - read auth info
327-
// - write auth challenge
328-
// - read auth challenge
329-
// - write OK
322+
// Accept the connection inside a goroutine. When the client makes a
323+
// reconnection, the messages flow is,
324+
// 1. the client sends the command PROTOCOLINFO to the server.
325+
// 2. the server responds with its protocol version.
326+
// 3. the client reads the response and sends the command AUTHENTICATE
327+
// to the server
328+
// 4. the server responds with the authentication info.
329+
//
330+
// From the server's PoV, We need to mock two reads and two writes
331+
// inside the connection,
332+
// 1. read the command PROTOCOLINFO sent from the client.
333+
// 2. write protocol info so the client can read it.
334+
// 3. read the command AUTHENTICATE sent from the client.
335+
// 4. write auth challenge so the client can read it.
330336
go func() {
331337
// Accept the new connection.
332338
server, err := proxy.server.Accept()
@@ -335,6 +341,11 @@ func TestReconnectSucceed(t *testing.T) {
335341
t.Logf("server listening on %v, client listening on %v",
336342
server.LocalAddr(), server.RemoteAddr())
337343

344+
// Read the protocol command from the client.
345+
buf := make([]byte, 65535)
346+
_, err = server.Read(buf)
347+
require.NoError(t, err)
348+
338349
// Write the protocol info.
339350
resp := "250-PROTOCOLINFO 1\n" +
340351
"250-AUTH METHODS=NULL\n" +
@@ -343,23 +354,13 @@ func TestReconnectSucceed(t *testing.T) {
343354
require.NoErrorf(t, err, "failed to write protocol info")
344355

345356
// Read the auth info from the client.
346-
buf := make([]byte, 65535)
347357
_, err = server.Read(buf)
348358
require.NoError(t, err)
349359

350360
// Write the auth challenge.
351361
resp = "250 AUTHCHALLENGE SERVERHASH=fake\n"
352362
_, err = server.Write([]byte(resp))
353363
require.NoErrorf(t, err, "failed to write auth challenge")
354-
355-
// Read the auth challenge resp from the client.
356-
_, err = server.Read(buf)
357-
require.NoError(t, err)
358-
359-
// Write OK resp.
360-
resp = "250 OK\n"
361-
_, err = server.Write([]byte(resp))
362-
require.NoErrorf(t, err, "failed to write response auth")
363364
}()
364365

365366
// Reconnect should succeed.

0 commit comments

Comments
 (0)