Skip to content

Commit 61ee175

Browse files
committed
Add transaction API for map primitive
1 parent 0431c9e commit 61ee175

File tree

6 files changed

+344
-9
lines changed

6 files changed

+344
-9
lines changed

go.mod

+3-3
Original file line numberDiff line numberDiff line change
@@ -3,9 +3,9 @@ module github.com/atomix/go-sdk
33
go 1.19
44

55
require (
6-
github.com/atomix/atomix/api v0.9.2
7-
github.com/atomix/atomix/protocols/rsm v0.5.7
8-
github.com/atomix/atomix/runtime v0.9.0
6+
github.com/atomix/atomix/api v1.1.0
7+
github.com/atomix/atomix/protocols/rsm v1.1.0
8+
github.com/atomix/atomix/runtime v1.1.0
99
github.com/atomix/atomix/sidecar v0.4.4
1010
github.com/gogo/protobuf v1.3.2
1111
github.com/golang/protobuf v1.5.2

go.sum

+6-6
Original file line numberDiff line numberDiff line change
@@ -2,12 +2,12 @@ cloud.google.com/go v0.26.0/go.mod h1:aQUYkXzVsufM+DwF1aE+0xfcU+56JwCaLick0ClmMT
22
cloud.google.com/go v0.34.0/go.mod h1:aQUYkXzVsufM+DwF1aE+0xfcU+56JwCaLick0ClmMTw=
33
github.com/BurntSushi/toml v0.3.1/go.mod h1:xHWCNGjB5oqiDr8zfno3MHue2Ht5sIBksp03qcyfWMU=
44
github.com/antihax/optional v1.0.0/go.mod h1:uupD/76wgC+ih3iEmQUL+0Ugr19nfwCT1kdvxnR2qWY=
5-
github.com/atomix/atomix/api v0.9.2 h1:BpT4zsXiKQWoIHQzGeCDQuC4BxcWvfrsMrPuUoSNLTI=
6-
github.com/atomix/atomix/api v0.9.2/go.mod h1:Fz8zXQH6n28U0NTu5xctKhkNrN5RsWgX56lrMhqXlPg=
7-
github.com/atomix/atomix/protocols/rsm v0.5.7 h1:hAaMCXT+F2DIN+e9B2r3nDPElJvEjCJE+llITVdy6jU=
8-
github.com/atomix/atomix/protocols/rsm v0.5.7/go.mod h1:SgkK3PVLqx1KmNjxw8OY7AY7T6t6TuPLmnP6Rr/oFAQ=
9-
github.com/atomix/atomix/runtime v0.9.0 h1:yDViPymoOkR5gMJkk25CZP2Tjf2j1JMAWNd/I38ws8I=
10-
github.com/atomix/atomix/runtime v0.9.0/go.mod h1:abB/WaP50Fm6MVi8CHRsfIOt8urDzcpQvMDFmhhlVgY=
5+
github.com/atomix/atomix/api v1.1.0 h1:zUbuD4yPu+jBT8NkxvDKx+m8QiRqhVmFUMgRvQoC1Tc=
6+
github.com/atomix/atomix/api v1.1.0/go.mod h1:Fz8zXQH6n28U0NTu5xctKhkNrN5RsWgX56lrMhqXlPg=
7+
github.com/atomix/atomix/protocols/rsm v1.1.0 h1:IFsU/VqoFjjRWRc+ET0B0aYqMG3+oTzDwuiYhVbBQVo=
8+
github.com/atomix/atomix/protocols/rsm v1.1.0/go.mod h1:TT5+SaXyrpS3CAHeGYftmi9DEybCsI8bZY/sFMJijTQ=
9+
github.com/atomix/atomix/runtime v1.1.0 h1:K1fUQqfngOqkXTk+1CwbPLO9STQ6gUVe6qXLlg9ageM=
10+
github.com/atomix/atomix/runtime v1.1.0/go.mod h1:7PtAhumBMs3TE3L/qXUSr4cNqqOWKGBFTvcHw+ZIQZ8=
1111
github.com/atomix/atomix/sidecar v0.4.4 h1:CjLPA3p1V83Tx2yxUGz4JLyQkeyXC8/1xXaiZVknJsY=
1212
github.com/atomix/atomix/sidecar v0.4.4/go.mod h1:psygbD10K+EdWS1XINexXP0QmiAd+yY2tGkIGqrVMHM=
1313
github.com/benbjohnson/clock v1.1.0 h1:Q92kusRqC1XV2MjkWETPvjJVqKetz1OzxZB7mHJLju8=

pkg/primitive/map/client.go

+145
Original file line numberDiff line numberDiff line change
@@ -368,3 +368,148 @@ func (m *mapClient) Events(ctx context.Context, opts ...EventsOption) (EventStre
368368
return nil, ctx.Err()
369369
}
370370
}
371+
372+
func (m *mapClient) Transaction(ctx context.Context) Transaction[string, []byte] {
373+
return &transactionClient{
374+
mapClient: m,
375+
ctx: ctx,
376+
}
377+
}
378+
379+
type transactionClient struct {
380+
*mapClient
381+
ctx context.Context
382+
operations []mapv1.CommitRequest_Operation
383+
}
384+
385+
func (t *transactionClient) Put(key string, value []byte, opts ...PutOption) Transaction[string, []byte] {
386+
request := &mapv1.PutRequest{
387+
Key: key,
388+
Value: value,
389+
}
390+
for _, opt := range opts {
391+
opt.beforePut(request)
392+
}
393+
t.operations = append(t.operations, mapv1.CommitRequest_Operation{
394+
Operation: &mapv1.CommitRequest_Operation_Put{
395+
Put: &mapv1.CommitRequest_Put{
396+
Key: request.Key,
397+
Value: request.Value,
398+
TTL: request.TTL,
399+
PrevVersion: request.PrevVersion,
400+
},
401+
},
402+
})
403+
return t
404+
}
405+
406+
func (t *transactionClient) Insert(key string, value []byte, opts ...InsertOption) Transaction[string, []byte] {
407+
request := &mapv1.InsertRequest{
408+
Key: key,
409+
Value: value,
410+
}
411+
for _, opt := range opts {
412+
opt.beforeInsert(request)
413+
}
414+
t.operations = append(t.operations, mapv1.CommitRequest_Operation{
415+
Operation: &mapv1.CommitRequest_Operation_Insert{
416+
Insert: &mapv1.CommitRequest_Insert{
417+
Key: request.Key,
418+
Value: request.Value,
419+
TTL: request.TTL,
420+
},
421+
},
422+
})
423+
return t
424+
}
425+
426+
func (t *transactionClient) Update(key string, value []byte, opts ...UpdateOption) Transaction[string, []byte] {
427+
request := &mapv1.UpdateRequest{
428+
Key: key,
429+
Value: value,
430+
}
431+
for _, opt := range opts {
432+
opt.beforeUpdate(request)
433+
}
434+
t.operations = append(t.operations, mapv1.CommitRequest_Operation{
435+
Operation: &mapv1.CommitRequest_Operation_Update{
436+
Update: &mapv1.CommitRequest_Update{
437+
Key: request.Key,
438+
Value: request.Value,
439+
TTL: request.TTL,
440+
PrevVersion: request.PrevVersion,
441+
},
442+
},
443+
})
444+
return t
445+
}
446+
447+
func (t *transactionClient) Remove(key string, opts ...RemoveOption) Transaction[string, []byte] {
448+
request := &mapv1.RemoveRequest{
449+
Key: key,
450+
}
451+
for _, opt := range opts {
452+
opt.beforeRemove(request)
453+
}
454+
t.operations = append(t.operations, mapv1.CommitRequest_Operation{
455+
Operation: &mapv1.CommitRequest_Operation_Remove{
456+
Remove: &mapv1.CommitRequest_Remove{
457+
Key: request.Key,
458+
PrevVersion: request.PrevVersion,
459+
},
460+
},
461+
})
462+
return t
463+
}
464+
465+
func (t *transactionClient) Commit() ([]*Entry[string, []byte], error) {
466+
request := &mapv1.CommitRequest{
467+
ID: runtimev1.PrimitiveID{
468+
Name: t.Name(),
469+
},
470+
Operations: t.operations,
471+
}
472+
response, err := t.client.Commit(t.ctx, request)
473+
if err != nil {
474+
return nil, err
475+
}
476+
entries := make([]*Entry[string, []byte], len(t.operations))
477+
for i, operation := range t.operations {
478+
result := response.Results[i]
479+
switch o := operation.Operation.(type) {
480+
case *mapv1.CommitRequest_Operation_Put:
481+
entries[i] = &Entry[string, []byte]{
482+
Key: o.Put.Key,
483+
Versioned: primitive.Versioned[[]byte]{
484+
Value: o.Put.Value,
485+
Version: primitive.Version(result.GetPut().Version),
486+
},
487+
}
488+
case *mapv1.CommitRequest_Operation_Insert:
489+
entries[i] = &Entry[string, []byte]{
490+
Key: o.Insert.Key,
491+
Versioned: primitive.Versioned[[]byte]{
492+
Value: o.Insert.Value,
493+
Version: primitive.Version(result.GetInsert().Version),
494+
},
495+
}
496+
case *mapv1.CommitRequest_Operation_Update:
497+
entries[i] = &Entry[string, []byte]{
498+
Key: o.Update.Key,
499+
Versioned: primitive.Versioned[[]byte]{
500+
Value: o.Update.Value,
501+
Version: primitive.Version(result.GetUpdate().Version),
502+
},
503+
}
504+
case *mapv1.CommitRequest_Operation_Remove:
505+
entries[i] = &Entry[string, []byte]{
506+
Key: o.Remove.Key,
507+
Versioned: primitive.Versioned[[]byte]{
508+
Value: result.GetRemove().Value.Value,
509+
Version: primitive.Version(result.GetRemove().Value.Version),
510+
},
511+
}
512+
}
513+
}
514+
return entries, nil
515+
}

