Skip to content

Commit

Permalink
refactor: Rename QueueSDKClient to Client, update constructor name, a…
Browse files Browse the repository at this point in the history
…nd modify function options for Client and Consumer
  • Loading branch information
vvatanabe committed Oct 26, 2023
1 parent 5f07c28 commit fe72151
Show file tree
Hide file tree
Showing 5 changed files with 94 additions and 98 deletions.
2 changes: 1 addition & 1 deletion cmd/dynamomq/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@ func main() {
fmt.Printf("endpoint is: [%s]\n", *endpoint)
fmt.Println("")

client, err := dynamomq.NewQueueSDKClient[any](context.Background(),
client, err := dynamomq.NewFromConfig[any](context.Background(),
dynamomq.WithAWSRegion(*region),
dynamomq.WithAWSCredentialsProfileName(*credentialsProfile),
dynamomq.WithTableName(*tableName),
Expand Down
26 changes: 12 additions & 14 deletions consumer.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,33 +15,31 @@ const (
defaultMaximumReceives = 0 // unlimited
)

type ConsumerOption func(o *Options)

func WithPollingInterval(pollingInterval time.Duration) ConsumerOption {
return func(o *Options) {
func WithPollingInterval(pollingInterval time.Duration) func(o *ConsumerOptions) {
return func(o *ConsumerOptions) {
o.PollingInterval = pollingInterval
}
}

func WithMaximumReceives(maximumReceives int) ConsumerOption {
return func(o *Options) {
func WithMaximumReceives(maximumReceives int) func(o *ConsumerOptions) {
return func(o *ConsumerOptions) {
o.MaximumReceives = maximumReceives
}
}

func WithErrorLog(errorLog *log.Logger) ConsumerOption {
return func(o *Options) {
func WithErrorLog(errorLog *log.Logger) func(o *ConsumerOptions) {
return func(o *ConsumerOptions) {
o.ErrorLog = errorLog
}
}

func WithOnShutdown(onShutdown []func()) ConsumerOption {
return func(o *Options) {
func WithOnShutdown(onShutdown []func()) func(o *ConsumerOptions) {
return func(o *ConsumerOptions) {
o.OnShutdown = onShutdown
}
}

type Options struct {
type ConsumerOptions struct {
PollingInterval time.Duration
MaximumReceives int
// errorLog specifies an optional logger for errors accepting
Expand All @@ -52,8 +50,8 @@ type Options struct {
OnShutdown []func()
}

func NewConsumer[T any](client QueueSDKClient[T], processor MessageProcessor[T], opts ...ConsumerOption) *Consumer[T] {
o := &Options{
func NewConsumer[T any](client Client[T], processor MessageProcessor[T], opts ...func(o *ConsumerOptions)) *Consumer[T] {
o := &ConsumerOptions{
PollingInterval: defaultPollingInterval,
MaximumReceives: defaultMaximumReceives,
}
Expand All @@ -80,7 +78,7 @@ type MessageProcessor[T any] interface {
}

type Consumer[T any] struct {
client QueueSDKClient[T]
client Client[T]
messageProcessor MessageProcessor[T]

pollingInterval time.Duration
Expand Down
4 changes: 2 additions & 2 deletions internal/cli/cli.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ type CLI struct {
CredentialsProfile string
TableName string

Client dynamomq.QueueSDKClient[any]
Client dynamomq.Client[any]
Message *dynamomq.Message[any]
}

Expand Down Expand Up @@ -107,7 +107,7 @@ func (c *CLI) aws(ctx context.Context, params []string) {
if endpoint != "" {
c.BaseEndpoint = endpoint
}
client, err := dynamomq.NewQueueSDKClient[any](ctx,
client, err := dynamomq.NewFromConfig[any](ctx,
dynamomq.WithAWSRegion(c.Region),
dynamomq.WithAWSCredentialsProfileName(profile),
dynamomq.WithTableName(c.TableName),
Expand Down
88 changes: 43 additions & 45 deletions sdk.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ const (
DefaultVisibilityTimeoutInMinutes = 1
)

type QueueSDKClient[T any] interface {
type Client[T any] interface {
Enqueue(ctx context.Context, id string, data T) (*EnqueueResult[T], error)
Peek(ctx context.Context) (*PeekResult[T], error)
Retry(ctx context.Context, id string) (*RetryResult[T], error)
Expand All @@ -46,7 +46,7 @@ type QueueSDKClient[T any] interface {
GetDynamodbClient() *dynamodb.Client
}

type queueSDKClient[T any] struct {
type client[T any] struct {
dynamoDB *dynamodb.Client

tableName string
Expand All @@ -63,7 +63,7 @@ type queueSDKClient[T any] struct {
clock clock.Clock
}

type options struct {
type ClientOptions struct {
tableName string
awsRegion string
awsCredentialsProfileName string
Expand All @@ -77,64 +77,62 @@ type options struct {
clock clock.Clock
}

type Option func(*options)

func WithTableName(tableName string) Option {
return func(s *options) {
func WithTableName(tableName string) func(*ClientOptions) {
return func(s *ClientOptions) {
s.tableName = tableName
}
}

func WithAWSRegion(awsRegion string) Option {
return func(s *options) {
func WithAWSRegion(awsRegion string) func(*ClientOptions) {
return func(s *ClientOptions) {
s.awsRegion = awsRegion
}
}

func WithAWSCredentialsProfileName(awsCredentialsProfileName string) Option {
return func(s *options) {
func WithAWSCredentialsProfileName(awsCredentialsProfileName string) func(*ClientOptions) {
return func(s *ClientOptions) {
s.awsCredentialsProfileName = awsCredentialsProfileName
}
}

func WithAWSCredentialsProvider(credentialsProvider aws.CredentialsProvider) Option {
return func(s *options) {
func WithAWSCredentialsProvider(credentialsProvider aws.CredentialsProvider) func(*ClientOptions) {
return func(s *ClientOptions) {
s.credentialsProvider = credentialsProvider
}
}

func WithAWSBaseEndpoint(baseEndpoint string) Option {
return func(s *options) {
func WithAWSBaseEndpoint(baseEndpoint string) func(*ClientOptions) {
return func(s *ClientOptions) {
s.baseEndpoint = baseEndpoint
}
}

func WithAWSRetryMaxAttempts(retryMaxAttempts int) Option {
return func(s *options) {
func WithAWSRetryMaxAttempts(retryMaxAttempts int) func(*ClientOptions) {
return func(s *ClientOptions) {
s.retryMaxAttempts = retryMaxAttempts
}
}

func WithAWSVisibilityTimeout(minutes int) Option {
return func(s *options) {
func WithAWSVisibilityTimeout(minutes int) func(*ClientOptions) {
return func(s *ClientOptions) {
s.visibilityTimeoutInMinutes = minutes
}
}

func WithUseFIFO(useFIFO bool) Option {
return func(s *options) {
func WithUseFIFO(useFIFO bool) func(*ClientOptions) {
return func(s *ClientOptions) {
s.useFIFO = useFIFO
}
}

func WithAWSDynamoDBClient(client *dynamodb.Client) Option {
return func(s *options) {
func WithAWSDynamoDBClient(client *dynamodb.Client) func(*ClientOptions) {
return func(s *ClientOptions) {
s.dynamoDB = client
}
}

func NewQueueSDKClient[T any](ctx context.Context, opts ...Option) (QueueSDKClient[T], error) {
o := &options{
func NewFromConfig[T any](ctx context.Context, optFns ...func(*ClientOptions)) (Client[T], error) {
o := &ClientOptions{
tableName: DefaultTableName,
awsRegion: AwsRegionDefault,
awsCredentialsProfileName: AwsProfileDefault,
Expand All @@ -143,10 +141,10 @@ func NewQueueSDKClient[T any](ctx context.Context, opts ...Option) (QueueSDKClie
useFIFO: false,
clock: &clock.RealClock{},
}
for _, opt := range opts {
for _, opt := range optFns {
opt(o)
}
c := &queueSDKClient[T]{
c := &client[T]{
tableName: o.tableName,
awsRegion: o.awsRegion,
awsCredentialsProfileName: o.awsCredentialsProfileName,
Expand Down Expand Up @@ -187,7 +185,7 @@ func NewQueueSDKClient[T any](ctx context.Context, opts ...Option) (QueueSDKClie
return c, nil
}

func (c *queueSDKClient[T]) Enqueue(ctx context.Context, id string, data T) (*EnqueueResult[T], error) {
func (c *client[T]) Enqueue(ctx context.Context, id string, data T) (*EnqueueResult[T], error) {
retrieved, err := c.Get(ctx, id)
if err != nil {
return nil, err
Expand All @@ -211,7 +209,7 @@ func (c *queueSDKClient[T]) Enqueue(ctx context.Context, id string, data T) (*En
}, nil
}

func (c *queueSDKClient[T]) Peek(ctx context.Context) (*PeekResult[T], error) {
func (c *client[T]) Peek(ctx context.Context) (*PeekResult[T], error) {
expr, err := expression.NewBuilder().
WithKeyCondition(expression.Key("queue_type").Equal(expression.Value(QueueTypeStandard))). // FIXME
Build()
Expand Down Expand Up @@ -297,7 +295,7 @@ ExitLoop:
}, nil
}

func (c *queueSDKClient[T]) Retry(ctx context.Context, id string) (*RetryResult[T], error) {
func (c *client[T]) Retry(ctx context.Context, id string) (*RetryResult[T], error) {
message, err := c.Get(ctx, id)
if err != nil {
return nil, err
Expand Down Expand Up @@ -331,7 +329,7 @@ func (c *queueSDKClient[T]) Retry(ctx context.Context, id string) (*RetryResult[
}, nil
}

func (c *queueSDKClient[T]) Delete(ctx context.Context, id string) error {
func (c *client[T]) Delete(ctx context.Context, id string) error {
if id == "" {
return &IDNotProvidedError{}
}
Expand All @@ -349,7 +347,7 @@ func (c *queueSDKClient[T]) Delete(ctx context.Context, id string) error {
return nil
}

func (c *queueSDKClient[T]) SendToDLQ(ctx context.Context, id string) (*Result, error) {
func (c *client[T]) SendToDLQ(ctx context.Context, id string) (*Result, error) {
message, err := c.Get(ctx, id)
if err != nil {
return nil, err
Expand Down Expand Up @@ -392,7 +390,7 @@ func (c *queueSDKClient[T]) SendToDLQ(ctx context.Context, id string) (*Result,
}, nil
}

func (c *queueSDKClient[T]) Redrive(ctx context.Context, id string) (*Result, error) {
func (c *client[T]) Redrive(ctx context.Context, id string) (*Result, error) {
retrieved, err := c.Get(ctx, id)
if err != nil {
return nil, err
Expand Down Expand Up @@ -460,7 +458,7 @@ func (c *queueSDKClient[T]) Redrive(ctx context.Context, id string) (*Result, er
//
// Note: The function uses pagination to query the DynamoDB table and will continue querying
// until all records have been fetched or an error occurs.
func (c *queueSDKClient[T]) GetQueueStats(ctx context.Context) (*QueueStats, error) {
func (c *client[T]) GetQueueStats(ctx context.Context) (*QueueStats, error) {
expr, err := expression.NewBuilder().
WithKeyCondition(expression.KeyEqual(expression.Key("queue_type"), expression.Value(QueueTypeStandard))).
Build()
Expand Down Expand Up @@ -532,7 +530,7 @@ func (c *queueSDKClient[T]) GetQueueStats(ctx context.Context) (*QueueStats, err
//
// Note: The function uses pagination to query the DynamoDB table and will continue querying
// until all records have been fetched or an error occurs.
func (c *queueSDKClient[T]) GetDLQStats(ctx context.Context) (*DLQStats, error) {
func (c *client[T]) GetDLQStats(ctx context.Context) (*DLQStats, error) {
expr, err := expression.NewBuilder().
WithKeyCondition(expression.KeyEqual(expression.Key("queue_type"), expression.Value(QueueTypeDLQ))).
Build()
Expand Down Expand Up @@ -592,7 +590,7 @@ func (c *queueSDKClient[T]) GetDLQStats(ctx context.Context) (*DLQStats, error)
// - (error): An error if any occurred during the retrieval process, including
// if the 'id' is empty, the database query fails, or unmarshaling the response
// fails.
func (c *queueSDKClient[T]) Get(ctx context.Context, id string) (*Message[T], error) {
func (c *client[T]) Get(ctx context.Context, id string) (*Message[T], error) {
if id == "" {
return nil, &IDNotProvidedError{}
}
Expand Down Expand Up @@ -627,7 +625,7 @@ func (c *queueSDKClient[T]) Get(ctx context.Context, id string) (*Message[T], er
//
// Returns:
// - error: Returns an error if one occurs, otherwise, it returns nil on successful storage.
func (c *queueSDKClient[T]) Put(ctx context.Context, message *Message[T]) error {
func (c *client[T]) Put(ctx context.Context, message *Message[T]) error {
retrieved, err := c.Get(ctx, message.ID)
if err != nil {
return err
Expand Down Expand Up @@ -656,7 +654,7 @@ func (c *queueSDKClient[T]) Put(ctx context.Context, message *Message[T]) error
//
// Returns:
// - error: Returns an error if one occurs, otherwise, it returns nil on successful upsert.
func (c *queueSDKClient[T]) Upsert(ctx context.Context, message *Message[T]) error {
func (c *client[T]) Upsert(ctx context.Context, message *Message[T]) error {
retrieved, err := c.Get(ctx, message.ID)
if err != nil {
return err
Expand All @@ -668,7 +666,7 @@ func (c *queueSDKClient[T]) Upsert(ctx context.Context, message *Message[T]) err
return c.put(ctx, message)
}

func (c *queueSDKClient[T]) put(ctx context.Context, message *Message[T]) error {
func (c *client[T]) put(ctx context.Context, message *Message[T]) error {
item, err := message.MarshalMap()
if err != nil {
return err
Expand Down Expand Up @@ -701,7 +699,7 @@ func (c *queueSDKClient[T]) put(ctx context.Context, message *Message[T]) error
// - If there's an error while building the DynamoDB expression, this error is returned.
// - If there's an error unmarshalling the DynamoDB response, this error is returned.
// Otherwise, if the operation succeeds, the error will be 'nil'.
func (c *queueSDKClient[T]) Touch(ctx context.Context, id string) (*Result, error) {
func (c *client[T]) Touch(ctx context.Context, id string) (*Result, error) {
message, err := c.Get(ctx, id)
if err != nil {
return nil, err
Expand Down Expand Up @@ -742,7 +740,7 @@ func (c *queueSDKClient[T]) Touch(ctx context.Context, id string) (*Result, erro
// Returns:
// - A slice of pointers to Message if found.
// - error if there's any issue in the operation.
func (c *queueSDKClient[T]) List(ctx context.Context, size int32) ([]*Message[T], error) {
func (c *client[T]) List(ctx context.Context, size int32) ([]*Message[T], error) {
output, err := c.dynamoDB.Scan(ctx, &dynamodb.ScanInput{
TableName: &c.tableName,
Limit: aws.Int32(size),
Expand All @@ -769,7 +767,7 @@ func (c *queueSDKClient[T]) List(ctx context.Context, size int32) ([]*Message[T]
// Returns:
// - A slice of string IDs if found.
// - error if there's any issue in the operation.
func (c *queueSDKClient[T]) ListIDs(ctx context.Context, size int32) ([]string, error) {
func (c *client[T]) ListIDs(ctx context.Context, size int32) ([]string, error) {
messages, err := c.List(ctx, size)
if err != nil {
return nil, err
Expand All @@ -795,7 +793,7 @@ func (c *queueSDKClient[T]) ListIDs(ctx context.Context, size int32) ([]string,
// Returns:
// - A slice of extended ID strings if found.
// - error if there's any issue in the operation.
func (c *queueSDKClient[T]) ListExtendedIDs(ctx context.Context, size int32) ([]string, error) {
func (c *client[T]) ListExtendedIDs(ctx context.Context, size int32) ([]string, error) {
messages, err := c.List(ctx, size)
if err != nil {
return nil, err
Expand All @@ -809,11 +807,11 @@ func (c *queueSDKClient[T]) ListExtendedIDs(ctx context.Context, size int32) ([]
return extendedIDs, nil
}

func (c *queueSDKClient[T]) GetDynamodbClient() *dynamodb.Client {
func (c *client[T]) GetDynamodbClient() *dynamodb.Client {
return c.dynamoDB
}

func (c *queueSDKClient[T]) updateDynamoDBItem(ctx context.Context,
func (c *client[T]) updateDynamoDBItem(ctx context.Context,
id string, expr *expression.Expression) (*Message[T], error) {
outcome, err := c.dynamoDB.UpdateItem(ctx, &dynamodb.UpdateItemInput{
Key: map[string]types.AttributeValue{
Expand Down
Loading

0 comments on commit fe72151

Please sign in to comment.