Skip to content

Commit b4b6ba6

Browse files
authored
Merge pull request #137 from dsouzajude/configure-kinesis-subscription
Configure and update kinesis subscription
2 parents a978aef + 5e0ee97 commit b4b6ba6

8 files changed

+192
-10
lines changed

Diff for: lambda_uploader/config.py

+17-1
Original file line numberDiff line numberDiff line change
@@ -13,6 +13,7 @@
1313
# limitations under the License.
1414

1515
import json
16+
from datetime import datetime
1617
from os import path
1718

1819
# Python 2/3 compatability
@@ -26,7 +27,8 @@
2627
u'role': basestring, u'timeout': int, u'memory': int}
2728
REQUIRED_VPC_PARAMS = {u'subnets': list, u'security_groups': list}
2829
REQUIRED_KINESIS_SUBSCRIPTION_PARAMS = {u'stream': basestring,
29-
u'batch_size': int}
30+
u'batch_size': int,
31+
u'starting_position': basestring}
3032

3133
DEFAULT_PARAMS = {u'requirements': [], u'publish': False,
3234
u'alias': None, u'alias_description': None,
@@ -136,6 +138,20 @@ def validate_kinesis():
136138
raise TypeError("Batch size in Kinesis subscription must"
137139
" be greater than 0")
138140

141+
valid_starting_pos = ['TRIM_HORIZON', 'LATEST', 'AT_TIMESTAMP']
142+
if ksub['starting_position'] not in valid_starting_pos:
143+
raise TypeError("Starting position in Kinesis"
144+
" must be one of %s" % valid_starting_pos)
145+
146+
if ksub['starting_position'] == 'AT_TIMESTAMP':
147+
ts = ksub.get('starting_position_timestamp')
148+
try:
149+
datetime.strptime(ts, '%Y-%m-%dT%H:%M:%SZ')
150+
except:
151+
raise TypeError("Starting position timestamp"
152+
" must have format "
153+
" YYYY-mm-ddTHH:MM:SSZ")
154+
139155
if 'kinesis' in self._config['subscription']:
140156
validate_kinesis()
141157

Diff for: lambda_uploader/subscribers.py

+40-8
Original file line numberDiff line numberDiff line change
@@ -15,35 +15,60 @@
1515
import boto3
1616
import botocore
1717
import logging
18+
from datetime import datetime
1819

1920
LOG = logging.getLogger(__name__)
2021

2122

2223
class KinesisSubscriber(object):
2324
''' Invokes the lambda function on events from the Kinesis streams '''
2425
def __init__(self, config, profile_name,
25-
function_name, stream_name, batch_size):
26+
function_name, stream_name, batch_size,
27+
starting_position, starting_position_ts=None):
2628
self._aws_session = boto3.session.Session(region_name=config.region,
2729
profile_name=profile_name)
2830
self._lambda_client = self._aws_session.client('lambda')
2931
self.function_name = function_name
3032
self.stream_name = stream_name
3133
self.batch_size = batch_size
34+
self.starting_position = starting_position
35+
self.starting_position_ts = starting_position_ts
3236

