-
Notifications
You must be signed in to change notification settings - Fork 175
/
Copy pathkinesis_lambda_es.js
80 lines (71 loc) · 2.58 KB
/
kinesis_lambda_es.js
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
/*
* Sample node.js code for AWS Lambda to upload the JSON documents
* pushed from Kinesis to Amazon Elasticsearch.
*
*
* Copyright 2015- Amazon.com, Inc. or its affiliates. All Rights Reserved.
*
* Licensed under the Amazon Software License (the "License").
* You may not use this file except in compliance with the License.
* A copy of the License is located at http://aws.amazon.com/asl/
* or in the "license" file accompanying this file. This file is distributed
* on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND,
* express or implied. See the License for the specific language governing
* permissions and limitations under the License.
*/
/* == Imports == */
var AWS = require('aws-sdk');
var path = require('path');
/* == Globals == */
var esDomain = {
region: 'us-east-1',
endpoint: 'my-domain-search-endpoint',
index: 'myindex',
doctype: 'mytype'
};
var endpoint = new AWS.Endpoint(esDomain.endpoint);
/*
* The AWS credentials are picked up from the environment.
* They belong to the IAM role assigned to the Lambda function.
* Since the ES requests are signed using these credentials,
* make sure to apply a policy that allows ES domain operations
* to the role.
*/
var creds = new AWS.EnvironmentCredentials('AWS');
/* Lambda "main": Execution begins here */
exports.handler = function(event, context) {
console.log(JSON.stringify(event, null, ' '));
event.Records.forEach(function(record) {
var jsonDoc = new Buffer(record.kinesis.data, 'base64');
postToES(jsonDoc.toString(), context);
});
}
/*
* Post the given document to Elasticsearch
*/
function postToES(doc, context) {
var req = new AWS.HttpRequest(endpoint);
req.method = 'POST';
req.path = path.join('/', esDomain.index, esDomain.doctype);
req.region = esDomain.region;
req.headers['presigned-expires'] = false;
req.headers['Host'] = endpoint.host;
req.headers['Content-Type'] = "application/json";
req.body = doc;
var signer = new AWS.Signers.V4(req , 'es'); // es: service code
signer.addAuthorization(creds, new Date());
var send = new AWS.NodeHttpClient();
send.handleRequest(req, null, function(httpResp) {
var respBody = '';
httpResp.on('data', function (chunk) {
respBody += chunk;
});
httpResp.on('end', function (chunk) {
console.log('Response: ' + respBody);
context.succeed('Lambda added document ' + doc);
});
}, function(err) {
console.log('Error: ' + err);
context.fail('Lambda failed with error ' + err);
});
}