Skip to content

Commit

Permalink
-Refactor code
Browse files Browse the repository at this point in the history
  • Loading branch information
vinhkute1996 committed Jan 31, 2021
1 parent 2e83a97 commit b7e5cd5
Show file tree
Hide file tree
Showing 3 changed files with 8 additions and 8 deletions.
10 changes: 5 additions & 5 deletions consumer.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ func NewConsumer(client *sqs.SQS, queueURL string, ackOnConsume bool, visibility
return &Consumer{Client: client, QueueURL: &queueURL, AckOnConsume: ackOnConsume, VisibilityTimeout: visibilityTimeout, WaitTimeSeconds: waitTimeSeconds}
}

func (c *Consumer) Consume(ctx context.Context, caller mq.ConsumerCaller) {
func (c *Consumer) Consume(ctx context.Context, handle func(context.Context, *mq.Message, error) error) {
result, er1 := c.Client.ReceiveMessage(&sqs.ReceiveMessageInput{
AttributeNames: []*string{
aws.String(sqs.MessageSystemAttributeNameSentTimestamp),
Expand All @@ -42,7 +42,7 @@ func (c *Consumer) Consume(ctx context.Context, caller mq.ConsumerCaller) {
WaitTimeSeconds: aws.Int64(c.WaitTimeSeconds),
})
if er1 != nil {
caller.Call(ctx, nil, er1)
handle(ctx, nil, er1)
} else {
if len(result.Messages) > 0 {
m := result.Messages[0]
Expand All @@ -60,12 +60,12 @@ func (c *Consumer) Consume(ctx context.Context, caller mq.ConsumerCaller) {
ReceiptHandle: result.Messages[0].ReceiptHandle,
})
if er2 != nil {
caller.Call(ctx, nil, er2)
handle(ctx, nil, er2)
} else {
caller.Call(ctx, &message, nil)
handle(ctx, &message, nil)
}
} else {
caller.Call(ctx, &message, nil)
handle(ctx, &message, nil)
}
}
}
Expand Down
4 changes: 2 additions & 2 deletions func.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,10 +15,10 @@ func GetQueueUrl(client *sqs.SQS, queueName string) (string, error) {
return *result.QueueUrl, err
}

func MapToAttributes(messageAttributes *map[string]string) map[string]*sqs.MessageAttributeValue {
func MapToAttributes(messageAttributes map[string]string) map[string]*sqs.MessageAttributeValue {
attributes := make(map[string]*sqs.MessageAttributeValue)
if messageAttributes != nil {
for k, v := range *messageAttributes {
for k, v := range messageAttributes {
x := sqs.MessageAttributeValue{
DataType: aws.String("String"),
StringValue: aws.String(v),
Expand Down
2 changes: 1 addition & 1 deletion producer.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ func NewProducer(client *sqs.SQS, queueURL string, delaySeconds *int64) *Produce
return &Producer{Client: client, QueueURL: &queueURL, DelaySeconds: delaySeconds}
}

func (p *Producer) Produce(ctx context.Context, data []byte, messageAttributes *map[string]string) (string, error) {
func (p *Producer) Produce(ctx context.Context, data []byte, messageAttributes map[string]string) (string, error) {
attributes := MapToAttributes(messageAttributes)
s := string(data)
result, err := p.Client.SendMessage(&sqs.SendMessageInput{
Expand Down

0 comments on commit b7e5cd5

Please sign in to comment.