3337
def subscribe(self):
3438
''' Subscribes the lambda to the Kinesis stream '''
3539
try:
3640
LOG.debug('Creating Kinesis subscription')
37-
self._lambda_client \
38-
.create_event_source_mapping(EventSourceArn=self.stream_name,
39-
FunctionName=self.function_name,
40-
BatchSize=self.batch_size,
41-
StartingPosition='TRIM_HORIZON')
41+
if self.starting_position_ts:
42+
self._lambda_client \
43+
.create_event_source_mapping(
44+
EventSourceArn=self.stream_name,
45+
FunctionName=self.function_name,
46+
BatchSize=self.batch_size,
47+
StartingPosition=self.starting_position,
48+
StartingPositionTimestamp=self.starting_position_ts)
49+
else:
50+
self._lambda_client \
51+
.create_event_source_mapping(
52+
EventSourceArn=self.stream_name,
53+
FunctionName=self.function_name,
54+
BatchSize=self.batch_size,
55+
StartingPosition=self.starting_position)
4256
LOG.debug('Subscription created')
4357
except botocore.exceptions.ClientError as ex:
4458
response_code = ex.response['Error']['Code']
4559
if response_code == 'ResourceConflictException':
46-
LOG.debug('Subscription exists')
60+
LOG.debug('Subscription exists. Updating ...')
61+
resp = self._lambda_client\
62+
.list_event_source_mappings(
63+
FunctionName=self.function_name,
64+
EventSourceArn=self.stream_name)
65+
uuid = resp['EventSourceMappings'][0]['UUID']
66+
self._lambda_client \
67+
.update_event_source_mapping(
68+
UUID=uuid,
69+
FunctionName=self.function_name,
70+
Enabled=True,
71+
BatchSize=self.batch_size)
4772
else:
4873
LOG.error('Subscription failed, error=%s' % str(ex))
4974
raise ex
@@ -56,6 +81,13 @@ def create_subscriptions(config, profile_name):
5681
function_name = config.name
5782
stream_name = data['stream']
5883
batch_size = data['batch_size']
84+
starting_position = data['starting_position']
85+
starting_position_ts = None
86+
if starting_position == 'AT_TIMESTAMP':
87+
ts = data.get('starting_position_timestamp')
88+
starting_position_ts = datetime.strptime(ts, '%Y-%m-%dT%H:%M:%SZ')
5989
s = KinesisSubscriber(config, profile_name,
60-
function_name, stream_name, batch_size)
90+
function_name, stream_name, batch_size,
91+
starting_position,
92+
starting_position_ts=starting_position_ts)
6193
s.subscribe()

Diff for: tests/configs/lambda-with-subscription.json

