From 077fd0cc97eb672c9c3504ae8a38a7902f13c64a Mon Sep 17 00:00:00 2001 From: Alex Gap Date: Tue, 18 Oct 2022 10:31:45 -0700 Subject: [PATCH 1/2] record a graph on each run --- go.mod | 4 +- go.sum | 9 ++- peer/peer.go | 6 ++ tests/virtual-payment.go | 27 ++++++- utils/graph-recorder.go | 168 +++++++++++++++++++++++++++++++++++++++ utils/monitor.go | 13 ++- utils/testing.go | 11 ++- 7 files changed, 230 insertions(+), 8 deletions(-) create mode 100644 utils/graph-recorder.go diff --git a/go.mod b/go.mod index 05cde447..e9e7751f 100644 --- a/go.mod +++ b/go.mod @@ -6,6 +6,7 @@ require ( github.com/miguelmota/go-ethereum-hdwallet v0.1.1 github.com/multiformats/go-multiaddr v0.7.0 github.com/statechannels/go-nitro v0.0.0-20221009024643-ab7b1a648d10 + gonum.org/v1/gonum v0.12.0 ) require ( @@ -102,10 +103,11 @@ require ( github.com/tklauser/go-sysconf v0.3.5 // indirect github.com/tklauser/numcpus v0.2.2 // indirect github.com/tyler-smith/go-bip39 v1.0.1-0.20181017060643-dbb3b84ba2ef // indirect - golang.org/x/exp v0.0.0-20220916125017-b168a2c6b86b // indirect + golang.org/x/exp v0.0.0-20221012211006-4de253d81b95 // indirect golang.org/x/mod v0.6.0-dev.0.20220419223038-86c51ed26bb4 // indirect golang.org/x/net v0.0.0-20220920183852-bf014ff85ad5 // indirect golang.org/x/sync v0.0.0-20220722155255-886fb9371eb4 // indirect + golang.org/x/text v0.3.8 // indirect golang.org/x/tools v0.1.12 // indirect gopkg.in/natefinch/npipe.v2 v2.0.0-20160621034901-c1b8fa8bdcce // indirect gopkg.in/tomb.v1 v1.0.0-20141024135613-dd632973f1e7 // indirect diff --git a/go.sum b/go.sum index 2be2d9a3..f2fe632f 100644 --- a/go.sum +++ b/go.sum @@ -947,8 +947,8 @@ golang.org/x/exp v0.0.0-20191227195350-da58074b4299/go.mod h1:2RIsYlXP63K8oxa1u0 golang.org/x/exp v0.0.0-20200119233911-0405dc783f0a/go.mod h1:2RIsYlXP63K8oxa1u096TMicItID8zy7Y6sNkU49FU4= golang.org/x/exp v0.0.0-20200207192155-f17229e696bd/go.mod h1:J/WKrq2StrnmMY6+EHIKF9dgMWnmCNThgcyBT1FY9mM= golang.org/x/exp v0.0.0-20200224162631-6cc2880d07d6/go.mod h1:3jZMyOhIsHpP37uCMkUooju7aAi5cS1Q23tOzKc+0MU= -golang.org/x/exp v0.0.0-20220916125017-b168a2c6b86b h1:SCE/18RnFsLrjydh/R/s5EVvHoZprqEQUuoxK8q2Pc4= -golang.org/x/exp v0.0.0-20220916125017-b168a2c6b86b/go.mod h1:cyybsKvd6eL0RnXn6p/Grxp8F5bW7iYuBgsNCOHpMYE= +golang.org/x/exp v0.0.0-20221012211006-4de253d81b95 h1:sBdrWpxhGDdTAYNqbgBLAR+ULAPPhfgncLr1X0lyWtg= +golang.org/x/exp v0.0.0-20221012211006-4de253d81b95/go.mod h1:cyybsKvd6eL0RnXn6p/Grxp8F5bW7iYuBgsNCOHpMYE= golang.org/x/image v0.0.0-20180708004352-c73c2afc3b81/go.mod h1:ux5Hcp/YLpHSI86hEcLt0YII63i6oz57MZXIpbrjZUs= golang.org/x/image v0.0.0-20190227222117-0694c2d4d067/go.mod h1:kZ7UVZpmo3dzQBMxlp+ypCbDeSB+sBbTgSJuh5dn5js= golang.org/x/image v0.0.0-20190802002840-cff245a6509b/go.mod h1:FeLwcggjj3mMvU+oOTbSwawSJRM1uh48EjtB4UJZlP0= @@ -1145,8 +1145,9 @@ golang.org/x/text v0.3.3/go.mod h1:5Zoc/QRtKVWzQhOtBMvqHzDpF6irO9z98xDceosuGiQ= golang.org/x/text v0.3.4/go.mod h1:5Zoc/QRtKVWzQhOtBMvqHzDpF6irO9z98xDceosuGiQ= golang.org/x/text v0.3.5/go.mod h1:5Zoc/QRtKVWzQhOtBMvqHzDpF6irO9z98xDceosuGiQ= golang.org/x/text v0.3.6/go.mod h1:5Zoc/QRtKVWzQhOtBMvqHzDpF6irO9z98xDceosuGiQ= -golang.org/x/text v0.3.7 h1:olpwvP2KacW1ZWvsR7uQhoyTYvKAupfQrRGBFM352Gk= golang.org/x/text v0.3.7/go.mod h1:u+2+/6zg+i71rQMx5EYifcz6MCKuco9NR6JIITiCfzQ= +golang.org/x/text v0.3.8 h1:nAL+RVCQ9uMn3vJZbV+MRnydTJFPf8qqY42YiA6MrqY= +golang.org/x/text v0.3.8/go.mod h1:E6s5w1FMmriuDzIBO73fBruAKo1PCIq6d2Q6DHfQ8WQ= golang.org/x/time v0.0.0-20161028155119-f51c12702a4d/go.mod h1:tRJNPiyCQ0inRvYxbN9jk5I+vvW/OXSQhTDSoE431IQ= golang.org/x/time v0.0.0-20180412165947-fbb02b2291d2/go.mod h1:tRJNPiyCQ0inRvYxbN9jk5I+vvW/OXSQhTDSoE431IQ= golang.org/x/time v0.0.0-20181108054448-85acf8d2951c/go.mod h1:tRJNPiyCQ0inRvYxbN9jk5I+vvW/OXSQhTDSoE431IQ= @@ -1222,6 +1223,8 @@ golang.org/x/xerrors v0.0.0-20200804184101-5ec99f83aff1/go.mod h1:I/5z698sn9Ka8T gonum.org/v1/gonum v0.0.0-20180816165407-929014505bf4/go.mod h1:Y+Yx5eoAFn32cQvJDxZx5Dpnq+c3wtXuadVZAcxbbBo= gonum.org/v1/gonum v0.0.0-20181121035319-3f7ecaa7e8ca/go.mod h1:Y+Yx5eoAFn32cQvJDxZx5Dpnq+c3wtXuadVZAcxbbBo= gonum.org/v1/gonum v0.6.0/go.mod h1:9mxDZsDKxgMAuccQkewq682L+0eCu4dCN2yonUJTCLU= +gonum.org/v1/gonum v0.12.0 h1:xKuo6hzt+gMav00meVPUlXwSdoEJP46BR+wdxQEFK2o= +gonum.org/v1/gonum v0.12.0/go.mod h1:73TDxJfAAHeA8Mk9mf8NlIppyhQNo5GLTcYeqgo2lvY= gonum.org/v1/netlib v0.0.0-20181029234149-ec6d1f5cefe6/go.mod h1:wa6Ws7BG/ESfp6dHfk7C6KdzKA7wR7u/rKwOGE66zvw= gonum.org/v1/netlib v0.0.0-20190313105609-8cb42192e0e0/go.mod h1:wa6Ws7BG/ESfp6dHfk7C6KdzKA7wR7u/rKwOGE66zvw= gonum.org/v1/plot v0.0.0-20190515093506-e2840ee46a6b/go.mod h1:Wt8AAjI+ypCyYX3nZBvf6cAIx93T+c/OS2HFAYskSZc= diff --git a/peer/peer.go b/peer/peer.go index 4315694d..04083a23 100644 --- a/peer/peer.go +++ b/peer/peer.go @@ -56,6 +56,12 @@ type MyInfo struct { PrivateKey ecdsa.PrivateKey } +// IsGraphRecorder returns true if an instance is responsible for recording the graph to file. +// We use the first payee instance as they are not too busy. +func IsGraphRecorder(seq int64, c config.RunConfig) bool { + return int64(c.NumHubs+c.NumPayers+c.NumPayees) == seq +} + // GetRole determines the role an instance will play based on the run config. func GetRole(seq int64, c config.RunConfig) Role { switch { diff --git a/tests/virtual-payment.go b/tests/virtual-payment.go index eb320e0f..f28897fb 100644 --- a/tests/virtual-payment.go +++ b/tests/virtual-payment.go @@ -2,6 +2,7 @@ package tests import ( "context" + "encoding/xml" "fmt" "math/big" "math/rand" @@ -95,8 +96,9 @@ func CreateVirtualPaymentTest(runEnv *runtime.RunEnv, init *run.InitContext) err client.MustBarrier(ctx, contractSetup, runEnv.TestInstanceCount) nClient := nitro.New(ms, cs, store, logDestination, &engine.PermissivePolicy{}, runEnv.R()) + gr := utils.NewGraphRecorder(me.PeerInfo, peers, runConfig, client) - cm := utils.NewCompletionMonitor(&nClient, runEnv.RecordMessage) + cm := utils.NewCompletionMonitor(&nClient, gr, runEnv.RecordMessage) defer cm.Close() // We wait until everyone has chosen an address. @@ -105,7 +107,7 @@ func CreateVirtualPaymentTest(runEnv *runtime.RunEnv, init *run.InitContext) err client.MustSignalAndWait(ctx, "message service connected", runEnv.TestInstanceCount) // Create ledger channels with all the hubs - ledgerIds := utils.CreateLedgerChannels(nClient, cm, utils.FINNEY_IN_WEI, me.PeerInfo, peers) + ledgerIds := utils.CreateLedgerChannels(nClient, cm, gr, utils.FINNEY_IN_WEI, me.PeerInfo, peers) client.MustSignalAndWait(ctx, sync.State("ledgerDone"), runEnv.TestInstanceCount) toSleep := runConfig.GetSleepDuration() @@ -139,6 +141,16 @@ func CreateVirtualPaymentTest(runEnv *runtime.RunEnv, init *run.InitContext) err r := nClient.CreateVirtualPaymentChannel(selectedHubs, randomPayee.Address, 0, outcome) + participants := []types.Address{me.Address} + participants = append(participants, selectedHubs...) + participants = append(participants, randomPayee.Address) + gr.ObjectiveStatusUpdated(utils.ObjectiveStatusInfo{ + Id: r.Id, + Time: time.Now(), + Participants: participants, + ChannelId: r.ChannelId.String(), + Status: "Starting", + }) channelId = r.ChannelId cm.WaitForObjectivesToComplete([]protocols.ObjectiveId{r.Id}) @@ -218,6 +230,17 @@ func CreateVirtualPaymentTest(runEnv *runtime.RunEnv, init *run.InitContext) err } } + if peer.IsGraphRecorder(seq, runConfig) { + data, err := xml.Marshal(gr.Graph()) + if err != nil { + return err + } + err = os.WriteFile(fmt.Sprintf("./outputs/graph-%s.gexf", runEnv.TestRun), data, 0644) + if err != nil { + return err + } + + } client.MustSignalAndWait(ctx, "done", runEnv.TestInstanceCount) return nil diff --git a/utils/graph-recorder.go b/utils/graph-recorder.go new file mode 100644 index 00000000..c852c600 --- /dev/null +++ b/utils/graph-recorder.go @@ -0,0 +1,168 @@ +package utils + +import ( + "context" + "fmt" + "strings" + "time" + + "github.com/ethereum/go-ethereum/common" + "github.com/statechannels/go-nitro-testground/config" + "github.com/statechannels/go-nitro-testground/peer" + "github.com/statechannels/go-nitro/protocols" + "github.com/testground/sdk-go/sync" + + "gonum.org/v1/gonum/graph/formats/gexf12" +) + +type objectiveStatus string + +const ( + Starting objectiveStatus = "Starting" + Completed objectiveStatus = "Completed" +) + +// ObjectiveStatusInfo contains information about the status change of an objective +type ObjectiveStatusInfo struct { + Id protocols.ObjectiveId + Time time.Time + Participants []common.Address + ChannelId string + Status objectiveStatus +} + +// GraphRecorder is a utility for recording the state of the network in GEXF file format. +type GraphRecorder struct { + me peer.PeerInfo + peers []peer.PeerInfo + config config.RunConfig + graph *gexf12.Graph + syncClient sync.Client +} + +// Graph returns the GEXF graph struct +func (gr *GraphRecorder) Graph() *gexf12.Graph { + return gr.graph +} + +// NewGraphRecorder creates a new GraphRecorder +func NewGraphRecorder(me peer.PeerInfo, peers []peer.PeerInfo, config config.RunConfig, syncClient sync.Client) *GraphRecorder { + var graph *gexf12.Graph + if peer.IsGraphRecorder(me.Seq, config) { + graph = &gexf12.Graph{} + graph.TimeFormat = "dateTime" + graph.Start = time.Now().Format( + "2006-01-02T15:04:05-0700") + graph.DefaultEdgeType = "directed" + + // Set up attributes on the node and edges for channels and participants + graph.Attributes = []gexf12.Attributes{{ + Class: "node", + Attributes: []gexf12.Attribute{ + {ID: "address", Title: "address", Type: "string"}, {ID: "role", Title: "role", Type: "string"}}}, + { + Class: "edge", + Attributes: []gexf12.Attribute{ + {ID: "channelId", Title: "ChannelId", Type: "string"}, {ID: "channelType", Title: "ChannelType", Type: "string"}}}} + + // Add nodes for each participant + for _, p := range append(peers, me) { + attValues := gexf12.AttValues{AttValues: []gexf12.AttValue{ + {For: "address", Value: p.Address.String()}, + {For: "role", Value: fmt.Sprintf("%d", p.Role)}}} + + node := gexf12.Node{ID: p.Address.String(), + Label: p.Address.String()[0:6], + AttValues: &attValues, + Start: time.Now().Format( + "2006-01-02T15:04:05-0700")} + + graph.Nodes.Nodes = append(graph.Nodes.Nodes, node) + } + + } + + gr := &GraphRecorder{ + me: me, + peers: peers, + config: config, + graph: graph, + syncClient: syncClient, + } + // Start listening for changes from other participants + go gr.listenForShared(context.Background()) + + return gr +} + +// objectiveStatusChangedTopic returns the topic to share objective status changes on +func (gr *GraphRecorder) objectiveStatusChangedTopic() *sync.Topic { + return sync.NewTopic("objective-status-change", ObjectiveStatusInfo{}) +} + +// listenForShared listens for objective status changes from other participants +func (gr *GraphRecorder) listenForShared(ctx context.Context) { + if !peer.IsGraphRecorder(gr.me.Seq, gr.config) { + return + } + started := make(chan ObjectiveStatusInfo) + + gr.syncClient.MustSubscribe(ctx, gr.objectiveStatusChangedTopic(), started) + for { + select { + case s := <-started: + gr.ObjectiveStatusUpdated(s) + case <-ctx.Done(): + return + } + } +} + +// ObjectiveStatusUpdated is called when an objective status changes +// It uses this information to build a graph of the network +func (gr *GraphRecorder) ObjectiveStatusUpdated(info ObjectiveStatusInfo) { + if !peer.IsGraphRecorder(gr.me.Seq, gr.config) { + gr.syncClient.MustPublish(context.Background(), gr.objectiveStatusChangedTopic(), info) + } + + isCreateChannel := (strings.Contains(string(info.Id), "VirtualFund") || strings.Contains(string(info.Id), "DirectFund")) && info.Status == Starting + isCloseChannel := (strings.Contains(string(info.Id), "VirtualDefund") || strings.Contains(string(info.Id), "DirectDefund")) && info.Status == Completed + + if isCreateChannel { + channelType := "ledger" + if strings.Contains(string(info.Id), "VirtualFund") { + channelType = "virtual" + } + + attValues := gexf12.AttValues{ + AttValues: []gexf12.AttValue{ + {For: "channelId", Value: info.ChannelId}, + {For: "channelType", Value: channelType}}} + + for i := 0; (i + 1) < len(info.Participants); i++ { + gr.graph.Edges.Edges = append(gr.graph.Edges.Edges, + gexf12.Edge{ + ID: fmt.Sprintf("%s_%s", info.ChannelId, info.Participants[i]), + Label: string(info.ChannelId)[0:6], + Source: info.Participants[i].String(), + Target: info.Participants[i+1].String(), + Start: info.Time.Format( + "2006-01-02T15:04:05-0700"), + AttValues: &attValues, + }, + ) + } + + } + if isCloseChannel { + for i := 0; i < len(gr.graph.Edges.Edges); i++ { + + if strings.Contains(gr.graph.Edges.Edges[i].ID, info.ChannelId) { + + gr.graph.Edges.Edges[i].End = info.Time.Format( + "2006-01-02T15:04:05-0700") + } + } + } +} + diff --git a/utils/monitor.go b/utils/monitor.go index 976dd2f9..54d0d3c8 100644 --- a/utils/monitor.go +++ b/utils/monitor.go @@ -1,8 +1,10 @@ package utils import ( + "strings" "time" + "github.com/ethereum/go-ethereum/common" nitroclient "github.com/statechannels/go-nitro/client" "github.com/statechannels/go-nitro/client/engine/store/safesync" "github.com/statechannels/go-nitro/protocols" @@ -16,10 +18,11 @@ type CompletionMonitor struct { client *nitroclient.Client quit chan struct{} log func(msg string, a ...interface{}) + gr *GraphRecorder } // NewCompletionMonitor creates a new completion monitor -func NewCompletionMonitor(client *nitroclient.Client, logFunc func(msg string, a ...interface{})) *CompletionMonitor { +func NewCompletionMonitor(client *nitroclient.Client, gr *GraphRecorder, logFunc func(msg string, a ...interface{})) *CompletionMonitor { completed := safesync.Map[bool]{} @@ -28,6 +31,7 @@ func NewCompletionMonitor(client *nitroclient.Client, logFunc func(msg string, a client: client, quit: make(chan struct{}), log: logFunc, + gr: gr, } go c.watch() return c @@ -51,6 +55,13 @@ func (c *CompletionMonitor) watch() { for { select { case id := <-c.client.CompletedObjectives(): + c.gr.ObjectiveStatusUpdated(ObjectiveStatusInfo{ + Id: id, + Time: time.Now(), + Participants: []common.Address{}, + ChannelId: strings.Split(string(id), "-")[1], + Status: "Completed", + }) c.completed.Store(string(id), true) case <-c.quit: return diff --git a/utils/testing.go b/utils/testing.go index 53834a5f..0bf3c90d 100644 --- a/utils/testing.go +++ b/utils/testing.go @@ -13,6 +13,7 @@ import ( "sync/atomic" "time" + "github.com/ethereum/go-ethereum/common" "github.com/statechannels/go-nitro-testground/config" "github.com/statechannels/go-nitro-testground/peer" "github.com/statechannels/go-nitro/channel/state/outcome" @@ -175,7 +176,7 @@ func SelectRandomHubs(hubs []peer.PeerInfo, numHubs int) []types.Address { // The funding for each channel will be set to amount for both participants. // This function blocks until all ledger channels have successfully been created. // If the participant is a hub we use the participant's Seq to determine the initiator of the ledger channel. -func CreateLedgerChannels(client nitro.Client, cm *CompletionMonitor, amount uint, me peer.PeerInfo, peers []peer.PeerInfo) []types.Destination { +func CreateLedgerChannels(client nitro.Client, cm *CompletionMonitor, gr *GraphRecorder, amount uint, me peer.PeerInfo, peers []peer.PeerInfo) []types.Destination { ids := []protocols.ObjectiveId{} cIds := []types.Destination{} hubs := peer.FilterByRole(peers, peer.Hub) @@ -200,6 +201,14 @@ func CreateLedgerChannels(client nitro.Client, cm *CompletionMonitor, amount uin }, }} r := client.CreateLedgerChannel(p.Address, 0, outcome) + + gr.ObjectiveStatusUpdated(ObjectiveStatusInfo{ + Id: r.Id, + Time: time.Now(), + Participants: []common.Address{me.Address, p.Address}, + ChannelId: r.ChannelId.String(), + Status: "Starting", + }) cIds = append(cIds, r.ChannelId) ids = append(ids, r.Id) } From 030cb64c25a7e2ab878fc6dd287f07a612b6daa0 Mon Sep 17 00:00:00 2001 From: Alex Gap Date: Thu, 10 Nov 2022 11:32:23 -0800 Subject: [PATCH 2/2] only update graph if am recorder --- utils/graph-recorder.go | 73 +++++++++++++++++++++-------------------- 1 file changed, 38 insertions(+), 35 deletions(-) diff --git a/utils/graph-recorder.go b/utils/graph-recorder.go index c852c600..f3d5479f 100644 --- a/utils/graph-recorder.go +++ b/utils/graph-recorder.go @@ -89,8 +89,10 @@ func NewGraphRecorder(me peer.PeerInfo, peers []peer.PeerInfo, config config.Run graph: graph, syncClient: syncClient, } - // Start listening for changes from other participants - go gr.listenForShared(context.Background()) + if peer.IsGraphRecorder(gr.me.Seq, gr.config) { + // Start listening for changes from other participants + go gr.listenForShared(context.Background()) + } return gr } @@ -123,46 +125,47 @@ func (gr *GraphRecorder) listenForShared(ctx context.Context) { func (gr *GraphRecorder) ObjectiveStatusUpdated(info ObjectiveStatusInfo) { if !peer.IsGraphRecorder(gr.me.Seq, gr.config) { gr.syncClient.MustPublish(context.Background(), gr.objectiveStatusChangedTopic(), info) - } + } else { - isCreateChannel := (strings.Contains(string(info.Id), "VirtualFund") || strings.Contains(string(info.Id), "DirectFund")) && info.Status == Starting - isCloseChannel := (strings.Contains(string(info.Id), "VirtualDefund") || strings.Contains(string(info.Id), "DirectDefund")) && info.Status == Completed + isCreateChannel := (strings.Contains(string(info.Id), "VirtualFund") || strings.Contains(string(info.Id), "DirectFund")) && info.Status == Starting + isCloseChannel := (strings.Contains(string(info.Id), "VirtualDefund") || strings.Contains(string(info.Id), "DirectDefund")) && info.Status == Completed - if isCreateChannel { - channelType := "ledger" - if strings.Contains(string(info.Id), "VirtualFund") { - channelType = "virtual" - } + if isCreateChannel { + channelType := "ledger" + if strings.Contains(string(info.Id), "VirtualFund") { + channelType = "virtual" + } - attValues := gexf12.AttValues{ - AttValues: []gexf12.AttValue{ - {For: "channelId", Value: info.ChannelId}, - {For: "channelType", Value: channelType}}} - - for i := 0; (i + 1) < len(info.Participants); i++ { - gr.graph.Edges.Edges = append(gr.graph.Edges.Edges, - gexf12.Edge{ - ID: fmt.Sprintf("%s_%s", info.ChannelId, info.Participants[i]), - Label: string(info.ChannelId)[0:6], - Source: info.Participants[i].String(), - Target: info.Participants[i+1].String(), - Start: info.Time.Format( - "2006-01-02T15:04:05-0700"), - AttValues: &attValues, - }, - ) - } + attValues := gexf12.AttValues{ + AttValues: []gexf12.AttValue{ + {For: "channelId", Value: info.ChannelId}, + {For: "channelType", Value: channelType}}} + + for i := 0; (i + 1) < len(info.Participants); i++ { + + gr.graph.Edges.Edges = append(gr.graph.Edges.Edges, + gexf12.Edge{ + ID: fmt.Sprintf("%s_%s", info.ChannelId, info.Participants[i]), + Label: string(info.ChannelId)[0:6], + Source: info.Participants[i].String(), + Target: info.Participants[i+1].String(), + Start: info.Time.Format( + "2006-01-02T15:04:05-0700"), + AttValues: &attValues, + }, + ) + } - } - if isCloseChannel { - for i := 0; i < len(gr.graph.Edges.Edges); i++ { + } + if isCloseChannel { + for i := 0; i < len(gr.graph.Edges.Edges); i++ { - if strings.Contains(gr.graph.Edges.Edges[i].ID, info.ChannelId) { + if strings.Contains(gr.graph.Edges.Edges[i].ID, info.ChannelId) { - gr.graph.Edges.Edges[i].End = info.Time.Format( - "2006-01-02T15:04:05-0700") + gr.graph.Edges.Edges[i].End = info.Time.Format( + "2006-01-02T15:04:05-0700") + } } } } } -