@@ -123,6 +123,11 @@ type Builder struct {
123
123
// of our currently known best chain are sent over.
124
124
staleBlocks <- chan * chainview.FilteredBlock
125
125
126
+ // networkUpdates is a channel that carries new topology updates
127
+ // messages from outside the Builder to be processed by the
128
+ // networkHandler.
129
+ networkUpdates chan * routingMsg
130
+
126
131
// topologyClients maps a client's unique notification ID to a
127
132
// topologyClient client that contains its notification dispatch
128
133
// channel.
@@ -159,6 +164,7 @@ var _ ChannelGraphSource = (*Builder)(nil)
159
164
func NewBuilder (cfg * Config ) (* Builder , error ) {
160
165
return & Builder {
161
166
cfg : cfg ,
167
+ networkUpdates : make (chan * routingMsg ),
162
168
topologyClients : & lnutils.SyncMap [uint64 , * topologyClient ]{},
163
169
ntfnClientUpdates : make (chan * topologyClientUpdate ),
164
170
channelEdgeMtx : multimutex .NewMutex [uint64 ](),
@@ -707,8 +713,8 @@ func (b *Builder) handleNetworkUpdate(update *routingMsg) {
707
713
708
714
// networkHandler is the primary goroutine for the Builder. The roles of
709
715
// this goroutine include answering queries related to the state of the
710
- // network, pruning the graph on new block notification and registering new
711
- // topology clients.
716
+ // network, pruning the graph on new block notification, applying network
717
+ // updates, and registering new topology clients.
712
718
//
713
719
// NOTE: This MUST be run as a goroutine.
714
720
func (b * Builder ) networkHandler () {
@@ -728,6 +734,17 @@ func (b *Builder) networkHandler() {
728
734
}
729
735
730
736
select {
737
+ // A new fully validated network update has just arrived. As a
738
+ // result we'll modify the channel graph accordingly depending
739
+ // on the exact type of the message.
740
+ case update := <- b .networkUpdates :
741
+ b .wg .Add (1 )
742
+ go b .handleNetworkUpdate (update )
743
+
744
+ // TODO(roasbeef): remove all unconnected vertexes
745
+ // after N blocks pass with no corresponding
746
+ // announcements.
747
+
731
748
case chainUpdate , ok := <- b .staleBlocks :
732
749
// If the channel has been closed, then this indicates
733
750
// the daemon is shutting down, so we exit ourselves.
@@ -1091,12 +1108,14 @@ func (b *Builder) AddNode(node *models.LightningNode,
1091
1108
err : make (chan error , 1 ),
1092
1109
}
1093
1110
1094
- b .wg .Add (1 )
1095
- go b .handleNetworkUpdate (rMsg )
1096
-
1097
1111
select {
1098
- case err := <- rMsg .err :
1099
- return err
1112
+ case b .networkUpdates <- rMsg :
1113
+ select {
1114
+ case err := <- rMsg .err :
1115
+ return err
1116
+ case <- b .quit :
1117
+ return ErrGraphBuilderShuttingDown
1118
+ }
1100
1119
case <- b .quit :
1101
1120
return ErrGraphBuilderShuttingDown
1102
1121
}
@@ -1142,12 +1161,14 @@ func (b *Builder) AddEdge(edge *models.ChannelEdgeInfo,
1142
1161
err : make (chan error , 1 ),
1143
1162
}
1144
1163
1145
- b .wg .Add (1 )
1146
- go b .handleNetworkUpdate (rMsg )
1147
-
1148
1164
select {
1149
- case err := <- rMsg .err :
1150
- return err
1165
+ case b .networkUpdates <- rMsg :
1166
+ select {
1167
+ case err := <- rMsg .err :
1168
+ return err
1169
+ case <- b .quit :
1170
+ return ErrGraphBuilderShuttingDown
1171
+ }
1151
1172
case <- b .quit :
1152
1173
return ErrGraphBuilderShuttingDown
1153
1174
}
@@ -1250,12 +1271,14 @@ func (b *Builder) UpdateEdge(update *models.ChannelEdgePolicy,
1250
1271
err : make (chan error , 1 ),
1251
1272
}
1252
1273
1253
- b .wg .Add (1 )
1254
- go b .handleNetworkUpdate (rMsg )
1255
-
1256
1274
select {
1257
- case err := <- rMsg .err :
1258
- return err
1275
+ case b .networkUpdates <- rMsg :
1276
+ select {
1277
+ case err := <- rMsg .err :
1278
+ return err
1279
+ case <- b .quit :
1280
+ return ErrGraphBuilderShuttingDown
1281
+ }
1259
1282
case <- b .quit :
1260
1283
return ErrGraphBuilderShuttingDown
1261
1284
}
0 commit comments