-
Notifications
You must be signed in to change notification settings - Fork 0
feat: Kafka connection management improvements #2
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: main
Are you sure you want to change the base?
Conversation
✨ Key improvements: - Lazy Kafka connection (no startup dependency on Kafka) - Comprehensive health endpoint with Kafka status reporting - Retry logic with configurable timeouts and backoff - Graceful degradation with proper HTTP error responses - Connection pooling and health monitoring - Connection cooldown to prevent hammering unavailable Kafka - Environment variables for all connection parameters 🧪 Testing: - Updated test suite with comprehensive coverage - Added tests for connection failures and health checks - Fixed datetime deprecation warnings 📚 Documentation: - Updated README with new features and configuration 🎯 Benefits: - Service starts without Kafka dependency - Clear error messages when Kafka is down - Better observability of connection status - More resilient in production environments
ae34cd2 to
49adf6e
Compare
|
/gemini review |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Code Review
This pull request significantly improves the Kafka connection management by introducing a KafkaConnectionManager class. This is a great step towards making the service more resilient and robust. The implementation of lazy connections, retry logic, and health checks is well-done.
My review includes a few suggestions for improvement:
- Configuration Loading: The loading of environment variables should be made safer by handling potential
ValueErrors, and the connection cooldown should be made configurable. - Retry Logic: The retry mechanism should be updated to use exponential backoff as described in the README.
- Hardcoded Timeout: The message send timeout should be made configurable instead of being hardcoded.
The accompanying test updates are comprehensive and cover the new logic well. Overall, this is a strong improvement to the service.
| self.connection_timeout = int(os.environ.get("KAFKA_CONNECTION_TIMEOUT", "10")) | ||
| self.retry_attempts = int(os.environ.get("KAFKA_RETRY_ATTEMPTS", "3")) | ||
| self.retry_delay = int(os.environ.get("KAFKA_RETRY_DELAY", "2")) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The application will crash with a ValueError if environment variables like KAFKA_CONNECTION_TIMEOUT are set to non-integer values. This should be handled gracefully to prevent the service from failing to start due to misconfiguration.
| self.connection_timeout = int(os.environ.get("KAFKA_CONNECTION_TIMEOUT", "10")) | |
| self.retry_attempts = int(os.environ.get("KAFKA_RETRY_ATTEMPTS", "3")) | |
| self.retry_delay = int(os.environ.get("KAFKA_RETRY_DELAY", "2")) | |
| try: | |
| self.connection_timeout = int(os.environ.get("KAFKA_CONNECTION_TIMEOUT", "10")) | |
| self.retry_attempts = int(os.environ.get("KAFKA_RETRY_ATTEMPTS", "3")) | |
| self.retry_delay = int(os.environ.get("KAFKA_RETRY_DELAY", "2")) | |
| except ValueError: | |
| logger.critical("Kafka configuration values (timeout, attempts, delay) must be integers.") | |
| raise |
| self.retry_attempts = int(os.environ.get("KAFKA_RETRY_ATTEMPTS", "3")) | ||
| self.retry_delay = int(os.environ.get("KAFKA_RETRY_DELAY", "2")) | ||
| self.last_connection_attempt = 0 | ||
| self.connection_cooldown = 30 # seconds |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The connection_cooldown is hardcoded. For better configurability across different environments, this should be defined as an environment variable, similar to other Kafka settings. This will also require adding it to a try-except block for integer conversion to handle potential ValueErrors.
self.connection_cooldown = int(os.environ.get("KAFKA_CONNECTION_COOLDOWN", "30")) # seconds| except (KafkaError, NoBrokersAvailable, Exception) as e: | ||
| logger.warning(f"Failed to connect to Kafka (attempt {attempt + 1}/{self.retry_attempts}): {e}") | ||
| if attempt < self.retry_attempts - 1: | ||
| time.sleep(self.retry_delay) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
| try: | ||
| producer = self.get_producer() | ||
| future = producer.send(topic, message) | ||
| record_metadata = future.get(timeout=10) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The timeout for waiting on a message send confirmation (future.get()) is hardcoded to 10 seconds. This should be configurable to allow tuning for different network conditions or Kafka performance characteristics. Using the existing self.connection_timeout would be a reasonable choice.
record_metadata = future.get(timeout=self.connection_timeout)- Remove api_version=(0, 10, 1) from KafkaProducer configuration - Allow kafka-python client to auto-negotiate API version with server - Fixes UnsupportedVersionException with Kafka 7.0.1 (Confluent Platform) - Resolves 500 errors in e2e tests caused by version mismatch - Maintains compatibility with both older and newer Kafka deployments 🤖 Generated with [Claude Code](https://claude.ai/code) Co-Authored-By: Claude <[email protected]>
* Configure database for Spending Transaction Monnitor * Delete JIRA_USER_STORE.md Deleted the file JIRA_USER_STORE.md * Delete PR_DESCRIPTION.md Delete PR_DESCRIPTION.md
Testing Kafka connection improvements in fork CI before upstream PR.
Implements:
This PR tests the changes in fork CI first.