diff --git a/src/main/java/net/whistlingfish/harmony/HarmonyClient.java b/src/main/java/net/whistlingfish/harmony/HarmonyClient.java index 85def03..d0e60cc 100644 --- a/src/main/java/net/whistlingfish/harmony/HarmonyClient.java +++ b/src/main/java/net/whistlingfish/harmony/HarmonyClient.java @@ -8,6 +8,7 @@ import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.ScheduledFuture; import java.util.concurrent.TimeUnit; +import java.util.concurrent.locks.ReentrantLock; import javax.inject.Inject; @@ -69,6 +70,12 @@ public class HarmonyClient { private final ScheduledExecutorService scheduler = Executors.newScheduledThreadPool(1); private ScheduledFuture heartbeat; + /* + * To prevent timeouts when different threads send a message and expect a response, create a lock that only allows a + * single thread at a time to perform a send/receive action. + */ + private ReentrantLock messageLock = new ReentrantLock(); + @Inject private AuthService authService; @@ -189,12 +196,14 @@ private Packet sendOAPacket(XMPPTCPConnection authConnection, OAPacket packet) { private Packet sendOAPacket(XMPPTCPConnection authConnection, OAPacket packet, long replyTimeout) { PacketCollector collector = authConnection.createPacketCollector(new EmptyIncrementedIdReplyFilter(packet, authConnection)); + messageLock.lock(); try { authConnection.sendPacket(packet); return getNextPacketSkipContinues(collector, replyTimeout); } catch (SmackException | XMPPErrorException e) { throw new RuntimeException("Failed communicating with Harmony Hub", e); } finally { + messageLock.unlock(); collector.cancel(); } } @@ -206,18 +215,20 @@ private R sendOAPacket(XMPPTCPConnection authConnection, OA private R sendOAPacket(XMPPTCPConnection authConnection, OAPacket packet, Class replyClass, long replyTimeout) { PacketCollector collector = authConnection.createPacketCollector(new OAReplyFilter(packet, authConnection)); + messageLock.lock(); try { authConnection.sendPacket(packet); return replyClass.cast(getNextPacketSkipContinues(collector, replyTimeout)); } catch (SmackException | XMPPErrorException e) { throw new RuntimeException("Failed communicating with Harmony Hub", e); } finally { + messageLock.unlock(); collector.cancel(); } } - private Packet getNextPacketSkipContinues(PacketCollector collector, long replyTimeout) - throws NoResponseException, XMPPErrorException { + private Packet getNextPacketSkipContinues(PacketCollector collector, long replyTimeout) throws NoResponseException, + XMPPErrorException { while (true) { Packet reply = collector.nextResult(replyTimeout); if (reply == null) {