Skip to content

Commit e709e5d

Browse files
TienHuyIoTpre-commit-ci-lite[bot]mathieucarbou
authored
Fix WebSocket (#153)
* Fix WebSocket - Handle client disconnection event properly even if cleanupClients is not called within the main loop. - Avoid self-deadlock by using recursive mutex. * ci(pre-commit): Apply automatic fixes * Update example * ci(pre-commit): Apply automatic fixes --------- Co-authored-by: pre-commit-ci-lite[bot] <117423508+pre-commit-ci-lite[bot]@users.noreply.github.com> Co-authored-by: Mathieu Carbou <[email protected]>
1 parent a71d06e commit e709e5d

File tree

3 files changed

+41
-11
lines changed

3 files changed

+41
-11
lines changed

examples/WebSocket/WebSocket.ino

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -102,8 +102,10 @@ void loop() {
102102
}
103103

104104
if (now - lastHeap >= 2000) {
105-
// cleanup disconnected clients or too many clients
106-
ws.cleanupClients();
105+
Serial.printf("Connected clients: %u / %u total\n", ws.count(), ws.getClients().size());
106+
107+
// this can be called to also set a soft limit on the number of connected clients
108+
ws.cleanupClients(2); // no more than 2 clients
107109

108110
#ifdef ESP32
109111
Serial.printf("Free heap: %" PRIu32 "\n", ESP.getFreeHeap());

src/AsyncWebSocket.cpp

Lines changed: 35 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -333,7 +333,7 @@ AsyncWebSocketClient::AsyncWebSocketClient(AsyncWebServerRequest *request, Async
333333
AsyncWebSocketClient::~AsyncWebSocketClient() {
334334
{
335335
#ifdef ESP32
336-
std::lock_guard<std::mutex> lock(_lock);
336+
std::lock_guard<std::recursive_mutex> lock(_lock);
337337
#endif
338338
_messageQueue.clear();
339339
_controlQueue.clear();
@@ -351,7 +351,7 @@ void AsyncWebSocketClient::_onAck(size_t len, uint32_t time) {
351351
_lastMessageTime = millis();
352352

353353
#ifdef ESP32
354-
std::lock_guard<std::mutex> lock(_lock);
354+
std::unique_lock<std::recursive_mutex> lock(_lock);
355355
#endif
356356

357357
if (!_controlQueue.empty()) {
@@ -362,6 +362,14 @@ void AsyncWebSocketClient::_onAck(size_t len, uint32_t time) {
362362
_controlQueue.pop_front();
363363
_status = WS_DISCONNECTED;
364364
if (_client) {
365+
#ifdef ESP32
366+
/*
367+
Unlocking has to be called before return execution otherwise std::unique_lock ::~unique_lock() will get an exception pthread_mutex_unlock.
368+
Due to _client->close(true) shall call the callback function _onDisconnect()
369+
The calling flow _onDisconnect() --> _handleDisconnect() --> ~AsyncWebSocketClient()
370+
*/
371+
lock.unlock();
372+
#endif
365373
_client->close(true);
366374
}
367375
return;
@@ -385,7 +393,7 @@ void AsyncWebSocketClient::_onPoll() {
385393
}
386394

387395
#ifdef ESP32
388-
std::unique_lock<std::mutex> lock(_lock);
396+
std::unique_lock<std::recursive_mutex> lock(_lock);
389397
#endif
390398
if (_client && _client->canSend() && (!_controlQueue.empty() || !_messageQueue.empty())) {
391399
_runQueue();
@@ -415,21 +423,21 @@ void AsyncWebSocketClient::_runQueue() {
415423

416424
bool AsyncWebSocketClient::queueIsFull() const {
417425
#ifdef ESP32
418-
std::lock_guard<std::mutex> lock(_lock);
426+
std::lock_guard<std::recursive_mutex> lock(_lock);
419427
#endif
420428
return (_messageQueue.size() >= WS_MAX_QUEUED_MESSAGES) || (_status != WS_CONNECTED);
421429
}
422430

423431
size_t AsyncWebSocketClient::queueLen() const {
424432
#ifdef ESP32
425-
std::lock_guard<std::mutex> lock(_lock);
433+
std::lock_guard<std::recursive_mutex> lock(_lock);
426434
#endif
427435
return _messageQueue.size();
428436
}
429437

430438
bool AsyncWebSocketClient::canSend() const {
431439
#ifdef ESP32
432-
std::lock_guard<std::mutex> lock(_lock);
440+
std::lock_guard<std::recursive_mutex> lock(_lock);
433441
#endif
434442
return _messageQueue.size() < WS_MAX_QUEUED_MESSAGES;
435443
}
@@ -440,7 +448,7 @@ bool AsyncWebSocketClient::_queueControl(uint8_t opcode, const uint8_t *data, si
440448
}
441449

442450
#ifdef ESP32
443-
std::lock_guard<std::mutex> lock(_lock);
451+
std::lock_guard<std::recursive_mutex> lock(_lock);
444452
#endif
445453

446454
_controlQueue.emplace_back(opcode, data, len, mask);
@@ -458,14 +466,22 @@ bool AsyncWebSocketClient::_queueMessage(AsyncWebSocketSharedBuffer buffer, uint
458466
}
459467

460468
#ifdef ESP32
461-
std::lock_guard<std::mutex> lock(_lock);
469+
std::unique_lock<std::recursive_mutex> lock(_lock);
462470
#endif
463471

464472
if (_messageQueue.size() >= WS_MAX_QUEUED_MESSAGES) {
465473
if (closeWhenFull) {
466474
_status = WS_DISCONNECTED;
467475

468476
if (_client) {
477+
#ifdef ESP32
478+
/*
479+
Unlocking has to be called before return execution otherwise std::unique_lock ::~unique_lock() will get an exception pthread_mutex_unlock.
480+
Due to _client->close(true) shall call the callback function _onDisconnect()
481+
The calling flow _onDisconnect() --> _handleDisconnect() --> ~AsyncWebSocketClient()
482+
*/
483+
lock.unlock();
484+
#endif
469485
_client->close(true);
470486
}
471487

@@ -551,6 +567,7 @@ void AsyncWebSocketClient::_onTimeout(uint32_t time) {
551567
void AsyncWebSocketClient::_onDisconnect() {
552568
// Serial.println("onDis");
553569
_client = nullptr;
570+
_server->_handleDisconnect(this);
554571
}
555572

556573
void AsyncWebSocketClient::_onData(void *pbuf, size_t plen) {
@@ -857,6 +874,16 @@ AsyncWebSocketClient *AsyncWebSocket::_newClient(AsyncWebServerRequest *request)
857874
return &_clients.back();
858875
}
859876

877+
void AsyncWebSocket::_handleDisconnect(AsyncWebSocketClient *client) {
878+
const auto client_id = client->id();
879+
const auto iter = std::find_if(std::begin(_clients), std::end(_clients), [client_id](const AsyncWebSocketClient &c) {
880+
return c.id() == client_id;
881+
});
882+
if (iter != std::end(_clients)) {
883+
_clients.erase(iter);
884+
}
885+
}
886+
860887
bool AsyncWebSocket::availableForWriteAll() {
861888
return std::none_of(std::begin(_clients), std::end(_clients), [](const AsyncWebSocketClient &c) {
862889
return c.queueIsFull();

src/AsyncWebSocket.h

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -152,7 +152,7 @@ class AsyncWebSocketClient {
152152
uint32_t _clientId;
153153
AwsClientStatus _status;
154154
#ifdef ESP32
155-
mutable std::mutex _lock;
155+
mutable std::recursive_mutex _lock;
156156
#endif
157157
std::deque<AsyncWebSocketControl> _controlQueue;
158158
std::deque<AsyncWebSocketMessage> _messageQueue;
@@ -385,6 +385,7 @@ class AsyncWebSocket : public AsyncWebHandler {
385385
return _cNextId++;
386386
}
387387
AsyncWebSocketClient *_newClient(AsyncWebServerRequest *request);
388+
void _handleDisconnect(AsyncWebSocketClient *client);
388389
void _handleEvent(AsyncWebSocketClient *client, AwsEventType type, void *arg, uint8_t *data, size_t len);
389390
bool canHandle(AsyncWebServerRequest *request) const override final;
390391
void handleRequest(AsyncWebServerRequest *request) override final;

0 commit comments

Comments
 (0)