Skip to content

Commit 7d5866b

Browse files
author
Antoine Eddi
authored
Merge pull request #11 from aeddi/fix/lost-messages
2 parents 25edabf + b71218b commit 7d5866b

File tree

9 files changed

+91
-46
lines changed

9 files changed

+91
-46
lines changed

orbitdb.go

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -71,7 +71,7 @@ func (n *NewOrbitDBOptions) applyDefaults() {
7171
}
7272

7373
if n.MessageKeystore == nil {
74-
n.MessageKeystore = cryptoutil.NewMessageKeystore(datastoreutil.NewNamespacedDatastore(n.Datastore, datastore.NewKey(datastoreutil.NamespaceMessageKeystore)))
74+
n.MessageKeystore = cryptoutil.NewMessageKeystore(datastoreutil.NewNamespacedDatastore(n.Datastore, datastore.NewKey(datastoreutil.NamespaceMessageKeystore)), n.Logger)
7575
}
7676

7777
if n.DeviceKeystore == nil {
@@ -237,7 +237,7 @@ func (s *BertyOrbitDB) openAccountGroup(ctx context.Context, options *orbitdb.Cr
237237

238238
gc, err := s.OpenGroup(ctx, g, options)
239239
if err != nil {
240-
return nil, errcode.TODO.Wrap(err)
240+
return nil, errcode.ErrGroupOpen.Wrap(err)
241241
}
242242

243243
l.Debug("Opened account group", tyber.FormatStepLogFields(ctx, []tyber.Detail{})...)

pkg/bertypush/push_handler.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -98,7 +98,7 @@ func (opts *PushHandlerOpts) applyPushDefaults() error {
9898
}
9999

100100
if opts.MessageKeystore == nil {
101-
opts.MessageKeystore = cryptoutil.NewMessageKeystore(datastoreutil.NewNamespacedDatastore(opts.RootDatastore, ds.NewKey(datastoreutil.NamespaceMessageKeystore)))
101+
opts.MessageKeystore = cryptoutil.NewMessageKeystore(datastoreutil.NewNamespacedDatastore(opts.RootDatastore, ds.NewKey(datastoreutil.NamespaceMessageKeystore)), opts.Logger)
102102
}
103103

104104
return nil

pkg/cryptoutil/keystore_message.go

Lines changed: 47 additions & 34 deletions
Original file line numberDiff line numberDiff line change
@@ -10,9 +10,11 @@ import (
1010
"github.com/ipfs/go-datastore"
1111
dssync "github.com/ipfs/go-datastore/sync"
1212
"github.com/libp2p/go-libp2p/core/crypto"
13+
"go.uber.org/zap"
1314
"golang.org/x/crypto/nacl/secretbox"
1415

1516
"berty.tech/weshnet/pkg/errcode"
17+
"berty.tech/weshnet/pkg/logutil"
1618
"berty.tech/weshnet/pkg/protocoltypes"
1719
)
1820

@@ -22,6 +24,7 @@ type MessageKeystore struct {
2224
lock sync.Mutex
2325
preComputedKeysCount int
2426
store *dssync.MutexDatastore
27+
logger *zap.Logger
2528
}
2629

2730
type DecryptInfo struct {
@@ -204,6 +207,10 @@ func (m *MessageKeystore) registerChainKey(ctx context.Context, g *protocoltypes
204207

205208
if _, err := m.GetDeviceChainKey(ctx, groupPK, devicePK); err == nil {
206209
// Device is already registered, ignore it
210+
m.logger.Debug("device already registered in group",
211+
logutil.PrivateBinary("devicePK", logutil.CryptoKeyToBytes(devicePK)),
212+
logutil.PrivateBinary("groupPK", logutil.CryptoKeyToBytes(groupPK)),
213+
)
207214
return nil
208215
}
209216

@@ -227,8 +234,7 @@ func (m *MessageKeystore) registerChainKey(ctx context.Context, g *protocoltypes
227234
devicePKBytes, err := devicePK.Raw()
228235
if err == nil {
229236
if err := m.UpdatePushGroupReferences(ctx, devicePKBytes, ds.Counter, g); err != nil {
230-
// TODO: log
231-
_ = err
237+
m.logger.Error("updating push group references failed", zap.Error(err))
232238
}
233239
}
234240

@@ -330,47 +336,49 @@ type computedKey struct {
330336
}
331337

332338
func (m *MessageKeystore) putPrecomputedKeys(ctx context.Context, groupPK, device crypto.PubKey, preComputedKeys ...computedKey) error {
333-
if m == nil || len(preComputedKeys) == 0 {
339+
if m == nil {
334340
return errcode.ErrInvalidInput
335341
}
336342

337-
deviceRaw, err := device.Raw()
338-
if err != nil {
339-
return errcode.ErrSerialization.Wrap(err)
340-
}
343+
if len(preComputedKeys) != 0 {
344+
deviceRaw, err := device.Raw()
345+
if err != nil {
346+
return errcode.ErrSerialization.Wrap(err)
347+
}
341348

342-
groupRaw, err := groupPK.Raw()
343-
if err != nil {
344-
return errcode.ErrSerialization.Wrap(err)
345-
}
349+
groupRaw, err := groupPK.Raw()
350+
if err != nil {
351+
return errcode.ErrSerialization.Wrap(err)
352+
}
353+
354+
batch, err := m.store.Batch(ctx)
355+
if err == datastore.ErrBatchUnsupported {
356+
for _, preComputedKey := range preComputedKeys {
357+
id := idForCachedKey(groupRaw, deviceRaw, preComputedKey.counter)
358+
359+
if err := m.store.Put(ctx, id, preComputedKey.mk[:]); err != nil {
360+
return errcode.ErrMessageKeyPersistencePut.Wrap(err)
361+
}
362+
}
363+
364+
return nil
365+
} else if err != nil {
366+
return errcode.ErrMessageKeyPersistencePut.Wrap(err)
367+
}
346368

347-
batch, err := m.store.Batch(ctx)
348-
if err == datastore.ErrBatchUnsupported {
349369
for _, preComputedKey := range preComputedKeys {
350370
id := idForCachedKey(groupRaw, deviceRaw, preComputedKey.counter)
351371

352-
if err := m.store.Put(ctx, id, preComputedKey.mk[:]); err != nil {
372+
if err := batch.Put(ctx, id, preComputedKey.mk[:]); err != nil {
353373
return errcode.ErrMessageKeyPersistencePut.Wrap(err)
354374
}
355375
}
356376

357-
return nil
358-
} else if err != nil {
359-
return errcode.ErrMessageKeyPersistencePut.Wrap(err)
360-
}
361-
362-
for _, preComputedKey := range preComputedKeys {
363-
id := idForCachedKey(groupRaw, deviceRaw, preComputedKey.counter)
364-
365-
if err := batch.Put(ctx, id, preComputedKey.mk[:]); err != nil {
377+
if err := batch.Commit(ctx); err != nil {
366378
return errcode.ErrMessageKeyPersistencePut.Wrap(err)
367379
}
368380
}
369381

370-
if err := batch.Commit(ctx); err != nil {
371-
return errcode.ErrMessageKeyPersistencePut.Wrap(err)
372-
}
373-
374382
return nil
375383
}
376384

@@ -644,18 +652,23 @@ func (m *MessageKeystore) updateCurrentKey(ctx context.Context, groupPK, pk cryp
644652
}
645653

646654
// NewMessageKeystore instantiate a new MessageKeystore
647-
func NewMessageKeystore(s datastore.Datastore) *MessageKeystore {
655+
func NewMessageKeystore(s datastore.Datastore, logger *zap.Logger) *MessageKeystore {
656+
if logger == nil {
657+
logger = zap.NewNop()
658+
}
659+
648660
return &MessageKeystore{
649661
preComputedKeysCount: 100,
650662
store: dssync.MutexWrap(s),
663+
logger: logger.Named("message-ks"),
651664
}
652665
}
653666

654667
// nolint:deadcode,unused // NewInMemMessageKeystore instantiate a new MessageKeystore, useful for testing
655-
func NewInMemMessageKeystore() (*MessageKeystore, func()) {
668+
func NewInMemMessageKeystore(logger *zap.Logger) (*MessageKeystore, func()) {
656669
ds := dssync.MutexWrap(datastore.NewMapDatastore())
657670

658-
return NewMessageKeystore(ds), func() { _ = ds.Close() }
671+
return NewMessageKeystore(ds, logger), func() { _ = ds.Close() }
659672
}
660673

661674
func (m *MessageKeystore) OpenOutOfStoreMessage(ctx context.Context, envelope *protocoltypes.OutOfStoreMessage, groupPublicKey []byte) ([]byte, bool, error) {
@@ -777,12 +790,12 @@ func (m *MessageKeystore) UpdatePushGroupReferences(ctx context.Context, deviceP
777790
for i := 0; i < len(refsExisting); i++ {
778791
ref, err := CreatePushGroupReference(devicePK, refsExisting[i], groupPushSecret)
779792
if err != nil {
780-
// TODO: log
793+
m.logger.Error("creating existing push group reference failed", logutil.PrivateBinary("ref", ref), zap.Error(err))
781794
continue
782795
}
783796

784797
if err := m.store.Delete(ctx, m.refKey(ref)); err != nil {
785-
// TODO: log
798+
m.logger.Error("deleting existing push group reference failed", logutil.PrivateBinary("ref", ref), zap.Error(err))
786799
continue
787800
}
788801
}
@@ -791,12 +804,12 @@ func (m *MessageKeystore) UpdatePushGroupReferences(ctx context.Context, deviceP
791804
for i := 0; i < len(refsToCreate); i++ {
792805
ref, err := CreatePushGroupReference(devicePK, refsToCreate[i], groupPushSecret)
793806
if err != nil {
794-
// TODO: log
807+
m.logger.Error("creating new push group reference failed", logutil.PrivateBinary("ref", ref), zap.Error(err))
795808
continue
796809
}
797810

798811
if err := m.store.Put(ctx, m.refKey(ref), group.GetPublicKey()); err != nil {
799-
// TODO: log
812+
m.logger.Error("putting new push group reference failed", logutil.PrivateBinary("ref", ref), zap.Error(err))
800813
continue
801814
}
802815
}

pkg/cryptoutil/keystore_message_utils_test.go

Lines changed: 10 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -13,6 +13,7 @@ import (
1313
"github.com/libp2p/go-libp2p/core/crypto"
1414
"github.com/stretchr/testify/assert"
1515
"github.com/stretchr/testify/require"
16+
"go.uber.org/zap"
1617

1718
"berty.tech/weshnet"
1819
"berty.tech/weshnet/pkg/cryptoutil"
@@ -74,6 +75,7 @@ func mustMessageHeaders(t testing.TB, sk crypto.PrivKey, counter uint64, payload
7475
}
7576

7677
func Test_EncryptMessagePayload(t *testing.T) {
78+
logger := zap.NewNop()
7779
ctx, cancel := context.WithCancel(context.Background())
7880
defer cancel()
7981

@@ -100,10 +102,10 @@ func Test_EncryptMessagePayload(t *testing.T) {
100102

101103
omd2, err := acc2.MemberDeviceForGroup(g)
102104

103-
mkh1, cleanup := cryptoutil.NewInMemMessageKeystore()
105+
mkh1, cleanup := cryptoutil.NewInMemMessageKeystore(logger)
104106
defer cleanup()
105107

106-
mkh2, cleanup := cryptoutil.NewInMemMessageKeystore()
108+
mkh2, cleanup := cryptoutil.NewInMemMessageKeystore(logger)
107109
defer cleanup()
108110

109111
gc1 := weshnet.NewContextGroup(g, nil, nil, mkh1, omd1, nil)
@@ -263,14 +265,15 @@ func Test_EncryptMessagePayload(t *testing.T) {
263265
}
264266

265267
func Test_EncryptMessageEnvelope(t *testing.T) {
268+
logger := zap.NewNop()
266269
ctx, cancel := context.WithCancel(context.Background())
267270
defer cancel()
268271

269272
g, _, err := weshnet.NewGroupMultiMember()
270273
assert.NoError(t, err)
271274

272275
acc1 := cryptoutil.NewDeviceKeystore(keystore.NewMemKeystore(), nil)
273-
mkh1, cleanup := cryptoutil.NewInMemMessageKeystore()
276+
mkh1, cleanup := cryptoutil.NewInMemMessageKeystore(logger)
274277
defer cleanup()
275278

276279
omd1, err := acc1.MemberDeviceForGroup(g)
@@ -285,7 +288,7 @@ func Test_EncryptMessageEnvelope(t *testing.T) {
285288
assert.NoError(t, err)
286289

287290
acc2 := cryptoutil.NewDeviceKeystore(keystore.NewMemKeystore(), nil)
288-
mkh2, cleanup := cryptoutil.NewInMemMessageKeystore()
291+
mkh2, cleanup := cryptoutil.NewInMemMessageKeystore(logger)
289292
defer cleanup()
290293

291294
omd2, err := acc2.MemberDeviceForGroup(g)
@@ -318,6 +321,7 @@ func Test_EncryptMessageEnvelope(t *testing.T) {
318321
}
319322

320323
func Test_EncryptMessageEnvelopeAndDerive(t *testing.T) {
324+
logger := zap.NewNop()
321325
ctx, cancel := context.WithCancel(context.Background())
322326
defer cancel()
323327

@@ -336,10 +340,10 @@ func Test_EncryptMessageEnvelopeAndDerive(t *testing.T) {
336340
ds1, err := cryptoutil.NewDeviceSecret()
337341
assert.NoError(t, err)
338342

339-
mkh1, cleanup := cryptoutil.NewInMemMessageKeystore()
343+
mkh1, cleanup := cryptoutil.NewInMemMessageKeystore(logger)
340344
defer cleanup()
341345

342-
mkh2, cleanup := cryptoutil.NewInMemMessageKeystore()
346+
mkh2, cleanup := cryptoutil.NewInMemMessageKeystore(logger)
343347
defer cleanup()
344348

345349
err = mkh1.RegisterChainKey(ctx, g, omd1.PrivateDevice().GetPublic(), ds1, true)

pkg/logutil/crypto_utils.go

Lines changed: 21 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,21 @@
1+
package logutil
2+
3+
import (
4+
"encoding/base64"
5+
6+
"github.com/libp2p/go-libp2p/core/crypto"
7+
)
8+
9+
func CryptoKeyToBytes(key crypto.Key) []byte {
10+
keyBytes, err := key.Raw()
11+
if err != nil {
12+
return []byte{0}
13+
}
14+
15+
return keyBytes
16+
}
17+
18+
func CryptoKeyToBase64(key crypto.Key) string {
19+
bytes := CryptoKeyToBytes(key)
20+
return base64.StdEncoding.EncodeToString(bytes)
21+
}

pkg/tyber/step.go

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,7 @@ package tyber
22

33
import (
44
"context"
5+
"runtime/debug"
56

67
"go.uber.org/zap"
78
"go.uber.org/zap/zapcore"
@@ -33,6 +34,12 @@ func FormatStepLogFields(ctx context.Context, details []Detail, mutators ...Step
3334
for _, m := range mutators {
3435
s = m(s)
3536
}
37+
38+
// Add debug if a there is no parent trace ID
39+
if s.ParentTraceID == noTraceID {
40+
s.Details = append(s.Details, Detail{Name: "StackTrace", Description: string(debug.Stack())})
41+
}
42+
3643
return []zapcore.Field{
3744
zap.String("tyberLogType", string(StepType)),
3845
zap.Any("step", s),

service.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -123,7 +123,7 @@ func (opts *Opts) applyPushDefaults() error {
123123
}
124124

125125
if opts.MessageKeystore == nil {
126-
opts.MessageKeystore = cryptoutil.NewMessageKeystore(datastoreutil.NewNamespacedDatastore(opts.RootDatastore, ds.NewKey(datastoreutil.NamespaceMessageKeystore)))
126+
opts.MessageKeystore = cryptoutil.NewMessageKeystore(datastoreutil.NewNamespacedDatastore(opts.RootDatastore, ds.NewKey(datastoreutil.NamespaceMessageKeystore)), opts.Logger)
127127
}
128128

129129
return nil

store_message.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -217,7 +217,7 @@ func (m *MessageStore) processMessageLoop(ctx context.Context) {
217217
device.queue.Add(message)
218218
_ = m.emitters.groupCacheMessage.Emit(*message)
219219
} else {
220-
m.logger.Error("unable to prcess message", zap.Error(err))
220+
m.logger.Error("unable to process message", zap.Error(err))
221221
}
222222

223223
return

testing.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -383,7 +383,7 @@ func CreatePeersWithGroupTest(ctx context.Context, t testing.TB, pathBase string
383383
require.NoError(t, err, "deviceKeystore from existing keys")
384384
}
385385

386-
mk, cleanupMessageKeystore := cryptoutil.NewInMemMessageKeystore()
386+
mk, cleanupMessageKeystore := cryptoutil.NewInMemMessageKeystore(logger)
387387

388388
db, err := NewBertyOrbitDB(ctx, ca.API(), &NewOrbitDBOptions{
389389
NewOrbitDBOptions: orbitdb.NewOrbitDBOptions{

0 commit comments

Comments
 (0)