Skip to content

Commit

Permalink
Merge pull request #20 from infn-datacloud/fix-no-kafka-brokers
Browse files Browse the repository at this point in the history
Fix no kafka brokers
  • Loading branch information
giosava94 authored Sep 24, 2024
2 parents 97dd8b8 + 035d393 commit 1d1f427
Show file tree
Hide file tree
Showing 2 changed files with 18 additions and 6 deletions.
14 changes: 14 additions & 0 deletions src/kafka_conn.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,9 @@
import json
from logging import Logger
from typing import Any

from kafka import KafkaProducer
from kafka.errors import NoBrokersAvailable


class Producer:
Expand All @@ -28,3 +30,15 @@ def __init__(
def send(self, data: dict[str, Any]):
"""Send message to kafka"""
self.producer.send(self.topic, data)


def get_kafka_prod(
*, url: str | None, topic: str | None, logger: Logger
) -> Producer | None:
"""Return kafka producer if broker exists."""
if not (url is None or topic is None):
try:
return Producer(server_url=url, topic=topic)
except NoBrokersAvailable:
logger.warning("No brokers available at %s", url)
return None
10 changes: 4 additions & 6 deletions src/main.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@
from fed_reg.provider.schemas_extended import ProviderCreateExtended

from src.config import get_settings
from src.kafka_conn import Producer
from src.kafka_conn import get_kafka_prod
from src.logger import create_logger
from src.parser import parser
from src.providers.core import ProviderThread
Expand All @@ -27,11 +27,9 @@ def main(log_level: str) -> None:
settings = get_settings()

# Create kafka producer if needed
kafka_prod = None
if not (settings.KAFKA_SERVER_URL is None or settings.KAFKA_TOPIC is None):
kafka_prod = Producer(
server_url=settings.KAFKA_SERVER_URL, topic=settings.KAFKA_TOPIC
)
kafka_prod = get_kafka_prod(
url=settings.KAFKA_SERVER_URL, topic=settings.KAFKA_TOPIC, logger=logger
)

# Read all yaml files containing providers configurations.
yaml_files = get_conf_files(settings=settings, logger=logger)
Expand Down

0 comments on commit 1d1f427

Please sign in to comment.