+2-1
Original file line numberDiff line numberDiff line change
@@ -15,7 +15,8 @@
1515
"subscription": {
1616
"kinesis": {
1717
"stream": "arn:aws:kinesis:eu-west-1:000000000000:stream/services",
18-
"batch_size": 10
18+
"batch_size": 10,
19+
"starting_position": "TRIM_HORIZON"
1920
}
2021
},
2122
"vpc": {

Diff for: tests/configs/lambda-with-subscription_at_ts.json

+31
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,31 @@
1+
{
2+
"name": "myFunc",
3+
"description": "myfunc",
4+
"region": "us-east-1",
5+
"handler": "function.lambda_handler",
6+
"role": "arn:aws:iam::00000000000:role/lambda_basic_execution",
7+
"requirements": ["Jinja2==2.8"],
8+
"ignore": [
9+
"circle.yml",
10+
".git",
11+
"/*.pyc"
12+
],
13+
"timeout": 30,
14+
"memory": 512,
15+
"subscription": {
16+
"kinesis": {
17+
"stream": "arn:aws:kinesis:eu-west-1:000000000000:stream/services",
18+
"batch_size": 10,
19+
"starting_position": "TRIM_HORIZON",
20+
"starting_position_timestamp": "2017-11-01T11:00:00Z"
21+
}
22+
},
23+
"vpc": {
24+
"subnets": [
25+
"subnet-00000000"
26+
],
27+
"security_groups": [
28+
"sg-00000000"
29+
]
30+
}
31+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,31 @@
1+
{
2+
"name": "myFunc",
3+
"description": "myfunc",
4+
"region": "us-east-1",
5+
"handler": "function.lambda_handler",
6+
"role": "arn:aws:iam::00000000000:role/lambda_basic_execution",
7+
"requirements": ["Jinja2==2.8"],
8+
"ignore": [
9+
"circle.yml",
10+
".git",
11+
"/*.pyc"
12+
],
13+
"timeout": 30,
14+
"memory": 512,
15+
"subscription": {
16+
"kinesis": {
17+
"stream": "arn:aws:kinesis:eu-west-1:000000000000:stream/services",
18+
"batch_size": 10,
19+
"starting_position": "AT_TIMESTAMP",
20+
"starting_position_timestamp": null
21+
}
22+
},
23+
"vpc": {
24+
"subnets": [
25+
"subnet-00000000"
26+
],
27+
"security_groups": [
28+
"sg-00000000"
29+
]
30+
}
31+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,30 @@
1+
{
2+
"name": "myFunc",
3+
"description": "myfunc",
4+
"region": "us-east-1",
5+
"handler": "function.lambda_handler",
6+
"role": "arn:aws:iam::00000000000:role/lambda_basic_execution",
7+
"requirements": ["Jinja2==2.8"],
8+
"ignore": [
9+
"circle.yml",
10+
".git",
11+
"/*.pyc"
12+
],
13+
"timeout": 30,
14+
"memory": 512,
15+
"subscription": {
16+
"kinesis": {
17+
"stream": "arn:aws:kinesis:eu-west-1:000000000000:stream/services",
18+
"batch_size": 10,
19+
"starting_position": "AT_TIMESTAMP"
20+
}
21+
},
22+
"vpc": {
23+
"subnets": [
24+
"subnet-00000000"
25+
],
26+
"security_groups": [
27+
"sg-00000000"
28+
]
29+
}
30+
}

Diff for: tests/test_config.py

+22
Original file line numberDiff line numberDiff line change
@@ -58,6 +58,28 @@ def test_kinesis_subscription():
5858
assert cfg.raw['subscription']['kinesis']['batch_size'] == ksub['batch_size']
5959

6060

61+
def test_kinesis_subscription_with_starting_position_at_timestamp():
62+
ksub = {
63+
'stream': 'arn:aws:kinesis:eu-west-1:000000000000:stream/services',
64+
'batch_size': 10,
65+
'starting_position_timestamp': '2017-11-01T11:00:00Z'
66+
}
67+
cfg = config.Config(EX_CONFIG, EX_CONFIG + '/lambda-with-subscription_at_ts.json')
68+
assert cfg.raw['subscription']['kinesis']['stream'] == ksub['stream']
69+
assert cfg.raw['subscription']['kinesis']['batch_size'] == ksub['batch_size']
70+
assert cfg.raw['subscription']['kinesis']['starting_position_timestamp'] == ksub['starting_position_timestamp']
71+
72+
73+
def test_kinesis_subscription_with_starting_position_at_timestamp_fails_when_timestamp_is_invalid():
74+
with pytest.raises(Exception):
75+
cfg = config.Config(EX_CONFIG, EX_CONFIG + '/lambda-with-subscription_at_ts_invalid_ts.json')
76+
77+
78+
def test_kinesis_subscription_with_starting_position_at_timestamp_fails_when_timestamp_not_provided():
79+
with pytest.raises(Exception):
80+
cfg = config.Config(EX_CONFIG, EX_CONFIG + '/lambda-with-subscription_at_ts_not_provided.json')
81+
82+
6183
def test_set_publish():
6284
cfg = config.Config(EX_CONFIG)
6385
# Check that we default to false

Diff for: tests/test_subscribers.py

+19
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,7 @@
11
from os import path
22
from mock import patch, Mock
33
import nose.tools as nt
4+
import botocore
45

56
from lambda_uploader import subscribers, config
67

@@ -21,3 +22,21 @@ def test_successfully_adds_kinesis_subscription(self, mocked_session):
2122
config_file=path.join(EX_CONFIG, 'lambda-with-subscription.json'))
2223
subscribers.create_subscriptions(conf, None)
2324
nt.assert_equals(True, _mocked_lambda.create_event_source_mapping.called)
25+
26+
@patch('lambda_uploader.subscribers.boto3.session.Session')
27+
def test_successfully_updates_kinesis_subscription(self, mocked_session):
28+
resonse = {"Error": {"Code": "ResourceConflictException", "Message": ""}}
29+
err = botocore.exceptions.ClientError(resonse, "create_event_source_mapping")
30+
_mocked_lambda = Mock()
31+
_mocked_lambda.create_event_source_mapping.side_effect = err
32+
_mocked_lambda.list_event_source_mappings.return_value = {
33+
'EventSourceMappings': [{'UUID': 'myuuid'}]
34+
}
35+
_mocked_session = Mock()
36+
_mocked_session.client = Mock()
37+
_mocked_session.client.return_value = _mocked_lambda
38+
mocked_session.return_value = _mocked_session
39+
conf = config.Config(path.dirname(__file__),
40+
config_file=path.join(EX_CONFIG, 'lambda-with-subscription.json'))
41+
subscribers.create_subscriptions(conf, None)
42+
nt.assert_equals(True, _mocked_lambda.update_event_source_mapping.called)

0 commit comments

Comments
 (0)