Skip to content

To add bandwidth metrics #35

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Draft
wants to merge 15 commits into
base: master
Choose a base branch
from
6 changes: 3 additions & 3 deletions bootstrap.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,9 +7,9 @@ import (
"time"

dht "github.com/libp2p/go-libp2p-kad-dht"
"github.com/libp2p/go-libp2p/core/network"
"github.com/libp2p/go-libp2p/core/peer"
"github.com/libp2p/go-libp2p/core/peerstore"
"github.com/chiangmaioneluv/go-libp2p/core/network"
"github.com/chiangmaioneluv/go-libp2p/core/peer"
"github.com/chiangmaioneluv/go-libp2p/core/peerstore"
)

var BootstrapPeers = dht.DefaultBootstrapPeers
Expand Down
72 changes: 68 additions & 4 deletions conn.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,14 +3,13 @@ package p2pd
import (
"context"
"fmt"
"github.com/chiangmaioneluv/go-libp2p/core/network"
"github.com/chiangmaioneluv/go-libp2p/core/peer"
"github.com/chiangmaioneluv/go-libp2p/core/protocol"
"io"
"net"
"time"

"github.com/libp2p/go-libp2p/core/network"
"github.com/libp2p/go-libp2p/core/peer"
"github.com/libp2p/go-libp2p/core/protocol"

"github.com/learning-at-home/go-libp2p-daemon/internal/utils"
pb "github.com/learning-at-home/go-libp2p-daemon/pb"

Expand Down Expand Up @@ -124,6 +123,14 @@ func (d *Daemon) handleConn(c net.Conn) {
return
}

case pb.Request_BANDWIDTH_METRICS:
res := d.doBandwidthMetrics(&req)
err := w.WriteMsg(res)
if err != nil {
log.Debugw("error writing response", "error", err)
return
}

case pb.Request_CONNMANAGER:
res := d.doConnManager(&req)
err := w.WriteMsg(res)
Expand Down Expand Up @@ -329,6 +336,63 @@ func (d *Daemon) doRemoveStreamHandler(req *pb.Request) *pb.Response {
return okResponse()
}

func (d *Daemon) doBandwidthMetrics(req *pb.Request) *pb.Response {
if d.bandwidth_metrics == nil {
log.Debugw("error getting bandwidth metrics: daemon option is off")
return errorResponseString("error getting bandwidth metrics: daemon option is off")
}
selfRateIn := 0.0
selfRateOut := 0.0
if req.Bwr.GetForSelf() {
stats := d.bandwidth_metrics.GetBandwidthTotals()
selfRateIn = stats.RateIn
selfRateOut = stats.RateOut
}
res := okResponse()
res.Bwr = &pb.BandwidthMetricsResponse{
SelfRateIn: &selfRateIn,
SelfRateOut: &selfRateOut,
}

if req.Bwr.GetForAllPeers() {
peerStats := d.bandwidth_metrics.GetBandwidthByPeer()
peers := make([]*pb.PeerInfo, len(peerStats))
i := 0
for id, stats := range peerStats {
rateIn := stats.RateIn
rateOut := stats.RateOut
peers[i] = &pb.PeerInfo{
Id: []byte(id),
Ratein: &rateIn,
Rateout: &rateOut,
}
i++
}
res.Bwr.Peers = peers
} else {
peers := make([]*pb.PeerInfo, len(req.Bwr.Ids))
i := 0
for _, id := range req.Bwr.Ids {
peer_id, err := peer.IDFromBytes([]byte(id))
if err != nil {
log.Debugw("error parsing peer ID", "error", err)
return errorResponse(err)
}
stats := d.bandwidth_metrics.GetBandwidthForPeer(peer_id)

peers[i] = &pb.PeerInfo{
Id: []byte(id),
Ratein: &stats.RateIn,
Rateout: &stats.RateOut,
}
i++
}
res.Bwr.Peers = peers
}

return res
}

func (d *Daemon) doListPeers(req *pb.Request) *pb.Response {
conns := d.host.Network().Conns()
peers := make([]*pb.PeerInfo, len(conns))
Expand Down
2 changes: 1 addition & 1 deletion connmgr.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ import (
"context"
"time"

"github.com/libp2p/go-libp2p/core/peer"
"github.com/chiangmaioneluv/go-libp2p/core/peer"

pb "github.com/learning-at-home/go-libp2p-daemon/pb"
)
Expand Down
27 changes: 19 additions & 8 deletions daemon.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,13 +11,14 @@ import (
"github.com/learning-at-home/go-libp2p-daemon/config"
"github.com/learning-at-home/go-libp2p-daemon/internal/utils"

"github.com/libp2p/go-libp2p"
"github.com/libp2p/go-libp2p/core/host"
"github.com/libp2p/go-libp2p/core/peer"
"github.com/libp2p/go-libp2p/core/protocol"
"github.com/libp2p/go-libp2p/core/routing"
"github.com/libp2p/go-libp2p/p2p/host/resource-manager"
"github.com/libp2p/go-libp2p/p2p/protocol/circuitv2/relay"
"github.com/chiangmaioneluv/go-libp2p"
"github.com/chiangmaioneluv/go-libp2p/core/host"
"github.com/chiangmaioneluv/go-libp2p/core/metrics"
"github.com/chiangmaioneluv/go-libp2p/core/peer"
"github.com/chiangmaioneluv/go-libp2p/core/protocol"
"github.com/chiangmaioneluv/go-libp2p/core/routing"
"github.com/chiangmaioneluv/go-libp2p/p2p/host/resource-manager"
"github.com/chiangmaioneluv/go-libp2p/p2p/protocol/circuitv2/relay"

multierror "github.com/hashicorp/go-multierror"
logging "github.com/ipfs/go-log"
Expand Down Expand Up @@ -64,13 +65,16 @@ type Daemon struct {
cancelTerminateTimer context.CancelFunc

persistentConnMsgMaxSize int

bandwidth_metrics *metrics.BandwidthCounter
}

func NewDaemon(
ctx context.Context,
maddr ma.Multiaddr,
dhtMode string,
relayDiscovery bool,
bandwidthMetricsEnabled bool,
trustedRelays []string,
persistentConnMsgMaxSize int,
opts ...libp2p.Option,
Expand All @@ -81,7 +85,7 @@ func NewDaemon(
registeredUnaryProtocols: make(map[protocol.ID]*utils.RoundRobin),
persistentConnMsgMaxSize: persistentConnMsgMaxSize,
}
// setup resource usage limits; see https://github.com/libp2p/go-libp2p/tree/master/p2p/host/resource-manager
// setup resource usage limits; see https://github.com/chiangmaioneluv/go-libp2p/tree/master/p2p/host/resource-manager
rm, err := rcmgr.NewResourceManager(rcmgr.NewFixedLimiter(rcmgr.InfiniteLimits))
if err != nil {
panic(err)
Expand All @@ -101,6 +105,13 @@ func NewDaemon(
opts = append(opts, libp2p.Routing(d.DHTRoutingFactory(dhtOpts)))
}

if bandwidthMetricsEnabled {
d.bandwidth_metrics = metrics.NewBandwidthCounter()
opts = append(opts, libp2p.BandwidthReporter(d.bandwidth_metrics))
} else {
d.bandwidth_metrics = nil
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: d.bandwidth_metrics are nil be default anyway.
ignore this comment if you wish

}

h, err := libp2p.New(opts...)
if err != nil {
return nil, err
Expand Down
4 changes: 2 additions & 2 deletions dht.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,8 +5,8 @@ import (

pb "github.com/learning-at-home/go-libp2p-daemon/pb"

"github.com/libp2p/go-libp2p/core/crypto"
"github.com/libp2p/go-libp2p/core/peer"
"github.com/chiangmaioneluv/go-libp2p/core/crypto"
"github.com/chiangmaioneluv/go-libp2p/core/peer"

cid "github.com/ipfs/go-cid"
)
Expand Down
6 changes: 4 additions & 2 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -4,12 +4,13 @@ go 1.20

require (
github.com/cenkalti/backoff/v4 v4.2.1
github.com/chiangmaioneluv/go-libp2p v1.6.0
github.com/gogo/protobuf v1.3.2
github.com/golang/protobuf v1.5.3
github.com/google/uuid v1.4.0
github.com/hashicorp/go-multierror v1.1.1
github.com/ipfs/go-cid v0.4.1
github.com/ipfs/go-log v1.0.5
github.com/libp2p/go-libp2p v0.32.1
github.com/libp2p/go-libp2p-kad-dht v0.25.1
github.com/libp2p/go-libp2p-pubsub v0.10.0
github.com/multiformats/go-multiaddr v0.12.0
Expand All @@ -22,6 +23,7 @@ require (
github.com/benbjohnson/clock v1.3.5 // indirect
github.com/beorn7/perks v1.0.1 // indirect
github.com/cespare/xxhash/v2 v2.2.0 // indirect
github.com/chiangmaioneluv/go-maxflow-metrics v1.6.0 // indirect
github.com/containerd/cgroups v1.1.0 // indirect
github.com/coreos/go-systemd/v22 v22.5.0 // indirect
github.com/davecgh/go-spew v1.1.1 // indirect
Expand Down Expand Up @@ -54,7 +56,7 @@ require (
github.com/koron/go-ssdp v0.0.4 // indirect
github.com/libp2p/go-buffer-pool v0.1.0 // indirect
github.com/libp2p/go-cidranger v1.1.0 // indirect
github.com/libp2p/go-flow-metrics v0.1.0 // indirect
github.com/libp2p/go-libp2p v0.32.0 // indirect
github.com/libp2p/go-libp2p-asn-util v0.3.0 // indirect
github.com/libp2p/go-libp2p-kbucket v0.6.3 // indirect
github.com/libp2p/go-libp2p-record v0.2.0 // indirect
Expand Down
11 changes: 8 additions & 3 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,10 @@ github.com/cenkalti/backoff/v4 v4.2.1/go.mod h1:Y3VNntkOUPxTVeUxJ/G5vcM//AlwfmyY
github.com/census-instrumentation/opencensus-proto v0.2.1/go.mod h1:f6KPmirojxKA12rnyqOA5BBL4O983OfeGPqjHWSTneU=
github.com/cespare/xxhash/v2 v2.2.0 h1:DC2CZ1Ep5Y4k3ZQ899DldepgrayRUGE6BBZ/cd9Cj44=
github.com/cespare/xxhash/v2 v2.2.0/go.mod h1:VGX0DQ3Q6kWi7AoAeZDth3/j3BFtOZR5XLFGgcrjCOs=
github.com/chiangmaioneluv/go-libp2p v1.6.0 h1:cUYFTHlxTRY+2lrNTspDVgEKPZgGDUjQ7zbmzpAtLB8=
github.com/chiangmaioneluv/go-libp2p v1.6.0/go.mod h1:5QHuTD/TXMeLbWXu94HSfPXzFfJB06l42v3MatLgYBk=
github.com/chiangmaioneluv/go-maxflow-metrics v1.6.0 h1:itVY5W8fexp9ie+mVGPHtEe79Ad1bmsqZzpBytSRLqc=
github.com/chiangmaioneluv/go-maxflow-metrics v1.6.0/go.mod h1:W0WI9SkSPxMVwF0Z7SBWKhPjN31am5vEvzDCwn+e21s=
github.com/cilium/ebpf v0.2.0/go.mod h1:To2CFviqOWL/M0gIMsvSMlqe7em/l1ALkX1PyjrX2Qs=
github.com/client9/misspell v0.3.4/go.mod h1:qj6jICC3Q7zFZvVWo7KLAzC3yx5G7kyvSDkc90ppPyw=
github.com/cncf/udpa/go v0.0.0-20191209042840-269d4d468f6f/go.mod h1:M8M6+tZqaGXZJjfX53e64911xZQV5JYwmTeXPW+k8Sc=
Expand Down Expand Up @@ -98,6 +102,7 @@ github.com/golang/protobuf v1.4.1/go.mod h1:U8fpvMrcmy5pZrNK1lt4xCsGvpyWQ/VVv6QD
github.com/golang/protobuf v1.4.3/go.mod h1:oDoupMAO8OvCJWAcko0GGGIgR6R6ocIYbsSw735rRwI=
github.com/golang/protobuf v1.5.0/go.mod h1:FsONVRAS9T7sI+LIUmWTfcYkHO4aIWwzhcaSAoJOfIk=
github.com/golang/protobuf v1.5.3 h1:KhyjKVUg7Usr/dYsdSqoFveMYd5ko72D+zANwlG1mmg=
github.com/golang/protobuf v1.5.3/go.mod h1:XVQd3VNwM+JqD3oG2Ue2ip4fOMUkwXdXDdiuN0vRsmY=
github.com/google/btree v0.0.0-20180813153112-4030bb1f1f0c/go.mod h1:lNA+9X1NB3Zf8V7Ke586lFgjr2dZNuvo3lPJSGZ5JPQ=
github.com/google/go-cmp v0.2.0/go.mod h1:oXzfMopK8JAjlY9xF4vHSVASa0yLyX7SntLO5aqRK0M=
github.com/google/go-cmp v0.3.0/go.mod h1:8QqcDgzrUqlUb/G2PQTWiueGozuR1884gddMywk6iLU=
Expand Down Expand Up @@ -188,9 +193,8 @@ github.com/libp2p/go-buffer-pool v0.1.0/go.mod h1:N+vh8gMqimBzdKkSMVuydVDq+UV5QT
github.com/libp2p/go-cidranger v1.1.0 h1:ewPN8EZ0dd1LSnrtuwd4709PXVcITVeuwbag38yPW7c=
github.com/libp2p/go-cidranger v1.1.0/go.mod h1:KWZTfSr+r9qEo9OkI9/SIEeAtw+NNoU0dXIXt15Okic=
github.com/libp2p/go-flow-metrics v0.1.0 h1:0iPhMI8PskQwzh57jB9WxIuIOQ0r+15PChFGkx3Q3WM=
github.com/libp2p/go-flow-metrics v0.1.0/go.mod h1:4Xi8MX8wj5aWNDAZttg6UPmc0ZrnFNsMtpsYUClFtro=
github.com/libp2p/go-libp2p v0.32.1 h1:wy1J4kZIZxOaej6NveTWCZmHiJ/kY7GoAqXgqNCnPps=
github.com/libp2p/go-libp2p v0.32.1/go.mod h1:hXXC3kXPlBZ1eu8Q2hptGrMB4mZ3048JUoS4EKaHW5c=
github.com/libp2p/go-libp2p v0.32.0 h1:86I4B7nBUPIyTgw3+5Ibq6K7DdKRCuZw8URCfPc1hQM=
github.com/libp2p/go-libp2p v0.32.0/go.mod h1:hXXC3kXPlBZ1eu8Q2hptGrMB4mZ3048JUoS4EKaHW5c=
github.com/libp2p/go-libp2p-asn-util v0.3.0 h1:gMDcMyYiZKkocGXDQ5nsUQyquC9+H+iLEQHwOCZ7s8s=
github.com/libp2p/go-libp2p-asn-util v0.3.0/go.mod h1:B1mcOrKUE35Xq/ASTmQ4tN3LNzVVaMNmq2NACuqyB9w=
github.com/libp2p/go-libp2p-kad-dht v0.25.1 h1:ofFNrf6MMEy4vi3R1VbJ7LOcTn3Csh0cDcaWHTxtWNA=
Expand Down Expand Up @@ -549,6 +553,7 @@ google.golang.org/protobuf v1.23.0/go.mod h1:EGpADcykh3NcUnDUJcl1+ZksZNG86OlYog2
google.golang.org/protobuf v1.23.1-0.20200526195155-81db48ad09cc/go.mod h1:EGpADcykh3NcUnDUJcl1+ZksZNG86OlYog2l/sGQquU=
google.golang.org/protobuf v1.25.0/go.mod h1:9JNX74DMeImyA3h4bdi1ymwjUzf21/xIlbajtzgsN7c=
google.golang.org/protobuf v1.26.0-rc.1/go.mod h1:jlhhOSvTdKEhbULTjvd4ARK9grFBp09yW+WbY/TyQbw=
google.golang.org/protobuf v1.26.0/go.mod h1:9q0QmTI4eRPtz6boOQmLYwt+qCgq0jsYwAQnmE0givc=
google.golang.org/protobuf v1.31.0 h1:g0LDEJHgrBl9N9r17Ru3sqWhkIx2NB67okBHPwC7hs8=
google.golang.org/protobuf v1.31.0/go.mod h1:HV8QOd/L58Z+nl8r43ehVNZIU/HEI6OcFqwMG9pJV4I=
gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0=
Expand Down
2 changes: 1 addition & 1 deletion identity.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ package p2pd
import (
"os"

"github.com/libp2p/go-libp2p/core/crypto"
"github.com/chiangmaioneluv/go-libp2p/core/crypto"
)

func ReadIdentity(path string) (crypto.PrivKey, error) {
Expand Down
4 changes: 2 additions & 2 deletions p2p-keygen/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,8 +7,8 @@ import (

p2pd "github.com/learning-at-home/go-libp2p-daemon"

"github.com/libp2p/go-libp2p/core/crypto"
"github.com/libp2p/go-libp2p/core/peer"
"github.com/chiangmaioneluv/go-libp2p/core/crypto"
"github.com/chiangmaioneluv/go-libp2p/core/peer"
)

func main() {
Expand Down
4 changes: 2 additions & 2 deletions p2pclient/dht.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,8 +6,8 @@ import (
"fmt"
"net"

"github.com/libp2p/go-libp2p/core/crypto"
"github.com/libp2p/go-libp2p/core/peer"
"github.com/chiangmaioneluv/go-libp2p/core/crypto"
"github.com/chiangmaioneluv/go-libp2p/core/peer"

ggio "github.com/gogo/protobuf/io"
cid "github.com/ipfs/go-cid"
Expand Down
2 changes: 1 addition & 1 deletion p2pclient/p2pclient.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ import (
"errors"
"sync"

"github.com/libp2p/go-libp2p/core/peer"
"github.com/chiangmaioneluv/go-libp2p/core/peer"

ggio "github.com/gogo/protobuf/io"
logging "github.com/ipfs/go-log"
Expand Down
2 changes: 1 addition & 1 deletion p2pclient/pubsub.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ import (
"context"
"fmt"

"github.com/libp2p/go-libp2p/core/peer"
"github.com/chiangmaioneluv/go-libp2p/core/peer"

ggio "github.com/gogo/protobuf/io"
pb "github.com/learning-at-home/go-libp2p-daemon/pb"
Expand Down
2 changes: 1 addition & 1 deletion p2pclient/streams.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ import (
"net"

"github.com/hashicorp/go-multierror"
"github.com/libp2p/go-libp2p/core/peer"
"github.com/chiangmaioneluv/go-libp2p/core/peer"

ggio "github.com/gogo/protobuf/io"
proto "github.com/gogo/protobuf/proto"
Expand Down
6 changes: 3 additions & 3 deletions p2pclient/unary_handlers.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,9 +7,9 @@ import (
"github.com/google/uuid"
"github.com/learning-at-home/go-libp2p-daemon/internal/utils"
pb "github.com/learning-at-home/go-libp2p-daemon/pb"
"github.com/libp2p/go-libp2p/core/network"
"github.com/libp2p/go-libp2p/core/peer"
"github.com/libp2p/go-libp2p/core/protocol"
"github.com/chiangmaioneluv/go-libp2p/core/network"
"github.com/chiangmaioneluv/go-libp2p/core/peer"
"github.com/chiangmaioneluv/go-libp2p/core/protocol"

ggio "github.com/gogo/protobuf/io"
)
Expand Down
15 changes: 8 additions & 7 deletions p2pd/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,17 +14,17 @@ import (
"strings"
"time"

"github.com/libp2p/go-libp2p"
"github.com/chiangmaioneluv/go-libp2p"

p2pd "github.com/learning-at-home/go-libp2p-daemon"
config "github.com/learning-at-home/go-libp2p-daemon/config"
ps "github.com/libp2p/go-libp2p-pubsub"
network "github.com/libp2p/go-libp2p/core/network"
network "github.com/chiangmaioneluv/go-libp2p/core/network"

"github.com/libp2p/go-libp2p/p2p/muxer/yamux"
connmgr "github.com/libp2p/go-libp2p/p2p/net/connmgr"
noise "github.com/libp2p/go-libp2p/p2p/security/noise"
tls "github.com/libp2p/go-libp2p/p2p/security/tls"
"github.com/chiangmaioneluv/go-libp2p/p2p/muxer/yamux"
connmgr "github.com/chiangmaioneluv/go-libp2p/p2p/net/connmgr"
noise "github.com/chiangmaioneluv/go-libp2p/p2p/security/noise"
tls "github.com/chiangmaioneluv/go-libp2p/p2p/security/tls"
multiaddr "github.com/multiformats/go-multiaddr"
promhttp "github.com/prometheus/client_golang/prometheus/promhttp"

Expand Down Expand Up @@ -120,6 +120,7 @@ func main() {
persistentConnMaxMsgSize := flag.Int("persistentConnMaxMsgSize", 4*1024*1024,
"Max size for persistent connection messages (bytes). Default: 4 MiB")
muxer := flag.String("muxer", "yamux", "muxer to use for connections")
bandwidthMetricsEnabled := flag.Bool("bandwidthMetrics", true, "Enables collection of bandwidth rate metrics")

flag.Parse()

Expand Down Expand Up @@ -388,7 +389,7 @@ func main() {
// start daemon
d, err := p2pd.NewDaemon(
defaultCtx, &c.ListenAddr, c.DHT.Mode,
c.Relay.Discovery, trustedRelays, *persistentConnMaxMsgSize,
c.Relay.Discovery, *bandwidthMetricsEnabled, trustedRelays, *persistentConnMaxMsgSize,
opts...)
if err != nil {
log.Fatal(err)
Expand Down
Loading