Skip to content

Commit

Permalink
feat: implement validation of message state transitions
Browse files Browse the repository at this point in the history
  • Loading branch information
vvatanabe committed Oct 27, 2023
1 parent fe72151 commit 6308c9c
Show file tree
Hide file tree
Showing 4 changed files with 149 additions and 103 deletions.
22 changes: 10 additions & 12 deletions error.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,18 +28,6 @@ func (e ConditionalCheckFailedError) Error() string {
return fmt.Sprintf("Condition on the 'version' attribute has failed: %v", e.Cause)
}

type RecordNotConstructedError struct{}

func (e RecordNotConstructedError) Error() string {
return "Message record not yet fully constructed .. cannot execute API."
}

type IllegalStateError struct{}

func (e IllegalStateError) Error() string {
return "Illegal state, cannot proceed."
}

type BuildingExpressionError struct {
Cause error
}
Expand Down Expand Up @@ -77,3 +65,13 @@ type EmptyQueueError struct{}
func (e EmptyQueueError) Error() string {
return "Cannot proceed, queue is empty."
}

type InvalidStateTransitionError struct {
Msg string
Operation string
Current Status
}

func (e *InvalidStateTransitionError) Error() string {
return fmt.Sprintf("operation %s failed for status %s: %s", e.Operation, e.Current, e.Msg)
}
101 changes: 70 additions & 31 deletions message.go
Original file line number Diff line number Diff line change
Expand Up @@ -91,37 +91,6 @@ func (m *Message[T]) IsDLQ() bool {
return m.QueueType == QueueTypeDLQ
}

func (m *Message[T]) MarkAsRetry(now time.Time) {
ts := clock.FormatRFC3339(now)
m.LastUpdatedTimestamp = ts
m.Status = StatusReady
}

func (m *Message[T]) MarkAsPeeked(now time.Time) {
ts := clock.FormatRFC3339(now)
m.LastUpdatedTimestamp = ts
m.PeekFromQueueTimestamp = ts
m.Status = StatusProcessing
}

func (m *Message[T]) MarkAsDLQ(now time.Time) {
ts := clock.FormatRFC3339(now)
m.QueueType = QueueTypeDLQ
m.Status = StatusReady
m.ReceiveCount = 0
m.LastUpdatedTimestamp = ts
m.AddToQueueTimestamp = ts
m.PeekFromQueueTimestamp = ""
}

func (m *Message[T]) MarkAsRedrive(now time.Time) {
ts := clock.FormatRFC3339(now)
m.LastUpdatedTimestamp = ts
m.AddToQueueTimestamp = ts
m.QueueType = QueueTypeStandard
m.Status = StatusReady
}

