From 443716d608896c8cf607292c41148badfe0a0282 Mon Sep 17 00:00:00 2001 From: vvatanabe Date: Fri, 20 Oct 2023 11:15:45 +0900 Subject: [PATCH] feat: implement FIFO queues --- sdk/message.go | 8 +- sdk/sdk.go | 23 +++- sdk/sdk_test.go | 281 ++++++++++++++++++++++++++++++++++++++++++++++++ 3 files changed, 303 insertions(+), 9 deletions(-) diff --git a/sdk/message.go b/sdk/message.go index 6a724aa..21e6383 100644 --- a/sdk/message.go +++ b/sdk/message.go @@ -141,7 +141,6 @@ func (m *Message[T]) MarkAsEnqueued(now time.Time) { m.Queued = 1 m.DLQ = 0 m.LastUpdatedTimestamp = ts - m.LastUpdatedTimestamp = ts m.AddToQueueTimestamp = ts m.Status = StatusReady } @@ -149,8 +148,9 @@ func (m *Message[T]) MarkAsEnqueued(now time.Time) { func (m *Message[T]) MarkAsPeeked(now time.Time) { ts := clock.FormatRFC3339(now) m.Queued = 1 - m.LastUpdatedTimestamp = ts - m.LastUpdatedTimestamp = ts + // IMPORTANT + // please note, we are not updating top-level attribute `last_updated_timestamp` in order to avoid re-indexing the order + // m.LastUpdatedTimestamp = ts m.PeekFromQueueTimestamp = ts m.Status = StatusProcessing } @@ -167,7 +167,6 @@ func (m *Message[T]) MarkAsDone(now time.Time) { ts := clock.FormatRFC3339(now) m.Queued = 0 m.DLQ = 0 - m.LastUpdatedTimestamp = ts m.Status = StatusCompleted m.LastUpdatedTimestamp = ts m.CompleteFromQueueTimestamp = ts @@ -178,7 +177,6 @@ func (m *Message[T]) MarkAsDLQ(now time.Time) { m.Queued = 0 m.DLQ = 1 m.LastUpdatedTimestamp = ts - m.LastUpdatedTimestamp = ts m.AddToDLQTimestamp = ts m.Status = StatusInDLQ } diff --git a/sdk/sdk.go b/sdk/sdk.go index 41e661c..359010d 100644 --- a/sdk/sdk.go +++ b/sdk/sdk.go @@ -61,6 +61,7 @@ type queueSDKClient[T any] struct { retryMaxAttempts int visibilityTimeoutInMinutes int maximumReceives int + useFIFO bool clock clock.Clock } @@ -74,6 +75,7 @@ type options struct { retryMaxAttempts int visibilityTimeoutInMinutes int maximumReceives int + useFIFO bool dynamoDB *dynamodb.Client clock clock.Clock } @@ -122,6 +124,12 @@ func WithAWSVisibilityTimeout(minutes int) Option { } } +func WithUseFIFO(useFIFO bool) Option { + return func(s *options) { + s.useFIFO = useFIFO + } +} + func WithAWSDynamoDBClient(client *dynamodb.Client) Option { return func(s *options) { s.dynamoDB = client @@ -135,6 +143,7 @@ func NewQueueSDKClient[T any](ctx context.Context, opts ...Option) (QueueSDKClie awsCredentialsProfileName: AwsProfileDefault, retryMaxAttempts: DefaultRetryMaxAttempts, visibilityTimeoutInMinutes: DefaultVisibilityTimeoutInMinutes, + useFIFO: false, clock: &clock.RealClock{}, } for _, opt := range opts { @@ -149,6 +158,7 @@ func NewQueueSDKClient[T any](ctx context.Context, opts ...Option) (QueueSDKClie retryMaxAttempts: o.retryMaxAttempts, visibilityTimeoutInMinutes: o.visibilityTimeoutInMinutes, maximumReceives: o.maximumReceives, + useFIFO: o.useFIFO, dynamoDB: o.dynamoDB, clock: o.clock, } @@ -605,7 +615,11 @@ func (c *queueSDKClient[T]) Peek(ctx context.Context) (*PeekResult[T], error) { return nil, &UnmarshalingAttributeError{Cause: err} } visibilityTimeout := time.Duration(c.visibilityTimeoutInMinutes) * time.Minute - if !item.IsQueueSelected(c.clock.Now(), visibilityTimeout) { + isQueueSelected := item.IsQueueSelected(c.clock.Now(), visibilityTimeout) + if c.useFIFO && isQueueSelected { + goto ExitLoop + } + if !isQueueSelected { selectedID = item.ID selectedVersion = item.Version recordForPeekIsFound = true @@ -616,6 +630,7 @@ func (c *queueSDKClient[T]) Peek(ctx context.Context) (*PeekResult[T], error) { break } } +ExitLoop: if selectedID == "" { return nil, &EmptyQueueError{} } @@ -624,13 +639,13 @@ func (c *queueSDKClient[T]) Peek(ctx context.Context) (*PeekResult[T], error) { return nil, err } message.MarkAsPeeked(c.clock.Now()) - // IMPORTANT - // please note, we are not updating top-level attribute `last_updated_timestamp` in order to avoid re-indexing the order expr, err = expression.NewBuilder(). WithUpdate(expression. Add(expression.Name("version"), expression.Value(1)). Add(expression.Name("receive_count"), expression.Value(1)). - Set(expression.Name("last_updated_timestamp"), expression.Value(message.LastUpdatedTimestamp)). + // IMPORTANT + // please note, we are not updating top-level attribute `last_updated_timestamp` in order to avoid re-indexing the order + // Set(expression.Name("last_updated_timestamp"), expression.Value(message.LastUpdatedTimestamp)). Set(expression.Name("queue_peek_timestamp"), expression.Value(message.PeekFromQueueTimestamp)). Set(expression.Name("status"), expression.Value(message.Status))). WithCondition(expression.Name("version").Equal(expression.Value(selectedVersion))). diff --git a/sdk/sdk_test.go b/sdk/sdk_test.go index f847401..5fa5465 100644 --- a/sdk/sdk_test.go +++ b/sdk/sdk_test.go @@ -2,6 +2,8 @@ package sdk import ( "context" + "encoding/json" + "errors" "fmt" "reflect" "sort" @@ -1017,6 +1019,285 @@ func TestQueueSDKClientPeek(t *testing.T) { } } +func TestQueueSDKClientPeekUseFIFO(t *testing.T) { + + raw, clean := setupDynamoDB(t, + &types.PutRequest{ + Item: newTestMessageItemAsEnqueued("A-101", time.Date(2023, 12, 1, 0, 0, 3, 0, time.UTC)).MarshalMapUnsafe(), + }, + &types.PutRequest{ + Item: newTestMessageItemAsEnqueued("A-202", time.Date(2023, 12, 1, 0, 0, 2, 0, time.UTC)).MarshalMapUnsafe(), + }, + &types.PutRequest{ + Item: newTestMessageItemAsEnqueued("A-303", time.Date(2023, 12, 1, 0, 0, 1, 0, time.UTC)).MarshalMapUnsafe(), + }, + ) + defer clean() + + now := time.Date(2023, 12, 1, 0, 0, 10, 0, time.UTC) + + ctx := context.Background() + client, err := NewQueueSDKClient[test.MessageData](ctx, + WithAWSDynamoDBClient(raw), + withClock(mockClock{ + t: now, + }), + WithAWSVisibilityTimeout(1), + WithUseFIFO(true)) + if err != nil { + t.Fatalf("NewQueueSDKClient() error = %v", err) + return + } + + want1 := func() *PeekResult[test.MessageData] { + s := newTestMessageItemAsEnqueued("A-303", time.Date(2023, 12, 1, 0, 0, 1, 0, time.UTC)) + s.MarkAsPeeked(now) + s.Version = 2 + s.ReceiveCount = 1 + r := &PeekResult[test.MessageData]{ + Result: &Result{ + ID: s.ID, + Status: s.Status, + LastUpdatedTimestamp: s.LastUpdatedTimestamp, + Version: s.Version, + }, + PeekFromQueueTimestamp: s.PeekFromQueueTimestamp, + PeekedMessageObject: s, + } + return r + }() + + result, err := client.Peek(ctx) + if err != nil { + t.Errorf("Peek() 1 error = %v", err) + return + } + if !reflect.DeepEqual(result, want1) { + v1, _ := json.Marshal(result) + v2, _ := json.Marshal(want1) + t.Errorf("Peek() 1 got = %v, want %v", string(v1), string(v2)) + } + _, err = client.Peek(ctx) + if !errors.Is(err, &EmptyQueueError{}) { + t.Errorf("Peek() 2 error = %v, wantErr %v", err, &EmptyQueueError{}) + return + } + _, err = client.Done(ctx, result.ID) + if err != nil { + t.Errorf("Done() 1 error = %v", err) + return + } + + want2 := func() *PeekResult[test.MessageData] { + s := newTestMessageItemAsEnqueued("A-202", time.Date(2023, 12, 1, 0, 0, 2, 0, time.UTC)) + s.MarkAsPeeked(now) + s.Version = 2 + s.ReceiveCount = 1 + r := &PeekResult[test.MessageData]{ + Result: &Result{ + ID: s.ID, + Status: s.Status, + LastUpdatedTimestamp: s.LastUpdatedTimestamp, + Version: s.Version, + }, + PeekFromQueueTimestamp: s.PeekFromQueueTimestamp, + PeekedMessageObject: s, + } + return r + }() + + result, err = client.Peek(ctx) + if err != nil { + t.Errorf("Peek() 3 error = %v", err) + return + } + if !reflect.DeepEqual(result, want2) { + v1, _ := json.Marshal(result) + v2, _ := json.Marshal(want2) + t.Errorf("Peek() 3 got = %v, want %v", string(v1), string(v2)) + } + + _, err = client.Peek(ctx) + if !errors.Is(err, &EmptyQueueError{}) { + t.Errorf("Peek() 4 error = %v, wantErr %v", err, &EmptyQueueError{}) + return + } + _, err = client.Done(ctx, result.ID) + if err != nil { + t.Errorf("Done() 2 error = %v", err) + return + } + + want3 := func() *PeekResult[test.MessageData] { + s := newTestMessageItemAsEnqueued("A-101", time.Date(2023, 12, 1, 0, 0, 3, 0, time.UTC)) + s.MarkAsPeeked(now) + s.Version = 2 + s.ReceiveCount = 1 + r := &PeekResult[test.MessageData]{ + Result: &Result{ + ID: s.ID, + Status: s.Status, + LastUpdatedTimestamp: s.LastUpdatedTimestamp, + Version: s.Version, + }, + PeekFromQueueTimestamp: s.PeekFromQueueTimestamp, + PeekedMessageObject: s, + } + return r + }() + + result, err = client.Peek(ctx) + if err != nil { + t.Errorf("Peek() 5 error = %v", err) + return + } + if !reflect.DeepEqual(result, want3) { + v1, _ := json.Marshal(result) + v2, _ := json.Marshal(want3) + t.Errorf("Peek() 5 got = %v, want %v", string(v1), string(v2)) + } + + _, err = client.Peek(ctx) + if !errors.Is(err, &EmptyQueueError{}) { + t.Errorf("Peek() 6 error = %v, wantErr %v", err, &EmptyQueueError{}) + return + } + _, err = client.Done(ctx, result.ID) + if err != nil { + t.Errorf("Done() 3 error = %v", err) + return + } + _, err = client.Peek(ctx) + if !errors.Is(err, &EmptyQueueError{}) { + t.Errorf("Peek() 7 error = %v, wantErr %v", err, &EmptyQueueError{}) + return + } +} + +func TestQueueSDKClientPeekNotUseFIFO(t *testing.T) { + + raw, clean := setupDynamoDB(t, + &types.PutRequest{ + Item: newTestMessageItemAsEnqueued("A-101", time.Date(2023, 12, 1, 0, 0, 3, 0, time.UTC)).MarshalMapUnsafe(), + }, + &types.PutRequest{ + Item: newTestMessageItemAsEnqueued("A-202", time.Date(2023, 12, 1, 0, 0, 2, 0, time.UTC)).MarshalMapUnsafe(), + }, + &types.PutRequest{ + Item: newTestMessageItemAsEnqueued("A-303", time.Date(2023, 12, 1, 0, 0, 1, 0, time.UTC)).MarshalMapUnsafe(), + }, + ) + defer clean() + + now := time.Date(2023, 12, 1, 0, 0, 10, 0, time.UTC) + + ctx := context.Background() + client, err := NewQueueSDKClient[test.MessageData](ctx, + WithAWSDynamoDBClient(raw), + withClock(mockClock{ + t: now, + }), + WithAWSVisibilityTimeout(1)) + if err != nil { + t.Fatalf("NewQueueSDKClient() error = %v", err) + return + } + + want1 := func() *PeekResult[test.MessageData] { + s := newTestMessageItemAsEnqueued("A-303", time.Date(2023, 12, 1, 0, 0, 1, 0, time.UTC)) + s.MarkAsPeeked(now) + s.Version = 2 + s.ReceiveCount = 1 + r := &PeekResult[test.MessageData]{ + Result: &Result{ + ID: s.ID, + Status: s.Status, + LastUpdatedTimestamp: s.LastUpdatedTimestamp, + Version: s.Version, + }, + PeekFromQueueTimestamp: s.PeekFromQueueTimestamp, + PeekedMessageObject: s, + } + return r + }() + + result, err := client.Peek(ctx) + if err != nil { + t.Errorf("Peek() 1 error = %v", err) + return + } + if !reflect.DeepEqual(result, want1) { + v1, _ := json.Marshal(result) + v2, _ := json.Marshal(want1) + t.Errorf("Peek() 1 got = %v, want %v", string(v1), string(v2)) + } + + want2 := func() *PeekResult[test.MessageData] { + s := newTestMessageItemAsEnqueued("A-202", time.Date(2023, 12, 1, 0, 0, 2, 0, time.UTC)) + s.MarkAsPeeked(now) + s.Version = 2 + s.ReceiveCount = 1 + r := &PeekResult[test.MessageData]{ + Result: &Result{ + ID: s.ID, + Status: s.Status, + LastUpdatedTimestamp: s.LastUpdatedTimestamp, + Version: s.Version, + }, + PeekFromQueueTimestamp: s.PeekFromQueueTimestamp, + PeekedMessageObject: s, + } + return r + }() + + result, err = client.Peek(ctx) + if err != nil { + t.Errorf("Peek() 2 error = %v", err) + return + } + if !reflect.DeepEqual(result, want2) { + v1, _ := json.Marshal(result) + v2, _ := json.Marshal(want2) + t.Errorf("Peek() 2 got = %v, want %v", string(v1), string(v2)) + } + + want3 := func() *PeekResult[test.MessageData] { + s := newTestMessageItemAsEnqueued("A-101", time.Date(2023, 12, 1, 0, 0, 3, 0, time.UTC)) + s.MarkAsPeeked(now) + s.Version = 2 + s.ReceiveCount = 1 + r := &PeekResult[test.MessageData]{ + Result: &Result{ + ID: s.ID, + Status: s.Status, + LastUpdatedTimestamp: s.LastUpdatedTimestamp, + Version: s.Version, + }, + PeekFromQueueTimestamp: s.PeekFromQueueTimestamp, + PeekedMessageObject: s, + } + return r + }() + + result, err = client.Peek(ctx) + if err != nil { + t.Errorf("Peek() 3 error = %v", err) + return + } + if !reflect.DeepEqual(result, want3) { + v1, _ := json.Marshal(result) + v2, _ := json.Marshal(want3) + t.Errorf("Peek() 3 got = %v, want %v", string(v1), string(v2)) + } + + _, err = client.Peek(ctx) + if !errors.Is(err, &EmptyQueueError{}) { + t.Errorf("Peek() 4 error = %v, wantErr %v", err, &EmptyQueueError{}) + return + } + +} + func TestQueueSDKClientDequeue(t *testing.T) { tests := []struct { name string