Skip to content

This issue was moved to a discussion.

You can continue the conversation there. Go to discussion →

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

The uWS::Loop::get()->defer method will block the reception of subsequent messages. How should this be handled? #1844

Closed
LetAmericaGreatAgain opened this issue Feb 10, 2025 · 3 comments

Comments

@LetAmericaGreatAgain
Copy link

play is a time-consuming task, and calling this method blocks the reception of subsequent messages. How should this be handled?

uWS::Loop::get()->defer([this, ws]() { 
    this->play(ws); 
});
app->ws<PerSocketData>(api, {
            .compression = uWS::DISABLED,
            .maxPayloadLength = 1024 * 1024 * 1024,
            .idleTimeout = 60,
            .maxBackpressure = 1024 * 1024 * 1024,
            .closeOnBackpressureLimit = false,
            .resetIdleTimeoutOnSend = true,
            .sendPingsAutomatically = true,
            .upgrade = nullptr,
            .open = [](uWS::WebSocket<false, true, PerSocketData> *ws) {
                std::cout << "open..." << std::endl;
            },
            .message = [this](uWS::WebSocket<false, true, PerSocketData> *ws, std::string_view message,
                              uWS::OpCode opCode) {
                try {
                    IMMsg msg = json::parse(message);
                    if(msg.getType() == "Video") {
                        std::string cmd = msg.getCmd();
                        if(cmd == "play") {
                            uWS::Loop::get()->defer([this, ws]() {
                                this->play(ws);
                            });
                        } else if(cmd == "stop") {
                            this->stop();
                        }
                    }
                } catch (json::exception &e) {
                    LOGI << "Parsing message exception..." << e.what();
                }

            },
@Jacob-Burckhardt
Copy link

Make the message handler acquire a mutex, add the message to a queue, and release the mutex. That will be a fast-running handler that will block the uWebSockets run method for a very short time.

Make another thread acquire the same mutex, pop off the message to a local variable, and release the mutex. After the mutex release, make that thread parse the message. If it is a play message, make it run your code that computes the data that needs to be sent to the client. Then make it call

defer([this, ws]() { ws->send(message); });

Notice that that code does not have a call to the time-consuming play function. The above code snippet will run fast meaning it blocks the uWebSockets run method for a very short time.

@LetAmericaGreatAgain
Copy link
Author

Is it like this? The program runs fine, but the browser doesn't receive any data.

app->ws<PerSocketData>(api, {
            .compression = uWS::DISABLED,
            .maxPayloadLength = 1024 * 1024 * 1024,
            .idleTimeout = 60,
            .maxBackpressure = 1024 * 1024 * 1024,
            .closeOnBackpressureLimit = false,
            .resetIdleTimeoutOnSend = true,
            .sendPingsAutomatically = true,
            .upgrade = nullptr,
            .open = [](uWS::WebSocket<false, true, PerSocketData> *ws) {
                std::cout << "open..." << std::endl;
            },
            .message = [this](uWS::WebSocket<false, true, PerSocketData> *ws, std::string_view message,
                              uWS::OpCode opCode) {
                try {
                    IMMsg msg = json::parse(message);
                    if(msg.getType() == "Camera") {
                        std::string cmd = msg.getCmd();
                        if(cmd == "play") {
                            std::lock_guard<std::mutex> lock(this->messageMutex);
                            this->messageQueue.push(msg);

                            std::thread([this, ws]() {
                                std::lock_guard<std::mutex> lock(this->messageMutex);

                                if (!this->messageQueue.empty()) {
                                    IMMsg msg = this->messageQueue.front();
                                    this->messageQueue.pop();

                                    if (msg.getType() == "Camera" && msg.getCmd() == "play") {
                                        this->play(ws);
                                    }
                                }
                            }).detach();
                        } else if(cmd == "stop") {
                            this->stop();
                        }
                    }
                } catch (json::exception &e) {
                    LOGI << "Parsing message exception..." << e.what();
                }
void play(uWS::WebSocket<false, true, PerSocketData> *ws) {
    try {
        video::start([ws](std::string_view frame_data) -> void {
            uWS::Loop::get()->defer([frame_data, ws]() {
                ws->send(frame_data, uWS::OpCode::BINARY);
            });
        });
    }

@Jacob-Burckhardt
Copy link

Add a cout line right before the send call. I suspect you won't see the message from cout which would explain why the browser doesn't receive any data.

If I am right, then I think you have the same problem as in #1118. I.e. your run method is called in one thread which in turn calls your message callback which then creates a new thread. If video::start directly calls the lambda you pass it, then that means the new thread called Loop::get which means it is a different thread than the thread that called run.

When your message handler creates a new thread, pass that thread a loop in the lambda capture section. Then pass that loop onto play and again to the lambda in play so that lambda can call defer on that loop.

BTW, your code creates a new thread every time you receive a message. If you receive messages often that would create a lot of overhead. Since you said that play is long-running, that means it will take a long time before the thread ends. If you get another message before the previous thread has ended then now you have two new threads running. If you receive messages often you could have a very large number of threads running at the same time which might use up too much CPU time.

@uNetworking uNetworking locked and limited conversation to collaborators Feb 20, 2025
@uNetworkingAB uNetworkingAB converted this issue into discussion #1847 Feb 20, 2025

This issue was moved to a discussion.

You can continue the conversation there. Go to discussion →

Labels
None yet
Projects
None yet
Development

No branches or pull requests

2 participants