Skip to content
This repository was archived by the owner on Feb 14, 2024. It is now read-only.

Commit 95d395d

Browse files
author
Trevor Bernard
committed
Clean up proxy
1 parent 518cb38 commit 95d395d

File tree

1 file changed

+21
-19
lines changed

1 file changed

+21
-19
lines changed

src/zeromq/device.clj

+21-19
Original file line numberDiff line numberDiff line change
@@ -4,9 +4,22 @@
44
(:import
55
[org.zeromq ZContext ZMQ$Socket]))
66

7+
(defn forward
8+
"Forward messages from the frontend socket to the backend and optionally to a
9+
capture socket"
10+
[^ZMQ$Socket frontend ^ZMQ$Socket backend ^ZMQ$Socket capture]
11+
(loop [part (zmq/receive frontend)
12+
more? (zmq/receive-more? frontend)]
13+
(zmq/send backend part (if more? zmq/send-more 0))
14+
(when capture
15+
(zmq/send capture part (if more? zmq/send-more 0)))
16+
(when more?
17+
(recur (zmq/receive frontend)
18+
(zmq/receive-more? frontend)))))
19+
720
(defn proxy
821
"The proxy function starts the built-in ØMQ proxy in the current application
9-
thread.
22+
thread.
1023
1124
The proxy connects a frontend socket to a backend socket. Conceptually, data
1225
flows from frontend to backend. Depending on the socket types, replies may
@@ -25,21 +38,10 @@
2538
(let [poller (zmq/poller context 2)]
2639
(zmq/register poller frontend :pollin)
2740
(zmq/register poller backend :pollin)
28-
(while true
29-
(zmq/poll poller)
30-
(when (zmq/check-poller poller 0 :pollin)
31-
(loop [part (zmq/receive frontend)]
32-
(let [more? (zmq/receive-more? frontend)]
33-
(zmq/send backend part (if more? zmq/send-more 0))
34-
(when capture
35-
(zmq/send capture part (if more? zmq/send-more 0)))
36-
(when more?
37-
(recur (zmq/receive frontend))))))
38-
(when (zmq/check-poller poller 1 :pollin)
39-
(loop [part (zmq/receive backend)]
40-
(let [more? (zmq/receive-more? backend)]
41-
(zmq/send frontend part (if more? zmq/send-more 0))
42-
(when capture
43-
(zmq/send capture part (if more? zmq/send-more 0)))
44-
(when more?
45-
(recur (zmq/receive backend))))))))))
41+
(while (not (.. Thread currentThread isInterrupted))
42+
(zmq/poll poller 200)
43+
(cond
44+
(zmq/check-poller poller 0 :pollin)
45+
(forward frontend backend capture)
46+
(zmq/check-poller poller 1 :pollin)
47+
(forward backend frontend capture))))))

0 commit comments

Comments
 (0)