Skip to content

Commit 25e9a45

Browse files
committed
feat: create a kafka consumer
1 parent 8601d84 commit 25e9a45

17 files changed

+142
-170
lines changed

.github/workflows/trigger_partner_docs_update.yml

+4-4
Original file line numberDiff line numberDiff line change
@@ -12,16 +12,16 @@ jobs:
1212
runs-on: ubuntu-latest
1313
steps:
1414
- name: Trigger partner docs update
15-
if: github.repository == 'pactflow/example-consumer-python-sns'
15+
if: github.repository == 'pactflow/example-consumer-python-kafka'
1616
uses: peter-evans/repository-dispatch@v1
1717
with:
1818
token: ${{ secrets.GHTOKENFORTRIGGERINGPACTDOCSUPDATE }}
1919
repository: pactflow/partners.pactflow.io
20-
event-type: pactflow-example-consumer-python-sns-updated
20+
event-type: pactflow-example-consumer-python-kafka-updated
2121
- name: Trigger docs update
22-
if: github.repository == 'pactflow/example-consumer-python-sns'
22+
if: github.repository == 'pactflow/example-consumer-python-kafka'
2323
uses: peter-evans/repository-dispatch@v1
2424
with:
2525
token: ${{ secrets.GHTOKENFORTRIGGERINGPACTDOCSUPDATE }}
2626
repository: pactflow/docs.pactflow.io
27-
event-type: pactflow-example-consumer-python-sns-updated
27+
event-type: pactflow-example-consumer-python-kafka-updated

.whitesource

-3
This file was deleted.

Makefile

+5-14
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,6 @@
11
# Default to the read only token - the read/write token will be present on Travis CI.
22
# It's set as a secure environment variable in the .travis.yml file
3-
PACTICIPANT := "pactflow-example-consumer-python-sns"
3+
PACTICIPANT := "pactflow-example-consumer-python-kafka"
44
GITHUB_WEBHOOK_UUID := "c76b601e-d66a-4eb1-88a4-6ebc50c0df8b"
55
PACT_CLI="docker run --rm -v ${PWD}:${PWD} -e PACT_BROKER_BASE_URL -e PACT_BROKER_TOKEN pactfoundation/pact-cli:latest"
66

@@ -84,7 +84,7 @@ create_github_token_secret:
8484
create_or_update_github_webhook:
8585
@"${PACT_CLI}" \
8686
broker create-or-update-webhook \
87-
'https://api.github.com/repos/pactflow/example-consumer-python-sns/statuses/$${pactbroker.consumerVersionNumber}' \
87+
'https://api.github.com/repos/pactflow/example-consumer-python-kafka/statuses/$${pactbroker.consumerVersionNumber}' \
8888
--header 'Content-Type: application/json' 'Accept: application/vnd.github.v3+json' 'Authorization: token $${user.githubCommitStatusToken}' \
8989
--request POST \
9090
--data @${PWD}/pactflow/github-commit-status-webhook.json \
@@ -106,7 +106,7 @@ test_github_webhook:
106106
## ======================
107107
## Python additions
108108
## ======================
109-
PROJECT := example-consumer-python-sns
109+
PROJECT := example-consumer-python-kafka
110110
PYTHON_MAJOR_VERSION := 3.11
111111

112112
sgr0 := $(shell tput sgr0)
@@ -116,9 +116,6 @@ green := $(shell tput setaf 2)
116116
deps:
117117
poetry install
118118

119-
integration:
120-
sam local invoke ProductEventHandler --event ./tests/resources/events/update.json
121-
122119
venv:
123120
@if [ -d "./.venv" ]; then echo "$(red).venv already exists, not continuing!$(sgr0)"; exit 1; fi
124121
@type pyenv >/dev/null 2>&1 || (echo "$(red)pyenv not found$(sgr0)"; exit 1)
@@ -139,11 +136,5 @@ venv:
139136
@echo "\n$(green)Use it! (populate .python-version)$(sgr0)"
140137
pyenv local ${PROJECT}
141138

