Skip to content

Commit bab6576

Browse files
author
Jude
committed
Add functionality for Kinesis subscriptions
1 parent 1909ec8 commit bab6576

File tree

6 files changed

+152
-2
lines changed

6 files changed

+152
-2
lines changed

lambda_uploader/config.py

+20-1
Original file line numberDiff line numberDiff line change
@@ -25,12 +25,14 @@
2525
u'region': basestring, u'handler': basestring,
2626
u'role': basestring, u'timeout': int, u'memory': int}
2727
REQUIRED_VPC_PARAMS = {u'subnets': list, u'security_groups': list}
28+
REQUIRED_KINESIS_SUBSCRIPTION_PARAMS = {u'stream': basestring,
29+
u'batch_size': int}
2830

2931
DEFAULT_PARAMS = {u'requirements': [], u'publish': False,
3032
u'alias': None, u'alias_description': None,
3133
u'ignore': [], u'extra_files': [], u'vpc': None,
3234
u's3_bucket': None, u's3_key': None, u'runtime': 'python2.7',
33-
u'variables': {}}
35+
u'variables': {}, u'subscription': {}}
3436

3537

3638
class Config(object):
@@ -45,6 +47,8 @@ def __init__(self, pth, config_file=None, role=None, variables=None):
4547
self._set_defaults()
4648
if self._config['vpc']:
4749
self._validate_vpc()
50+
if self._config['subscription']:
51+
self._validate_subscription()
4852

4953
for param, clss in REQUIRED_PARAMS.items():
5054
self._validate(param, cls=clss)
@@ -120,6 +124,21 @@ def _validate_vpc(self):
120124
" strings. '%s' contains something else"
121125
% param)
122126

127+
'''Validate the subscription configuration.
128+
All kinds of subscription will be validated here'''
129+
def _validate_subscription(self):
130+
def validate_kinesis():
131+
ksub = self._config['subscription']['kinesis']
132+
for param, clss in REQUIRED_KINESIS_SUBSCRIPTION_PARAMS.items():
133+
self._compare(param, clss, ksub.get(param))
134+
135+
if ksub['batch_size'] <= 0:
136+
raise TypeError("Batch size in Kinesis subscription must"
137+
" be greater than 0")
138+
139+
if 'kinesis' in self._config['subscription']:
140+
validate_kinesis()
141+
123142
'''Compare if a string is a certain type'''
124143
def _compare(self, key, cls, value):
125144
if cls:

lambda_uploader/shell.py

+5-1
Original file line numberDiff line numberDiff line change
@@ -23,7 +23,7 @@
2323
import lambda_uploader
2424

2525
from os import getcwd, path, getenv
26-
from lambda_uploader import package, config, uploader
26+
from lambda_uploader import package, config, uploader, subscribers
2727
from boto3 import __version__ as boto3_version
2828
from botocore import __version__ as botocore_version
2929

@@ -104,6 +104,10 @@ def _execute(args):
104104
if create_alias:
105105
upldr.alias()
106106

107+
if cfg.subscription:
108+
_print('Creating subscription')
109+
subscribers.create_subscriptions(cfg, args.profile)
110+
107111
pkg.clean_zipfile()
108112

109113
_print('Fin')

lambda_uploader/subscribers.py

