diff --git a/autopilot/manager.go b/autopilot/manager.go index dba4cc6cc5..0463f98d99 100644 --- a/autopilot/manager.go +++ b/autopilot/manager.go @@ -6,7 +6,7 @@ import ( "github.com/btcsuite/btcd/btcec/v2" "github.com/btcsuite/btcd/wire" - "github.com/lightningnetwork/lnd/graph" + graphdb "github.com/lightningnetwork/lnd/graph/db" "github.com/lightningnetwork/lnd/lnwallet" "github.com/lightningnetwork/lnd/lnwire" ) @@ -36,7 +36,7 @@ type ManagerCfg struct { // SubscribeTopology is used to get a subscription for topology changes // on the network. - SubscribeTopology func() (*graph.TopologyClient, error) + SubscribeTopology func() (*graphdb.TopologyClient, error) } // Manager is struct that manages an autopilot agent, making it possible to diff --git a/graph/builder.go b/graph/builder.go index 3e11155535..f92b523b00 100644 --- a/graph/builder.go +++ b/graph/builder.go @@ -109,8 +109,7 @@ type Builder struct { started atomic.Bool stopped atomic.Bool - ntfnClientCounter atomic.Uint64 - bestHeight atomic.Uint32 + bestHeight atomic.Uint32 cfg *Config @@ -123,22 +122,6 @@ type Builder struct { // of our currently known best chain are sent over. staleBlocks <-chan *chainview.FilteredBlock - // topologyUpdates is a channel that carries new topology updates - // messages from outside the Builder to be processed by the - // networkHandler. - topologyUpdates chan any - - // topologyClients maps a client's unique notification ID to a - // topologyClient client that contains its notification dispatch - // channel. - topologyClients *lnutils.SyncMap[uint64, *topologyClient] - - // ntfnClientUpdates is a channel that's used to send new updates to - // topology notification clients to the Builder. Updates either - // add a new notification client, or cancel notifications for an - // existing client. - ntfnClientUpdates chan *topologyClientUpdate - // channelEdgeMtx is a mutex we use to make sure we process only one // ChannelEdgePolicy at a time for a given channelID, to ensure // consistency between the various database accesses. @@ -163,14 +146,11 @@ var _ ChannelGraphSource = (*Builder)(nil) // NewBuilder constructs a new Builder. func NewBuilder(cfg *Config) (*Builder, error) { return &Builder{ - cfg: cfg, - topologyUpdates: make(chan any), - topologyClients: &lnutils.SyncMap[uint64, *topologyClient]{}, - ntfnClientUpdates: make(chan *topologyClientUpdate), - channelEdgeMtx: multimutex.NewMutex[uint64](), - statTicker: ticker.New(defaultStatInterval), - stats: new(builderStats), - quit: make(chan struct{}), + cfg: cfg, + channelEdgeMtx: multimutex.NewMutex[uint64](), + statTicker: ticker.New(defaultStatInterval), + stats: new(builderStats), + quit: make(chan struct{}), }, nil } @@ -656,28 +636,6 @@ func (b *Builder) pruneZombieChans() error { return nil } -// handleTopologyUpdate is responsible for sending any topology changes -// notifications to registered clients. -// -// NOTE: must be run inside goroutine. -func (b *Builder) handleTopologyUpdate(update any) { - defer b.wg.Done() - - topChange := &TopologyChange{} - err := addToTopologyChange(b.cfg.Graph, topChange, update) - if err != nil { - log.Errorf("unable to update topology change notification: %v", - err) - return - } - - if topChange.isEmpty() { - return - } - - b.notifyTopologyChange(topChange) -} - // networkHandler is the primary goroutine for the Builder. The roles of // this goroutine include answering queries related to the state of the // network, pruning the graph on new block notification, applying network @@ -701,16 +659,6 @@ func (b *Builder) networkHandler() { } select { - // A new fully validated topology update has just arrived. - // We'll notify any registered clients. - case update := <-b.topologyUpdates: - b.wg.Add(1) - go b.handleTopologyUpdate(update) - - // TODO(roasbeef): remove all unconnected vertexes - // after N blocks pass with no corresponding - // announcements. - case chainUpdate, ok := <-b.staleBlocks: // If the channel has been closed, then this indicates // the daemon is shutting down, so we exit ourselves. @@ -783,31 +731,6 @@ func (b *Builder) networkHandler() { " processed.", chainUpdate.Height) } - // A new notification client update has arrived. We're either - // gaining a new client, or cancelling notifications for an - // existing client. - case ntfnUpdate := <-b.ntfnClientUpdates: - clientID := ntfnUpdate.clientID - - if ntfnUpdate.cancel { - client, ok := b.topologyClients.LoadAndDelete( - clientID, - ) - if ok { - close(client.exit) - client.wg.Wait() - - close(client.ntfnChan) - } - - continue - } - - b.topologyClients.Store(clientID, &topologyClient{ - ntfnChan: ntfnUpdate.ntfnChan, - exit: make(chan struct{}), - }) - // The graph prune ticker has ticked, so we'll examine the // state of the known graph to filter out any zombie channels // for pruning. @@ -934,16 +857,6 @@ func (b *Builder) updateGraphWithClosedChannels( log.Infof("Block %v (height=%v) closed %v channels", chainUpdate.Hash, blockHeight, len(chansClosed)) - if len(chansClosed) == 0 { - return err - } - - // Notify all currently registered clients of the newly closed channels. - closeSummaries := createCloseSummaries(blockHeight, chansClosed...) - b.notifyTopologyChange(&TopologyChange{ - ClosedChannels: closeSummaries, - }) - return nil } @@ -1067,12 +980,6 @@ func (b *Builder) AddNode(node *models.LightningNode, return err } - select { - case b.topologyUpdates <- node: - case <-b.quit: - return ErrGraphBuilderShuttingDown - } - return nil } @@ -1117,12 +1024,6 @@ func (b *Builder) AddEdge(edge *models.ChannelEdgeInfo, return err } - select { - case b.topologyUpdates <- edge: - case <-b.quit: - return ErrGraphBuilderShuttingDown - } - return nil } @@ -1224,12 +1125,6 @@ func (b *Builder) UpdateEdge(update *models.ChannelEdgePolicy, return err } - select { - case b.topologyUpdates <- update: - case <-b.quit: - return ErrGraphBuilderShuttingDown - } - return nil } diff --git a/graph/db/graph.go b/graph/db/graph.go index b2aeaa433f..c6639e86f2 100644 --- a/graph/db/graph.go +++ b/graph/db/graph.go @@ -12,10 +12,15 @@ import ( "github.com/lightningnetwork/lnd/batch" "github.com/lightningnetwork/lnd/graph/db/models" "github.com/lightningnetwork/lnd/kvdb" + "github.com/lightningnetwork/lnd/lnutils" "github.com/lightningnetwork/lnd/lnwire" "github.com/lightningnetwork/lnd/routing/route" ) +// ErrChanGraphShuttingDown indicates that the ChannelGraph has shutdown or is +// busy shutting down. +var ErrChanGraphShuttingDown = fmt.Errorf("ChannelGraph shutting down") + // Config is a struct that holds all the necessary dependencies for a // ChannelGraph. type Config struct { @@ -41,6 +46,24 @@ type ChannelGraph struct { *KVStore + ntfnClientCounter atomic.Uint64 + + // topologyUpdate is a channel that carries new topology updates + // messages from outside the ChannelGraph to be processed by the + // networkHandler. + topologyUpdate chan any + + // topologyClients maps a client's unique notification ID to a + // topologyClient client that contains its notification dispatch + // channel. + topologyClients *lnutils.SyncMap[uint64, *topologyClient] + + // ntfnClientUpdates is a channel that's used to send new updates to + // topology notification clients to the ChannelGraph. Updates either + // add a new notification client, or cancel notifications for an + // existing client. + ntfnClientUpdates chan *topologyClientUpdate + quit chan struct{} wg sync.WaitGroup } @@ -60,8 +83,11 @@ func NewChannelGraph(cfg *Config, options ...ChanGraphOption) (*ChannelGraph, } g := &ChannelGraph{ - KVStore: store, - quit: make(chan struct{}), + KVStore: store, + topologyUpdate: make(chan any), + topologyClients: &lnutils.SyncMap[uint64, *topologyClient]{}, + ntfnClientUpdates: make(chan *topologyClientUpdate), + quit: make(chan struct{}), } // The graph cache can be turned off (e.g. for mobile users) for a @@ -90,6 +116,9 @@ func (c *ChannelGraph) Start() error { } } + c.wg.Add(1) + go c.handleTopologySubscriptions() + return nil } @@ -108,6 +137,57 @@ func (c *ChannelGraph) Stop() error { return nil } +// handleTopologySubscriptions ensures that topology client subscriptions, +// subscription cancellations and topology notifications are handled +// synchronously. +// +// NOTE: this MUST be run in a goroutine. +func (c *ChannelGraph) handleTopologySubscriptions() { + defer c.wg.Done() + + for { + select { + // A new fully validated topology update has just arrived. + // We'll notify any registered clients. + case update := <-c.topologyUpdate: + c.wg.Add(1) + go c.handleTopologyUpdate(update) + + // TODO(roasbeef): remove all unconnected vertexes + // after N blocks pass with no corresponding + // announcements. + + // A new notification client update has arrived. We're either + // gaining a new client, or cancelling notifications for an + // existing client. + case ntfnUpdate := <-c.ntfnClientUpdates: + clientID := ntfnUpdate.clientID + + if ntfnUpdate.cancel { + client, ok := c.topologyClients.LoadAndDelete( + clientID, + ) + if ok { + close(client.exit) + client.wg.Wait() + + close(client.ntfnChan) + } + + continue + } + + c.topologyClients.Store(clientID, &topologyClient{ + ntfnChan: ntfnUpdate.ntfnChan, + exit: make(chan struct{}), + }) + + case <-c.quit: + return + } + } +} + // populateCache loads the entire channel graph into the in-memory graph cache. // // NOTE: This should only be called if the graphCache has been constructed. @@ -226,6 +306,17 @@ func (c *ChannelGraph) AddLightningNode(node *models.LightningNode, ) } + // No need to send topology updates for shell nodes. + if !node.HaveNodeAnnouncement { + return nil + } + + select { + case c.topologyUpdate <- node: + case <-c.quit: + return ErrChanGraphShuttingDown + } + return nil } @@ -262,6 +353,12 @@ func (c *ChannelGraph) AddChannelEdge(edge *models.ChannelEdgeInfo, c.graphCache.AddChannel(edge, nil, nil) } + select { + case c.topologyUpdate <- edge: + case <-c.quit: + return ErrChanGraphShuttingDown + } + return nil } @@ -391,6 +488,17 @@ func (c *ChannelGraph) PruneGraph(spentOutputs []*wire.OutPoint, c.graphCache.Stats()) } + if len(edges) != 0 { + // Notify all currently registered clients of the newly closed + // channels. + closeSummaries := createCloseSummaries( + blockHeight, edges..., + ) + c.notifyTopologyChange(&TopologyChange{ + ClosedChannels: closeSummaries, + }) + } + return edges, nil } @@ -498,16 +606,20 @@ func (c *ChannelGraph) UpdateEdgePolicy(edge *models.ChannelEdgePolicy, return err } - if c.graphCache == nil { - return nil - } + if c.graphCache != nil { + var isUpdate1 bool + if edge.ChannelFlags&lnwire.ChanUpdateDirection == 0 { + isUpdate1 = true + } - var isUpdate1 bool - if edge.ChannelFlags&lnwire.ChanUpdateDirection == 0 { - isUpdate1 = true + c.graphCache.UpdatePolicy(edge, from, to, isUpdate1) } - c.graphCache.UpdatePolicy(edge, from, to, isUpdate1) + select { + case c.topologyUpdate <- edge: + case <-c.quit: + return ErrChanGraphShuttingDown + } return nil } diff --git a/graph/db/graph_test.go b/graph/db/graph_test.go index dc41f02a20..85b86b8027 100644 --- a/graph/db/graph_test.go +++ b/graph/db/graph_test.go @@ -964,6 +964,23 @@ func randEdgePolicy(chanID uint64, db kvdb.Backend) *models.ChannelEdgePolicy { return newEdgePolicy(chanID, db, update) } +func copyEdgePolicy(p *models.ChannelEdgePolicy) *models.ChannelEdgePolicy { + return &models.ChannelEdgePolicy{ + SigBytes: p.SigBytes, + ChannelID: p.ChannelID, + LastUpdate: p.LastUpdate, + MessageFlags: p.MessageFlags, + ChannelFlags: p.ChannelFlags, + TimeLockDelta: p.TimeLockDelta, + MinHTLC: p.MinHTLC, + MaxHTLC: p.MaxHTLC, + FeeBaseMSat: p.FeeBaseMSat, + FeeProportionalMillionths: p.FeeProportionalMillionths, + ToNode: p.ToNode, + ExtraOpaqueData: p.ExtraOpaqueData, + } +} + func newEdgePolicy(chanID uint64, db kvdb.Backend, updateTime int64) *models.ChannelEdgePolicy { @@ -2929,6 +2946,7 @@ func TestChannelEdgePruningUpdateIndexDeletion(t *testing.T) { if err := graph.UpdateEdgePolicy(edge1); err != nil { t.Fatalf("unable to update edge: %v", err) } + edge1 = copyEdgePolicy(edge1) // Avoid read/write race conditions. edge2 := randEdgePolicy(chanID.ToUint64(), graph.db) edge2.ChannelFlags = 1 @@ -2937,6 +2955,7 @@ func TestChannelEdgePruningUpdateIndexDeletion(t *testing.T) { if err := graph.UpdateEdgePolicy(edge2); err != nil { t.Fatalf("unable to update edge: %v", err) } + edge2 = copyEdgePolicy(edge2) // Avoid read/write race conditions. // checkIndexTimestamps is a helper function that checks the edge update // index only includes the given timestamps. @@ -4044,6 +4063,7 @@ func TestGraphCacheForEachNodeChannel(t *testing.T) { 253, 217, 3, 8, 0, 0, 0, 10, 0, 0, 0, 20, } require.NoError(t, graph.UpdateEdgePolicy(edge1)) + edge1 = copyEdgePolicy(edge1) // Avoid read/write race conditions. directedChan := getSingleChannel() require.NotNil(t, directedChan) diff --git a/graph/notifications.go b/graph/db/notifications.go similarity index 91% rename from graph/notifications.go rename to graph/db/notifications.go index 76eabdb02f..7d54a74319 100644 --- a/graph/notifications.go +++ b/graph/db/notifications.go @@ -1,4 +1,4 @@ -package graph +package graphdb import ( "fmt" @@ -54,16 +54,16 @@ type topologyClientUpdate struct { // topology occurs. Changes that will be sent at notifications include: new // nodes appearing, node updating their attributes, new channels, channels // closing, and updates in the routing policies of a channel's directed edges. -func (b *Builder) SubscribeTopology() (*TopologyClient, error) { +func (c *ChannelGraph) SubscribeTopology() (*TopologyClient, error) { // If the router is not yet started, return an error to avoid a // deadlock waiting for it to handle the subscription request. - if !b.started.Load() { + if !c.started.Load() { return nil, fmt.Errorf("router not started") } // We'll first atomically obtain the next ID for this client from the // incrementing client ID counter. - clientID := b.ntfnClientCounter.Add(1) + clientID := c.ntfnClientCounter.Add(1) log.Debugf("New graph topology client subscription, client %v", clientID) @@ -71,12 +71,12 @@ func (b *Builder) SubscribeTopology() (*TopologyClient, error) { ntfnChan := make(chan *TopologyChange, 10) select { - case b.ntfnClientUpdates <- &topologyClientUpdate{ + case c.ntfnClientUpdates <- &topologyClientUpdate{ cancel: false, clientID: clientID, ntfnChan: ntfnChan, }: - case <-b.quit: + case <-c.quit: return nil, errors.New("ChannelRouter shutting down") } @@ -84,11 +84,11 @@ func (b *Builder) SubscribeTopology() (*TopologyClient, error) { TopologyChanges: ntfnChan, Cancel: func() { select { - case b.ntfnClientUpdates <- &topologyClientUpdate{ + case c.ntfnClientUpdates <- &topologyClientUpdate{ cancel: true, clientID: clientID, }: - case <-b.quit: + case <-c.quit: return } }, @@ -114,7 +114,7 @@ type topologyClient struct { // notifyTopologyChange notifies all registered clients of a new change in // graph topology in a non-blocking. -func (b *Builder) notifyTopologyChange(topologyDiff *TopologyChange) { +func (c *ChannelGraph) notifyTopologyChange(topologyDiff *TopologyChange) { // notifyClient is a helper closure that will send topology updates to // the given client. notifyClient := func(clientID uint64, client *topologyClient) bool { @@ -127,23 +127,22 @@ func (b *Builder) notifyTopologyChange(topologyDiff *TopologyChange) { len(topologyDiff.ChannelEdgeUpdates), len(topologyDiff.ClosedChannels)) - go func(c *topologyClient) { - defer c.wg.Done() + go func(t *topologyClient) { + defer t.wg.Done() select { // In this case we'll try to send the notification // directly to the upstream client consumer. - case c.ntfnChan <- topologyDiff: + case t.ntfnChan <- topologyDiff: // If the client cancels the notifications, then we'll // exit early. - case <-c.exit: + case <-t.exit: // Similarly, if the ChannelRouter itself exists early, // then we'll also exit ourselves. - case <-b.quit: - + case <-c.quit: } }(client) @@ -154,7 +153,29 @@ func (b *Builder) notifyTopologyChange(topologyDiff *TopologyChange) { // Range over the set of active clients, and attempt to send the // topology updates. - b.topologyClients.Range(notifyClient) + c.topologyClients.Range(notifyClient) +} + +// handleTopologyUpdate is responsible for sending any topology changes +// notifications to registered clients. +// +// NOTE: must be run inside goroutine. +func (c *ChannelGraph) handleTopologyUpdate(update any) { + defer c.wg.Done() + + topChange := &TopologyChange{} + err := c.addToTopologyChange(topChange, update) + if err != nil { + log.Errorf("unable to update topology change notification: %v", + err) + return + } + + if topChange.isEmpty() { + return + } + + c.notifyTopologyChange(topChange) } // TopologyChange represents a new set of modifications to the channel graph. @@ -310,8 +331,8 @@ type ChannelEdgeUpdate struct { // constitutes. This function will also fetch any required auxiliary // information required to create the topology change update from the graph // database. -func addToTopologyChange(graph DB, update *TopologyChange, - msg interface{}) error { +func (c *ChannelGraph) addToTopologyChange(update *TopologyChange, + msg any) error { switch m := msg.(type) { @@ -345,7 +366,7 @@ func addToTopologyChange(graph DB, update *TopologyChange, // We'll need to fetch the edge's information from the database // in order to get the information concerning which nodes are // being connected. - edgeInfo, _, _, err := graph.FetchChannelEdgesByID(m.ChannelID) + edgeInfo, _, _, err := c.FetchChannelEdgesByID(m.ChannelID) if err != nil { return errors.Errorf("unable fetch channel edge: %v", err) diff --git a/graph/notifications_test.go b/graph/notifications_test.go index 807b3fea7e..0e2ec7afba 100644 --- a/graph/notifications_test.go +++ b/graph/notifications_test.go @@ -469,7 +469,7 @@ func TestEdgeUpdateNotification(t *testing.T) { // With the channel edge now in place, we'll subscribe for topology // notifications. - ntfnClient, err := ctx.builder.SubscribeTopology() + ntfnClient, err := ctx.graph.SubscribeTopology() require.NoError(t, err, "unable to subscribe for channel notifications") // Create random policy edges that are stemmed to the channel id @@ -489,7 +489,8 @@ func TestEdgeUpdateNotification(t *testing.T) { t.Fatalf("unable to add edge update: %v", err) } - assertEdgeCorrect := func(t *testing.T, edgeUpdate *ChannelEdgeUpdate, + assertEdgeCorrect := func(t *testing.T, + edgeUpdate *graphdb.ChannelEdgeUpdate, edgeAnn *models.ChannelEdgePolicy) { if edgeUpdate.ChanID != edgeAnn.ChannelID { @@ -659,7 +660,7 @@ func TestNodeUpdateNotification(t *testing.T) { } // Create a new client to receive notifications. - ntfnClient, err := ctx.builder.SubscribeTopology() + ntfnClient, err := ctx.graph.SubscribeTopology() require.NoError(t, err, "unable to subscribe for channel notifications") // Change network topology by adding the updated info for the two nodes @@ -672,7 +673,7 @@ func TestNodeUpdateNotification(t *testing.T) { } assertNodeNtfnCorrect := func(t *testing.T, ann *models.LightningNode, - nodeUpdate *NetworkNodeUpdate) { + nodeUpdate *graphdb.NetworkNodeUpdate) { nodeKey, _ := ann.PubKey() @@ -699,9 +700,10 @@ func TestNodeUpdateNotification(t *testing.T) { t.Fatalf("node alias doesn't match: expected %v, got %v", ann.Alias, nodeUpdate.Alias) } - if nodeUpdate.Color != EncodeHexColor(ann.Color) { - t.Fatalf("node color doesn't match: expected %v, got %v", - EncodeHexColor(ann.Color), nodeUpdate.Color) + if nodeUpdate.Color != graphdb.EncodeHexColor(ann.Color) { + t.Fatalf("node color doesn't match: expected %v, "+ + "got %v", graphdb.EncodeHexColor(ann.Color), + nodeUpdate.Color) } } @@ -793,7 +795,7 @@ func TestNotificationCancellation(t *testing.T) { ctx := createTestCtxSingleNode(t, startingBlockHeight) // Create a new client to receive notifications. - ntfnClient, err := ctx.builder.SubscribeTopology() + ntfnClient, err := ctx.graph.SubscribeTopology() require.NoError(t, err, "unable to subscribe for channel notifications") // We'll create the utxo for a new channel. @@ -919,7 +921,7 @@ func TestChannelCloseNotification(t *testing.T) { // With the channel edge now in place, we'll subscribe for topology // notifications. - ntfnClient, err := ctx.builder.SubscribeTopology() + ntfnClient, err := ctx.graph.SubscribeTopology() require.NoError(t, err, "unable to subscribe for channel notifications") // Next, we'll simulate the closure of our channel by generating a new @@ -1002,7 +1004,9 @@ func TestEncodeHexColor(t *testing.T) { } for _, tc := range colorTestCases { - encoded := EncodeHexColor(color.RGBA{tc.R, tc.G, tc.B, 0}) + encoded := graphdb.EncodeHexColor( + color.RGBA{tc.R, tc.G, tc.B, 0}, + ) if (encoded == tc.encoded) != tc.isValid { t.Fatalf("incorrect color encoding, "+ "want: %v, got: %v", tc.encoded, encoded) diff --git a/pilot.go b/pilot.go index 11333a0722..8cbf23cc65 100644 --- a/pilot.go +++ b/pilot.go @@ -295,6 +295,6 @@ func initAutoPilot(svr *server, cfg *lncfg.AutoPilot, }, nil }, SubscribeTransactions: svr.cc.Wallet.SubscribeTransactions, - SubscribeTopology: svr.graphBuilder.SubscribeTopology, + SubscribeTopology: svr.graphDB.SubscribeTopology, }, nil } diff --git a/rpcserver.go b/rpcserver.go index 7236a5dad5..692ba74d2b 100644 --- a/rpcserver.go +++ b/rpcserver.go @@ -48,7 +48,6 @@ import ( "github.com/lightningnetwork/lnd/feature" "github.com/lightningnetwork/lnd/fn/v2" "github.com/lightningnetwork/lnd/funding" - "github.com/lightningnetwork/lnd/graph" graphdb "github.com/lightningnetwork/lnd/graph/db" "github.com/lightningnetwork/lnd/graph/db/models" "github.com/lightningnetwork/lnd/htlcswitch" @@ -3294,7 +3293,7 @@ func (r *rpcServer) GetInfo(_ context.Context, // TODO(roasbeef): add synced height n stuff isTestNet := chainreg.IsTestnet(&r.cfg.ActiveNetParams) - nodeColor := graph.EncodeHexColor(nodeAnn.RGBColor) + nodeColor := graphdb.EncodeHexColor(nodeAnn.RGBColor) version := build.Version() + " commit=" + build.Commit return &lnrpc.GetInfoResponse{ @@ -6886,7 +6885,7 @@ func marshalNode(node *models.LightningNode) *lnrpc.LightningNode { PubKey: hex.EncodeToString(node.PubKeyBytes[:]), Addresses: nodeAddrs, Alias: node.Alias, - Color: graph.EncodeHexColor(node.Color), + Color: graphdb.EncodeHexColor(node.Color), Features: features, CustomRecords: customRecords, } @@ -7084,7 +7083,7 @@ func (r *rpcServer) SubscribeChannelGraph(req *lnrpc.GraphTopologySubscription, // First, we start by subscribing to a new intent to receive // notifications from the channel router. - client, err := r.server.graphBuilder.SubscribeTopology() + client, err := r.server.graphDB.SubscribeTopology() if err != nil { return err } @@ -7137,7 +7136,7 @@ func (r *rpcServer) SubscribeChannelGraph(req *lnrpc.GraphTopologySubscription, // returned by the router to the form of notifications expected by the current // gRPC service. func marshallTopologyChange( - topChange *graph.TopologyChange) *lnrpc.GraphTopologyUpdate { + topChange *graphdb.TopologyChange) *lnrpc.GraphTopologyUpdate { // encodeKey is a simple helper function that converts a live public // key into a hex-encoded version of the compressed serialization for diff --git a/server.go b/server.go index dba560bb2d..deae1f5915 100644 --- a/server.go +++ b/server.go @@ -368,7 +368,7 @@ type server struct { // updatePersistentPeerAddrs subscribes to topology changes and stores // advertised addresses for any NodeAnnouncements from our persisted peers. func (s *server) updatePersistentPeerAddrs() error { - graphSub, err := s.graphBuilder.SubscribeTopology() + graphSub, err := s.graphDB.SubscribeTopology() if err != nil { return err }