Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add Persistence Layer on top of PubSub #33

Merged
merged 28 commits into from
Aug 19, 2019
Merged
Changes from 1 commit
Commits
Show all changes
28 commits
Select commit Hold shift + click to select a range
43ebd02
removed bootstrapping functionality
aschmahmann May 12, 2019
f57d48c
start pubsub validator changes to implement LWW pubsub
aschmahmann May 22, 2019
e2a5fca
Merge branch 'master' into feat/persistence
aschmahmann May 31, 2019
ff89016
Implement persistence --temporary
aschmahmann Jun 14, 2019
90c980a
Added message protobuf that was missing from previous commit
aschmahmann Jun 21, 2019
3ae7b1f
Use changes from ongoing pubsub PRs
aschmahmann Jul 3, 2019
6fb9435
removed unused protobufs
aschmahmann Jul 3, 2019
f3f8dd4
records past EOL should fail
aschmahmann Jul 3, 2019
9e9d778
reorder imports
aschmahmann Jul 3, 2019
a24054d
Improved the get-latest protocol (protobufs for request and response,…
aschmahmann Aug 9, 2019
092d0e1
better context cancel in get-latest protocol
aschmahmann Aug 9, 2019
3bf24fa
restore bootstrapping ... for now
aschmahmann Aug 9, 2019
e9b0864
In get-latest tests wait a bit after connecting hosts so they have ti…
aschmahmann Aug 9, 2019
f85f2bc
Changed get-latest protocol to have responses with a status code.
aschmahmann Aug 9, 2019
9756263
get-latest responds with ERR message even when sender sends an incorr…
aschmahmann Aug 9, 2019
a4d4d6c
Removed ERR from protobuf and we just reset the stream when we encoun…
aschmahmann Aug 9, 2019
97ac549
protobuf Makefile supports spaces in path name
aschmahmann Aug 9, 2019
b0eeb77
fixed potential goroutine leak. switched order of protobuf fields.
aschmahmann Aug 11, 2019
4fbc97b
Small Makefile refactor
aschmahmann Aug 11, 2019
aa737d1
changed get-latest protocol to be called fetch. some refactoring
aschmahmann Aug 12, 2019
ded823d
renamed protocol files to match protocol rename
aschmahmann Aug 12, 2019
76d8fca
made function passed into the fetch protocol a typedef
aschmahmann Aug 14, 2019
1913974
renames in the fetch protobufs
aschmahmann Aug 15, 2019
fdbeaec
Makefile is more MSYS friendly but you still need a weird GOPATH
aschmahmann Aug 15, 2019
b303f91
Updated go.mod to use unreleased version of pubsub. Refactored Fetch …
aschmahmann Aug 16, 2019
0787cb5
rebroadcast initial delay using timer
aschmahmann Aug 16, 2019
41f6fb8
Added Error status code to Fetch protobuf. Currently unused.
aschmahmann Aug 16, 2019
981b38c
use go-libp2p-pubsub v0.1.1. Fix `Fetch` function to be a pointer rec…
aschmahmann Aug 19, 2019
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Prev Previous commit
Next Next commit
Implement persistence --temporary
aschmahmann committed Jun 14, 2019

Unverified

The committer email address is not verified.
commit ff890161a5a23096d884eecdd48f092851286689
6 changes: 4 additions & 2 deletions go.mod
Original file line number Diff line number Diff line change
@@ -1,15 +1,17 @@
module github.com/libp2p/go-libp2p-pubsub-router

require (
github.com/ipfs/go-cid v0.0.2
github.com/gogo/protobuf v1.2.1
github.com/ipfs/go-datastore v0.0.5
github.com/ipfs/go-ipfs-ds-help v0.0.1
github.com/ipfs/go-ipfs-util v0.0.1
github.com/ipfs/go-log v0.0.1
github.com/libp2p/go-libp2p-blankhost v0.1.1
github.com/libp2p/go-libp2p-core v0.0.1
github.com/libp2p/go-libp2p-net v0.1.0
github.com/libp2p/go-libp2p-pubsub v0.1.0
github.com/libp2p/go-libp2p-record v0.1.0
github.com/libp2p/go-libp2p-routing-helpers v0.1.0
github.com/libp2p/go-libp2p-swarm v0.1.0
)

replace github.com/libp2p/go-libp2p-pubsub v0.1.0 => ../../libp2p/go-libp2p-pubsub
3 changes: 3 additions & 0 deletions go.sum
Original file line number Diff line number Diff line change
@@ -86,11 +86,14 @@ github.com/libp2p/go-libp2p-core v0.0.1 h1:HSTZtFIq/W5Ue43Zw+uWZyy2Vl5WtF0zDjKN8
github.com/libp2p/go-libp2p-core v0.0.1/go.mod h1:g/VxnTZ/1ygHxH3dKok7Vno1VfpvGcGip57wjTU4fco=
github.com/libp2p/go-libp2p-crypto v0.1.0 h1:k9MFy+o2zGDNGsaoZl0MA3iZ75qXxr9OOoAZF+sD5OQ=
github.com/libp2p/go-libp2p-crypto v0.1.0/go.mod h1:sPUokVISZiy+nNuTTH/TY+leRSxnFj/2GLjtOTW90hI=
github.com/libp2p/go-libp2p-discovery v0.1.0/go.mod h1:4F/x+aldVHjHDHuX85x1zWoFTGElt8HnoDzwkFZm29g=
github.com/libp2p/go-libp2p-loggables v0.1.0 h1:h3w8QFfCt2UJl/0/NW4K829HX/0S4KD31PQ7m8UXXO8=
github.com/libp2p/go-libp2p-loggables v0.1.0/go.mod h1:EyumB2Y6PrYjr55Q3/tiJ/o3xoDasoRYM7nOzEpoa90=
github.com/libp2p/go-libp2p-mplex v0.2.0/go.mod h1:Ejl9IyjvXJ0T9iqUTE1jpYATQ9NM3g+OtR+EMMODbKo=
github.com/libp2p/go-libp2p-mplex v0.2.1 h1:E1xaJBQnbSiTHGI1gaBKmKhu1TUKkErKJnE8iGvirYI=
github.com/libp2p/go-libp2p-mplex v0.2.1/go.mod h1:SC99Rxs8Vuzrf/6WhmH41kNn13TiYdAWNYHrwImKLnE=
github.com/libp2p/go-libp2p-net v0.1.0 h1:3t23V5cR4GXcNoFriNoZKFdUZEUDZgUkvfwkD2INvQE=
github.com/libp2p/go-libp2p-net v0.1.0/go.mod h1:R5VZbutk75tkC5YJJS61OCO1NWoajxYjCEV2RoHh3FY=
github.com/libp2p/go-libp2p-peer v0.2.0 h1:EQ8kMjaCUwt/Y5uLgjT8iY2qg0mGUT0N1zUjer50DsY=
github.com/libp2p/go-libp2p-peer v0.2.0/go.mod h1:RCffaCvUyW2CJmG2gAWVqwePwW7JMgxjsHm7+J5kjWY=
github.com/libp2p/go-libp2p-peerstore v0.1.0 h1:MKh7pRNPHSh1fLPj8u/M/s/napdmeNpoi9BRy9lPN0E=
244 changes: 217 additions & 27 deletions pubsub.go
Original file line number Diff line number Diff line change
@@ -4,28 +4,31 @@ import (
"bytes"
"context"
"encoding/base64"
"errors"
"sync"
"time"

"encoding/binary"
"fmt"
"github.com/libp2p/go-libp2p-core/host"
"github.com/libp2p/go-libp2p-core/peer"
"github.com/libp2p/go-libp2p-core/protocol"
"github.com/libp2p/go-libp2p-core/routing"
"github.com/libp2p/go-libp2p-net"

pubsub "github.com/libp2p/go-libp2p-pubsub"
record "github.com/libp2p/go-libp2p-record"

"github.com/ipfs/go-cid"
ds "github.com/ipfs/go-datastore"
dssync "github.com/ipfs/go-datastore/sync"
dshelp "github.com/ipfs/go-ipfs-ds-help"
u "github.com/ipfs/go-ipfs-util"
logging "github.com/ipfs/go-log"
pb "github.com/libp2p/go-libp2p-pubsub-router/pb"
"io"
"sync"
"time"
)

var log = logging.Logger("pubsub-valuestore")

const OnJoinProto = protocol.ID("/psOnJoin/0.0.1")

type watchGroup struct {
// Note: this chan must be buffered, see notifyWatchers
listeners map[chan []byte]struct{}
@@ -36,6 +39,11 @@ type PubsubValueStore struct {
ds ds.Datastore
ps *pubsub.PubSub

host host.Host

rebroadcastInitialDelay time.Duration
rebroadcastInterval time.Duration

// Map of keys to subscriptions.
//
// If a key is present but the subscription is nil, we've bootstrapped
@@ -60,17 +68,44 @@ func KeyToTopic(key string) string {
// The constructor interface is complicated by the need to bootstrap the pubsub topic.
// This could be greatly simplified if the pubsub implementation handled bootstrap itself
func NewPubsubValueStore(ctx context.Context, host host.Host, cr routing.ContentRouting, ps *pubsub.PubSub, validator record.Validator) *PubsubValueStore {
return &PubsubValueStore{
psValueStore := &PubsubValueStore{
ctx: ctx,

ds: dssync.MutexWrap(ds.NewMapDatastore()),
ps: ps,
ds: dssync.MutexWrap(ds.NewMapDatastore()),
ps: ps,
host: host,
rebroadcastInitialDelay: 100 * time.Millisecond,
rebroadcastInterval: time.Second,

subs: make(map[string]*pubsub.Subscription),
watching: make(map[string]*watchGroup),

Validator: validator,
}

host.SetStreamHandler(OnJoinProto, func(s net.Stream) {
defer net.FullClose(s)

msgData, err := readBytes(s)
if err != nil {
return
}

msg := pb.RequestLatest{}
if msg.Unmarshal(msgData) != nil {
return
}

response, err := psValueStore.getLocal(*msg.Identifier)
if err != nil {
return
}

if writeBytes(s, response) != nil {
return
}
})
return psValueStore
}

// Publish publishes an IPNS record through pubsub with default TTL
@@ -84,29 +119,39 @@ func (p *PubsubValueStore) PutValue(ctx context.Context, key string, value []byt
}

log.Debugf("PubsubPublish: publish value for key", key)
return p.ps.Publish(topic, value)
done := make(chan error, 1)
go func() {
done <- p.ps.Publish(topic, value)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hm. Publish is really going to need to take a context: libp2p/go-libp2p-pubsub#184 (comment).

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, removing all the bootstrapping code makes #28 really obvious. It also makes it easy to see that Publish 's lack of a context while potentially workable is quite problematic if we expect Publish to be blocking after libp2p/go-libp2p-pubsub#184 lands.

}()

select {
case err := <-done:
return err
case <-ctx.Done():
return ctx.Err()
}
}

func (p *PubsubValueStore) isBetter(key string, val []byte) bool {
func (p *PubsubValueStore) isBetter(key string, val []byte) (isBetter bool, isEqual bool) {
if p.Validator.Validate(key, val) != nil {
return false
return false, false
}

old, err := p.getLocal(key)

// Same record is not better
if old != nil && bytes.Equal(old, val) {
return false
return false, true
}

if err != nil {
// If the old one is invalid and is not identical to the new record we assume the new one is better.
// Perhaps we should have levels of invalid (e.g. EOL vs bad formatting)
return true
return true, false
}

i, err := p.Validator.Select(key, [][]byte{val, old})
return err == nil && i == 0
return err == nil && i == 0, false
}

func (p *PubsubValueStore) Subscribe(key string) error {
@@ -125,8 +170,14 @@ func (p *PubsubValueStore) Subscribe(key string) error {
// record hasn't expired.
//
// Also, make sure to do this *before* subscribing.
_ = p.ps.RegisterTopicValidator(topic, func(ctx context.Context, _ peer.ID, msg *pubsub.Message) bool {
return p.isBetter(key, msg.GetData())
myID := p.host.ID()
_ = p.ps.RegisterTopicValidator(topic, func(ctx context.Context, src peer.ID, msg *pubsub.Message) bool {
isBetter, isEqual := p.isBetter(key, msg.GetData())

if src == myID && isEqual {
return true
}
return isBetter
})

sub, err := p.ps.Subscribe(topic)
@@ -147,11 +198,32 @@ func (p *PubsubValueStore) Subscribe(key string) error {
go p.handleSubscription(sub, key)
p.mx.Unlock()

go p.rebroadcast(key)
log.Debugf("PubsubResolve: subscribed to %s", key)

return nil
}

func (p *PubsubValueStore) rebroadcast(key string) {
topic := KeyToTopic(key)
time.Sleep(p.rebroadcastInitialDelay)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit for later: this should use a select and a time.After.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done, although if we really wanted we might be able to up efficiency slightly by reusing a Timer instead of a ticker.


ticker := time.NewTicker(p.rebroadcastInterval)
defer ticker.Stop()

for {
select {
case <-ticker.C:
val, _ := p.getLocal(key)
if val != nil {
_ = p.ps.Publish(topic, val)
}
case <-p.ctx.Done():
return
}
}
}

func (p *PubsubValueStore) getLocal(key string) ([]byte, error) {
val, err := p.ds.Get(dshelp.NewKeyFromBinary([]byte(key)))
if err != nil {
@@ -268,7 +340,7 @@ func (p *PubsubValueStore) Cancel(name string) (bool, error) {
p.watchLk.Lock()
if _, wok := p.watching[name]; wok {
p.watchLk.Unlock()
return false, errors.New("key has active subscriptions")
return false, fmt.Errorf("key has active subscriptions")
}
p.watchLk.Unlock()

@@ -284,22 +356,92 @@ func (p *PubsubValueStore) Cancel(name string) (bool, error) {
func (p *PubsubValueStore) handleSubscription(sub *pubsub.Subscription, key string) {
defer sub.Cancel()

for {
msg, err := sub.Next(p.ctx)
if err != nil {
if err != context.Canceled {
log.Warningf("PubsubResolve: subscription error in %s: %s", key, err.Error())
newMsg := make(chan []byte)
go func() {
for {
data, err := p.handleNewMsgs(sub, key)
if err != nil {
close(newMsg)
return
}
return
newMsg <- data
}
}()

newPeerData := make(chan []byte)
go func() {
for {
data, err := p.handleNewPeer(sub, key)
if err != nil {
close(newPeerData)
return
}
newPeerData <- data
}
}()

for {
var data []byte
select {
case data = <-newMsg:
case data = <-newPeerData:
}
if p.isBetter(key, msg.GetData()) {
err := p.ds.Put(dshelp.NewKeyFromBinary([]byte(key)), msg.GetData())

if isBetter, _ := p.isBetter(key, data); isBetter {
err := p.ds.Put(dshelp.NewKeyFromBinary([]byte(key)), data)
if err != nil {
log.Warningf("PubsubResolve: error writing update for %s: %s", key, err)
}
p.notifyWatchers(key, msg.GetData())
p.notifyWatchers(key, data)
}
}
}

func (p *PubsubValueStore) handleNewMsgs(sub *pubsub.Subscription, key string) ([]byte, error) {
msg, err := sub.Next(p.ctx)
if err != nil {
if err != context.Canceled {
log.Warningf("PubsubResolve: subscription error in %s: %s", key, err.Error())
}
return nil, err
}
return msg.GetData(), nil
}

func (p *PubsubValueStore) handleNewPeer(sub *pubsub.Subscription, key string) ([]byte, error) {
peer, err := sub.NextPeerJoin(p.ctx)
if err != nil {
if err != context.Canceled {
log.Warningf("PubsubNewPeer: subscription error in %s: %s", key, err.Error())
}
return nil, err
}

peerCtx, _ := context.WithTimeout(p.ctx, time.Second*10)
s, err := p.host.NewStream(peerCtx, peer, OnJoinProto)
if err != nil {
return nil, err
}
defer net.FullClose(s)

msg := pb.RequestLatest{Identifier: &key}
msgData, err := msg.Marshal()
if err != nil {
return nil, err
}

if writeBytes(s, msgData) != nil {
return nil, err
}

s.Close()

response, err := readBytes(s)
if err != nil {
return nil, err
}

return response, nil
}

func (p *PubsubValueStore) notifyWatchers(key string, data []byte) {
@@ -318,3 +460,51 @@ func (p *PubsubValueStore) notifyWatchers(key string, data []byte) {
}
}
}

const sizeLengthBytes = 8

// readNumBytesFromReader reads a specific number of bytes from a Reader, or returns an error
func readNumBytesFromReader(r io.Reader, numBytes uint64) ([]byte, error) {
data := make([]byte, numBytes)
n, err := io.ReadFull(r, data)
if err != nil {
return data, err
} else if uint64(n) != numBytes {
return data, fmt.Errorf("Could not read full length from stream")
}
return data, nil
}

func readBytes(r io.Reader) ([]byte, error) {
// Protocol: uint64 MessageLength followed by byte[] MarshalledMessage

sizeData, err := readNumBytesFromReader(r, sizeLengthBytes)
if err != nil {
return nil, err
}

size := binary.LittleEndian.Uint64(sizeData)
data, err := readNumBytesFromReader(r, size)
if err != nil {
return nil, err
}

return data, nil
}

func writeBytes(w io.Writer, data []byte) error {
size := len(data)

// Protocol: uint64 MessageLength followed by byte[] MarshalledMessage
sizeData := make([]byte, sizeLengthBytes)
binary.LittleEndian.PutUint64(sizeData, uint64(size))

_, err := w.Write(sizeData)
if err != nil {
return err
}

_, err = w.Write(data)

return err
}