Skip to content

Commit cbaeb93

Browse files
committedJan 30, 2020
client: silence logger errors if connection was gracefully closed
kademlia: add logs for evictions and insertions into routing table kademlia/table: have Update return true if an id is newly inserted into a bucket, and false if the id was just shifted to the head of the bucket node, client: remove references to prior noise.Binder which was renamed to noise.Protocol node, protocol, kademlia/protocol: add callback should any arbitrary ping or dial to a peer fails, which kademlia uses to evict peers cmd/chat: add chat example kademlia/table, protocol, client: add more docs
1 parent 7986d07 commit cbaeb93

File tree

10 files changed

+463
-39
lines changed

10 files changed

+463
-39
lines changed
 

‎client.go

+50-16
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,7 @@ import (
44
"context"
55
"crypto/aes"
66
"crypto/cipher"
7+
"encoding/hex"
78
"errors"
89
"fmt"
910
"github.com/oasislabs/ed25519"
@@ -90,7 +91,7 @@ func newClient(node *Node) *Client {
9091
clientDone: make(chan struct{}),
9192
}
9293

93-
c.logger.Logger = node.logger
94+
c.SetLogger(node.logger)
9495

9596
return c
9697
}
@@ -251,9 +252,9 @@ func (c *Client) outbound(ctx context.Context, addr string) {
251252
return
252253
}
253254

254-
conn.(*net.TCPConn).SetNoDelay(false)
255-
conn.(*net.TCPConn).SetWriteBuffer(10000)
256-
conn.(*net.TCPConn).SetReadBuffer(10000)
255+
_ = conn.(*net.TCPConn).SetNoDelay(false)
256+
_ = conn.(*net.TCPConn).SetWriteBuffer(10000)
257+
_ = conn.(*net.TCPConn).SetReadBuffer(10000)
257258