142-
deploy_sam:
143-
scripts/deploy.sh
144-
145-
publish_sam:
146-
scripts/publish.sh
147-
148-
logs:
149-
sam logs -n ProductEventHandler --stack-name pactflow-example-consumer-python-sns -t
139+
run:
140+
python3 run.py

README.md

+9-10
Original file line numberDiff line numberDiff line change
@@ -1,17 +1,16 @@
1-
# Example Python AWS SNS Consumer
1+
# Example Python AWS Kafka Consumer
22

3-
[![Build Status](https://github.com/pactflow/example-consumer-python-sns/actions/workflows/build.yml/badge.svg)](https://github.com/pactflow/example-consumer-python-sns/actions)
3+
[![Build Status](https://github.com/pactflow/example-consumer-python-kafka/actions/workflows/build.yml/badge.svg)](https://github.com/pactflow/example-consumer-python-kafka/actions)
44

5-
[![Can I deploy Status](https://test.pactflow.io/pacticipants/pactflow-example-consumer-python-sns/branches/main/latest-version/can-i-deploy/to-environment/production/badge.svg)](https://test.pactflow.io/pacticipants/pactflow-example-consumer-python-sns/branches/main/latest-version/can-i-deploy/to-environment/production/badge)
5+
[![Can I deploy Status](https://test.pactflow.io/pacticipants/pactflow-example-consumer-python-kafka/branches/main/latest-version/can-i-deploy/to-environment/production/badge.svg)](https://test.pactflow.io/pacticipants/pactflow-example-consumer-python-kafka/branches/main/latest-version/can-i-deploy/to-environment/production/badge)
66

7-
[![Pact Status](https://test.pactflow.io/pacts/provider/pactflow-example-provider-python-sns/consumer/pactflow-example-consumer-python-sns/latest/badge.svg?label=consumer)](https://test.pactflow.io/pacts/provider/pactflow-example-provider-python-sns/consumer/pactflow-example-consumer-python-sns/latest) (latest pact)
7+
[![Pact Status](https://test.pactflow.io/pacts/provider/pactflow-example-provider-python-kafka/consumer/pactflow-example-consumer-python-kafka/latest/badge.svg?label=consumer)](https://test.pactflow.io/pacts/provider/pactflow-example-provider-python-kafka/consumer/pactflow-example-consumer-python-kafka/latest) (latest pact)
88

9-
[![Pact Status](https://test.pactflow.io/matrix/provider/pactflow-example-provider-python-sns/latest/main/consumer/pactflow-example-consumer-python-sns/latest/main/badge.svg?label=consumer)](https://test.pactflow.io/pacts/provider/pactflow-example-provider-python-sns/consumer/pactflow-example-consumer-python-sns/latest/prod) (prod/prod pact)
9+
[![Pact Status](https://test.pactflow.io/matrix/provider/pactflow-example-provider-python-kafka/latest/main/consumer/pactflow-example-consumer-python-kafka/latest/main/badge.svg?label=consumer)](https://test.pactflow.io/pacts/provider/pactflow-example-provider-python-kafka/consumer/pactflow-example-consumer-python-kafka/latest/prod) (prod/prod pact)
1010

11-
This is an example of a Python AWS SNS consumer that uses Pact, [Pactflow](https://pactflow.io) and GitHub Actions to ensure that it is compatible with the expectations its consumers have of it.
11+
This is an example of a Python Kafka consumer that uses Pact, [Pactflow](https://pactflow.io) and GitHub Actions to ensure that it is compatible with the expectations its consumers have of it.
1212

13-
All examples in the series `example-consumer-<language>-sns` provide the same functionality to be easily comparable across languages.
14-
As such, please refer to [https://docs.pactflow.io/docs/examples/aws/sns/consumer/](AWS SNS Consumer Examples) to avoid unnecessary duplication of details here.
13+
All examples in the series `example-consumer-<language>-kafka` provide the same functionality to be easily comparable across languages.
1514

1615
Language specific sections which differ from the canonical example only can be found below.
1716

@@ -25,7 +24,7 @@ You're probably familiar with layered architectures such as Ports and Adaptors (
2524

2625
This code base is setup with this modularity in mind:
2726

28-
- [Lambda Handler](src/_lambda/product.py)
27+
- [Kafka Handler](src/_kafka/product.py)
2928
- [Event Service](src/product/product_service.py)
3029
- Business Logic
3130
- [Product](src/product/product.py)
@@ -43,4 +42,4 @@ See also:
4342
### Testing
4443

4544
- Run the unit tests: `make test`
46-
- Run a (local) lambda integration test: `make integration`
45+
- Run the kafka consumer: `make run`

pactflow/github-commit-status-webhook.json

-6
This file was deleted.

poetry.lock

+57-1
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

pyproject.toml

+5-4
Original file line numberDiff line numberDiff line change
@@ -1,19 +1,20 @@
11
[tool.poetry]
2-
name = "example-consumer-python-sns"
2+
name = "example-consumer-python-kafka"
33
version = "0.0.0"
4-
description = "Example Python AWS SNS Consumer"
4+
description = "Example Python Kafka Consumer"
55
license = "Proprietary"
66
authors = ["Mike Geeves <[email protected]>"]
77
readme = 'README.md'
8-
repository = "https://github.com/pactflow/example-consumer-python-sns"
9-
homepage = "https://github.com/pactflow/example-consumer-python-sns"
8+
repository = "https://github.com/pactflow/example-consumer-python-kafka"
9+
homepage = "https://github.com/pactflow/example-consumer-python-kafka"
1010

1111
[build-system]
1212
requires = ["poetry-core>=1.0.0"]
1313
build-backend = "poetry.core.masonry.api"
1414

1515
[tool.poetry.dependencies]
1616
python = "^3.11"
17+
confluent-kafka = "^2.6.0"
1718

1819
[tool.poetry.dev-dependencies]
1920
pytest = "8.3.3"

run.py

+4
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,4 @@
1+
from src._kafka.product import consume_messages
2+
3+
if __name__ == "__main__":
4+
consume_messages()

samconfig.toml

-10
This file was deleted.

scripts/deploy.sh

-5
This file was deleted.

scripts/publish.sh

-9
This file was deleted.

src/_kafka/__init__.py

Whitespace-only changes.

src/_kafka/product.py

+57
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,57 @@
1+
#!/usr/bin/env python
2+
import logging
3+
import asyncio
4+
import json
5+
from confluent_kafka import Consumer
6+
from typing import Dict
7+
from src.product.product_service import receive_product_update
8+
logger = logging.getLogger(__name__)
9+
10+
def message_handler(event):
11+
loop = asyncio.get_event_loop()
12+
return loop.run_until_complete(handler(event))
13+
14+
async def handler(event: Dict):
15+
16+
logger.info(f'{event=}')
17+
18+
# Read the SNS message and pass the contents to the actual message handler
19+
message: Dict = json.loads(event.value().decode('utf-8'))
20+
logger.info(message)
21+
results = await receive_product_update(message)
22+
logger.info(results)
23+
24+
# Return the current size of the repository
25+
return len(results.keys())
26+
27+
28+
def consume_messages():
29+
config = {
30+
'bootstrap.servers': 'localhost:9092',
31+
'group.id': 'pactflow-example-consumer-python-kafka',
32+
}
33+
34+
# Create Consumer instance
35+
consumer = Consumer(config)
36+
37+
# Subscribe to topic
38+
topic = "products"
39+
consumer.subscribe([topic])
40+
41+
# Poll for new messages from Kafka and print them.
42+
try:
43+
while True:
44+
msg = consumer.poll(1.0)
45+
if msg is None:
46+
print("Waiting...")
47+
elif msg.error():
48+
print("ERROR: %s".format(msg.error()))
49+
else:
50+
print("Consumed event from topic {topic}: value = {value:12}".format(
51+
topic=msg.topic(), value=msg.value().decode('utf-8')))
52+
message_handler(msg)
53+
except KeyboardInterrupt:
54+
pass
55+
finally:
56+
# Leave group and commit final offsets
57+
consumer.close()

template.yml

-42
This file was deleted.

0 commit comments

Comments
 (0)