Skip to content

Commit 164013f

Browse files
authored
Merge pull request certtools#2533 from kamil-certat/amgp_timeout
Fix maintaining pipeline when using AMQP
2 parents 1a92ae8 + 8d71e85 commit 164013f

File tree

2 files changed

+15
-6
lines changed

2 files changed

+15
-6
lines changed

CHANGELOG.md

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -12,6 +12,7 @@
1212
### Configuration
1313

1414
### Core
15+
- AMQP: Fix maintaining pipeline connection when during interrupted connections (PR#2533 by Kamil Mankowski).
1516
- Python 3.8 or newer is required (PR#2541 by Sebastian Wagner).
1617
- `intelmq.lib.utils.list_all_bots`/`intelmqctl check`: Fix check for bot executable in $PATH by using the bot name instead of the import path (fixes #2559, PR#2564 by Sebastian Wagner).
1718

intelmq/lib/pipeline.py

Lines changed: 14 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -511,6 +511,7 @@ def __init__(self, logger, pipeline_args: dict = None, load_balance=False, is_mu
511511
if pika is None:
512512
raise ValueError("To use AMQP you must install the 'pika' library.")
513513
self.properties = pika.BasicProperties(delivery_mode=2) # message persistence
514+
self.heartbeat = 10
514515

515516
def load_configurations(self, queues_type):
516517
self.host = self.pipeline_args.get(f"{queues_type}_pipeline_host", "10.0.0.1")
@@ -533,9 +534,9 @@ def load_configurations(self, queues_type):
533534
self.kwargs['ssl_options'] = pika.SSLOptions(context=ssl.create_default_context(ssl.Purpose.SERVER_AUTH))
534535
pika_version = tuple(int(x) for x in pika.__version__.split('.'))
535536
if pika_version < (0, 11):
536-
self.kwargs['heartbeat_interval'] = 10
537+
self.kwargs['heartbeat_interval'] = self.heartbeat
537538
else:
538-
self.kwargs['heartbeat'] = 10
539+
self.kwargs['heartbeat'] = self.heartbeat
539540
if pika_version < (1, ):
540541
# https://groups.google.com/forum/#!topic/pika-python/gz7lZtPRq4Q
541542
self.publish_raises_nack = False
@@ -607,7 +608,7 @@ def _send(self, destination_queue, message, reconnect=True):
607608
mandatory=True,
608609
)
609610
except Exception as exc: # UnroutableError, NackError in 1.0.0
610-
if reconnect and isinstance(exc, pika.exceptions.ConnectionClosed):
611+
if reconnect and isinstance(exc, (pika.exceptions.ConnectionClosed, pika.exceptions.StreamLostError)):
611612
self.logger.debug('Error sending the message. '
612613
'Will re-connect and re-send.',
613614
exc_info=True)
@@ -645,9 +646,16 @@ def _receive(self) -> bytes:
645646
if self.source_queue is None:
646647
raise exceptions.ConfigurationError('pipeline', 'No source queue given.')
647648
try:
648-
method, header, body = next(self.channel.consume(self.source_queue))
649-
if method:
650-
self.delivery_tag = method.delivery_tag
649+
# self.channel.consume is blocking and with no incoming messages
650+
# can prevent heartbeat maintenance. This loop let the pika maintain
651+
# the channel at least once between expected heartbeats.
652+
method, body = None, None
653+
while not (method or body):
654+
method, _, body = next(
655+
self.channel.consume(self.source_queue, inactivity_timeout=self.heartbeat / 2)
656+
)
657+
if method:
658+
self.delivery_tag = method.delivery_tag
651659
except Exception as exc:
652660
raise exceptions.PipelineError(exc)
653661
else:

0 commit comments

Comments
 (0)