Skip to content

Commit fb5670b

Browse files
added multiple checks (sqs src/trg, kinesis trg)
1 parent 4807ae4 commit fb5670b

File tree

7 files changed

+36
-39
lines changed

7 files changed

+36
-39
lines changed

Diff for: pkg/source/kinesis/kinesis_source_test.go

-4
Original file line numberDiff line numberDiff line change
@@ -290,10 +290,6 @@ func TestGetSource_ConfigErrorLeaderAction(t *testing.T) {
290290

291291
assert.Nil(source)
292292
assert.EqualError(err, `Failed to create Kinsumer client: leaderActionFrequency config value is mandatory and must be at least as long as ShardCheckFrequency`)
293-
294-
if testing.Short() {
295-
t.Skip("skipping integration test")
296-
}
297293
}
298294

299295
func TestGetSource_ConfigErrorMaxAge(t *testing.T) {

Diff for: pkg/source/sqs/sqs_source.go

+5
Original file line numberDiff line numberDiff line change
@@ -115,6 +115,11 @@ var ConfigPair = sourceconfig.ConfigPair{
115115
// newSQSSourceWithInterfaces allows you to provide an SQS client directly to allow
116116
// for mocking and localstack usage
117117
func newSQSSourceWithInterfaces(client sqsiface.SQSAPI, awsAccountID string, concurrentWrites int, region string, queueName string) (*sqsSource, error) {
118+
_, err := client.GetQueueUrl(&sqs.GetQueueUrlInput{QueueName: &queueName})
119+
if err != nil {
120+
return nil, errors.Wrap(err, `Could not connect to SQS`)
121+
}
122+
118123
return &sqsSource{
119124
client: client,
120125
queueName: queueName,

Diff for: pkg/source/sqs/sqs_source_test.go

+7-8
Original file line numberDiff line numberDiff line change
@@ -11,6 +11,7 @@ import (
1111
"os"
1212
"path/filepath"
1313
"reflect"
14+
"strings"
1415
"testing"
1516
"time"
1617

@@ -30,8 +31,8 @@ func TestMain(m *testing.M) {
3031
os.Exit(exitVal)
3132
}
3233

33-
// TestNewSqsSource_ConnectionCheck tests that the SQS source fails on start-up if the connection to SQS fails
34-
func TestNewSqsSource_ConnectionCheck(t *testing.T) {
34+
// TestNewSqsSource_AWSConnectionCheck tests that the SQS source fails on start-up if the connection to AWS fails
35+
func TestNewSqsSource_AWSConnectionCheck(t *testing.T) {
3536
assert := assert.New(t)
3637

3738
target, err := configFunction(&configuration{
@@ -66,20 +67,18 @@ func TestNewSQSSourceWithInterfaces_Success(t *testing.T) {
6667
}
6768

6869
// newSQSSourceWithInterfaces should fail if we can't reach SQS
69-
func TestNewSQSSourceWithInterfaces_Failure(t *testing.T) {
70+
func TestNewSQSSourceWithInterfaces_SQSConnectioNFailure(t *testing.T) {
7071
// Unlike the success test, we don't require anything to exist for this one
7172
assert := assert.New(t)
7273

7374
client := testutil.GetAWSLocalstackSQSClient()
7475

7576
source, err := newSQSSourceWithInterfaces(client, "00000000000", 10, testutil.AWSLocalstackRegion, "nonexistent-queue")
7677

77-
assert.NotNil(source)
78-
assert.Nil(err)
79-
80-
err = source.Read(nil)
78+
assert.Nil(source)
79+
assert.NotNil(err)
8180
if err != nil {
82-
assert.Equal(err.Error(), "Failed to get SQS queue URL: AWS.SimpleQueueService.NonExistentQueue: AWS.SimpleQueueService.NonExistentQueue; see the SQS docs.\n\tstatus code: 400, request id: 00000000-0000-0000-0000-000000000000")
81+
assert.True(strings.HasPrefix(err.Error(), `Could not connect to SQS: RequestError: send request failed`))
8382
}
8483
}
8584

Diff for: pkg/target/kinesis.go

+6
Original file line numberDiff line numberDiff line change
@@ -62,6 +62,12 @@ func newKinesisTarget(region string, streamName string, roleARN string) (*Kinesi
6262
// newKinesisTargetWithInterfaces allows you to provide a Kinesis client directly to allow
6363
// for mocking and localstack usage
6464
func newKinesisTargetWithInterfaces(client kinesisiface.KinesisAPI, awsAccountID string, region string, streamName string) (*KinesisTarget, error) {
65+
// test the connection to kinesis by trying to make an API call
66+
_, err := client.DescribeStream(&kinesis.DescribeStreamInput{StreamName: &streamName})
67+
if err != nil {
68+
return nil, err
69+
}
70+
6571
return &KinesisTarget{
6672
client: client,
6773
streamName: streamName,

Diff for: pkg/target/kinesis_test.go

+4-17
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,7 @@
77
package target
88

99
import (
10+
"strings"
1011
"sync/atomic"
1112
"testing"
1213

@@ -28,7 +29,7 @@ func TestNewKinesisTarget_ConnectionCheck(t *testing.T) {
2829
}
2930
}
3031

31-
func TestKinesisTarget_WriteFailure(t *testing.T) {
32+
func TestKinesisTarget_KinesisConnectionFailure(t *testing.T) {
3233
if testing.Short() {
3334
t.Skip("skipping integration test")
3435
}
@@ -38,25 +39,11 @@ func TestKinesisTarget_WriteFailure(t *testing.T) {
3839
client := testutil.GetAWSLocalstackKinesisClient()
3940

4041
target, err := newKinesisTargetWithInterfaces(client, "00000000000", testutil.AWSLocalstackRegion, "not-exists")
41-
assert.Nil(err)
42-
assert.NotNil(target)
43-
assert.Equal("arn:aws:kinesis:us-east-1:00000000000:stream/not-exists", target.GetID())
44-
45-
defer target.Close()
46-
target.Open()
47-
48-
messages := testutil.GetTestMessages(1, "Hello Kinesis!!", nil)
49-
50-
writeRes, err := target.Write(messages)
5142
assert.NotNil(err)
43+
assert.Nil(target)
5244
if err != nil {
53-
assert.Equal("Error writing messages to Kinesis stream: 1 error occurred:\n\t* Failed to send message batch to Kinesis stream: ResourceNotFoundException: Stream not-exists under account 000000000000 not found.\n\n", err.Error())
45+
assert.True(strings.HasPrefix(err.Error(), `RequestError`))
5446
}
55-
assert.NotNil(writeRes)
56-
57-
// Check results
58-
assert.Equal(int64(0), writeRes.SentCount)
59-
assert.Equal(int64(1), writeRes.FailedCount)
6047
}
6148

6249
func TestKinesisTarget_WriteSuccess(t *testing.T) {

Diff for: pkg/target/sqs.go

+5
Original file line numberDiff line numberDiff line change
@@ -64,6 +64,11 @@ func newSQSTarget(region string, queueName string, roleARN string) (*SQSTarget,
6464
// newSQSTargetWithInterfaces allows you to provide an SQS client directly to allow
6565
// for mocking and localstack usage
6666
func newSQSTargetWithInterfaces(client sqsiface.SQSAPI, awsAccountID string, region string, queueName string) (*SQSTarget, error) {
67+
_, err := client.GetQueueUrl(&sqs.GetQueueUrlInput{QueueName: &queueName})
68+
if err != nil {
69+
return nil, errors.Wrap(err, `Could not connect to SQS`)
70+
}
71+
6772
return &SQSTarget{
6873
client: client,
6974
queueName: queueName,

Diff for: pkg/target/sqs_test.go

+9-10
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,7 @@
77
package target
88

99
import (
10+
"strings"
1011
"sync/atomic"
1112
"testing"
1213

@@ -15,16 +16,16 @@ import (
1516
"github.com/snowplow-devops/stream-replicator/pkg/testutil"
1617
)
1718

18-
// TestNewSqsTarget_ConnectionCheck tests that the SQS target fails on start-up if the connection to SQS fails
19-
func TestNewSqsTarget_ConnectionCheck(t *testing.T) {
19+
// TestNewSqsTarget_AWSConnectionCheck tests that the SQS target fails on start-up if the connection to AWS fails
20+
func TestNewSqsTarget_AWSConnectionCheck(t *testing.T) {
2021
assert := assert.New(t)
2122

2223
target, err := newSQSTarget(testutil.AWSLocalstackRegion, "not-exists", `arn:aws:sqs:us-east-1:00000000000:not-exists`)
2324
assert.Nil(target)
2425
assert.EqualError(err, "NoCredentialProviders: no valid providers in chain. Deprecated.\n\tFor verbose messaging see aws.Config.CredentialsChainVerboseErrors")
2526
}
2627

27-
func TestSQSTarget_WriteFailure(t *testing.T) {
28+
func TestSQSTarget_SQSConnectionFailure(t *testing.T) {
2829
if testing.Short() {
2930
t.Skip("skipping integration test")
3031
}
@@ -34,13 +35,11 @@ func TestSQSTarget_WriteFailure(t *testing.T) {
3435
client := testutil.GetAWSLocalstackSQSClient()
3536

3637
target, err := newSQSTargetWithInterfaces(client, "00000000000", testutil.AWSLocalstackRegion, "not-exists")
37-
assert.Nil(err)
38-
assert.NotNil(target)
39-
assert.Equal("arn:aws:sqs:us-east-1:00000000000:not-exists", target.GetID())
40-
41-
res, err := target.Write(nil)
42-
assert.Nil(err)
43-
assert.NotNil(res)
38+
assert.Nil(target)
39+
assert.NotNil(err)
40+
if err != nil {
41+
assert.True(strings.HasPrefix(err.Error(), `Could not connect to SQS: RequestError: send request failed`))
42+
}
4443
}
4544

4645
func TestSQSTarget_WriteSuccess(t *testing.T) {

0 commit comments

Comments
 (0)