Skip to content

Commit babf5fa

Browse files
author
Jude
committed
Configure Kinesis subscription with StartingPosition
1 parent a978aef commit babf5fa

7 files changed

+161
-9
lines changed

lambda_uploader/config.py

+17-1
Original file line numberDiff line numberDiff line change
@@ -13,6 +13,7 @@
1313
# limitations under the License.
1414

1515
import json
16+
from datetime import datetime
1617
from os import path
1718

1819
# Python 2/3 compatability
@@ -26,7 +27,8 @@
2627
u'role': basestring, u'timeout': int, u'memory': int}
2728
REQUIRED_VPC_PARAMS = {u'subnets': list, u'security_groups': list}
2829
REQUIRED_KINESIS_SUBSCRIPTION_PARAMS = {u'stream': basestring,
29-
u'batch_size': int}
30+
u'batch_size': int,
31+
u'starting_position': basestring}
3032

3133
DEFAULT_PARAMS = {u'requirements': [], u'publish': False,
3234
u'alias': None, u'alias_description': None,
@@ -136,6 +138,20 @@ def validate_kinesis():
136138
raise TypeError("Batch size in Kinesis subscription must"
137139
" be greater than 0")
138140

141+
valid_starting_pos = ['TRIM_HORIZON', 'LATEST', 'AT_TIMESTAMP']
142+
if ksub['starting_position'] not in valid_starting_pos:
143+
raise TypeError("Starting position in Kinesis"
144+
" must be one of %s" % valid_starting_pos)
145+
146+
if ksub['starting_position'] == 'AT_TIMESTAMP':
147+
ts = ksub.get('starting_position_timestamp')
148+
try:
149+
datetime.strptime(ts, '%Y-%m-%dT%H:%M:%SZ')
150+
except:
151+
raise TypeError("Starting position timestamp"
152+
" must have format "
153+
" YYYY-mm-ddTHH:MM:SSZ")
154+
139155
if 'kinesis' in self._config['subscription']:
140156
validate_kinesis()
141157

lambda_uploader/subscribers.py

+28-7
Original file line numberDiff line numberDiff line change
@@ -15,30 +15,44 @@
1515
import boto3
1616
import botocore
1717
import logging
18+
from datetime import datetime
1819

1920
LOG = logging.getLogger(__name__)
2021

2122

2223
class KinesisSubscriber(object):
2324
''' Invokes the lambda function on events from the Kinesis streams '''
2425
def __init__(self, config, profile_name,
25-
function_name, stream_name, batch_size):
26+
function_name, stream_name, batch_size,
27+
starting_position, starting_position_ts=None):
2628
self._aws_session = boto3.session.Session(region_name=config.region,
2729
profile_name=profile_name)
2830
self._lambda_client = self._aws_session.client('lambda')
2931
self.function_name = function_name
3032
self.stream_name = stream_name
3133
self.batch_size = batch_size
34+
self.starting_position = starting_position
35+
self.starting_position_ts = starting_position_ts
3236

3337
def subscribe(self):
3438
''' Subscribes the lambda to the Kinesis stream '''
3539
try:
3640
LOG.debug('Creating Kinesis subscription')
37-
self._lambda_client \
38-
.create_event_source_mapping(EventSourceArn=self.stream_name,
39-
FunctionName=self.function_name,
40-
BatchSize=self.batch_size,
41-
StartingPosition='TRIM_HORIZON')
41+
if self.starting_position_ts:
42+
self._lambda_client \
43+
.create_event_source_mapping(
44+
EventSourceArn=self.stream_name,
45+
FunctionName=self.function_name,
46+
BatchSize=self.batch_size,
47+
StartingPosition=self.starting_position,
48+
StartingPositionTimestamp=self.starting_position_ts)
49+
else:
50+
self._lambda_client \
51+
.create_event_source_mapping(
52+
EventSourceArn=self.stream_name,
53+
FunctionName=self.function_name,
54+
BatchSize=self.batch_size,
55+
StartingPosition=self.starting_position)
4256
LOG.debug('Subscription created')
4357
except botocore.exceptions.ClientError as ex:
4458
response_code = ex.response['Error']['Code']
@@ -56,6 +70,13 @@ def create_subscriptions(config, profile_name):
5670
function_name = config.name
5771
stream_name = data['stream']
5872
batch_size = data['batch_size']
73+
starting_position = data['starting_position']
74+
starting_position_ts = None
75+
if starting_position == 'AT_TIMESTAMP':
76+
ts = data.get('starting_position_timestamp')
77+
starting_position_ts = datetime.strptime(ts, '%Y-%m-%dT%H:%M:%SZ')
5978
s = KinesisSubscriber(config, profile_name,
60-
function_name, stream_name, batch_size)
79+
function_name, stream_name, batch_size,
80+
starting_position,
81+
starting_position_ts=starting_position_ts)
6182
s.subscribe()

tests/configs/lambda-with-subscription.json