pkg/primitive/map/interfaces.go

+20
Original file line numberDiff line numberDiff line change
@@ -61,6 +61,26 @@ type Map[K scalar.Scalar, V any] interface {
6161
// This is a non-blocking method. If the method returns without error, map events will be pushed onto
6262
// the given channel in the order in which they occur.
6363
Events(ctx context.Context, opts ...EventsOption) (EventStream[K, V], error)
64+
65+
Transaction(ctx context.Context) Transaction[K, V]
66+
}
67+
68+
// Transaction is a map transaction
69+
type Transaction[K scalar.Scalar, V any] interface {
70+
// Put sets a key/value pair in the map
71+
Put(key K, value V, opts ...PutOption) Transaction[K, V]
72+
73+
// Insert sets a key/value pair in the map
74+
Insert(key K, value V, opts ...InsertOption) Transaction[K, V]
75+
76+
// Update sets a key/value pair in the map
77+
Update(key K, value V, opts ...UpdateOption) Transaction[K, V]
78+
79+
// Remove removes a key from the map
80+
Remove(key K, opts ...RemoveOption) Transaction[K, V]
81+
82+
// Commit commits the transaction and returns the results
83+
Commit() ([]*Entry[K, V], error)
6484
}
6585

6686
type EntryStream[K scalar.Scalar, V any] stream.Stream[*Entry[K, V]]

