diff --git a/tests/IotServiceTest.cpp b/tests/IotServiceTest.cpp index 7f3f5c8c1..b9638d314 100644 --- a/tests/IotServiceTest.cpp +++ b/tests/IotServiceTest.cpp @@ -157,6 +157,7 @@ static int s_TestIotPublishSubscribe(Aws::Crt::Allocator *allocator, void *ctx) std::mutex mutex; std::condition_variable cv; + bool connectionAttemptComplete = false; bool connected = false; bool subscribed = false; bool published = false; @@ -172,7 +173,11 @@ static int s_TestIotPublishSubscribe(Aws::Crt::Allocator *allocator, void *ctx) (int)sessionPresent); { std::lock_guard lock(mutex); - connected = true; + connectionAttemptComplete = true; + if (errorCode == AWS_ERROR_SUCCESS && returnCode == AWS_MQTT_CONNECT_ACCEPTED) + { + connected = true; + } } cv.notify_one(); }; @@ -229,11 +234,25 @@ static int s_TestIotPublishSubscribe(Aws::Crt::Allocator *allocator, void *ctx) mqttConnection->OnConnectionClosed = onConnectionClosed; Aws::Crt::UUID Uuid; Aws::Crt::String uuidStr = Uuid.ToString(); - mqttConnection->Connect(uuidStr.c_str(), true); + bool done = false; + while (!done) { - std::unique_lock lock(mutex); - cv.wait(lock, [&]() { return connected; }); + mqttConnection->Connect(uuidStr.c_str(), true); + + { + std::unique_lock lock(mutex); + cv.wait(lock, [&]() { return connectionAttemptComplete; }); + if (connected) + { + done = true; + } + else + { + connectionAttemptComplete = false; + std::this_thread::sleep_for(std::chrono::seconds(2)); + } + } } mqttConnection->Subscribe("/publish/me/senpai", QOS::AWS_MQTT_QOS_AT_LEAST_ONCE, onTest, onSubAck); @@ -243,6 +262,9 @@ static int s_TestIotPublishSubscribe(Aws::Crt::Allocator *allocator, void *ctx) cv.wait(lock, [&]() { return subscribed; }); } + // try to settle any eventual consistency issues server-side + std::this_thread::sleep_for(std::chrono::seconds(2)); + Aws::Crt::ByteBuf payload = Aws::Crt::ByteBufFromCString("notice me pls"); mqttConnection->Publish("/publish/me/senpai", QOS::AWS_MQTT_QOS_AT_LEAST_ONCE, false, payload, onPubAck);