Skip to content

Commit e889d9c

Browse files
authored
reuse rabbitMQ connection and channel (#27)
1 parent 8ebf513 commit e889d9c

File tree

2 files changed

+73
-49
lines changed

2 files changed

+73
-49
lines changed

src/main/java/com/github/aznamier/keycloak/event/provider/RabbitMqEventListenerProvider.java

+6-30
Original file line numberDiff line numberDiff line change
@@ -15,37 +15,18 @@
1515
import com.rabbitmq.client.AMQP.BasicProperties;
1616
import com.rabbitmq.client.AMQP.BasicProperties.Builder;
1717
import com.rabbitmq.client.Channel;
18-
import com.rabbitmq.client.Connection;
19-
import com.rabbitmq.client.ConnectionFactory;
2018

2119
public class RabbitMqEventListenerProvider implements EventListenerProvider {
2220

2321
private static final Logger log = Logger.getLogger(RabbitMqEventListenerProvider.class);
24-
22+
2523
private final RabbitMqConfig cfg;
26-
private final ConnectionFactory factory;
24+
private Channel channel;
2725

2826
private final EventListenerTransaction tx = new EventListenerTransaction(this::publishAdminEvent, this::publishEvent);
2927

30-
public RabbitMqEventListenerProvider(RabbitMqConfig cfg, KeycloakSession session) {
28+
public RabbitMqEventListenerProvider(Channel channel, KeycloakSession session, RabbitMqConfig cfg) {
3129
this.cfg = cfg;
32-
33-
this.factory = new ConnectionFactory();
34-
35-
this.factory.setUsername(cfg.getUsername());
36-
this.factory.setPassword(cfg.getPassword());
37-
this.factory.setVirtualHost(cfg.getVhost());
38-
this.factory.setHost(cfg.getHostUrl());
39-
this.factory.setPort(cfg.getPort());
40-
41-
if(cfg.getUseTls()) {
42-
try {
43-
this.factory.useSslProtocol();
44-
} catch (Exception e) {
45-
log.error("Could not use SSL protocol", e);
46-
}
47-
}
48-
4930
session.getTransactionManager().enlistAfterCompletion(tx);
5031
}
5132

@@ -97,16 +78,11 @@ private static BasicProperties getMessageProps(String className) {
9778

9879
private void publishNotification(String messageString, BasicProperties props, String routingKey) {
9980
try {
100-
Connection conn = factory.newConnection();
101-
Channel channel = conn.createChannel();
102-
10381
channel.basicPublish(cfg.getExchange(), routingKey, props, messageString.getBytes(StandardCharsets.UTF_8));
104-
log.infof("keycloak-to-rabbitmq SUCCESS sending message: %s%n", routingKey);
105-
channel.close();
106-
conn.close();
107-
82+
log.tracef("keycloak-to-rabbitmq SUCCESS sending message: %s%n", routingKey);
10883
} catch (Exception ex) {
10984
log.errorf(ex, "keycloak-to-rabbitmq ERROR sending message: %s%n", routingKey);
11085
}
11186
}
112-
}
87+
88+
}
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,11 @@
11
package com.github.aznamier.keycloak.event.provider;
22

3+
import com.rabbitmq.client.Channel;
4+
import com.rabbitmq.client.Connection;
5+
import com.rabbitmq.client.ConnectionFactory;
6+
import java.io.IOException;
7+
import java.util.concurrent.TimeoutException;
8+
import org.jboss.logging.Logger;
39
import org.keycloak.Config.Scope;
410
import org.keycloak.events.EventListenerProvider;
511
import org.keycloak.events.EventListenerProviderFactory;
@@ -8,31 +14,73 @@
814

915
public class RabbitMqEventListenerProviderFactory implements EventListenerProviderFactory {
1016

11-
private RabbitMqConfig cfg;
17+
private static final Logger log = Logger.getLogger(RabbitMqEventListenerProviderFactory.class);
18+
private RabbitMqConfig cfg;
19+
private ConnectionFactory connectionFactory;
20+
private Connection connection;
21+
private Channel channel;
1222

13-
@Override
14-
public EventListenerProvider create(KeycloakSession session) {
15-
return new RabbitMqEventListenerProvider(cfg, session);
16-
}
23+
@Override
24+
public EventListenerProvider create(KeycloakSession session) {
25+
checkConnectionAndChannel();
26+
return new RabbitMqEventListenerProvider(channel, session, cfg);
27+
}
1728

18-
@Override
19-
public void init(Scope config) {
20-
cfg = RabbitMqConfig.createFromScope(config);
21-
}
29+
private synchronized void checkConnectionAndChannel() {
30+
try {
31+
if (connection == null || !connection.isOpen()) {
32+
this.connection = connectionFactory.newConnection();
33+
}
34+
if (channel == null || !channel.isOpen()) {
35+
channel = connection.createChannel();
36+
}
37+
}
38+
catch (IOException | TimeoutException e) {
39+
log.error("keycloak-to-rabbitmq ERROR on connection to rabbitmq", e);
40+
}
41+
}
2242

23-
@Override
24-
public void postInit(KeycloakSessionFactory factory) {
43+
@Override
44+
public void init(Scope config) {
45+
cfg = RabbitMqConfig.createFromScope(config);
46+
this.connectionFactory = new ConnectionFactory();
2547

26-
}
48+
this.connectionFactory.setUsername(cfg.getUsername());
49+
this.connectionFactory.setPassword(cfg.getPassword());
50+
this.connectionFactory.setVirtualHost(cfg.getVhost());
51+
this.connectionFactory.setHost(cfg.getHostUrl());
52+
this.connectionFactory.setPort(cfg.getPort());
53+
this.connectionFactory.setAutomaticRecoveryEnabled(true);
2754

28-
@Override
29-
public void close() {
55+
if (cfg.getUseTls()) {
56+
try {
57+
this.connectionFactory.useSslProtocol();
58+
}
59+
catch (Exception e) {
60+
log.error("Could not use SSL protocol", e);
61+
}
62+
}
63+
}
3064

31-
}
65+
@Override
66+
public void postInit(KeycloakSessionFactory factory) {
3267

33-
@Override
34-
public String getId() {
35-
return "keycloak-to-rabbitmq";
36-
}
68+
}
69+
70+
@Override
71+
public void close() {
72+
try {
73+
channel.close();
74+
connection.close();
75+
}
76+
catch (IOException | TimeoutException e) {
77+
log.error("keycloak-to-rabbitmq ERROR on close", e);
78+
}
79+
}
80+
81+
@Override
82+
public String getId() {
83+
return "keycloak-to-rabbitmq";
84+
}
3785

3886
}

0 commit comments

Comments
 (0)