Skip to content

Commit a978aef

Browse files
authored
Merge pull request #136 from dsouzajude/kinesis-subscription
Add functionality for Kinesis subscriptions
2 parents 1909ec8 + b1d0d45 commit a978aef

File tree

8 files changed

+175
-3
lines changed

8 files changed

+175
-3
lines changed

README.md

+25-1
Original file line numberDiff line numberDiff line change
@@ -26,7 +26,7 @@ function inside a specific VPC, make sure you set up the role correctly to allow
2626

2727
Note also the `ignore` entry is an array of regular expression strings
2828
used to match against the relative paths - be careful to quote accordingly.
29-
For example, a traditional `*.txt` "glob" is matched by the JSON string:
29+
For example, a traditional `*.txt` "glob" is matched by the JSON string:
3030
`".*\\.txt$"` (or just `"\\.txt$"`).
3131

3232
Example `lambda.json` file:
@@ -57,6 +57,30 @@ Example `lambda.json` file:
5757
}
5858
```
5959

60+
You can also optionally setup a subscription to a Kinesis stream for your
61+
lambda using the `subscription` field as in the following sample configuration.
62+
63+
```json
64+
{
65+
"name": "myFunction",
66+
"description": "It does things",
67+
"region": "us-east-1",
68+
"runtime": "python2.7",
69+
"handler": "function.lambda_handler",
70+
"role": "arn:aws:iam::00000000000:role/lambda_basic_execution",
71+
"requirements": ["pygithub"],
72+
"timeout": 30,
73+
"memory": 512,
74+
"subscription": {
75+
"kinesis": {
76+
"stream": "arn:aws:kinesis:eu-west-1:000000000000:stream/services",
77+
"batch_size": 10
78+
}
79+
}
80+
}
81+
```
82+
83+
6084
### Command Line Usage
6185
To package and upload simply run the command from within your lambda directory or
6286
with the directory as an option.

lambda_uploader/config.py

+20-1
Original file line numberDiff line numberDiff line change
@@ -25,12 +25,14 @@
2525
u'region': basestring, u'handler': basestring,
2626
u'role': basestring, u'timeout': int, u'memory': int}
2727
REQUIRED_VPC_PARAMS = {u'subnets': list, u'security_groups': list}
28+
REQUIRED_KINESIS_SUBSCRIPTION_PARAMS = {u'stream': basestring,
29+
u'batch_size': int}
2830

2931
DEFAULT_PARAMS = {u'requirements': [], u'publish': False,
3032
u'alias': None, u'alias_description': None,
3133
u'ignore': [], u'extra_files': [], u'vpc': None,
3234
u's3_bucket': None, u's3_key': None, u'runtime': 'python2.7',
33-
u'variables': {}}
35+
u'variables': {}, u'subscription': {}}
3436

3537

3638
class Config(object):
@@ -45,6 +47,8 @@ def __init__(self, pth, config_file=None, role=None, variables=None):
4547
self._set_defaults()
4648
if self._config['vpc']:
4749
self._validate_vpc()
50+
if self._config['subscription']:
51+
self._validate_subscription()
4852

4953
for param, clss in REQUIRED_PARAMS.items():
5054
self._validate(param, cls=clss)
@@ -120,6 +124,21 @@ def _validate_vpc(self):
120124
" strings. '%s' contains something else"
121125
% param)
122126

127+
'''Validate the subscription configuration.
128+
All kinds of subscription will be validated here'''
129+
def _validate_subscription(self):
130+
def validate_kinesis():
131+
ksub = self._config['subscription']['kinesis']
132+
for param, clss in REQUIRED_KINESIS_SUBSCRIPTION_PARAMS.items():
133+
self._compare(param, clss, ksub.get(param))
134+
135+
if ksub['batch_size'] <= 0:
136+
raise TypeError("Batch size in Kinesis subscription must"
137+
" be greater than 0")
138+
139+
if 'kinesis' in self._config['subscription']:
140+
validate_kinesis()
141+
123142
'''Compare if a string is a certain type'''
124143
def _compare(self, key, cls, value):
125144
if cls:

lambda_uploader/shell.py

+5-1
Original file line numberDiff line numberDiff line change
@@ -23,7 +23,7 @@
2323
import lambda_uploader
2424

2525
from os import getcwd, path, getenv
26-
from lambda_uploader import package, config, uploader
26+
from lambda_uploader import package, config, uploader, subscribers
2727
from boto3 import __version__ as boto3_version
2828
from botocore import __version__ as botocore_version
2929

@@ -104,6 +104,10 @@ def _execute(args):
104104
if create_alias:
105105
upldr.alias()
106106

107+
if cfg.subscription:
108+
_print('Creating subscription')
109+
subscribers.create_subscriptions(cfg, args.profile)
110+
107111
pkg.clean_zipfile()
108112

109113
_print('Fin')

lambda_uploader/subscribers.py

+61
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,61 @@
1+
# Copyright 2015-2016 Rackspace US, Inc.
2+
#
3+
# Licensed under the Apache License, Version 2.0 (the "License");
4+
# you may not use this file except in compliance with the License.
5+
# You may obtain a copy of the License at
6+
#
7+
# http://www.apache.org/licenses/LICENSE-2.0
8+
#
9+
# Unless required by applicable law or agreed to in writing, software
10+
# distributed under the License is distributed on an "AS IS" BASIS,
11+
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12+
# See the License for the specific language governing permissions and
13+
# limitations under the License.
14+
15+
import boto3
16+
import botocore
17+
import logging
18+
19+
LOG = logging.getLogger(__name__)
20+
21+
22+
class KinesisSubscriber(object):
23+
''' Invokes the lambda function on events from the Kinesis streams '''
24+
def __init__(self, config, profile_name,
25+
function_name, stream_name, batch_size):
26+
self._aws_session = boto3.session.Session(region_name=config.region,
27+
profile_name=profile_name)
28+
self._lambda_client = self._aws_session.client('lambda')
29+
self.function_name = function_name
30+
self.stream_name = stream_name
31+
self.batch_size = batch_size
32+
33+
def subscribe(self):
34+
''' Subscribes the lambda to the Kinesis stream '''
35+
try:
36+
LOG.debug('Creating Kinesis subscription')
37+
self._lambda_client \
38+
.create_event_source_mapping(EventSourceArn=self.stream_name,
39+
FunctionName=self.function_name,
40+
BatchSize=self.batch_size,
41+
StartingPosition='TRIM_HORIZON')
42+
LOG.debug('Subscription created')
43+
except botocore.exceptions.ClientError as ex:
44+
response_code = ex.response['Error']['Code']
45+
if response_code == 'ResourceConflictException':
46+
LOG.debug('Subscription exists')
47+
else:
48+
LOG.error('Subscription failed, error=%s' % str(ex))
49+
raise ex
50+
51+
52+
def create_subscriptions(config, profile_name):
53+
''' Adds supported subscriptions '''
54+
if 'kinesis' in config.subscription.keys():
55+
data = config.subscription['kinesis']
56+
function_name = config.name
57+
stream_name = data['stream']
58+
batch_size = data['batch_size']
59+
s = KinesisSubscriber(config, profile_name,
60+
function_name, stream_name, batch_size)
61+
s.subscribe()

setup.py

+2
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,8 @@
2020
'coverage>=4.0.3',
2121
'pytest>=2.9.1',
2222
'moto>=0.4.23',
23+
'mock',
24+
'nose',
2325
]
2426

2527

Original file line numberDiff line numberDiff line change
@@ -0,0 +1,29 @@
1+
{
2+
"name": "myFunc",
3+
"description": "myfunc",
4+
"region": "us-east-1",
5+
"handler": "function.lambda_handler",
6+
"role": "arn:aws:iam::00000000000:role/lambda_basic_execution",
7+
"requirements": ["Jinja2==2.8"],
8+
"ignore": [
9+
"circle.yml",
10+
".git",
11+
"/*.pyc"
12+
],
13+
"timeout": 30,
14+
"memory": 512,
15+
"subscription": {
16+
"kinesis": {
17+
"stream": "arn:aws:kinesis:eu-west-1:000000000000:stream/services",
18+
"batch_size": 10
19+
}
20+
},
21+
"vpc": {
22+
"subnets": [
23+
"subnet-00000000"
24+
],
25+
"security_groups": [
26+
"sg-00000000"
27+
]
28+
}
29+
}

tests/test_config.py

+10
Original file line numberDiff line numberDiff line change
@@ -48,6 +48,16 @@ def test_no_vpc():
4848
assert cfg.raw['vpc'] is None
4949

5050

51+
def test_kinesis_subscription():
52+
ksub = {
53+
'stream': 'arn:aws:kinesis:eu-west-1:000000000000:stream/services',
54+
'batch_size': 10
55+
}
56+
cfg = config.Config(EX_CONFIG, EX_CONFIG + '/lambda-with-subscription.json')
57+
assert cfg.raw['subscription']['kinesis']['stream'] == ksub['stream']
58+
assert cfg.raw['subscription']['kinesis']['batch_size'] == ksub['batch_size']
59+
60+
5161
def test_set_publish():
5262
cfg = config.Config(EX_CONFIG)
5363
# Check that we default to false

tests/test_subscribers.py

+23
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,23 @@
1+
from os import path
2+
from mock import patch, Mock
3+
import nose.tools as nt
4+
5+
from lambda_uploader import subscribers, config
6+
7+
8+
EX_CONFIG = path.normpath(path.join(path.dirname(__file__),
9+
'../tests/configs'))
10+
11+
class TestKinesisSubscriber(object):
12+
13+
@patch('lambda_uploader.subscribers.boto3.session.Session')
14+
def test_successfully_adds_kinesis_subscription(self, mocked_session):
15+
_mocked_lambda = Mock()
16+
_mocked_session = Mock()
17+
_mocked_session.client = Mock()
18+
_mocked_session.client.return_value = _mocked_lambda
19+
mocked_session.return_value = _mocked_session
20+
conf = config.Config(path.dirname(__file__),
21+
config_file=path.join(EX_CONFIG, 'lambda-with-subscription.json'))
22+
subscribers.create_subscriptions(conf, None)
23+
nt.assert_equals(True, _mocked_lambda.create_event_source_mapping.called)

0 commit comments

Comments
 (0)