Skip to content

Commit d2c4278

Browse files
authored
BAEL-6731: Introduction to JeroMQ (eugenp#14412)
1 parent 0b18a9f commit d2c4278

File tree

5 files changed

+511
-0
lines changed

5 files changed

+511
-0
lines changed

jeromq/pom.xml

+57
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,57 @@
1+
<?xml version="1.0" encoding="UTF-8"?>
2+
<project xmlns="http://maven.apache.org/POM/4.0.0"
3+
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
4+
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
5+
<modelVersion>4.0.0</modelVersion>
6+
<artifactId>jeromq</artifactId>
7+
<version>0.0.1-SNAPSHOT</version>
8+
<name>jeromq</name>
9+
10+
<parent>
11+
<groupId>com.baeldung</groupId>
12+
<artifactId>parent-modules</artifactId>
13+
<version>1.0.0-SNAPSHOT</version>
14+
</parent>
15+
16+
<dependencies>
17+
<dependency>
18+
<groupId>org.zeromq</groupId>
19+
<artifactId>jeromq</artifactId>
20+
<version>0.5.3</version>
21+
</dependency>
22+
<dependency>
23+
<groupId>org.junit.platform</groupId>
24+
<artifactId>junit-platform-engine</artifactId>
25+
<version>${junit-platform.version}</version>
26+
<scope>test</scope>
27+
</dependency>
28+
<dependency>
29+
<groupId>org.junit.platform</groupId>
30+
<artifactId>junit-platform-console-standalone</artifactId>
31+
<version>${junit-platform.version}</version>
32+
<scope>test</scope>
33+
</dependency>
34+
<dependency>
35+
<groupId>org.junit.jupiter</groupId>
36+
<artifactId>junit-jupiter-migrationsupport</artifactId>
37+
<version>${junit-jupiter.version}</version>
38+
<scope>test</scope>
39+
</dependency>
40+
</dependencies>
41+
42+
<build>
43+
<resources>
44+
<resource>
45+
<directory>src/main/resources</directory>
46+
<filtering>true</filtering>
47+
</resource>
48+
<resource>
49+
<directory>src/test/resources</directory>
50+
<filtering>true</filtering>
51+
</resource>
52+
</resources>
53+
</build>
54+
55+
</project>
56+
57+
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,256 @@
1+
package com.baeldung.jeromq;
2+
3+
import org.junit.jupiter.api.Test;
4+
import org.zeromq.SocketType;
5+
import org.zeromq.ZContext;
6+
import org.zeromq.ZMQ;
7+
8+
import java.util.Random;
9+
import java.util.Set;
10+
import java.util.concurrent.ExecutorService;
11+
import java.util.concurrent.Executors;
12+
import java.util.stream.Collectors;
13+
import java.util.stream.IntStream;
14+
15+
public class DealerRouterLiveTest {
16+
@Test
17+
public void single() throws Exception {
18+
Thread brokerThread = new Thread(() -> {
19+
try (ZContext context = new ZContext()) {
20+
21+
ZMQ.Socket broker = context.createSocket(SocketType.ROUTER);
22+
broker.bind("tcp://*:5555");
23+
24+
String identity = broker.recvStr();
25+
System.out.println(Thread.currentThread().getName() + " - Received identity " + identity);
26+
27+
broker.recv(0); // Envelope delimiter
28+
System.out.println(Thread.currentThread().getName() + " - Received envelope");
29+
String message = broker.recvStr(0); // Response from worker
30+
System.out.println(Thread.currentThread().getName() + " - Received message " + message);
31+
32+
broker.sendMore(identity);
33+
broker.sendMore("xxx");
34+
broker.send("Hello back");
35+
}
36+
});
37+
brokerThread.setName("broker");
38+
39+
Thread workerThread = new Thread(() -> {
40+
try (ZContext context = new ZContext()) {
41+
ZMQ.Socket worker = context.createSocket(SocketType.DEALER);
42+
worker.setIdentity(Thread.currentThread().getName().getBytes(ZMQ.CHARSET));
43+
44+
worker.connect("tcp://localhost:5555");
45+
System.out.println(Thread.currentThread().getName() + " - Connected");
46+
47+
worker.sendMore("");
48+
worker.send("Hello " + Thread.currentThread().getName());
49+
System.out.println(Thread.currentThread().getName() + " - Sent Hello");
50+
51+
worker.recvStr(); // Envelope delimiter
52+
System.out.println(Thread.currentThread().getName() + " - Received Envelope");
53+
String workload = worker.recvStr();
54+
System.out.println(Thread.currentThread().getName() + " - Received " + workload);
55+
}
56+
});
57+
workerThread.setName("worker");
58+
59+
brokerThread.start();
60+
workerThread.start();
61+
62+
workerThread.join();
63+
brokerThread.join();
64+
}
65+
66+
@Test
67+
public void asynchronous() throws Exception {
68+
Thread brokerThread = new Thread(() -> {
69+
try (ZContext context = new ZContext()) {
70+
71+
ZMQ.Socket broker = context.createSocket(SocketType.ROUTER);
72+
broker.bind("tcp://*:5555");
73+
74+
while (true) {
75+
String identity = broker.recvStr(ZMQ.DONTWAIT);
76+
System.out.println(Thread.currentThread().getName() + " - Received identity " + identity);
77+
78+
if (identity == null) {
79+
try {
80+
Thread.sleep(100);
81+
} catch (InterruptedException e) {}
82+
} else {
83+
84+
broker.recv(0); // Envelope delimiter
85+
System.out.println(Thread.currentThread().getName() + " - Received envelope");
86+
String message = broker.recvStr(0); // Response from worker
87+
System.out.println(Thread.currentThread().getName() + " - Received message " + message);
88+
89+
broker.sendMore(identity);
90+
broker.sendMore("xxx");
91+
broker.send("Hello back");
92+
93+
break;
94+
}
95+
}
96+
}
97+
});
98+
brokerThread.setName("broker");
99+
100+
Thread workerThread = new Thread(() -> {
101+
try (ZContext context = new ZContext()) {
102+
ZMQ.Socket worker = context.createSocket(SocketType.DEALER);
103+
worker.setIdentity(Thread.currentThread().getName().getBytes(ZMQ.CHARSET));
104+
105+
worker.connect("tcp://localhost:5555");
106+
System.out.println(Thread.currentThread().getName() + " - Connected");
107+
108+
worker.sendMore("");
109+
worker.send("Hello " + Thread.currentThread().getName());
110+
System.out.println(Thread.currentThread().getName() + " - Sent Hello");
111+
112+
worker.recvStr(); // Envelope delimiter
113+
System.out.println(Thread.currentThread().getName() + " - Received Envelope");
114+
String workload = worker.recvStr();
115+
System.out.println(Thread.currentThread().getName() + " - Received " + workload);
116+
}
117+
});
118+
workerThread.setName("worker");
119+
120+
brokerThread.start();
121+
workerThread.start();
122+
123+
workerThread.join();
124+
brokerThread.join();
125+
}
126+
127+
128+
@Test
129+
public void many() throws Exception {
130+
Thread brokerThread = new Thread(() -> {
131+
try (ZContext context = new ZContext()) {
132+
133+
ZMQ.Socket broker = context.createSocket(SocketType.ROUTER);
134+
broker.bind("tcp://*:5555");
135+
136+
while (!Thread.currentThread().isInterrupted()) {
137+
String identity = broker.recvStr();
138+
System.out.println(Thread.currentThread().getName() + " - Received identity " + identity);
139+
140+
broker.recv(0); // Envelope delimiter
141+
String message = broker.recvStr(0); // Response from worker
142+
System.out.println(Thread.currentThread().getName() + " - Received message " + message);
143+
144+
broker.sendMore(identity);
145+
broker.sendMore("");
146+
broker.send("Hello back to " + identity);
147+
}
148+
}
149+
});
150+
brokerThread.setName("broker");
151+
152+
Set<Thread> workers = IntStream.range(0, 10)
153+
.mapToObj(index -> {
154+
Thread workerThread = new Thread(() -> {
155+
try (ZContext context = new ZContext()) {
156+
ZMQ.Socket worker = context.createSocket(SocketType.DEALER);
157+
worker.setIdentity(Thread.currentThread().getName().getBytes(ZMQ.CHARSET));
158+
159+
worker.connect("tcp://localhost:5555");
160+
System.out.println(Thread.currentThread().getName() + " - Connected");
161+
162+
worker.sendMore("");
163+
worker.send("Hello " + Thread.currentThread().getName());
164+
System.out.println(Thread.currentThread().getName() + " - Sent Hello");
165+
166+
worker.recvStr(); // Envelope delimiter
167+
String workload = worker.recvStr();
168+
System.out.println(Thread.currentThread().getName() + " - Received " + workload);
169+
}
170+
});
171+
workerThread.setName("worker-" + index);
172+
173+
return workerThread;
174+
})
175+
.collect(Collectors.toSet());
176+
177+
brokerThread.start();
178+
workers.forEach(Thread::start);
179+
180+
for (Thread worker : workers) {
181+
worker.join();
182+
}
183+
brokerThread.interrupt();
184+
}
185+
186+
@Test
187+
public void threaded() throws Exception {
188+
Thread brokerThread = new Thread(() -> {
189+
try (ZContext context = new ZContext()) {
190+
191+
ZMQ.Socket broker = context.createSocket(SocketType.ROUTER);
192+
broker.bind("tcp://*:5555");
193+
194+
ExecutorService threadPool = Executors.newFixedThreadPool(5);
195+
Random rng = new Random();
196+
197+
while (!Thread.currentThread().isInterrupted()) {
198+
String identity = broker.recvStr();
199+
System.out.println(Thread.currentThread().getName() + " - Received identity " + identity);
200+
201+
broker.recv(0); // Envelope delimiter
202+
String message = broker.recvStr(0); // Response from worker
203+
System.out.println(Thread.currentThread().getName() + " - Received message " + message);
204+
205+
threadPool.submit(() -> {
206+
try {
207+
Thread.sleep(rng.nextInt(1000) + 1000 );
208+
} catch (Exception e) {}
209+
210+
synchronized(broker) {
211+
broker.sendMore(identity);
212+
broker.sendMore("");
213+
broker.send("Hello back to " + identity + " from " + Thread.currentThread().getName());
214+
}
215+
});
216+
}
217+
218+
threadPool.shutdown();
219+
}
220+
});
221+
brokerThread.setName("broker");
222+
223+
Set<Thread> workers = IntStream.range(0, 10)
224+
.mapToObj(index -> {
225+
Thread workerThread = new Thread(() -> {
226+
try (ZContext context = new ZContext()) {
227+
ZMQ.Socket worker = context.createSocket(SocketType.DEALER);
228+
worker.setIdentity(Thread.currentThread().getName().getBytes(ZMQ.CHARSET));
229+
230+
worker.connect("tcp://localhost:5555");
231+
System.out.println(Thread.currentThread().getName() + " - Connected");
232+
233+
worker.sendMore("");
234+
worker.send("Hello " + Thread.currentThread().getName());
235+
System.out.println(Thread.currentThread().getName() + " - Sent Hello");
236+
237+
worker.recvStr(); // Envelope delimiter
238+
String workload = worker.recvStr();
239+
System.out.println(Thread.currentThread().getName() + " - Received " + workload);
240+
}
241+
});
242+
workerThread.setName("worker-" + index);
243+
244+
return workerThread;
245+
})
246+
.collect(Collectors.toSet());
247+
248+
brokerThread.start();
249+
workers.forEach(Thread::start);
250+
251+
for (Thread worker : workers) {
252+
worker.join();
253+
}
254+
brokerThread.interrupt();
255+
}
256+
}

0 commit comments

Comments
 (0)