Skip to content

Commit cefc43e

Browse files
simplify the Get()/GetMultiple() re-use GetRaw() for both (minio#179)
Remember GetMultiple() must be used if your target is calling PutMultiple(), without that the multiple events will not be replayed.
1 parent 25e34fd commit cefc43e

File tree

2 files changed

+75
-45
lines changed

2 files changed

+75
-45
lines changed

internal/store/queuestore.go

+5-45
Original file line numberDiff line numberDiff line change
@@ -251,61 +251,21 @@ func (store *QueueStore[I]) GetRaw(key Key) (raw []byte, err error) {
251251

252252
// Get - gets an item from the store.
253253
func (store *QueueStore[I]) Get(key Key) (item I, err error) {
254-
store.RLock()
255-
256-
defer func(store *QueueStore[I]) {
257-
store.RUnlock()
258-
if err != nil && !os.IsNotExist(err) {
259-
// Upon error we remove the entry.
260-
store.Del(key)
261-
}
262-
}(store)
263-
264-
var eventData []byte
265-
eventData, err = os.ReadFile(filepath.Join(store.directory, key.String()))
254+
items, err := store.GetMultiple(key)
266255
if err != nil {
267256
return item, err
268257
}
269-
270-
if len(eventData) == 0 {
271-
return item, os.ErrNotExist
272-
}
273-
274-
if err = json.Unmarshal(eventData, &item); err != nil {
275-
return item, err
276-
}
277-
278-
return item, nil
258+
return items[0], nil
279259
}
280260

281261
// GetMultiple will read the multi payload file and fetch the items
282262
func (store *QueueStore[I]) GetMultiple(key Key) (items []I, err error) {
283-
store.RLock()
284-
285-
defer func(store *QueueStore[I]) {
286-
store.RUnlock()
287-
if err != nil && !os.IsNotExist(err) {
288-
// Upon error we remove the entry.
289-
store.Del(key)
290-
}
291-
}(store)
292-
293-
raw, err := os.ReadFile(filepath.Join(store.directory, key.String()))
263+
raw, err := store.GetRaw(key)
294264
if err != nil {
295-
return
296-
}
297-
298-
var decoder *jsoniter.Decoder
299-
if key.Compress {
300-
decodedBytes, err := s2.Decode(nil, raw)
301-
if err != nil {
302-
return nil, err
303-
}
304-
decoder = jsoniter.ConfigCompatibleWithStandardLibrary.NewDecoder(bytes.NewReader(decodedBytes))
305-
} else {
306-
decoder = jsoniter.ConfigCompatibleWithStandardLibrary.NewDecoder(bytes.NewReader(raw))
265+
return nil, err
307266
}
308267

268+
decoder := jsoniter.ConfigCompatibleWithStandardLibrary.NewDecoder(bytes.NewReader(raw))
309269
for decoder.More() {
310270
var item I
311271
if err := decoder.Decode(&item); err != nil {

internal/store/queuestore_test.go

+70
Original file line numberDiff line numberDiff line change
@@ -18,11 +18,15 @@
1818
package store
1919

2020
import (
21+
"bytes"
2122
"fmt"
2223
"os"
2324
"path/filepath"
2425
"reflect"
2526
"testing"
27+
28+
jsoniter "github.com/json-iterator/go"
29+
"github.com/valyala/bytebufferpool"
2630
)
2731

2832
type TestItem struct {
@@ -221,6 +225,72 @@ func TestQueueStoreListN(t *testing.T) {
221225
}
222226
}
223227

228+
func TestMultiplePutGetRaw(t *testing.T) {
229+
defer func() {
230+
if err := tearDownQueueStore(); err != nil {
231+
t.Fatalf("Failed to tear down store; %v", err)
232+
}
233+
}()
234+
store, err := setUpQueueStore(queueDir, 10)
235+
if err != nil {
236+
t.Fatalf("Failed to create a queue store; %v", err)
237+
}
238+
// TestItem{Name: "test-item", Property: "property"}
239+
var items []TestItem
240+
for i := 0; i < 10; i++ {
241+
items = append(items, TestItem{
242+
Name: fmt.Sprintf("test-item-%d", i),
243+
Property: "property",
244+
})
245+
}
246+
247+
buf := bytebufferpool.Get()
248+
defer bytebufferpool.Put(buf)
249+
250+
enc := jsoniter.ConfigCompatibleWithStandardLibrary.NewEncoder(buf)
251+
for i := range items {
252+
if err = enc.Encode(items[i]); err != nil {
253+
t.Fatal(err)
254+
}
255+
}
256+
257+
if _, err := store.PutMultiple(items); err != nil {
258+
t.Fatalf("failed to put multiple; %v", err)
259+
}
260+
261+
keys := store.List()
262+
if len(keys) != 1 {
263+
t.Fatalf("expected len(keys)=1, but found %d", len(keys))
264+
}
265+
266+
key := keys[0]
267+
if !key.Compress {
268+
t.Fatal("expected the item to be compressed")
269+
}
270+
if key.ItemCount != 10 {
271+
t.Fatalf("expected itemcount=10 but found %v", key.ItemCount)
272+
}
273+
274+
raw, err := store.GetRaw(key)
275+
if err != nil {
276+
t.Fatalf("unable to get multiple items; %v", err)
277+
}
278+
279+
if !bytes.Equal(buf.Bytes(), raw) {
280+
t.Fatalf("expected bytes: %d vs read bytes is wrong %d", len(buf.Bytes()), len(raw))
281+
}
282+
283+
if err := store.Del(key); err != nil {
284+
t.Fatalf("unable to Del; %v", err)
285+
}
286+
287+
// Re-list
288+
keys = store.List()
289+
if len(keys) > 0 || err != nil {
290+
t.Fatalf("Expected List() to return empty list and no error, got %v err: %v", keys, err)
291+
}
292+
}
293+
224294
func TestMultiplePutGets(t *testing.T) {
225295
defer func() {
226296
if err := tearDownQueueStore(); err != nil {

0 commit comments

Comments
 (0)