Skip to content

Commit

Permalink
graph/db: move Topology client management to ChannelGraph
Browse files Browse the repository at this point in the history
  • Loading branch information
ellemouton committed Feb 19, 2025
1 parent 71c7e9d commit dc553c3
Show file tree
Hide file tree
Showing 9 changed files with 208 additions and 158 deletions.
4 changes: 2 additions & 2 deletions autopilot/manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ import (

"github.com/btcsuite/btcd/btcec/v2"
"github.com/btcsuite/btcd/wire"
"github.com/lightningnetwork/lnd/graph"
graphdb "github.com/lightningnetwork/lnd/graph/db"
"github.com/lightningnetwork/lnd/lnwallet"
"github.com/lightningnetwork/lnd/lnwire"
)
Expand Down Expand Up @@ -36,7 +36,7 @@ type ManagerCfg struct {

// SubscribeTopology is used to get a subscription for topology changes
// on the network.
SubscribeTopology func() (*graph.TopologyClient, error)
SubscribeTopology func() (*graphdb.TopologyClient, error)
}

// Manager is struct that manages an autopilot agent, making it possible to
Expand Down
117 changes: 6 additions & 111 deletions graph/builder.go
Original file line number Diff line number Diff line change
Expand Up @@ -109,8 +109,7 @@ type Builder struct {
started atomic.Bool
stopped atomic.Bool

ntfnClientCounter atomic.Uint64
bestHeight atomic.Uint32
bestHeight atomic.Uint32

cfg *Config

Expand All @@ -123,22 +122,6 @@ type Builder struct {
// of our currently known best chain are sent over.
staleBlocks <-chan *chainview.FilteredBlock

// topologyUpdate is a channel that carries new topology updates
// messages from outside the Builder to be processed by the
// networkHandler.
topologyUpdate chan any

// topologyClients maps a client's unique notification ID to a
// topologyClient client that contains its notification dispatch
// channel.
topologyClients *lnutils.SyncMap[uint64, *topologyClient]

// ntfnClientUpdates is a channel that's used to send new updates to
// topology notification clients to the Builder. Updates either
// add a new notification client, or cancel notifications for an
// existing client.
ntfnClientUpdates chan *topologyClientUpdate

// channelEdgeMtx is a mutex we use to make sure we process only one
// ChannelEdgePolicy at a time for a given channelID, to ensure
// consistency between the various database accesses.
Expand All @@ -163,14 +146,11 @@ var _ ChannelGraphSource = (*Builder)(nil)
// NewBuilder constructs a new Builder.
func NewBuilder(cfg *Config) (*Builder, error) {
return &Builder{
cfg: cfg,
topologyUpdate: make(chan any),
topologyClients: &lnutils.SyncMap[uint64, *topologyClient]{},
ntfnClientUpdates: make(chan *topologyClientUpdate),
channelEdgeMtx: multimutex.NewMutex[uint64](),
statTicker: ticker.New(defaultStatInterval),
stats: new(builderStats),
quit: make(chan struct{}),
cfg: cfg,
channelEdgeMtx: multimutex.NewMutex[uint64](),
statTicker: ticker.New(defaultStatInterval),
stats: new(builderStats),
quit: make(chan struct{}),
}, nil
}

Expand Down Expand Up @@ -656,28 +636,6 @@ func (b *Builder) pruneZombieChans() error {
return nil
}

// handleTopologyUpdate is responsible for sending any topology changes
// notifications to registered clients.
//
// NOTE: must be run inside goroutine.
func (b *Builder) handleTopologyUpdate(update any) {
defer b.wg.Done()

topChange := &TopologyChange{}
err := addToTopologyChange(b.cfg.Graph, topChange, update)
if err != nil {
log.Errorf("unable to update topology change notification: %v",
err)
return
}

if topChange.isEmpty() {
return
}

b.notifyTopologyChange(topChange)
}

// networkHandler is the primary goroutine for the Builder. The roles of
// this goroutine include answering queries related to the state of the
// network, pruning the graph on new block notification, applying network
Expand All @@ -701,16 +659,6 @@ func (b *Builder) networkHandler() {
}

select {
// A new fully validated topology update has just arrived.
// We'll notify any registered clients.
case update := <-b.topologyUpdate:
b.wg.Add(1)
go b.handleTopologyUpdate(update)

// TODO(roasbeef): remove all unconnected vertexes
// after N blocks pass with no corresponding
// announcements.

case chainUpdate, ok := <-b.staleBlocks:
// If the channel has been closed, then this indicates
// the daemon is shutting down, so we exit ourselves.
Expand Down Expand Up @@ -783,31 +731,6 @@ func (b *Builder) networkHandler() {
" processed.", chainUpdate.Height)
}

// A new notification client update has arrived. We're either
// gaining a new client, or cancelling notifications for an
// existing client.
case ntfnUpdate := <-b.ntfnClientUpdates:
clientID := ntfnUpdate.clientID

if ntfnUpdate.cancel {
client, ok := b.topologyClients.LoadAndDelete(
clientID,
)
if ok {
close(client.exit)
client.wg.Wait()

close(client.ntfnChan)
}

continue
}

b.topologyClients.Store(clientID, &topologyClient{
ntfnChan: ntfnUpdate.ntfnChan,
exit: make(chan struct{}),
})

// The graph prune ticker has ticked, so we'll examine the
// state of the known graph to filter out any zombie channels
// for pruning.
Expand Down Expand Up @@ -934,16 +857,6 @@ func (b *Builder) updateGraphWithClosedChannels(
log.Infof("Block %v (height=%v) closed %v channels", chainUpdate.Hash,
blockHeight, len(chansClosed))

if len(chansClosed) == 0 {
return err
}

// Notify all currently registered clients of the newly closed channels.
closeSummaries := createCloseSummaries(blockHeight, chansClosed...)
b.notifyTopologyChange(&TopologyChange{
ClosedChannels: closeSummaries,
})

return nil
}

Expand Down Expand Up @@ -1073,12 +986,6 @@ func (b *Builder) AddNode(node *models.LightningNode,
return err
}

select {
case b.topologyUpdate <- node:
case <-b.quit:
return ErrGraphBuilderShuttingDown
}

return nil
}

Expand Down Expand Up @@ -1129,12 +1036,6 @@ func (b *Builder) AddEdge(edge *models.ChannelEdgeInfo,
return err
}

select {
case b.topologyUpdate <- edge:
case <-b.quit:
return ErrGraphBuilderShuttingDown
}

return nil
}

Expand Down Expand Up @@ -1242,12 +1143,6 @@ func (b *Builder) UpdateEdge(update *models.ChannelEdgePolicy,
return err
}

select {
case b.topologyUpdate <- update:
case <-b.quit:
return ErrGraphBuilderShuttingDown
}

return nil
}

Expand Down
Loading

0 comments on commit dc553c3

Please sign in to comment.