From 6308c9ca5116eadf8ee73a1034ff7239a66579a5 Mon Sep 17 00:00:00 2001 From: vvatanabe Date: Fri, 27 Oct 2023 13:08:31 +0900 Subject: [PATCH] feat: implement validation of message state transitions --- error.go | 22 ++++++------ message.go | 101 ++++++++++++++++++++++++++++++++++++---------------- sdk.go | 30 +++++++++------- sdk_test.go | 99 +++++++++++++++++++++++++------------------------- 4 files changed, 149 insertions(+), 103 deletions(-) diff --git a/error.go b/error.go index e1c3339..7cb51dc 100644 --- a/error.go +++ b/error.go @@ -28,18 +28,6 @@ func (e ConditionalCheckFailedError) Error() string { return fmt.Sprintf("Condition on the 'version' attribute has failed: %v", e.Cause) } -type RecordNotConstructedError struct{} - -func (e RecordNotConstructedError) Error() string { - return "Message record not yet fully constructed .. cannot execute API." -} - -type IllegalStateError struct{} - -func (e IllegalStateError) Error() string { - return "Illegal state, cannot proceed." -} - type BuildingExpressionError struct { Cause error } @@ -77,3 +65,13 @@ type EmptyQueueError struct{} func (e EmptyQueueError) Error() string { return "Cannot proceed, queue is empty." } + +type InvalidStateTransitionError struct { + Msg string + Operation string + Current Status +} + +func (e *InvalidStateTransitionError) Error() string { + return fmt.Sprintf("operation %s failed for status %s: %s", e.Operation, e.Current, e.Msg) +} diff --git a/message.go b/message.go index 7a98cce..b460601 100644 --- a/message.go +++ b/message.go @@ -91,37 +91,6 @@ func (m *Message[T]) IsDLQ() bool { return m.QueueType == QueueTypeDLQ } -func (m *Message[T]) MarkAsRetry(now time.Time) { - ts := clock.FormatRFC3339(now) - m.LastUpdatedTimestamp = ts - m.Status = StatusReady -} - -func (m *Message[T]) MarkAsPeeked(now time.Time) { - ts := clock.FormatRFC3339(now) - m.LastUpdatedTimestamp = ts - m.PeekFromQueueTimestamp = ts - m.Status = StatusProcessing -} - -func (m *Message[T]) MarkAsDLQ(now time.Time) { - ts := clock.FormatRFC3339(now) - m.QueueType = QueueTypeDLQ - m.Status = StatusReady - m.ReceiveCount = 0 - m.LastUpdatedTimestamp = ts - m.AddToQueueTimestamp = ts - m.PeekFromQueueTimestamp = "" -} - -func (m *Message[T]) MarkAsRedrive(now time.Time) { - ts := clock.FormatRFC3339(now) - m.LastUpdatedTimestamp = ts - m.AddToQueueTimestamp = ts - m.QueueType = QueueTypeStandard - m.Status = StatusReady -} - func (m *Message[T]) ResetSystemInfo(now time.Time) { system := newDefaultSystemInfo(m.ID, now) m.Status = system.Status @@ -166,3 +135,73 @@ func (m *Message[T]) MarshalMapUnsafe() map[string]types.AttributeValue { item, _ := attributevalue.MarshalMap(m) return item } + +func (m *Message[T]) MarkAsRetry(now time.Time) error { + if m.Status != StatusProcessing { + return &InvalidStateTransitionError{ + Msg: "cannot mark as retry", + Operation: "MarkAsRetry", + Current: m.Status, + } + } + ts := clock.FormatRFC3339(now) + m.Status = StatusReady + m.LastUpdatedTimestamp = ts + return nil +} + +func (m *Message[T]) MarkAsPeeked(now time.Time, visibilityTimeout time.Duration) error { + if m.IsQueueSelected(now, visibilityTimeout) { + return &InvalidStateTransitionError{ + Msg: "message is currently being processed", + Operation: "MarkAsPeeked", + Current: m.Status, + } + } + ts := clock.FormatRFC3339(now) + m.Status = StatusProcessing + m.LastUpdatedTimestamp = ts + m.PeekFromQueueTimestamp = ts + return nil +} + +func (m *Message[T]) MarkAsDLQ(now time.Time) error { + if m.QueueType == QueueTypeDLQ { + return &InvalidStateTransitionError{ + Msg: "message is already in DLQ", + Operation: "MarkAsDLQ", + Current: m.Status, + } + } + ts := clock.FormatRFC3339(now) + m.QueueType = QueueTypeDLQ + m.Status = StatusReady + m.ReceiveCount = 0 + m.LastUpdatedTimestamp = ts + m.AddToQueueTimestamp = ts + m.PeekFromQueueTimestamp = "" + return nil +} + +func (m *Message[T]) MarkAsRedrive(now time.Time) error { + if m.QueueType != QueueTypeDLQ { + return &InvalidStateTransitionError{ + Msg: "can only redrive messages from DLQ", + Operation: "MarkAsRedrive", + Current: m.Status, + } + } + if m.Status != StatusReady { + return &InvalidStateTransitionError{ + Msg: "message status is not Ready", + Operation: "MarkAsRedrive", + Current: m.Status, + } + } + ts := clock.FormatRFC3339(now) + m.QueueType = QueueTypeStandard + m.Status = StatusReady + m.LastUpdatedTimestamp = ts + m.AddToQueueTimestamp = ts + return nil +} diff --git a/sdk.go b/sdk.go index bf7eca8..7c6c9bc 100644 --- a/sdk.go +++ b/sdk.go @@ -211,7 +211,7 @@ func (c *client[T]) Enqueue(ctx context.Context, id string, data T) (*EnqueueRes func (c *client[T]) Peek(ctx context.Context) (*PeekResult[T], error) { expr, err := expression.NewBuilder(). - WithKeyCondition(expression.Key("queue_type").Equal(expression.Value(QueueTypeStandard))). // FIXME + WithKeyCondition(expression.Key("queue_type").Equal(expression.Value(QueueTypeStandard))). // FIXME make DLQs peek-enabled. Build() if err != nil { return nil, &BuildingExpressionError{Cause: err} @@ -222,6 +222,7 @@ func (c *client[T]) Peek(ctx context.Context) (*PeekResult[T], error) { selectedVersion int recordForPeekIsFound bool ) + visibilityTimeout := time.Duration(c.visibilityTimeoutInMinutes) * time.Minute for { queryResult, err := c.dynamoDB.Query(ctx, &dynamodb.QueryInput{ IndexName: aws.String(QueueingIndexName), @@ -242,7 +243,6 @@ func (c *client[T]) Peek(ctx context.Context) (*PeekResult[T], error) { if err = attributevalue.UnmarshalMap(itemMap, &item); err != nil { return nil, &UnmarshalingAttributeError{Cause: err} } - visibilityTimeout := time.Duration(c.visibilityTimeoutInMinutes) * time.Minute isQueueSelected := item.IsQueueSelected(c.clock.Now(), visibilityTimeout) if c.useFIFO && isQueueSelected { goto ExitLoop @@ -262,11 +262,14 @@ ExitLoop: if selectedID == "" { return nil, &EmptyQueueError{} } - message, err := c.Get(ctx, selectedID) + message, err := c.Get(ctx, selectedID) // FIXME It may be more efficient to use items retrieved from the INDEX + if err != nil { + return nil, err + } + err = message.MarkAsPeeked(c.clock.Now(), visibilityTimeout) if err != nil { return nil, err } - message.MarkAsPeeked(c.clock.Now()) expr, err = expression.NewBuilder(). WithUpdate(expression. Add(expression.Name("version"), expression.Value(1)). @@ -303,7 +306,10 @@ func (c *client[T]) Retry(ctx context.Context, id string) (*RetryResult[T], erro if message == nil { return nil, &IDNotFoundError{} } - message.MarkAsRetry(c.clock.Now()) + err = message.MarkAsRetry(c.clock.Now()) + if err != nil { + return nil, err + } expr, err := expression.NewBuilder(). WithUpdate(expression. Add(expression.Name("version"), expression.Value(1)). @@ -363,7 +369,10 @@ func (c *client[T]) SendToDLQ(ctx context.Context, id string) (*Result, error) { Version: message.Version, }, nil } - message.MarkAsDLQ(c.clock.Now()) + err = message.MarkAsDLQ(c.clock.Now()) + if err != nil { + return nil, err + } expr, err := expression.NewBuilder(). WithUpdate(expression. Add(expression.Name("version"), expression.Value(1)). @@ -398,13 +407,10 @@ func (c *client[T]) Redrive(ctx context.Context, id string) (*Result, error) { if retrieved == nil { return nil, &IDNotFoundError{} } - if retrieved.QueueType == QueueTypeStandard { - return nil, &RecordNotConstructedError{} // FIXME Define more appropriately named errors. - } - if retrieved.Status == StatusProcessing { - return nil, &IllegalStateError{} // FIXME Define more appropriately named errors. + err = retrieved.MarkAsRedrive(c.clock.Now()) + if err != nil { + return nil, err } - retrieved.MarkAsRedrive(c.clock.Now()) expr, err := expression.NewBuilder(). WithUpdate(expression.Add( expression.Name("version"), diff --git a/sdk_test.go b/sdk_test.go index 62ce331..df8eb69 100644 --- a/sdk_test.go +++ b/sdk_test.go @@ -91,13 +91,19 @@ func newTestMessageItemAsReady(id string, now time.Time) *Message[test.MessageDa func newTestMessageItemAsPeeked(id string, now time.Time) *Message[test.MessageData] { message := NewDefaultMessage[test.MessageData](id, test.NewMessageData(id), now) - message.MarkAsPeeked(now) + err := message.MarkAsPeeked(now, 0) + if err != nil { + panic(err) + } return message } func newTestMessageItemAsDLQ(id string, now time.Time) *Message[test.MessageData] { message := NewDefaultMessage[test.MessageData](id, test.NewMessageData(id), now) - message.MarkAsDLQ(now) + err := message.MarkAsDLQ(now) + if err != nil { + panic(err) + } return message } @@ -242,7 +248,10 @@ func TestQueueSDKClientPeek(t *testing.T) { }, want: func() *PeekResult[test.MessageData] { s := newTestMessageItemAsReady("B-202", time.Date(2023, 12, 1, 0, 0, 0, 0, time.UTC)) - s.MarkAsPeeked(time.Date(2023, 12, 1, 0, 0, 10, 0, time.UTC)) + err := s.MarkAsPeeked(time.Date(2023, 12, 1, 0, 0, 10, 0, time.UTC), 0) + if err != nil { + panic(err) + } s.Version = 2 s.ReceiveCount = 1 r := &PeekResult[test.MessageData]{ @@ -275,7 +284,10 @@ func TestQueueSDKClientPeek(t *testing.T) { }, want: func() *PeekResult[test.MessageData] { s := newTestMessageItemAsPeeked("B-202", time.Date(2023, 12, 1, 0, 0, 0, 0, time.UTC)) - s.MarkAsPeeked(time.Date(2023, 12, 1, 0, 1, 1, 0, time.UTC)) + err := s.MarkAsPeeked(time.Date(2023, 12, 1, 0, 1, 1, 0, time.UTC), 0) + if err != nil { + panic(err) + } s.Version = 2 s.ReceiveCount = 1 r := &PeekResult[test.MessageData]{ @@ -371,7 +383,10 @@ func TestQueueSDKClientPeekUseFIFO(t *testing.T) { want1 := func() *PeekResult[test.MessageData] { s := newTestMessageItemAsReady("A-303", time.Date(2023, 12, 1, 0, 0, 1, 0, time.UTC)) - s.MarkAsPeeked(now) + err := s.MarkAsPeeked(now, 0) + if err != nil { + panic(err) + } s.Version = 2 s.ReceiveCount = 1 r := &PeekResult[test.MessageData]{ @@ -410,7 +425,10 @@ func TestQueueSDKClientPeekUseFIFO(t *testing.T) { want2 := func() *PeekResult[test.MessageData] { s := newTestMessageItemAsReady("A-202", time.Date(2023, 12, 1, 0, 0, 2, 0, time.UTC)) - s.MarkAsPeeked(now) + err := s.MarkAsPeeked(now, 0) + if err != nil { + panic(err) + } s.Version = 2 s.ReceiveCount = 1 r := &PeekResult[test.MessageData]{ @@ -450,7 +468,10 @@ func TestQueueSDKClientPeekUseFIFO(t *testing.T) { want3 := func() *PeekResult[test.MessageData] { s := newTestMessageItemAsReady("A-101", time.Date(2023, 12, 1, 0, 0, 3, 0, time.UTC)) - s.MarkAsPeeked(now) + err := s.MarkAsPeeked(now, 0) + if err != nil { + panic(err) + } s.Version = 2 s.ReceiveCount = 1 r := &PeekResult[test.MessageData]{ @@ -524,7 +545,10 @@ func TestQueueSDKClientPeekNotUseFIFO(t *testing.T) { want1 := func() *PeekResult[test.MessageData] { s := newTestMessageItemAsReady("A-303", time.Date(2023, 12, 1, 0, 0, 1, 0, time.UTC)) - s.MarkAsPeeked(now) + err := s.MarkAsPeeked(now, 0) + if err != nil { + panic(err) + } s.Version = 2 s.ReceiveCount = 1 r := &PeekResult[test.MessageData]{ @@ -553,7 +577,10 @@ func TestQueueSDKClientPeekNotUseFIFO(t *testing.T) { want2 := func() *PeekResult[test.MessageData] { s := newTestMessageItemAsReady("A-202", time.Date(2023, 12, 1, 0, 0, 2, 0, time.UTC)) - s.MarkAsPeeked(now) + err := s.MarkAsPeeked(now, 0) + if err != nil { + panic(err) + } s.Version = 2 s.ReceiveCount = 1 r := &PeekResult[test.MessageData]{ @@ -582,7 +609,10 @@ func TestQueueSDKClientPeekNotUseFIFO(t *testing.T) { want3 := func() *PeekResult[test.MessageData] { s := newTestMessageItemAsReady("A-101", time.Date(2023, 12, 1, 0, 0, 3, 0, time.UTC)) - s.MarkAsPeeked(now) + err := s.MarkAsPeeked(now, 0) + if err != nil { + panic(err) + } s.Version = 2 s.ReceiveCount = 1 r := &PeekResult[test.MessageData]{ @@ -663,7 +693,7 @@ func TestQueueSDKClientRetry(t *testing.T) { setup: func(t *testing.T) (*dynamodb.Client, func()) { return setupDynamoDB(t, &types.PutRequest{ - Item: newTestMessageItemAsReady("A-101", time.Date(2023, 12, 1, 0, 0, 10, 0, time.UTC)).MarshalMapUnsafe(), + Item: newTestMessageItemAsPeeked("A-101", time.Date(2023, 12, 1, 0, 0, 10, 0, time.UTC)).MarshalMapUnsafe(), }, ) }, @@ -681,8 +711,11 @@ func TestQueueSDKClientRetry(t *testing.T) { Version: 2, }, Message: func() *Message[test.MessageData] { - message := newTestMessageItemAsReady("A-101", time.Date(2023, 12, 1, 0, 0, 10, 0, time.UTC)) - message.MarkAsRetry(time.Date(2023, 12, 1, 0, 0, 10, 0, time.UTC)) + message := newTestMessageItemAsPeeked("A-101", time.Date(2023, 12, 1, 0, 0, 10, 0, time.UTC)) + err := message.MarkAsRetry(time.Date(2023, 12, 1, 0, 0, 10, 0, time.UTC)) + if err != nil { + panic(err) + } message.Version = 2 return message }(), @@ -880,9 +913,12 @@ func TestQueueSDKClientSendToDLQ(t *testing.T) { id: "A-101", }, want: func() *Result { - s := newTestMessageItemAsDLQ("A-101", + s := newTestMessageItemAsReady("A-101", time.Date(2023, 12, 1, 0, 0, 0, 0, time.UTC)) - s.MarkAsDLQ(time.Date(2023, 12, 1, 0, 0, 10, 0, time.UTC)) + err := s.MarkAsDLQ(time.Date(2023, 12, 1, 0, 0, 10, 0, time.UTC)) + if err != nil { + panic(err) + } s.Version = 2 r := &Result{ ID: s.ID, @@ -967,39 +1003,6 @@ func TestQueueSDKClientRedrive(t *testing.T) { want: nil, wantErr: &IDNotFoundError{}, }, - { - name: "should return RecordNotConstructedError", - setup: func(t *testing.T) (*dynamodb.Client, func()) { - return setupDynamoDB(t, - &types.PutRequest{ - Item: newTestMessageItemAsReady("A-101", clock.Now()).MarshalMapUnsafe(), - }, - ) - }, - args: args{ - id: "A-101", - }, - want: nil, - wantErr: &RecordNotConstructedError{}, - }, - { - name: "should return IllegalStateError", - setup: func(t *testing.T) (*dynamodb.Client, func()) { - return setupDynamoDB(t, func() *types.PutRequest { - msg := newTestMessageItemAsDLQ("A-101", clock.Now()) - msg.MarkAsPeeked(clock.Now()) - return &types.PutRequest{ - Item: msg.MarshalMapUnsafe(), - } - }()) - }, - args: args{ - id: "A-101", - }, - want: nil, - wantErr: &IllegalStateError{}, - }, - { name: "should redrive succeeds", setup: func(t *testing.T) (*dynamodb.Client, func()) {