func (m *Message[T]) ResetSystemInfo(now time.Time) {
system := newDefaultSystemInfo(m.ID, now)
m.Status = system.Status
Expand Down Expand Up @@ -166,3 +135,73 @@ func (m *Message[T]) MarshalMapUnsafe() map[string]types.AttributeValue {
item, _ := attributevalue.MarshalMap(m)
return item
}

func (m *Message[T]) MarkAsRetry(now time.Time) error {
if m.Status != StatusProcessing {
return &InvalidStateTransitionError{
Msg: "cannot mark as retry",
Operation: "MarkAsRetry",
Current: m.Status,
}
}
ts := clock.FormatRFC3339(now)
m.Status = StatusReady
m.LastUpdatedTimestamp = ts
return nil
}

func (m *Message[T]) MarkAsPeeked(now time.Time, visibilityTimeout time.Duration) error {
if m.IsQueueSelected(now, visibilityTimeout) {
return &InvalidStateTransitionError{
Msg: "message is currently being processed",
Operation: "MarkAsPeeked",
Current: m.Status,
}
}
ts := clock.FormatRFC3339(now)
m.Status = StatusProcessing
m.LastUpdatedTimestamp = ts
m.PeekFromQueueTimestamp = ts
return nil
}

func (m *Message[T]) MarkAsDLQ(now time.Time) error {
if m.QueueType == QueueTypeDLQ {
return &InvalidStateTransitionError{
Msg: "message is already in DLQ",
Operation: "MarkAsDLQ",
Current: m.Status,
}
}
ts := clock.FormatRFC3339(now)
m.QueueType = QueueTypeDLQ
m.Status = StatusReady
m.ReceiveCount = 0
m.LastUpdatedTimestamp = ts
m.AddToQueueTimestamp = ts
m.PeekFromQueueTimestamp = ""
return nil
}

func (m *Message[T]) MarkAsRedrive(now time.Time) error {
if m.QueueType != QueueTypeDLQ {
return &InvalidStateTransitionError{
Msg: "can only redrive messages from DLQ",
Operation: "MarkAsRedrive",
Current: m.Status,
}
}
if m.Status != StatusReady {
return &InvalidStateTransitionError{
Msg: "message status is not Ready",
Operation: "MarkAsRedrive",
Current: m.Status,
}
}
ts := clock.FormatRFC3339(now)
m.QueueType = QueueTypeStandard
m.Status = StatusReady
m.LastUpdatedTimestamp = ts
m.AddToQueueTimestamp = ts
return nil
}
30 changes: 18 additions & 12 deletions sdk.go
Original file line number Diff line number Diff line change
Expand Up @@ -211,7 +211,7 @@ func (c *client[T]) Enqueue(ctx context.Context, id string, data T) (*EnqueueRes

func (c *client[T]) Peek(ctx context.Context) (*PeekResult[T], error) {
expr, err := expression.NewBuilder().
WithKeyCondition(expression.Key("queue_type").Equal(expression.Value(QueueTypeStandard))). // FIXME
WithKeyCondition(expression.Key("queue_type").Equal(expression.Value(QueueTypeStandard))). // FIXME make DLQs peek-enabled.
Build()
if err != nil {
return nil, &BuildingExpressionError{Cause: err}
Expand All @@ -222,6 +222,7 @@ func (c *client[T]) Peek(ctx context.Context) (*PeekResult[T], error) {
selectedVersion int
recordForPeekIsFound bool
)
visibilityTimeout := time.Duration(c.visibilityTimeoutInMinutes) * time.Minute
for {
queryResult, err := c.dynamoDB.Query(ctx, &dynamodb.QueryInput{
IndexName: aws.String(QueueingIndexName),
Expand All @@ -242,7 +243,6 @@ func (c *client[T]) Peek(ctx context.Context) (*PeekResult[T], error) {
if err = attributevalue.UnmarshalMap(itemMap, &item); err != nil {
return nil, &UnmarshalingAttributeError{Cause: err}
}
visibilityTimeout := time.Duration(c.visibilityTimeoutInMinutes) * time.Minute
isQueueSelected := item.IsQueueSelected(c.clock.Now(), visibilityTimeout)
if c.useFIFO && isQueueSelected {
goto ExitLoop
Expand All @@ -262,11 +262,14 @@ ExitLoop:
if selectedID == "" {
return nil, &EmptyQueueError{}
}
message, err := c.Get(ctx, selectedID)
message, err := c.Get(ctx, selectedID) // FIXME It may be more efficient to use items retrieved from the INDEX
if err != nil {
return nil, err
}
err = message.MarkAsPeeked(c.clock.Now(), visibilityTimeout)
if err != nil {
return nil, err
}
message.MarkAsPeeked(c.clock.Now())
expr, err = expression.NewBuilder().
WithUpdate(expression.
Add(expression.Name("version"), expression.Value(1)).
Expand Down Expand Up @@ -303,7 +306,10 @@ func (c *client[T]) Retry(ctx context.Context, id string) (*RetryResult[T], erro
if message == nil {
return nil, &IDNotFoundError{}
}
message.MarkAsRetry(c.clock.Now())
err = message.MarkAsRetry(c.clock.Now())
if err != nil {
return nil, err
}
expr, err := expression.NewBuilder().
WithUpdate(expression.
Add(expression.Name("version"), expression.Value(1)).
Expand Down Expand Up @@ -363,7 +369,10 @@ func (c *client[T]) SendToDLQ(ctx context.Context, id string) (*Result, error) {
Version: message.Version,
}, nil
}
message.MarkAsDLQ(c.clock.Now())
err = message.MarkAsDLQ(c.clock.Now())
if err != nil {
return nil, err
}
expr, err := expression.NewBuilder().
WithUpdate(expression.
Add(expression.Name("version"), expression.Value(1)).
Expand Down Expand Up @@ -398,13 +407,10 @@ func (c *client[T]) Redrive(ctx context.Context, id string) (*Result, error) {
if retrieved == nil {
return nil, &IDNotFoundError{}
}
if retrieved.QueueType == QueueTypeStandard {
return nil, &RecordNotConstructedError{} // FIXME Define more appropriately named errors.
}
if retrieved.Status == StatusProcessing {
return nil, &IllegalStateError{} // FIXME Define more appropriately named errors.
err = retrieved.MarkAsRedrive(c.clock.Now())
if err != nil {
return nil, err
}
retrieved.MarkAsRedrive(c.clock.Now())
expr, err := expression.NewBuilder().
WithUpdate(expression.Add(
expression.Name("version"),
Expand Down
Loading

0 comments on commit 6308c9c

Please sign in to comment.