Skip to content

Commit d9dff35

Browse files
committed
Ensure options are passed through transcoding interfaces
1 parent 5d4b42d commit d9dff35

File tree

7 files changed

+59
-35
lines changed

7 files changed

+59
-35
lines changed

pkg/primitive/indexedmap/caching.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -213,7 +213,7 @@ func (m *cachingIndexedMap) remove(entry *Entry[string, []byte]) {
213213
m.mu.RLock()
214214
stored, ok := m.indexes.Load(entry.Index)
215215
m.mu.RUnlock()
216-
if !ok || (entry.Version != 0 && stored.Version >= entry.Version) {
216+
if !ok || (entry.Version != 0 && stored.Version > entry.Version) {
217217
return
218218
}
219219

pkg/primitive/indexedmap/transcoding.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -134,7 +134,7 @@ func (m *transcodingIndexedMap[K, V]) Watch(ctx context.Context) (EntryStream[K,
134134
}
135135

136136
func (m *transcodingIndexedMap[K, V]) Events(ctx context.Context, opts ...EventsOption) (EventStream[K, V], error) {
137-
events, err := m.IndexedMap.Events(ctx)
137+
events, err := m.IndexedMap.Events(ctx, opts...)
138138
if err != nil {
139139
return nil, err
140140
}

pkg/primitive/list/transcoding.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -83,7 +83,7 @@ func (l *transcodingList[E]) Watch(ctx context.Context) (ItemStream[E], error) {
8383
}
8484

8585
func (l *transcodingList[E]) Events(ctx context.Context, opts ...EventsOption) (EventStream[E], error) {
86-
events, err := l.List.Events(ctx)
86+
events, err := l.List.Events(ctx, opts...)
8787
if err != nil {
8888
return nil, err
8989
}

pkg/primitive/map/transcoding.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -163,7 +163,7 @@ func (m *transcodingMap[K, V]) Watch(ctx context.Context) (EntryStream[K, V], er
163163
}
164164

165165
func (m *transcodingMap[K, V]) Events(ctx context.Context, opts ...EventsOption) (EventStream[K, V], error) {
166-
events, err := m.Map.Events(ctx)
166+
events, err := m.Map.Events(ctx, opts...)
167167
if err != nil {
168168
return nil, err
169169
}

pkg/primitive/value/caching.go

Lines changed: 24 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,7 @@ import (
88
"context"
99
"github.com/atomix/atomix/api/errors"
1010
"github.com/atomix/go-sdk/pkg/primitive"
11+
"github.com/atomix/go-sdk/pkg/stream"
1112
"github.com/atomix/go-sdk/pkg/util/cache"
1213
"io"
1314
)
@@ -119,6 +120,29 @@ func (v *cachingValue) Delete(ctx context.Context, opts ...DeleteOption) error {
119120
return nil
120121
}
121122

123+
func (v *cachingValue) Events(ctx context.Context, opts ...EventsOption) (EventStream[[]byte], error) {
124+
events, err := v.Value.Events(ctx, opts...)
125+
if err != nil {
126+
return nil, err
127+
}
128+
return stream.NewInterceptingStream[Event[[]byte]](events, func(event Event[[]byte]) {
129+
switch e := event.(type) {
130+
case *Created[[]byte]:
131+
v.cache.Store(e.Value, func(stored primitive.Versioned[[]byte]) bool {
132+
return e.Value.Version == 0 || e.Value.Version > stored.Version
133+
})
134+
case *Updated[[]byte]:
135+
v.cache.Store(e.Value, func(stored primitive.Versioned[[]byte]) bool {
136+
return e.Value.Version == 0 || e.Value.Version > stored.Version
137+
})
138+
case *Deleted[[]byte]:
139+
v.cache.Delete(func(stored primitive.Versioned[[]byte]) bool {
140+
return e.Value.Version == 0 || e.Value.Version > stored.Version
141+
})
142+
}
143+
}), nil
144+
}
145+
122146
func (v *cachingValue) Close(ctx context.Context) error {
123147
defer close(v.closeCh)
124148
return v.Value.Close(ctx)

pkg/primitive/value/client.go

Lines changed: 18 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -54,17 +54,17 @@ type valueClient struct {
5454
client valuev1.ValueClient
5555
}
5656

57-
func (m *valueClient) Set(ctx context.Context, value []byte, opts ...SetOption) (primitive.Versioned[[]byte], error) {
57+
func (v *valueClient) Set(ctx context.Context, value []byte, opts ...SetOption) (primitive.Versioned[[]byte], error) {
5858
request := &valuev1.SetRequest{
5959
ID: runtimev1.PrimitiveID{
60-
Name: m.Name(),
60+
Name: v.Name(),
6161
},
6262
Value: value,
6363
}
6464
for i := range opts {
6565
opts[i].beforeSet(request)
6666
}
67-
response, err := m.client.Set(ctx, request)
67+
response, err := v.client.Set(ctx, request)
6868
if err != nil {
6969
return primitive.Versioned[[]byte]{}, err
7070
}
@@ -77,17 +77,17 @@ func (m *valueClient) Set(ctx context.Context, value []byte, opts ...SetOption)
7777
}, nil
7878
}
7979

80-
func (m *valueClient) Update(ctx context.Context, value []byte, opts ...UpdateOption) (primitive.Versioned[[]byte], error) {
80+
func (v *valueClient) Update(ctx context.Context, value []byte, opts ...UpdateOption) (primitive.Versioned[[]byte], error) {
8181
request := &valuev1.UpdateRequest{
8282
ID: runtimev1.PrimitiveID{
83-
Name: m.Name(),
83+
Name: v.Name(),
8484
},
8585
Value: value,
8686
}
8787
for i := range opts {
8888
opts[i].beforeUpdate(request)
8989
}
90-
response, err := m.client.Update(ctx, request)
90+
response, err := v.client.Update(ctx, request)
9191
if err != nil {
9292
return primitive.Versioned[[]byte]{}, err
9393
}
@@ -100,16 +100,16 @@ func (m *valueClient) Update(ctx context.Context, value []byte, opts ...UpdateOp
100100
}, nil
101101
}
102102

103-
func (m *valueClient) Get(ctx context.Context, opts ...GetOption) (primitive.Versioned[[]byte], error) {
103+
func (v *valueClient) Get(ctx context.Context, opts ...GetOption) (primitive.Versioned[[]byte], error) {
104104
request := &valuev1.GetRequest{
105105
ID: runtimev1.PrimitiveID{
106-
Name: m.Name(),
106+
Name: v.Name(),
107107
},
108108
}
109109
for i := range opts {
110110
opts[i].beforeGet(request)
111111
}
112-
response, err := m.client.Get(ctx, request)
112+
response, err := v.client.Get(ctx, request)
113113
if err != nil {
114114
return primitive.Versioned[[]byte]{}, err
115115
}
@@ -122,16 +122,16 @@ func (m *valueClient) Get(ctx context.Context, opts ...GetOption) (primitive.Ver
122122
}, nil
123123
}
124124

125-
func (m *valueClient) Delete(ctx context.Context, opts ...DeleteOption) error {
125+
func (v *valueClient) Delete(ctx context.Context, opts ...DeleteOption) error {
126126
request := &valuev1.DeleteRequest{
127127
ID: runtimev1.PrimitiveID{
128-
Name: m.Name(),
128+
Name: v.Name(),
129129
},
130130
}
131131
for i := range opts {
132132
opts[i].beforeDelete(request)
133133
}
134-
response, err := m.client.Delete(ctx, request)
134+
response, err := v.client.Delete(ctx, request)
135135
if err != nil {
136136
return err
137137
}
@@ -141,13 +141,13 @@ func (m *valueClient) Delete(ctx context.Context, opts ...DeleteOption) error {
141141
return nil
142142
}
143143

144-
func (m *valueClient) Watch(ctx context.Context) (ValueStream[[]byte], error) {
144+
func (v *valueClient) Watch(ctx context.Context) (ValueStream[[]byte], error) {
145145
request := &valuev1.WatchRequest{
146146
ID: runtimev1.PrimitiveID{
147-
Name: m.Name(),
147+
Name: v.Name(),
148148
},
149149
}
150-
client, err := m.client.Watch(ctx, request)
150+
client, err := v.client.Watch(ctx, request)
151151
if err != nil {
152152
return nil, err
153153
}
@@ -178,17 +178,17 @@ func (m *valueClient) Watch(ctx context.Context) (ValueStream[[]byte], error) {
178178
return stream.NewChannelStream[primitive.Versioned[[]byte]](ch), nil
179179
}
180180

181-
func (m *valueClient) Events(ctx context.Context, opts ...EventsOption) (EventStream[[]byte], error) {
181+
func (v *valueClient) Events(ctx context.Context, opts ...EventsOption) (EventStream[[]byte], error) {
182182
request := &valuev1.EventsRequest{
183183
ID: runtimev1.PrimitiveID{
184-
Name: m.Name(),
184+
Name: v.Name(),
185185
},
186186
}
187187
for i := range opts {
188188
opts[i].beforeEvents(request)
189189
}
190190

191-
client, err := m.client.Events(ctx, request)
191+
client, err := v.client.Events(ctx, request)
192192
if err != nil {
193193
return nil, err
194194
}

pkg/primitive/value/transcoding.go

Lines changed: 13 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -24,12 +24,12 @@ type transcodingValue[V any] struct {
2424
codec types.Codec[V]
2525
}
2626

27-
func (m *transcodingValue[V]) Set(ctx context.Context, value V, opts ...SetOption) (primitive.Versioned[V], error) {
28-
bytes, err := m.codec.Encode(value)
27+
func (v *transcodingValue[V]) Set(ctx context.Context, value V, opts ...SetOption) (primitive.Versioned[V], error) {
28+
bytes, err := v.codec.Encode(value)
2929
if err != nil {
3030
return primitive.Versioned[V]{}, errors.NewInvalid("value encoding failed", err)
3131
}
32-
versioned, err := m.Value.Set(ctx, bytes, opts...)
32+
versioned, err := v.Value.Set(ctx, bytes, opts...)
3333
if err != nil {
3434
return primitive.Versioned[V]{}, err
3535
}
@@ -39,12 +39,12 @@ func (m *transcodingValue[V]) Set(ctx context.Context, value V, opts ...SetOptio
3939
}, nil
4040
}
4141

42-
func (m *transcodingValue[V]) Update(ctx context.Context, value V, opts ...UpdateOption) (primitive.Versioned[V], error) {
43-
bytes, err := m.codec.Encode(value)
42+
func (v *transcodingValue[V]) Update(ctx context.Context, value V, opts ...UpdateOption) (primitive.Versioned[V], error) {
43+
bytes, err := v.codec.Encode(value)
4444
if err != nil {
4545
return primitive.Versioned[V]{}, errors.NewInvalid("value encoding failed", err)
4646
}
47-
versioned, err := m.Value.Update(ctx, bytes, opts...)
47+
versioned, err := v.Value.Update(ctx, bytes, opts...)
4848
if err != nil {
4949
return primitive.Versioned[V]{}, err
5050
}
@@ -54,12 +54,12 @@ func (m *transcodingValue[V]) Update(ctx context.Context, value V, opts ...Updat
5454
}, nil
5555
}
5656

57-
func (m *transcodingValue[V]) Get(ctx context.Context, opts ...GetOption) (primitive.Versioned[V], error) {
58-
versioned, err := m.Value.Get(ctx, opts...)
57+
func (v *transcodingValue[V]) Get(ctx context.Context, opts ...GetOption) (primitive.Versioned[V], error) {
58+
versioned, err := v.Value.Get(ctx, opts...)
5959
if err != nil {
6060
return primitive.Versioned[V]{}, err
6161
}
62-
value, err := m.codec.Decode(versioned.Value)
62+
value, err := v.codec.Decode(versioned.Value)
6363
if err != nil {
6464
return primitive.Versioned[V]{}, errors.NewInvalid("value decoding failed", err)
6565
}
@@ -69,13 +69,13 @@ func (m *transcodingValue[V]) Get(ctx context.Context, opts ...GetOption) (primi
6969
}, nil
7070
}
7171

72-
func (m *transcodingValue[V]) Watch(ctx context.Context) (ValueStream[V], error) {
73-
elements, err := m.Value.Watch(ctx)
72+
func (v *transcodingValue[V]) Watch(ctx context.Context) (ValueStream[V], error) {
73+
elements, err := v.Value.Watch(ctx)
7474
if err != nil {
7575
return nil, err
7676
}
7777
return stream.NewTranscodingStream[primitive.Versioned[[]byte], primitive.Versioned[V]](elements, func(versioned primitive.Versioned[[]byte]) (primitive.Versioned[V], error) {
78-
value, err := m.codec.Decode(versioned.Value)
78+
value, err := v.codec.Decode(versioned.Value)
7979
if err != nil {
8080
return primitive.Versioned[V]{}, errors.NewInvalid("value decoding failed", err)
8181
}
@@ -87,7 +87,7 @@ func (m *transcodingValue[V]) Watch(ctx context.Context) (ValueStream[V], error)
8787
}
8888

8989
func (v *transcodingValue[V]) Events(ctx context.Context, opts ...EventsOption) (EventStream[V], error) {
90-
events, err := v.Value.Events(ctx)
90+
events, err := v.Value.Events(ctx, opts...)
9191
if err != nil {
9292
return nil, err
9393
}

0 commit comments

Comments
 (0)