Skip to content

Commit 04fb369

Browse files
Merge pull request #338 from michellammertink/simplify-RPC-examples
Simplified RPC examples
2 parents e4fee05 + a04cfcc commit 04fb369

File tree

2 files changed

+36
-55
lines changed

2 files changed

+36
-55
lines changed

java/RPCClient.java

+6-8
Original file line numberDiff line numberDiff line change
@@ -5,9 +5,7 @@
55

66
import java.io.IOException;
77
import java.util.UUID;
8-
import java.util.concurrent.ArrayBlockingQueue;
9-
import java.util.concurrent.BlockingQueue;
10-
import java.util.concurrent.TimeoutException;
8+
import java.util.concurrent.*;
119

1210
public class RPCClient implements AutoCloseable {
1311

@@ -31,12 +29,12 @@ public static void main(String[] argv) {
3129
String response = fibonacciRpc.call(i_str);
3230
System.out.println(" [.] Got '" + response + "'");
3331
}
34-
} catch (IOException | TimeoutException | InterruptedException e) {
32+
} catch (IOException | TimeoutException | InterruptedException | ExecutionException e) {
3533
e.printStackTrace();
3634
}
3735
}
3836

39-
public String call(String message) throws IOException, InterruptedException {
37+
public String call(String message) throws IOException, InterruptedException, ExecutionException {
4038
final String corrId = UUID.randomUUID().toString();
4139

4240
String replyQueueName = channel.queueDeclare().getQueue();
@@ -48,16 +46,16 @@ public String call(String message) throws IOException, InterruptedException {
4846

4947
channel.basicPublish("", requestQueueName, props, message.getBytes("UTF-8"));
5048

51-
final BlockingQueue<String> response = new ArrayBlockingQueue<>(1);
49+
final CompletableFuture<String> response = new CompletableFuture<>();
5250

5351
String ctag = channel.basicConsume(replyQueueName, true, (consumerTag, delivery) -> {
5452
if (delivery.getProperties().getCorrelationId().equals(corrId)) {
55-
response.offer(new String(delivery.getBody(), "UTF-8"));
53+
response.complete(new String(delivery.getBody(), "UTF-8"));
5654
}
5755
}, consumerTag -> {
5856
});
5957

60-
String result = response.take();
58+
String result = response.get();
6159
channel.basicCancel(ctag);
6260
return result;
6361
}

java/RPCServer.java

+30-47
Original file line numberDiff line numberDiff line change
@@ -14,53 +14,36 @@ public static void main(String[] argv) throws Exception {
1414
ConnectionFactory factory = new ConnectionFactory();
1515
factory.setHost("localhost");
1616

17-
try (Connection connection = factory.newConnection();
18-
Channel channel = connection.createChannel()) {
19-
channel.queueDeclare(RPC_QUEUE_NAME, false, false, false, null);
20-
channel.queuePurge(RPC_QUEUE_NAME);
21-
22-
channel.basicQos(1);
23-
24-
System.out.println(" [x] Awaiting RPC requests");
25-
26-
Object monitor = new Object();
27-
DeliverCallback deliverCallback = (consumerTag, delivery) -> {
28-
AMQP.BasicProperties replyProps = new AMQP.BasicProperties
29-
.Builder()
30-
.correlationId(delivery.getProperties().getCorrelationId())
31-
.build();
32-
33-
String response = "";
34-
35-
try {
36-
String message = new String(delivery.getBody(), "UTF-8");
37-
int n = Integer.parseInt(message);
38-
39-
System.out.println(" [.] fib(" + message + ")");
40-
response += fib(n);
41-
} catch (RuntimeException e) {
42-
System.out.println(" [.] " + e.toString());
43-
} finally {
44-
channel.basicPublish("", delivery.getProperties().getReplyTo(), replyProps, response.getBytes("UTF-8"));
45-
channel.basicAck(delivery.getEnvelope().getDeliveryTag(), false);
46-
// RabbitMq consumer worker thread notifies the RPC server owner thread
47-
synchronized (monitor) {
48-
monitor.notify();
49-
}
50-
}
51-
};
52-
53-
channel.basicConsume(RPC_QUEUE_NAME, false, deliverCallback, (consumerTag -> { }));
54-
// Wait and be prepared to consume the message from RPC client.
55-
while (true) {
56-
synchronized (monitor) {
57-
try {
58-
monitor.wait();
59-
} catch (InterruptedException e) {
60-
e.printStackTrace();
61-
}
62-
}
17+
Connection connection = factory.newConnection();
18+
Channel channel = connection.createChannel();
19+
channel.queueDeclare(RPC_QUEUE_NAME, false, false, false, null);
20+
channel.queuePurge(RPC_QUEUE_NAME);
21+
22+
channel.basicQos(1);
23+
24+
System.out.println(" [x] Awaiting RPC requests");
25+
26+
DeliverCallback deliverCallback = (consumerTag, delivery) -> {
27+
AMQP.BasicProperties replyProps = new AMQP.BasicProperties
28+
.Builder()
29+
.correlationId(delivery.getProperties().getCorrelationId())
30+
.build();
31+
32+
String response = "";
33+
try {
34+
String message = new String(delivery.getBody(), "UTF-8");
35+
int n = Integer.parseInt(message);
36+
37+
System.out.println(" [.] fib(" + message + ")");
38+
response += fib(n);
39+
} catch (RuntimeException e) {
40+
System.out.println(" [.] " + e);
41+
} finally {
42+
channel.basicPublish("", delivery.getProperties().getReplyTo(), replyProps, response.getBytes("UTF-8"));
43+
channel.basicAck(delivery.getEnvelope().getDeliveryTag(), false);
6344
}
64-
}
45+
};
46+
47+
channel.basicConsume(RPC_QUEUE_NAME, false, deliverCallback, (consumerTag -> {}));
6548
}
6649
}

0 commit comments

Comments
 (0)