pkg/primitive/map/map_test.go

+92
Original file line numberDiff line numberDiff line change
@@ -54,6 +54,98 @@ func TestMapEntries(t *testing.T) {
5454
}
5555
}
5656

57+
func TestMapTransactions(t *testing.T) {
58+
testMapTransactions(t, runtimev1.RoutingRule{Names: []string{"*"}})
59+
}
60+
61+
func TestCachingMapTransactions(t *testing.T) {
62+
config := mapv1.Config{
63+
Cache: mapv1.CacheConfig{
64+
Enabled: true,
65+
Size_: 3,
66+
},
67+
}
68+
bytes, err := json.Marshal(config)
69+
assert.NoError(t, err)
70+
testMapTransactions(t, runtimev1.RoutingRule{
71+
Names: []string{"*"},
72+
Config: &gogotypes.Any{
73+
Value: bytes,
74+
},
75+
})
76+
}
77+
78+
func TestMirroredMapTransactions(t *testing.T) {
79+
config := mapv1.Config{
80+
Cache: mapv1.CacheConfig{
81+
Enabled: true,
82+
},
83+
}
84+
bytes, err := json.Marshal(config)
85+
assert.NoError(t, err)
86+
testMapTransactions(t, runtimev1.RoutingRule{
87+
Names: []string{"*"},
88+
Config: &gogotypes.Any{
89+
Value: bytes,
90+
},
91+
})
92+
}
93+
94+
func testMapTransactions(t *testing.T, rule runtimev1.RoutingRule) {
95+
logging.SetLevel(logging.DebugLevel)
96+
97+
client := test.NewClient(rule)
98+
defer client.Close()
99+
100+
ctx, cancel := context.WithTimeout(context.Background(), time.Minute)
101+
defer cancel()
102+
103+
map1, err := NewBuilder[string, string](client, "test").
104+
Codec(types.Scalar[string]()).
105+
Get(ctx)
106+
assert.NoError(t, err)
107+
108+
map2, err := NewBuilder[string, string](client, "test").
109+
Codec(types.Scalar[string]()).
110+
Get(ctx)
111+
assert.NoError(t, err)
112+
113+
kv, err := map1.Get(context.Background(), "foo")
114+
assert.Error(t, err)
115+
assert.True(t, errors.IsNotFound(err))
116+
assert.Nil(t, kv)
117+
118+
_, err = map1.Transaction(context.Background()).
119+
Put("foo", "bar").
120+
Insert("bar", "baz").
121+
Update("baz", "bar").
122+
Commit()
123+
assert.Error(t, err)
124+
assert.True(t, errors.IsNotFound(err))
125+
126+
entries, err := map1.Transaction(context.Background()).
127+
Put("foo", "bar").
128+
Insert("bar", "baz").
129+
Insert("baz", "foo").
130+
Commit()
131+
assert.NoError(t, err)
132+
assert.Len(t, entries, 3)
133+
assert.Equal(t, "foo", entries[0].Key)
134+
assert.Equal(t, "bar", entries[0].Value)
135+
assert.Equal(t, "bar", entries[1].Key)
136+
assert.Equal(t, "baz", entries[1].Value)
137+
assert.Equal(t, "baz", entries[2].Key)
138+
assert.Equal(t, "foo", entries[2].Value)
139+
140+
kv, err = map1.Get(context.Background(), "foo")
141+
assert.NoError(t, err)
142+
assert.Equal(t, "bar", kv.Value)
143+
144+
kv, err = map2.Get(context.Background(), "bar")
145+
assert.NoError(t, err)
146+
assert.Equal(t, "baz", kv.Value)
147+
}
148+
57149
func TestMapOperations(t *testing.T) {
58150
testMapOperations(t, runtimev1.RoutingRule{Names: []string{"*"}})
59151
}