258259
c.conn = conn
259260
c.startTimeout(ctx)
@@ -265,8 +266,10 @@ func (c *Client) outbound(ctx context.Context, addr string) {
265266

266267
c.handleLoop()
267268

268-
for _, binder := range c.node.binders {
269-
binder.OnPeerLeave(c)
269+
c.Logger().Debug("Peer connection closed.")
270+
271+
for _, protocol := range c.node.protocols {
272+
protocol.OnPeerLeave(c)
270273
}
271274
}
272275

@@ -296,8 +299,8 @@ func (c *Client) inbound(conn net.Conn, addr string) {
296299

297300
c.handleLoop()
298301

299-
for _, binder := range c.node.binders {
300-
binder.OnPeerLeave(c)
302+
for _, protocol := range c.node.protocols {
303+
protocol.OnPeerLeave(c)
301304
}
302305
}
303306

@@ -496,8 +499,17 @@ func (c *Client) handshake(ctx context.Context) {
496499

497500
c.id = id
498501

499-
for _, binder := range c.node.binders {
500-
binder.OnPeerJoin(c)
502+
c.SetLogger(c.Logger().With(
503+
zap.String("peer_id", id.ID.String()),
504+
zap.String("peer_addr", id.Address),
505+
zap.String("remote_addr", c.conn.RemoteAddr().String()),
506+
zap.String("session_key", hex.EncodeToString(shared[:])),
507+
))
508+
509+
c.Logger().Debug("Peer connection opened.")
510+
511+
for _, protocol := range c.node.protocols {
512+
protocol.OnPeerJoin(c)
501513
}
502514
}
503515

@@ -507,13 +519,15 @@ func (c *Client) handleLoop() {
507519
for {
508520
msg, err := c.recv(context.Background())
509521
if err != nil {
510-
c.logger.Warn("Got an error deserializing a message from a peer.", zap.Error(err))
522+
if !isEOF(err) {
523+
c.Logger().Warn("Got an error deserializing a message from a peer.", zap.Error(err))
524+
}
511525
c.reportError(err)
512526
break
513527
}
514528

515-
for _, binder := range c.node.binders {
516-
binder.OnMessageRecv(c)
529+
for _, protocol := range c.node.protocols {
530+
protocol.OnMessageRecv(c)
517531
}
518532

519533
if ch := c.requests.findRequest(msg.nonce); ch != nil {
@@ -524,7 +538,7 @@ func (c *Client) handleLoop() {
524538

525539
for _, handler := range c.node.handlers {
526540
if err = handler(HandlerContext{client: c, msg: msg}); err != nil {
527-
c.logger.Warn("Got an error executing a message handler.", zap.Error(err))
541+
c.Logger().Warn("Got an error executing a message handler.", zap.Error(err))
528542
c.reportError(err)
529543
break
530544
}
@@ -542,7 +556,9 @@ func (c *Client) writeLoop(conn net.Conn) {
542556
defer close(c.writerDone)
543557

544558
if err := c.writer.loop(conn); err != nil {
545-
c.logger.Warn("Got an error while sending messages.", zap.Error(err))
559+
if !isEOF(err) {
560+
c.Logger().Warn("Got an error while sending messages.", zap.Error(err))
561+
}
546562
c.reportError(err)
547563
c.close()
548564
}
@@ -552,8 +568,26 @@ func (c *Client) readLoop(conn net.Conn) {
552568
defer close(c.readerDone)
553569

554570
if err := c.reader.loop(conn); err != nil {
555-
c.logger.Warn("Got an error while reading incoming messages.", zap.Error(err))
571+
if !isEOF(err) {
572+
c.Logger().Warn("Got an error while reading incoming messages.", zap.Error(err))
573+
}
556574
c.reportError(err)
557575
c.close()
558576
}
559577
}
578+
579+
func isEOF(err error) bool {
580+
if errors.Is(err, io.EOF) {
581+
return true
582+
}
583+
584+
var netErr *net.OpError
585+
586+
if errors.As(err, &netErr) {
587+
if netErr.Err.Error() == "use of closed network connection" {
588+
return true
589+
}
590+
}
591+
592+
return false
593+
}

‎cmd/chat/go.mod

+11
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,11 @@
1+
module github.com/perlin-network/noise/cmd/chat
2+
3+
replace github.com/perlin-network/noise => ../../
4+
5+
go 1.13
6+
7+
require (
8+
github.com/perlin-network/noise v0.0.0-00010101000000-000000000000
9+
github.com/spf13/pflag v1.0.5
10+
go.uber.org/zap v1.13.0
11+
)

‎cmd/chat/go.sum

+78
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,78 @@
1+
github.com/BurntSushi/toml v0.3.1 h1:WXkYYl6Yr3qBf1K79EBnL4mak0OimBfB0XUf9Vl28OQ=
2+
github.com/BurntSushi/toml v0.3.1/go.mod h1:xHWCNGjB5oqiDr8zfno3MHue2Ht5sIBksp03qcyfWMU=
3+
github.com/agl/ed25519 v0.0.0-20170116200512-5312a6153412 h1:w1UutsfOrms1J05zt7ISrnJIXKzwaspym5BTKGx93EI=
4+
github.com/agl/ed25519 v0.0.0-20170116200512-5312a6153412/go.mod h1:WPjqKcmVOxf0XSf3YxCJs6N6AOSrOx3obionmG7T0y0=
5+
github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
6+
github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c=
7+
github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
8+
github.com/google/renameio v0.1.0/go.mod h1:KWCgfxg9yswjAJkECMjeO8J8rahYeXnNhOm40UhjYkI=
9+
github.com/kisielk/gotool v1.0.0/go.mod h1:XhKaO+MFFWcvkIS/tQcRk01m1F5IRFswLeQ+oQHNcck=
10+
github.com/kr/pretty v0.1.0 h1:L/CwN0zerZDmRFUapSPitk6f+Q3+0za1rQkzVuMiMFI=
11+
github.com/kr/pretty v0.1.0/go.mod h1:dAy3ld7l9f0ibDNOQOHHMYYIIbhfbHSm3C4ZsoJORNo=
12+
github.com/kr/pty v1.1.1/go.mod h1:pFQYn66WHrOpPYNljwOMqo10TkYh1fy3cYio2l3bCsQ=
13+
github.com/kr/text v0.1.0 h1:45sCR5RtlFHMR4UwH9sdQ5TC8v0qDQCHnXt+kaKSTVE=
14+
github.com/kr/text v0.1.0/go.mod h1:4Jbv+DJW3UT/LiOwJeYQe1efqtUx/iVham/4vfdArNI=
15+
github.com/oasislabs/ed25519 v0.0.0-20191122104632-9d9ffc15f526 h1:xKlK+m6tNFucKVOP4V0GDgU4IgaLbS+HRoiVbN3W8Y4=
16+
github.com/oasislabs/ed25519 v0.0.0-20191122104632-9d9ffc15f526/go.mod h1:xIpCyrK2ouGA4QBGbiNbkoONrvJ00u9P3QOkXSOAC0c=
17+
github.com/pkg/errors v0.8.1 h1:iURUrRGxPUNPdy5/HRSm+Yj6okJ6UtLINN0Q9M4+h3I=
18+
github.com/pkg/errors v0.8.1/go.mod h1:bwawxfHBFNV+L2hUp1rHADufV3IMtnDRdf1r5NINEl0=
19+
github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM=
20+
github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4=
21+
github.com/rogpeppe/go-internal v1.3.0/go.mod h1:M8bDsm7K2OlrFYOpmOWEs/qY81heoFRclV5y23lUDJ4=
22+
github.com/spf13/pflag v1.0.5 h1:iy+VFUOCP1a+8yFto/drg2CJ5u0yRoB7fZw3DKv/JXA=
23+
github.com/spf13/pflag v1.0.5/go.mod h1:McXfInJRrz4CZXVZOBLb0bTZqETkiAhM9Iw0y3An2Bg=
24+
github.com/stretchr/objx v0.1.0/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME=
25+
github.com/stretchr/testify v1.3.0/go.mod h1:M5WIy9Dh21IEIfnGCwXGc5bZfKNJtfHm1UVUgZn+9EI=
26+
github.com/stretchr/testify v1.4.0 h1:2E4SXV/wtOkTonXsotYi4li6zVWxYlZuYNCXe9XRJyk=
27+
github.com/stretchr/testify v1.4.0/go.mod h1:j7eGeouHqKxXV5pUuKE4zz7dFj8WfuZ+81PSLYec5m4=
28+
github.com/valyala/bytebufferpool v1.0.0 h1:GqA5TC/0021Y/b9FG4Oi9Mr3q7XYx6KllzawFIhcdPw=
29+
github.com/valyala/bytebufferpool v1.0.0/go.mod h1:6bBcMArwyJ5K/AmCkWv1jt77kVWyCJ6HpOuEn7z0Csc=
30+
go.uber.org/atomic v1.5.0 h1:OI5t8sDa1Or+q8AeE+yKeB/SDYioSHAgcVljj9JIETY=
31+
go.uber.org/atomic v1.5.0/go.mod h1:sABNBOSYdrvTF6hTgEIbc7YasKWGhgEQZyfxyTvoXHQ=
32+
go.uber.org/atomic v1.5.1 h1:rsqfU5vBkVknbhUGbAUwQKR2H4ItV8tjJ+6kJX4cxHM=
33+
go.uber.org/atomic v1.5.1/go.mod h1:sABNBOSYdrvTF6hTgEIbc7YasKWGhgEQZyfxyTvoXHQ=
34+
go.uber.org/goleak v1.0.0 h1:qsup4IcBdlmsnGfqyLl4Ntn3C2XCCuKAE7DwHpScyUo=
35+
go.uber.org/goleak v1.0.0/go.mod h1:8a7PlsEVH3e/a/GLqe5IIrQx6GzcnRmZEufDUTk4A7A=
36+
go.uber.org/multierr v1.3.0 h1:sFPn2GLc3poCkfrpIXGhBD2X0CMIo4Q/zSULXrj/+uc=
37+
go.uber.org/multierr v1.3.0/go.mod h1:VgVr7evmIr6uPjLBxg28wmKNXyqE9akIJ5XnfpiKl+4=
38+
go.uber.org/tools v0.0.0-20190618225709-2cfd321de3ee h1:0mgffUl7nfd+FpvXMVz4IDEaUSmT1ysygQC7qYo7sG4=
39+
go.uber.org/tools v0.0.0-20190618225709-2cfd321de3ee/go.mod h1:vJERXedbb3MVM5f9Ejo0C68/HhF8uaILCdgjnY+goOA=
40+
go.uber.org/zap v1.13.0 h1:nR6NoDBgAf67s68NhaXbsojM+2gxp3S1hWkHDl27pVU=
41+
go.uber.org/zap v1.13.0/go.mod h1:zwrFLgMcdUuIBviXEYEH1YKNaOBnKXsx2IPda5bBwHM=
42+
golang.org/x/crypto v0.0.0-20190308221718-c2843e01d9a2/go.mod h1:djNgcEr1/C05ACkg1iLfiJU5Ep61QUkGW8qpdssI0+w=
43+
golang.org/x/crypto v0.0.0-20190510104115-cbcb75029529/go.mod h1:yigFU9vqHzYiE8UmvKecakEJjdnWj3jj499lnFckfCI=
44+
golang.org/x/crypto v0.0.0-20191011191535-87dc89f01550/go.mod h1:yigFU9vqHzYiE8UmvKecakEJjdnWj3jj499lnFckfCI=
45+
golang.org/x/crypto v0.0.0-20191119213627-4f8c1d86b1ba h1:9bFeDpN3gTqNanMVqNcoR/pJQuP5uroC3t1D7eXozTE=
46+
golang.org/x/crypto v0.0.0-20191119213627-4f8c1d86b1ba/go.mod h1:LzIPMQfyMNhhGPhUkYOs5KpL4U8rLKemX1yGLhDgUto=
47+
golang.org/x/lint v0.0.0-20190930215403-16217165b5de h1:5hukYrvBGR8/eNkX5mdUezrA6JiaEZDtJb9Ei+1LlBs=
48+
golang.org/x/lint v0.0.0-20190930215403-16217165b5de/go.mod h1:6SW0HCj/g11FgYtHlgUYUwCkIfeOF89ocIRzGO/8vkc=
49+
golang.org/x/lint v0.0.0-20191125180803-fdd1cda4f05f h1:J5lckAjkw6qYlOZNj90mLYNTEKDvWeuc1yieZ8qUzUE=
50+
golang.org/x/lint v0.0.0-20191125180803-fdd1cda4f05f/go.mod h1:5qLYkcX4OjUUV8bRuDixDT3tpyyb+LUpUlRWLxfhWrs=
51+
golang.org/x/mod v0.0.0-20190513183733-4bf6d317e70e/go.mod h1:mXi4GBBbnImb6dmsKGUJ2LatrhH/nqhxcFungHvyanc=
52+
golang.org/x/mod v0.1.1-0.20191105210325-c90efee705ee/go.mod h1:QqPTAvyqsEbceGzBzNggFXnrqF1CaUcvgkdR5Ot7KZg=
53+
golang.org/x/net v0.0.0-20190311183353-d8887717615a/go.mod h1:t9HGtf8HONx5eT2rtn7q6eTqICYqUVnKs3thJo3Qplg=
54+
golang.org/x/net v0.0.0-20190404232315-eb5bcb51f2a3/go.mod h1:t9HGtf8HONx5eT2rtn7q6eTqICYqUVnKs3thJo3Qplg=
55+
golang.org/x/net v0.0.0-20190620200207-3b0461eec859/go.mod h1:z5CRVTTTmAJ677TzLLGU+0bjPO0LkuOLi4/5GtJWs/s=
56+
golang.org/x/sync v0.0.0-20190423024810-112230192c58/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM=
57+
golang.org/x/sys v0.0.0-20190215142949-d0b11bdaac8a/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY=
58+
golang.org/x/sys v0.0.0-20190412213103-97732733099d/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
59+
golang.org/x/text v0.3.0/go.mod h1:NqM8EUOU14njkJ3fqMW+pc6Ldnwhi/IjpwHt7yyuwOQ=
60+
golang.org/x/tools v0.0.0-20190311212946-11955173bddd/go.mod h1:LCzVGOaR6xXOjkQ3onu1FJEFr0SW1gC7cKk1uF8kGRs=
61+
golang.org/x/tools v0.0.0-20190621195816-6e04913cbbac/go.mod h1:/rFqwRUd4F7ZHNgwSSTFct+R/Kf4OFW1sUzUTQQTgfc=
62+
golang.org/x/tools v0.0.0-20191029041327-9cc4af7d6b2c/go.mod h1:b+2E5dAYhXwXZwtnZ6UAqBI28+e2cm9otk0dWdXHAEo=
63+
golang.org/x/tools v0.0.0-20191029190741-b9c20aec41a5 h1:hKsoRgsbwY1NafxrwTs+k64bikrLBkAgPir1TNCj3Zs=
64+
golang.org/x/tools v0.0.0-20191029190741-b9c20aec41a5/go.mod h1:b+2E5dAYhXwXZwtnZ6UAqBI28+e2cm9otk0dWdXHAEo=
65+
golang.org/x/tools v0.0.0-20191108193012-7d206e10da11/go.mod h1:b+2E5dAYhXwXZwtnZ6UAqBI28+e2cm9otk0dWdXHAEo=
66+
golang.org/x/tools v0.0.0-20191125144606-a911d9008d1f/go.mod h1:b+2E5dAYhXwXZwtnZ6UAqBI28+e2cm9otk0dWdXHAEo=
67+
golang.org/x/tools v0.0.0-20200129045341-207d3de1faaf h1:mFgR10kFfr83r2+nXf0GZC2FKrFhMSs9NdJ0YdEaGiY=
68+
golang.org/x/tools v0.0.0-20200129045341-207d3de1faaf/go.mod h1:TB2adYChydJhpapKDTa4BR/hXlZSLoq2Wpct/0txZ28=
69+
golang.org/x/xerrors v0.0.0-20190717185122-a985d3407aa7/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0=
70+
golang.org/x/xerrors v0.0.0-20191011141410-1b5146add898/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0=
71+
gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0=
72+
gopkg.in/check.v1 v1.0.0-20180628173108-788fd7840127 h1:qIbj1fsPNlZgppZ+VLlY7N33q108Sa+fhmuc+sWQYwY=
73+
gopkg.in/check.v1 v1.0.0-20180628173108-788fd7840127/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0=
74+
gopkg.in/errgo.v2 v2.1.0/go.mod h1:hNsd1EY+bozCKY1Ytp96fpM3vjJbqLJn88ws8XvfDNI=
75+
gopkg.in/yaml.v2 v2.2.2 h1:ZCJp+EgiOT7lHqUV2J862kp8Qj64Jo6az82+3Td9dZw=
76+
gopkg.in/yaml.v2 v2.2.2/go.mod h1:hI93XBmqTisBFMUTm0b8Fm+jr3Dg1NNxqwp+5A1VGuI=
77+
honnef.co/go/tools v0.0.1-2019.2.3 h1:3JgtbtFHMiCmsznwGVTUWbgGov+pVqnlf1dEJTNAXeM=
78+
honnef.co/go/tools v0.0.1-2019.2.3/go.mod h1:a3bituU0lyd329TUQxRnasdCoJDkEUEAqEt0JzvZhAg=

‎cmd/chat/main.go

+196
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,196 @@
1+
package main
2+
3+
import (
4+
"bufio"
5+
"context"
6+
"errors"
7+
"fmt"
8+
"github.com/perlin-network/noise"
9+
"github.com/perlin-network/noise/kademlia"
10+
"github.com/spf13/pflag"
11+
"io"
12+
"os"
13+
"os/signal"
14+
"strings"
15+
"time"
16+
)
17+
18+
type chatMessage struct {
19+
contents string
20+
}
21+
22+
func (m chatMessage) Marshal() []byte {
23+
return []byte(m.contents)
24+
}
25+
26+
func unmarshalChatMessage(buf []byte) (chatMessage, error) {
27+
return chatMessage{contents: strings.ToValidUTF8(string(buf), "")}, nil
28+
}
29+
30+
var (
31+
hostFlag = pflag.IPP("host", "h", nil, "binding host")
32+
portFlag = pflag.Uint16P("port", "p", 0, "binding port")
33+
addressFlag = pflag.StringP("address", "a", "", "publicly reachable network address")
34+
)
35+
36+
func check(err error) {
37+
if err != nil {
38+
panic(err)
39+
}
40+
}
41+
42+
const printedLength = 8
43+
44+
func main() {
45+
pflag.Parse()
46+
47+
node, err := noise.NewNode(
48+
noise.WithNodeBindHost(*hostFlag),
49+
noise.WithNodeBindPort(*portFlag),
50+
noise.WithNodeAddress(*addressFlag),
51+
)
52+
check(err)
53+
54+
defer node.Close()
55+
56+
node.RegisterMessage(chatMessage{}, unmarshalChatMessage)
57+
58+
node.Handle(func(ctx noise.HandlerContext) error {
59+
if ctx.IsRequest() {
60+
return nil
61+
}
62+
63+
obj, err := ctx.DecodeMessage()
64+
if err != nil {
65+
return nil
66+
}
67+
68+
msg, ok := obj.(chatMessage)
69+
if !ok {
70+
return nil
71+
}
72+
73+
fmt.Printf("%s(%s)> %s\n", ctx.ID().Address, ctx.ID().ID.String()[:printedLength], msg.contents)
74+
75+
return nil
76+
})
77+
78+
overlay := kademlia.NewProtocol()
79+
node.Bind(overlay)
80+
81+
check(node.Listen())
82+
83+
fmt.Printf("Your ID is %s(%s). Type '/discover' to attempt to discover new peers, or '/peers' to list"+
84+
" out all peers you are connected to.\n", node.ID().Address, node.ID().ID.String()[:printedLength])
85+
86+
peers := make([]*noise.Client, 0, pflag.NArg())
87+
88+
for _, peerAddr := range pflag.Args() {
89+
ctx, cancel := context.WithTimeout(context.Background(), 3*time.Second)
90+
peer, err := node.Ping(ctx, peerAddr)
91+
cancel()
92+
93+
if err != nil {
94+
fmt.Printf("Failed to ping bootstrap node (%s). Skipping... [error: %s]\n", peerAddr, err)
95+
continue
96+
}
97+
98+
peers = append(peers, peer)
99+
}
100+
101+
ids := overlay.Discover()
102+
103+
var str []string
104+
for _, id := range ids {
105+
str = append(str, fmt.Sprintf("%s(%s)", id.Address, id.ID.String()[:printedLength]))
106+
}
107+
108+
if len(ids) > 0 {
109+
fmt.Printf("Discovered %d peer(s): [%v]\n", len(ids), strings.Join(str, ", "))
110+
} else {
111+
fmt.Printf("Did not discover any peers.\n")
112+
}
113+
114+
go func() {
115+
r := bufio.NewReader(os.Stdin)
116+
117+
for {
118+
buf, _, err := r.ReadLine()
119+
if err != nil {
120+
if errors.Is(err, io.EOF) {
121+
return
122+
}
123+
124+
check(err)
125+
}
126+
127+
line := string(buf)
128+
129+
if len(line) == 0 {
130+
continue
131+
}
132+
133+
switch line {
134+
case "/discover":
135+
ids := overlay.Discover()
136+
137+
var str []string
138+
for _, id := range ids {
139+
str = append(str, fmt.Sprintf("%s(%s)", id.Address, id.ID.String()[:printedLength]))
140+
}
141+
142+
if len(ids) > 0 {
143+
fmt.Printf("Discovered %d peer(s): [%v]\n", len(ids), strings.Join(str, ", "))
144+
} else {
145+
fmt.Printf("Did not discover any peers.\n")
146+
}
147+
148+
continue
149+
case "/peers":
150+
ids := overlay.Table().Peers()
151+
152+
var str []string
153+
for _, id := range ids {
154+
str = append(str, fmt.Sprintf("%s(%s)", id.Address, id.ID.String()[:printedLength]))
155+
}
156+
157+
fmt.Printf("You know %d peer(s): [%v]\n", len(ids), strings.Join(str, ", "))
158+
159+
continue
160+
default:
161+
}
162+
163+
if strings.HasPrefix(line, "/") {
164+
fmt.Printf("Your ID is %s(%s). Type '/discover' to attempt to discover new "+
165+
"peers, or '/peers' to list out all peers you are connected to.\n",
166+
node.ID().Address,
167+
node.ID().ID.String()[:printedLength],
168+
)
169+
170+
continue
171+
}
172+
173+
for _, id := range overlay.Table().Peers() {
174+
ctx, cancel := context.WithTimeout(context.Background(), 3*time.Second)
175+
err := node.SendMessage(ctx, id.Address, chatMessage{contents: line})
176+
cancel()
177+
178+
if err != nil {
179+
fmt.Printf("Failed to send message to %s(%s). Skipping... [error: %s]\n",
180+
id.Address,
181+
id.ID.String()[:printedLength],
182+
err,
183+
)
184+
continue
185+
}
186+
}
187+
}
188+
}()
189+
190+
c := make(chan os.Signal, 1)
191+
signal.Notify(c, os.Interrupt)
192+
<-c
193+
194+
check(os.Stdin.Close())
195+
println()
196+
}

‎kademlia/protocol.go

+37-4
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,7 @@ import (
77
"errors"
88
"fmt"
99
"github.com/perlin-network/noise"
10+
"go.uber.org/zap"
1011
"time"
1112
)
1213

@@ -86,7 +87,15 @@ func (p *Protocol) Table() *Table {
8687
// peer ID at the tail of the bucket is evicted and your peer ID is inserted to the head of the bucket.
8788
func (p *Protocol) Ack(id noise.ID) {
8889
for {
89-
if err := p.table.Update(id); err == nil {
90+
inserted, err := p.table.Update(id)
91+
if err == nil {
92+
if inserted {
93+
p.node.Logger().Debug("Peer was inserted into routing table.",
94+
zap.String("peer_id", id.String()),
95+
zap.String("peer_addr", id.Address),
96+
)
97+
}
98+
9099
return
91100
}
92101

@@ -98,15 +107,32 @@ func (p *Protocol) Ack(id noise.ID) {
98107
cancel()
99108

100109
if err != nil {
101-
p.table.Delete(last.ID)
110+
if p.table.Delete(last.ID) {
111+
p.node.Logger().Debug("Peer was evicted from routing table by failing to be pinged.",
112+
zap.String("peer_id", last.ID.String()),
113+
zap.String("peer_addr", last.Address),
114+
zap.Error(err),
115+
)
116+
}
102117
continue
103118
}
104119

105120
if _, ok := pong.(Pong); !ok {
106-
p.table.Delete(last.ID)
121+
if p.table.Delete(last.ID) {
122+
p.node.Logger().Debug("Peer was evicted from routing table by failing to be pinged.",
123+
zap.String("peer_id", last.ID.String()),
124+
zap.String("peer_addr", last.Address),
125+
zap.Error(err),
126+
)
127+
}
107128
continue
108129
}
109130

131+
p.node.Logger().Debug("Peer failed to be inserted into routing table as it's intended bucket is full.",
132+
zap.String("peer_id", id.String()),
133+
zap.String("peer_addr", id.Address),
134+
)
135+
110136
return
111137
}
112138
}
@@ -118,7 +144,14 @@ func (p *Protocol) OnPeerJoin(client *noise.Client) {
118144
}
119145

120146
// OnPeerLeave implements noise.Protocol and does nothing.
121-
func (p *Protocol) OnPeerLeave(client *noise.Client) {
147+
func (p *Protocol) OnPeerLeave(*noise.Client) {
148+
}
149+
150+
// OnPingFailed implements noise.Protocol and evicts peers that your node has failed to dial.
151+
func (p *Protocol) OnPingFailed(addr string, err error) {
152+
if p.table.DeleteByAddress(addr) {
153+
p.node.Logger().Debug("Peer was evicted from routing table by failing to be dialed.", zap.Error(err))
154+
}
122155
}
123156

124157
// OnMessageSent implements noise.Protocol and attempts to push the position in which the clients ID resides in

‎kademlia/protocol_test.go

+4
Original file line numberDiff line numberDiff line change
@@ -90,6 +90,8 @@ func TestTableEviction(t *testing.T) {
9090

9191
before := overlay.Table().Bucket(nodes[0].ID().ID)
9292
assert.Len(t, before, kademlia.BucketSize)
93+
assert.EqualValues(t, kademlia.BucketSize+1, overlay.Table().NumEntries())
94+
assert.EqualValues(t, overlay.Table().NumEntries(), len(overlay.Table().Entries()))
9395

9496
// Close the node that is at the bottom of the bucket.
9597

@@ -114,6 +116,8 @@ func TestTableEviction(t *testing.T) {
114116

115117
after := overlay.Table().Bucket(nodes[0].ID().ID)
116118
assert.Len(t, after, kademlia.BucketSize)
119+
assert.EqualValues(t, kademlia.BucketSize+1, overlay.Table().NumEntries())
120+
assert.EqualValues(t, overlay.Table().NumEntries(), len(overlay.Table().Entries()))
117121

118122
assert.EqualValues(t, after[0].Address, follower.Addr())
119123
assert.NotContains(t, after, nodes[0].ID())

‎kademlia/table.go

+53-7
Original file line numberDiff line numberDiff line change
@@ -12,14 +12,15 @@ type Table struct {
1212

1313
entries [noise.SizePublicKey * 8][]noise.ID
1414
self noise.ID
15+
size int
1516
}
1617

1718
// NewTable instantiates a new routing table whose XOR distance metric is defined with respect to some
1819
// given ID.
1920
func NewTable(self noise.ID) *Table {
2021
table := &Table{self: self}
2122

22-
if err := table.Update(self); err != nil {
23+
if _, err := table.Update(self); err != nil {
2324
panic(err)
2425
}
2526

@@ -41,10 +42,11 @@ func (t *Table) Bucket(target noise.PublicKey) []noise.ID {
4142

4243
// Update attempts to insert the target node/peer ID into this routing table. If the bucket it was expected
4344
// to be inserted within is full, ErrBucketFull is returned. If the ID already exists in its respective routing
44-
// table bucket, it is moved to the head of the bucket.
45-
func (t *Table) Update(target noise.ID) error {
45+
// table bucket, it is moved to the head of the bucket and false is returned. If the ID has yet to exist, it is
46+
// appended to the head of its intended bucket and true is returned.
47+
func (t *Table) Update(target noise.ID) (bool, error) {
4648
if target.ID == noise.ZeroPublicKey {
47-
return nil
49+
return false, nil
4850
}
4951

5052
t.Lock()
@@ -55,18 +57,19 @@ func (t *Table) Update(target noise.ID) error {
5557
for i, id := range t.entries[idx] {
5658
if id.ID == target.ID { // Found the target ID already inside the routing table.
5759
t.entries[idx] = append(append([]noise.ID{target}, t.entries[idx][:i]...), t.entries[idx][i+1:]...)
58-
return nil
60+
return false, nil
5961
}
6062
}
6163

6264
if len(t.entries[idx]) < BucketSize { // The bucket is not yet under full capacity.
6365
t.entries[idx] = append([]noise.ID{target}, t.entries[idx]...)
64-
return nil
66+
t.size++
67+
return true, nil
6568
}
6669

6770
// The bucket is at full capacity. Return ErrBucketFull.
6871

69-
return fmt.Errorf("cannot insert id %x into routing table: %w", target.ID, ErrBucketFull)
72+
return false, fmt.Errorf("cannot insert id %x into routing table: %w", target.ID, ErrBucketFull)
7073
}
7174

7275
// Recorded returns true if target is already recorded in this routing table.
@@ -93,13 +96,32 @@ func (t *Table) Delete(target noise.PublicKey) bool {
9396
for i, id := range t.entries[idx] {
9497
if id.ID == target {
9598
t.entries[idx] = append(t.entries[idx][:i], t.entries[idx][i+1:]...)
99+
t.size--
96100
return true
97101
}
98102
}
99103

100104
return false
101105
}
102106

107+
// DeleteByAddress removes the first occurrence of an id with target as its address from this routing table.
108+
func (t *Table) DeleteByAddress(target string) bool {
109+
t.Lock()
110+
defer t.Unlock()
111+
112+
for i, bucket := range t.entries {
113+
for j, id := range bucket {
114+
if id.Address == target {
115+
t.entries[i] = append(t.entries[i][:j], t.entries[i][j+1:]...)
116+
t.size--
117+
return true
118+
}
119+
}
120+
}
121+
122+
return false
123+
}
124+
103125
// Peers returns BucketSize closest peer IDs to the ID which this routing table's distance metric is defined against.
104126
func (t *Table) Peers() []noise.ID {
105127
return t.FindClosest(t.self.ID, BucketSize)
@@ -143,6 +165,30 @@ func (t *Table) FindClosest(target noise.PublicKey, k int) []noise.ID {
143165
return closest
144166
}
145167

168+
// Entries returns all stored ids in this routing table.
169+
func (t *Table) Entries() []noise.ID {
170+
t.RLock()
171+
defer t.RUnlock()
172+
173+
entries := make([]noise.ID, 0, t.size)
174+
175+
for _, bucket := range t.entries {
176+
for _, id := range bucket {
177+
entries = append(entries, id)
178+
}
179+
}
180+
181+
return entries
182+
}
183+
184+
// NumEntries returns the total amount of ids stored in this routing table.
185+
func (t *Table) NumEntries() int {
186+
t.RLock()
187+
defer t.RUnlock()
188+
189+
return t.size
190+
}
191+
146192
func (t *Table) getBucketIndex(target noise.PublicKey) int {
147193
l := PrefixLen(XOR(target[:], t.self.ID[:]))
148194
if l == noise.SizePublicKey*8 {

‎mod.go

+3
Original file line numberDiff line numberDiff line change
@@ -32,6 +32,9 @@ type Protocol interface {
3232
// has been terminated.
3333
OnPeerLeave(client *Client)
3434

35+
// OnPingFailed is called whenever any attempt by a node to dial a peer at addr fails.
36+
OnPingFailed(addr string, err error)
37+
3538
// OnMessageSent is called whenever a message or request is successfully sent to a peer.
3639
OnMessageSent(client *Client)
3740

‎msg.go

+6
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,7 @@ import (
44
"encoding/binary"
55
"errors"
66
"go.uber.org/atomic"
7+
"go.uber.org/zap"
78
"io"
89
)
910

@@ -43,6 +44,11 @@ func (ctx *HandlerContext) ID() ID {
4344
return ctx.client.ID()
4445
}
4546

47+
// Logger returns the logger instance associated to the inbound/outbound peer being handled.
48+
func (ctx *HandlerContext) Logger() *zap.Logger {
49+
return ctx.client.Logger()
50+
}
51+
4652
// Data returns the raw bytes that some peer has sent to you.
4753
//
4854
// Data may be called concurrently.

‎node.go

+25-12
Original file line numberDiff line numberDiff line change
@@ -43,9 +43,9 @@ type Node struct {
4343
outbound *clientMap
4444
inbound *clientMap
4545

46-
codec *codec
47-
binders []Protocol
48-
handlers []Handler
46+
codec *codec
47+
protocols []Protocol
48+
handlers []Handler
4949

5050
kill chan error
5151
}
@@ -140,8 +140,8 @@ func (n *Node) Listen() error {
140140

141141
n.id = NewID(n.publicKey, n.host, n.port)
142142

143-
for _, binder := range n.binders {
144-
if err = binder.Bind(n); err != nil {
143+
for _, protocol := range n.protocols {
144+
if err = protocol.Bind(n); err != nil {
145145
return err
146146
}
147147
}
@@ -153,6 +153,13 @@ func (n *Node) Listen() error {
153153
n.listening.Store(true)
154154
defer n.listening.Store(false)
155155

156+
n.logger.Info("Listening for incoming peers.",
157+
zap.String("bind_addr", addr.String()),
158+
zap.String("id_addr", n.id.Address),
159+
zap.String("public_key", n.publicKey.String()),
160+
zap.String("private_key", n.privateKey.String()),
161+
)
162+
156163
for {
157164
conn, err := n.listener.Accept()
158165
if err != nil {
@@ -260,8 +267,8 @@ func (n *Node) Send(ctx context.Context, addr string, data []byte) error {
260267
return err
261268
}
262269

263-
for _, binder := range c.node.binders {
264-
binder.OnMessageSent(c)
270+
for _, protocol := range c.node.protocols {
271+
protocol.OnMessageSent(c)
265272
}
266273

267274
return nil
@@ -292,8 +299,8 @@ func (n *Node) Request(ctx context.Context, addr string, data []byte) ([]byte, e
292299
return nil, err
293300
}
294301

295-
for _, binder := range c.node.binders {
296-
binder.OnMessageSent(c)
302+
for _, protocol := range c.node.protocols {
303+
protocol.OnMessageSent(c)
297304
}
298305

299306
return msg.data, nil
@@ -368,7 +375,13 @@ func (n *Node) dialIfNotExists(ctx context.Context, addr string) (*Client, error
368375
}
369376
}
370377

371-
return nil, fmt.Errorf("attempted to dial %s several times but failed: %w", addr, err)
378+
err = fmt.Errorf("attempted to dial %s several times but failed: %w", addr, err)
379+
380+
for _, protocol := range n.protocols {
381+
protocol.OnPingFailed(addr, err)
382+
}
383+
384+
return nil, err
372385
}
373386

374387
// Bind registers a Protocol to this node, which implements callbacks for all events this node can emit throughout
@@ -377,12 +390,12 @@ func (n *Node) dialIfNotExists(ctx context.Context, addr string) (*Client, error
377390
// for new peers, Bind silently returns and does nothing.
378391
//
379392
// Bind may be called concurrently.
380-
func (n *Node) Bind(binders ...Protocol) {
393+
func (n *Node) Bind(protocols ...Protocol) {
381394
if n.listening.Load() {
382395
return
383396
}
384397

385-
n.binders = append(n.binders, binders...)
398+
n.protocols = append(n.protocols, protocols...)
386399
}
387400

388401
// Handle registers a Handler to this node, which is executed every time this node receives a message from an

0 commit comments

Comments
 (0)
Please sign in to comment.