Skip to content

Commit 4f9ab32

Browse files
committed
dlq_purge: Show what queues messages came from
1 parent 0ef3e56 commit 4f9ab32

File tree

1 file changed

+28
-7
lines changed

1 file changed

+28
-7
lines changed

src/zocalo/cli/dlq_purge.py

Lines changed: 28 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,7 @@
1515
import time
1616
from datetime import datetime
1717
from functools import partial
18+
from typing import Literal
1819

1920
import workflows
2021

@@ -66,9 +67,11 @@ def run() -> None:
6667
transport = workflows.transport.lookup(args.transport)()
6768

6869
characterfilter = re.compile(r"[^a-zA-Z0-9._-]+", re.UNICODE)
69-
idlequeue: queue.Queue = queue.Queue()
70+
idlequeue: queue.Queue[Literal["start", "done"] | tuple[str, str]] = queue.Queue()
7071

71-
def receive_dlq_message(header: dict, message: dict, rabbitmq=False) -> None:
72+
def receive_dlq_message(
73+
header: dict, message: dict, *, queue_name: str, rabbitmq=False
74+
) -> None:
7275
idlequeue.put_nowait("start")
7376
if rabbitmq:
7477
msg_time = int(datetime.timestamp(header["x-death"][0]["time"])) * 1000
@@ -99,8 +102,11 @@ def receive_dlq_message(header: dict, message: dict, rabbitmq=False) -> None:
99102

100103
with filename.open("w") as fh:
101104
json.dump(dlqmsg, fh, indent=2, sort_keys=True)
102-
print(
103-
f"Message {header['message-id']} ({time.strftime('%Y-%m-%d %H:%M:%S', timestamp)}) exported:\n {filename}"
105+
idlequeue.put_nowait(
106+
(
107+
queue_name,
108+
f" Message {header['message-id']} ({time.strftime('%Y-%m-%d %H:%M:%S', timestamp)}) exported:\n {filename}",
109+
)
104110
)
105111
transport.ack(header)
106112
idlequeue.put_nowait("done")
@@ -112,17 +118,32 @@ def receive_dlq_message(header: dict, message: dict, rabbitmq=False) -> None:
112118
elif args.transport == "PikaTransport":
113119
rmq = RabbitMQAPI.from_zocalo_configuration(zc)
114120
queues = [q.name for q in rmq.queues() if q.name.startswith("dlq.")]
121+
print(f"Looking for DLQ messages in {len(queues)} queues...")
115122
for queue_ in queues:
116-
print("Looking for DLQ messages in " + queue_)
117123
transport.subscribe(
118124
queue_,
119-
partial(receive_dlq_message, rabbitmq=args.transport == "PikaTransport"),
125+
partial(
126+
receive_dlq_message,
127+
rabbitmq=args.transport == "PikaTransport",
128+
queue_name=queue_,
129+
),
120130
acknowledgement=True,
121131
)
132+
messages: dict[str, list[str]] = {}
122133
try:
123134
idlequeue.get(True, args.wait or 3)
124135
while True:
125-
idlequeue.get(True, args.wait or 0.1)
136+
result = idlequeue.get(True, args.wait or 0.1)
137+
if isinstance(result, tuple):
138+
queuename, message = result
139+
messages.setdefault(queuename, []).append(message)
140+
126141
except queue.Empty:
142+
# Print out what we found, per queue
143+
for queuename, q_messages in messages.items():
144+
print(f"Found {len(q_messages)} DLQ messages in {queuename}")
145+
for message in q_messages:
146+
print(message)
147+
127148
print("Done.")
128149
transport.disconnect()

0 commit comments

Comments
 (0)