pkg/primitive/map/transcoding.go

+78
Original file line numberDiff line numberDiff line change
@@ -205,6 +205,13 @@ func (m *transcodingMap[K, V]) Events(ctx context.Context, opts ...EventsOption)
205205
}), nil
206206
}
207207

208+
func (m *transcodingMap[K, V]) Transaction(ctx context.Context) Transaction[K, V] {
209+
return &transcodingTransaction[K, V]{
210+
transcodingMap: m,
211+
Transaction: m.Map.Transaction(ctx),
212+
}
213+
}
214+
208215
func (m *transcodingMap[K, V]) decode(entry *Entry[string, []byte]) (*Entry[K, V], error) {
209216
key, err := m.keyDecoder(entry.Key)
210217
if err != nil {
@@ -222,3 +229,74 @@ func (m *transcodingMap[K, V]) decode(entry *Entry[string, []byte]) (*Entry[K, V
222229
Key: key,
223230
}, nil
224231
}
232+
233+
type transcodingTransaction[K scalar.Scalar, V any] struct {
234+
*transcodingMap[K, V]
235+
Transaction[string, []byte]
236+
operations []func() error
237+
}
238+
239+
func (t *transcodingTransaction[K, V]) Put(key K, value V, opts ...PutOption) Transaction[K, V] {
240+
t.operations = append(t.operations, func() error {
241+
valueBytes, err := t.valueCodec.Encode(value)
242+
if err != nil {
243+
return errors.NewInvalid("value encoding failed", err)
244+
}
245+
t.Transaction.Put(t.keyEncoder(key), valueBytes, opts...)
246+
return nil
247+
})
248+
return t
249+
}
250+
251+
func (t *transcodingTransaction[K, V]) Insert(key K, value V, opts ...InsertOption) Transaction[K, V] {
252+
t.operations = append(t.operations, func() error {
253+
valueBytes, err := t.valueCodec.Encode(value)
254+
if err != nil {
255+
return errors.NewInvalid("value encoding failed", err)
256+
}
257+
t.Transaction.Insert(t.keyEncoder(key), valueBytes, opts...)
258+
return nil
259+
})
260+
return t
261+
}
262+
263+
func (t *transcodingTransaction[K, V]) Update(key K, value V, opts ...UpdateOption) Transaction[K, V] {
264+
t.operations = append(t.operations, func() error {
265+
valueBytes, err := t.valueCodec.Encode(value)
266+
if err != nil {
267+
return errors.NewInvalid("value encoding failed", err)
268+
}
269+
t.Transaction.Update(t.keyEncoder(key), valueBytes, opts...)
270+
return nil
271+
})
272+
return t
273+
}
274+
275+
func (t *transcodingTransaction[K, V]) Remove(key K, opts ...RemoveOption) Transaction[K, V] {
276+
t.operations = append(t.operations, func() error {
277+
t.Transaction.Remove(t.keyEncoder(key), opts...)
278+
return nil
279+
})
280+
return t
281+
}
282+
283+
func (t *transcodingTransaction[K, V]) Commit() ([]*Entry[K, V], error) {
284+
for _, operation := range t.operations {
285+
if err := operation(); err != nil {
286+
return nil, err
287+
}
288+
}
289+
results, err := t.Transaction.Commit()
290+
if err != nil {
291+
return nil, err
292+
}
293+
entries := make([]*Entry[K, V], len(results))
294+
for i, result := range results {
295+
entry, err := t.decode(result)
296+
if err != nil {
297+
return nil, err
298+
}
299+
entries[i] = entry
300+
}
301+
return entries, nil
302+
}

0 commit comments

Comments
 (0)