+2-1
Original file line numberDiff line numberDiff line change
@@ -15,7 +15,8 @@
1515
"subscription": {
1616
"kinesis": {
1717
"stream": "arn:aws:kinesis:eu-west-1:000000000000:stream/services",
18-
"batch_size": 10
18+
"batch_size": 10,
19+
"starting_position": "TRIM_HORIZON"
1920
}
2021
},
2122
"vpc": {
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,31 @@
1+
{
2+
"name": "myFunc",
3+
"description": "myfunc",
4+
"region": "us-east-1",
5+
"handler": "function.lambda_handler",
6+
"role": "arn:aws:iam::00000000000:role/lambda_basic_execution",
7+
"requirements": ["Jinja2==2.8"],
8+
"ignore": [
9+
"circle.yml",
10+
".git",
11+
"/*.pyc"
12+
],
13+
"timeout": 30,
14+
"memory": 512,
15+
"subscription": {
16+
"kinesis": {
17+
"stream": "arn:aws:kinesis:eu-west-1:000000000000:stream/services",
18+
"batch_size": 10,
19+
"starting_position": "TRIM_HORIZON",
20+
"starting_position_timestamp": "2017-11-01T11:00:00Z"
21+
}
22+
},
23+
"vpc": {
24+
"subnets": [
25+
"subnet-00000000"
26+
],
27+
"security_groups": [
28+
"sg-00000000"
29+
]
30+
}
31+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,31 @@
1+
{
2+
"name": "myFunc",
3+
"description": "myfunc",
4+
"region": "us-east-1",
5+
"handler": "function.lambda_handler",
6+
"role": "arn:aws:iam::00000000000:role/lambda_basic_execution",
7+
"requirements": ["Jinja2==2.8"],
8+
"ignore": [
9+
"circle.yml",
10+
".git",
11+
"/*.pyc"
12+
],
13+
"timeout": 30,
14+
"memory": 512,
15+
"subscription": {
16+
"kinesis": {
17+
"stream": "arn:aws:kinesis:eu-west-1:000000000000:stream/services",
18+
"batch_size": 10,
19+
"starting_position": "AT_TIMESTAMP",
20+
"starting_position_timestamp": null
21+
}
22+
},
23+
"vpc": {
24+
"subnets": [
25+
"subnet-00000000"
26+
],
27+
"security_groups": [
28+
"sg-00000000"
29+
]
30+
}
31+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,30 @@
1+
{
2+
"name": "myFunc",
3+
"description": "myfunc",
4+
"region": "us-east-1",
5+
"handler": "function.lambda_handler",
6+
"role": "arn:aws:iam::00000000000:role/lambda_basic_execution",
7+
"requirements": ["Jinja2==2.8"],
8+
"ignore": [
9+
"circle.yml",
10+
".git",
11+
"/*.pyc"
12+
],
13+
"timeout": 30,
14+
"memory": 512,
15+
"subscription": {
16+
"kinesis": {
17+
"stream": "arn:aws:kinesis:eu-west-1:000000000000:stream/services",
18+
"batch_size": 10,
19+
"starting_position": "AT_TIMESTAMP"
20+
}
21+
},
22+
"vpc": {
23+
"subnets": [
24+
"subnet-00000000"
25+
],
26+
"security_groups": [
27+
"sg-00000000"
28+
]
29+
}
30+
}

tests/test_config.py

+22
Original file line numberDiff line numberDiff line change
@@ -58,6 +58,28 @@ def test_kinesis_subscription():
5858
assert cfg.raw['subscription']['kinesis']['batch_size'] == ksub['batch_size']
5959

6060

61+
def test_kinesis_subscription_with_starting_position_at_timestamp():
62+
ksub = {
63+
'stream': 'arn:aws:kinesis:eu-west-1:000000000000:stream/services',
64+
'batch_size': 10,
65+
'starting_position_timestamp': '2017-11-01T11:00:00Z'
66+
}
67+
cfg = config.Config(EX_CONFIG, EX_CONFIG + '/lambda-with-subscription_at_ts.json')
68+
assert cfg.raw['subscription']['kinesis']['stream'] == ksub['stream']
69+
assert cfg.raw['subscription']['kinesis']['batch_size'] == ksub['batch_size']
70+
assert cfg.raw['subscription']['kinesis']['starting_position_timestamp'] == ksub['starting_position_timestamp']
71+
72+
73+
def test_kinesis_subscription_with_starting_position_at_timestamp_fails_when_timestamp_is_invalid():
74+
with pytest.raises(Exception):
75+
cfg = config.Config(EX_CONFIG, EX_CONFIG + '/lambda-with-subscription_at_ts_invalid_ts.json')
76+
77+
78+
def test_kinesis_subscription_with_starting_position_at_timestamp_fails_when_timestamp_not_provided():
79+
with pytest.raises(Exception):
80+
cfg = config.Config(EX_CONFIG, EX_CONFIG + '/lambda-with-subscription_at_ts_not_provided.json')
81+
82+
6183
def test_set_publish():
6284
cfg = config.Config(EX_CONFIG)
6385
# Check that we default to false

0 commit comments

Comments
 (0)