Skip to content

Commit 5e0ee97

Browse files
author
Jude
committed
Update kinesis subscription if exists
1 parent babf5fa commit 5e0ee97

File tree

2 files changed

+40
-10
lines changed

2 files changed

+40
-10
lines changed

lambda_uploader/subscribers.py

+21-10
Original file line numberDiff line numberDiff line change
@@ -41,23 +41,34 @@ def subscribe(self):
4141
if self.starting_position_ts:
4242
self._lambda_client \
4343
.create_event_source_mapping(
44-
EventSourceArn=self.stream_name,
45-
FunctionName=self.function_name,
46-
BatchSize=self.batch_size,
47-
StartingPosition=self.starting_position,
48-
StartingPositionTimestamp=self.starting_position_ts)
44+
EventSourceArn=self.stream_name,
45+
FunctionName=self.function_name,
46+
BatchSize=self.batch_size,
47+
StartingPosition=self.starting_position,
48+
StartingPositionTimestamp=self.starting_position_ts)
4949
else:
5050
self._lambda_client \
5151
.create_event_source_mapping(
52-
EventSourceArn=self.stream_name,
53-
FunctionName=self.function_name,
54-
BatchSize=self.batch_size,
55-
StartingPosition=self.starting_position)
52+
EventSourceArn=self.stream_name,
53+
FunctionName=self.function_name,
54+
BatchSize=self.batch_size,
55+
StartingPosition=self.starting_position)
5656
LOG.debug('Subscription created')
5757
except botocore.exceptions.ClientError as ex:
5858
response_code = ex.response['Error']['Code']
5959
if response_code == 'ResourceConflictException':
60-
LOG.debug('Subscription exists')
60+
LOG.debug('Subscription exists. Updating ...')
61+
resp = self._lambda_client\
62+
.list_event_source_mappings(
63+
FunctionName=self.function_name,
64+
EventSourceArn=self.stream_name)
65+
uuid = resp['EventSourceMappings'][0]['UUID']
66+
self._lambda_client \
67+
.update_event_source_mapping(
68+
UUID=uuid,
69+
FunctionName=self.function_name,
70+
Enabled=True,
71+
BatchSize=self.batch_size)
6172
else:
6273
LOG.error('Subscription failed, error=%s' % str(ex))
6374
raise ex

tests/test_subscribers.py

+19
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,7 @@
11
from os import path
22
from mock import patch, Mock
33
import nose.tools as nt
4+
import botocore
45

56
from lambda_uploader import subscribers, config
67

@@ -21,3 +22,21 @@ def test_successfully_adds_kinesis_subscription(self, mocked_session):
2122
config_file=path.join(EX_CONFIG, 'lambda-with-subscription.json'))
2223
subscribers.create_subscriptions(conf, None)
2324
nt.assert_equals(True, _mocked_lambda.create_event_source_mapping.called)
25+
26+
@patch('lambda_uploader.subscribers.boto3.session.Session')
27+
def test_successfully_updates_kinesis_subscription(self, mocked_session):
28+
resonse = {"Error": {"Code": "ResourceConflictException", "Message": ""}}
29+
err = botocore.exceptions.ClientError(resonse, "create_event_source_mapping")
30+
_mocked_lambda = Mock()
31+
_mocked_lambda.create_event_source_mapping.side_effect = err
32+
_mocked_lambda.list_event_source_mappings.return_value = {
33+
'EventSourceMappings': [{'UUID': 'myuuid'}]
34+
}
35+
_mocked_session = Mock()
36+
_mocked_session.client = Mock()
37+
_mocked_session.client.return_value = _mocked_lambda
38+
mocked_session.return_value = _mocked_session
39+
conf = config.Config(path.dirname(__file__),
40+
config_file=path.join(EX_CONFIG, 'lambda-with-subscription.json'))
41+
subscribers.create_subscriptions(conf, None)
42+
nt.assert_equals(True, _mocked_lambda.update_event_source_mapping.called)

0 commit comments

Comments
 (0)