+65
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,65 @@
1+
# Copyright 2015-2016 Rackspace US, Inc.
2+
#
3+
# Licensed under the Apache License, Version 2.0 (the "License");
4+
# you may not use this file except in compliance with the License.
5+
# You may obtain a copy of the License at
6+
#
7+
# http://www.apache.org/licenses/LICENSE-2.0
8+
#
9+
# Unless required by applicable law or agreed to in writing, software
10+
# distributed under the License is distributed on an "AS IS" BASIS,
11+
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12+
# See the License for the specific language governing permissions and
13+
# limitations under the License.
14+
15+
import time
16+
import boto3
17+
import botocore
18+
import logging
19+
20+
LOG = logging.getLogger(__name__)
21+
22+
23+
class KinesisSubscriber(object):
24+
''' Invokes the lambda function on events from the Kinesis streams '''
25+
def __init__(self, config, profile_name,
26+
function_name, stream_name, batch_size):
27+
self._aws_session = boto3.session.Session(region_name=config.region,
28+
profile_name=profile_name)
29+
self._lambda_client = self._aws_session.client('lambda')
30+
self.function_name = function_name
31+
self.stream_name = stream_name
32+
self.batch_size = batch_size
33+
34+
def subscribe(self):
35+
''' Subscribes the lambda to the Kinesis stream '''
36+
try:
37+
LOG.debug('Creating Kinesis subscription')
38+
self._lambda_client \
39+
.create_event_source_mapping(EventSourceArn=self.stream_name,
40+
FunctionName=self.function_name,
41+
BatchSize=self.batch_size,
42+
StartingPosition='TRIM_HORIZON')
43+
LOG.debug('Subscription created')
44+
except botocore.exceptions.ClientError as ex:
45+
response_code = ex.response['Error']['Code']
46+
if response_code == 'InvalidParameterValueException':
47+
LOG.debug('Retrying subscription')
48+
time.sleep(3)
49+
elif response_code == 'ResourceConflictException':
50+
LOG.debug('Subscription exists')
51+
else:
52+
LOG.error('Subscription failed, error=%s' % str(ex))
53+
raise ex
54+
55+
56+
def create_subscriptions(config, profile_name):
57+
''' Adds supported subscriptions '''
58+
if 'kinesis' in config.subscription.keys():
59+
data = config.subscription['kinesis']
60+
function_name = config.name
61+
stream_name = data['stream']
62+
batch_size = data['batch_size']
63+
s = KinesisSubscriber(config, profile_name,
64+
function_name, stream_name, batch_size)
65+
s.subscribe()
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,29 @@
1+
{
2+
"name": "myFunc",
3+
"description": "myfunc",
4+
"region": "us-east-1",
5+
"handler": "function.lambda_handler",
6+
"role": "arn:aws:iam::00000000000:role/lambda_basic_execution",
7+
"requirements": ["Jinja2==2.8"],
8+
"ignore": [
9+
"circle.yml",
10+
".git",
11+
"/*.pyc"
12+
],
13+
"timeout": 30,
14+
"memory": 512,
15+
"subscription": {
16+
"kinesis": {
17+
"stream": "arn:aws:kinesis:eu-west-1:000000000000:stream/services",
18+
"batch_size": 10
19+
}
20+
},
21+
"vpc": {
22+
"subnets": [
23+
"subnet-00000000"
24+
],
25+
"security_groups": [
26+
"sg-00000000"
27+
]
28+
}
29+
}

tests/test_config.py

+10
Original file line numberDiff line numberDiff line change
@@ -48,6 +48,16 @@ def test_no_vpc():
4848
assert cfg.raw['vpc'] is None
4949

5050

51+
def test_kinesis_subscription():
52+
ksub = {
53+
'stream': 'arn:aws:kinesis:eu-west-1:000000000000:stream/services',
54+
'batch_size': 10
55+
}
56+
cfg = config.Config(EX_CONFIG, EX_CONFIG + '/lambda-with-subscription.json')
57+
assert cfg.raw['subscription']['kinesis']['stream'] == ksub['stream']
58+
assert cfg.raw['subscription']['kinesis']['batch_size'] == ksub['batch_size']
59+
60+
5161
def test_set_publish():
5262
cfg = config.Config(EX_CONFIG)
5363
# Check that we default to false

tests/test_subscribers.py

+23
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,23 @@
1+
from os import path
2+
from mock import patch, Mock
3+
import nose.tools as nt
4+
5+
from lambda_uploader import subscribers, config
6+
7+
8+
EX_CONFIG = path.normpath(path.join(path.dirname(__file__),
9+
'../tests/configs'))
10+
11+
class TestKinesisSubscriber(object):
12+
13+
@patch('lambda_uploader.subscribers.boto3.session.Session')
14+
def test_successfully_adds_kinesis_subscription(self, mocked_session):
15+
_mocked_lambda = Mock()
16+
_mocked_session = Mock()
17+
_mocked_session.client = Mock()
18+
_mocked_session.client.return_value = _mocked_lambda
19+
mocked_session.return_value = _mocked_session
20+
conf = config.Config(path.dirname(__file__),
21+
config_file=path.join(EX_CONFIG, 'lambda-with-subscription.json'))
22+
subscribers.create_subscriptions(conf, None)
23+
nt.assert_equals(True, _mocked_lambda.create_event_source_mapping.called)

0 commit comments

Comments
 (0)