Skip to content

Commit 28dbd19

Browse files
authored
Merge pull request #29 from Demetrios-loutsios/feat-kafka-event-source-support-provisioned-poller-config
Add ProvisionedPollerConfig to Kafka event source
2 parents 6e1a0d8 + 090842a commit 28dbd19

File tree

4 files changed

+87
-0
lines changed

4 files changed

+87
-0
lines changed

Diff for: docs/events/kafka.md

+21
Original file line numberDiff line numberDiff line change
@@ -182,3 +182,24 @@ functions:
182182
maximumBatchingWindow: 30
183183
startingPosition: LATEST
184184
```
185+
186+
## Setting ProvisionedPollerConfig
187+
188+
You can set the `provisionedPollerConfig` to configure your Kafka event source to be in provisioned mode. A `minimumPollers` must be configured from 1 to 200 and a `maximumPollers` from 1 to 2000.
189+
190+
```yml
191+
functions:
192+
compute:
193+
handler: handler.compute
194+
events:
195+
- kafka:
196+
accessConfigurations:
197+
saslScram512Auth: arn:aws:secretsmanager:us-east-1:01234567890:secret:MyBrokerSecretName
198+
topic: MySelfManagedKafkaTopic
199+
bootstrapServers:
200+
- abc3.xyz.com:9092
201+
- abc2.xyz.com:9092
202+
provisionedPollerConfig:
203+
minimumPollers: 1
204+
maximumPollers: 10
205+
```

Diff for: docs/guides/serverless.yml.md

+4
Original file line numberDiff line numberDiff line change
@@ -1044,6 +1044,10 @@ functions:
10441044
# Optional, specifies event pattern content filtering
10451045
filterPatterns:
10461046
- eventName: INSERT
1047+
# Optional, configures provisioned mode, must specify minimumPollers (1-200 range), minimumPollers (1-2000 range). Both or one can be specified.
1048+
provisionedPollerConfig:
1049+
minimumPollers: 1
1050+
maximumPollers: 10
10471051
```
10481052
10491053
### RabbitMQ

Diff for: lib/plugins/aws/package/compile/events/kafka.js

+24
Original file line numberDiff line numberDiff line change
@@ -100,6 +100,22 @@ class AwsCompileKafkaEvents {
100100
pattern: '[a-zA-Z0-9-/*:_+=.@-]*',
101101
},
102102
filterPatterns: { $ref: '#/definitions/filterPatterns' },
103+
provisionedPollerConfig: {
104+
type: 'object',
105+
minProperties: 1,
106+
properties: {
107+
minimumPollers: {
108+
type: 'number',
109+
minimum: 1,
110+
maximum: 200,
111+
},
112+
maximumPollers: {
113+
type: 'number',
114+
minimum: 1,
115+
maximum: 2000,
116+
},
117+
},
118+
},
103119
},
104120
additionalProperties: false,
105121
required: ['accessConfigurations', 'bootstrapServers', 'topic'],
@@ -270,6 +286,14 @@ class AwsCompileKafkaEvents {
270286
};
271287
}
272288

289+
const provisionedPollerConfig = event.kafka.provisionedPollerConfig;
290+
if (provisionedPollerConfig) {
291+
kafkaResource.Properties.ProvisionedPollerConfig = {
292+
MinimumPollers: provisionedPollerConfig.minimumPollers,
293+
MaximumPollers: provisionedPollerConfig.maximumPollers,
294+
};
295+
}
296+
273297
cfTemplate.Resources[kafkaEventLogicalId] = kafkaResource;
274298
});
275299

Diff for: test/unit/lib/plugins/aws/package/compile/events/kafka.test.js

+38
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,7 @@ describe('test/unit/lib/plugins/aws/package/compile/events/kafka.test.js', () =>
2121
const batchSize = 5000;
2222
const maximumBatchingWindow = 20;
2323
const filterPatterns = [{ eventName: 'INSERT' }];
24+
const provisionedPollerConfig = { minimumPollers: 1, maximumPollers: 10 };
2425

2526
describe('when there are kafka events defined', () => {
2627
let minimalEventSourceMappingResource;
@@ -56,6 +57,7 @@ describe('test/unit/lib/plugins/aws/package/compile/events/kafka.test.js', () =>
5657
enabled,
5758
startingPosition,
5859
filterPatterns,
60+
provisionedPollerConfig,
5961
},
6062
},
6163
],
@@ -136,6 +138,10 @@ describe('test/unit/lib/plugins/aws/package/compile/events/kafka.test.js', () =>
136138
},
137139
],
138140
},
141+
ProvisionedPollerConfig: {
142+
MinimumPollers: 1,
143+
MaximumPollers: 10,
144+
},
139145
});
140146
});
141147
});
@@ -673,6 +679,38 @@ describe('test/unit/lib/plugins/aws/package/compile/events/kafka.test.js', () =>
673679
});
674680
});
675681

682+
it('should correctly compile EventSourceMapping resource properties for ProvisionedPollerConfig', async () => {
683+
const { awsNaming, cfTemplate } = await runServerless({
684+
fixture: 'function',
685+
configExt: {
686+
functions: {
687+
basic: {
688+
role: { 'Fn::ImportValue': 'MyImportedRole' },
689+
events: [
690+
{
691+
kafka: {
692+
topic,
693+
bootstrapServers: ['abc.xyz:9092'],
694+
accessConfigurations: { saslScram256Auth: saslScram256AuthArn },
695+
provisionedPollerConfig: { minimumPollers: 2, maximumPollers: 10 },
696+
},
697+
},
698+
],
699+
},
700+
},
701+
},
702+
command: 'package',
703+
});
704+
705+
const eventSourceMappingResource =
706+
cfTemplate.Resources[awsNaming.getKafkaEventLogicalId('basic', 'TestingTopic')];
707+
708+
expect(eventSourceMappingResource.Properties.ProvisionedPollerConfig).to.deep.equal({
709+
MinimumPollers: 2,
710+
MaximumPollers: 10,
711+
});
712+
});
713+
676714
describe('when no kafka events are defined', () => {
677715
it('should not modify the default IAM role', async () => {
678716
const { cfTemplate } = await runServerless({

0 commit comments

Comments
 (0)