Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Race when closing in setReadyState() #539

Open
awelzel opened this issue Jan 27, 2025 · 1 comment · May be fixed by #540
Open

Race when closing in setReadyState() #539

awelzel opened this issue Jan 27, 2025 · 1 comment · May be fixed by #540

Comments

@awelzel
Copy link

awelzel commented Jan 27, 2025

I'm working on a multi-threaded server application that streams data to clients form dedicated sender threads. This has been working fine except for an occasional segfault, std::bad_function_callor a plain hang. I've worked out a condensed reproducer below.

The reproducer essentially launches a new thread for each client and streams data to it as fast as possible. From the comments and locking done by the library, it seems that should be valid usage (and unclear how else one would stream data to clients independently from the receiving path).

This is an example client code client.py.txt, receiving a few messages, then disconnecting and reconnecting immediately to do the same in a tight loop.

Running the server with 4 to 8 such clients, results in the following crash after a few seconds:

$ ./server
...
Thread[139624607368896] error after 395 sends (312)
Thread[139624607368896] exit (312)
Close 312
Reaped 1
Close 313
terminate called after throwing an instance of 'std::bad_function_call'
  what():  bad_function_call
Aborted (core dumped)

Sometimes it results in just a hangs or segfaults, but most of the time it's the std::bad_function_call.

I have traced it to the client's sendText() detecting an error racing with the server-side thread doing the same. Both invoke setReadyState(CLOSED), possibly invoking _onCloseCallback() twice. Worse, the server-side thread unsetting _onMessageCallback while the sending thread is still executing setReadyState(), accessing the then unset callback member.

I have a local fix adding a mutex into setReadyState() to avoid this. Will open shortly.


#include "ixwebsocket/IXConnectionState.h"
#include "ixwebsocket/IXWebSocket.h"
#include "ixwebsocket/IXWebSocketMessage.h"
#include "ixwebsocket/IXWebSocketMessageType.h"
#include "ixwebsocket/IXWebSocketServer.h"
#include <chrono>
#include <cstdlib>
#include <iostream>
#include <memory>
#include <mutex>
#include <string>
#include <thread>


struct Client
{
    std::thread thread;
    std::shared_ptr<ix::ConnectionState> cs;
    bool closed = false;
};

// Global clients table.
std::mutex clients_mtx;
std::map<std::string, Client> clients;


void reaper();

void sender(std::shared_ptr<ix::WebSocket> ws, std::shared_ptr<ix::ConnectionState> cs)
{
    size_t sends = 0;

    auto make_json = [](size_t v) -> std::string
    {
        std::string s = R"({"value": )";
        s += std::to_string(v);
        s += "}";
        return s;
    };

    while (!cs->isTerminated())
    {
        auto info = ws->sendText(make_json(sends));
        if (!info.success)
        {
            std::cerr << "Thread[" << std::this_thread::get_id() << "] error after " << sends
                      << " sends (" << cs->getId() << ")" << std::endl;
            break;
        }
        ++sends;
    }

    std::cerr << "Thread[" << std::this_thread::get_id() << "] exit " << "(" << cs->getId() << ")"
              << std::endl;
}

/* Main entry */
int main()
{
    int port = 1234;
    const char* port_str = getenv("SERVER_PORT");
    if (port_str) port = atoi(port_str);

    auto server = std::make_shared<ix::WebSocketServer>(port);

    server->setOnConnectionCallback(
        [](std::weak_ptr<ix::WebSocket> wws, std::shared_ptr<ix::ConnectionState> cs)
        {
            auto ws = wws.lock();
            if (!ws)
            {
                std::cerr << "Could not acquire WS client!" << std::endl;
                return;
            }

            ws->setOnMessageCallback(
                [ws = ws, cs = cs](const ix::WebSocketMessagePtr& msg)
                {
                    std::lock_guard<std::mutex> lk {clients_mtx};

                    if (msg->type == ix::WebSocketMessageType::Open)
                    {
                        std::cerr << "Open " << cs->getId() << std::endl;

                        // Spawn a thread streaming data to the client.
                        clients[cs->getId()] = {};
                        clients[cs->getId()].cs = cs;
                        clients[cs->getId()].thread = std::thread {sender, ws, cs};
                    }
                    else if (msg->type == ix::WebSocketMessageType::Close)
                    {
                        std::cerr << "Close " << cs->getId() << std::endl;
                        clients[cs->getId()].closed = true;
                    }
                });
        });

    // Run reaper thread in background.
    std::thread r {reaper};

    if (!server->listenAndStart())
    {
        std::cerr << "Failed ot listen and start" << std::endl;
        exit(1);
    }

    server->wait();

    server->stop();
    return 0;
}

void reaper()
{
    while (true)
    {
        int reaped = 0;
        {
            std::lock_guard<std::mutex> lk {clients_mtx};
            for (auto it = clients.begin(); it != clients.end(); /**/)
            {
                auto& [thread, cs, closed] = it->second;
                if (closed && cs->isTerminated())
                {
                    thread.join();
                    it = clients.erase(it);
                    ++reaped;
                }
                else
                    ++it;
            }
        }

        if (reaped) std::cerr << "Reaped " << reaped << std::endl;

        std::this_thread::sleep_for(std::chrono::microseconds {3});
    }
}

Backtrace of crash

(gdb) bt
#0  __pthread_kill_implementation (no_tid=0, signo=6, threadid=<optimized out>) at ./nptl/pthread_kill.c:44
#1  __pthread_kill_internal (signo=6, threadid=<optimized out>) at ./nptl/pthread_kill.c:78
#2  __GI___pthread_kill (threadid=<optimized out>, signo=signo@entry=6) at ./nptl/pthread_kill.c:89
#3  0x00007fa2e644526e in __GI_raise (sig=sig@entry=6) at ../sysdeps/posix/raise.c:26
#4  0x00007fa2e64288ff in __GI_abort () at ./stdlib/abort.c:79
#5  0x00007fa2e68a5ff5 in ?? () from /lib/x86_64-linux-gnu/libstdc++.so.6
#6  0x00007fa2e68bb0da in ?? () from /lib/x86_64-linux-gnu/libstdc++.so.6
#7  0x00007fa2e68a5a55 in std::terminate() () from /lib/x86_64-linux-gnu/libstdc++.so.6
#8  0x00007fa2e68bb391 in __cxa_throw () from /lib/x86_64-linux-gnu/libstdc++.so.6
#9  0x00007fa2e68a9670 in std::__throw_bad_function_call() () from /lib/x86_64-linux-gnu/libstdc++.so.6
#10 0x00005558aa4bd8de in std::function<void (std::unique_ptr<ix::WebSocketMessage, std::default_delete<ix::WebSocketMessage> > const&)>::operator()(std::unique_ptr<ix::WebSocketMessage, std::default_delete<ix::WebSocketMessage> > const&) const (__args#0=std::unique_ptr<ix::WebSocketMessage> = {...}, this=<optimized out>) at /usr/include/c++/13/bits/std_function.h:590
#11 operator() (code=<optimized out>, reason=..., wireSize=<optimized out>, remote=<optimized out>, __closure=<optimized out>) at /home/awelzel/corelight-oss/projects/IXWebSocket/ixwebsocket/IXWebSocket.cpp:49
#12 0x00005558aa4c7d80 in std::function<void (unsigned short, std::__cxx11::basic_string<char, std::char_traits<char>, std::allocator<char> > const&, unsigned long, bool)>::operator()(unsigned short, std::__cxx11::basic_string<char, std::char_traits<char>, std::allocator<char> > const&, unsigned long, bool) const (__args#3=<optimized out>, __args#2=<optimized out>, __args#1=..., __args#0=<optimized out>, this=0x7fa2c4000c98) at /usr/include/c++/13/bits/std_function.h:591
#13 ix::WebSocketTransport::setReadyState (this=this@entry=0x7fa2c4000b80, readyState=readyState@entry=ix::WebSocketTransport::ReadyState::CLOSED) at /home/awelzel/corelight-oss/projects/IXWebSocket/ixwebsocket/IXWebSocketTransport.cpp:217
#14 0x00005558aa4c802d in ix::WebSocketTransport::flushSendBuffer (this=this@entry=0x7fa2c4000b80) at /home/awelzel/corelight-oss/projects/IXWebSocket/ixwebsocket/IXWebSocketTransport.cpp:1234
#15 0x00005558aa4c8977 in ix::WebSocketTransport::sendData(ix::WebSocketTransport::wsheader_type::opcode_type, ix::IXWebSocketSendData const&, bool, std::function<bool (int, int)> const&) (this=this@entry=0x7fa2c4000b80, type=type@entry=ix::WebSocketTransport::wsheader_type::TEXT_FRAME, message=..., compress=<optimized out>, onProgressCallback=...) at /home/awelzel/corelight-oss/projects/IXWebSocket/ixwebsocket/IXWebSocketTransport.cpp:930
#16 0x00005558aa4c8c4c in ix::WebSocketTransport::sendText(ix::IXWebSocketSendData const&, std::function<bool (int, int)> const&) (this=this@entry=0x7fa2c4000b80, message=..., onProgressCallback=...) at /home/awelzel/corelight-oss/projects/IXWebSocket/ixwebsocket/IXWebSocketTransport.cpp:1054
#17 0x00005558aa4bf525 in ix::WebSocket::sendMessage(ix::IXWebSocketSendData const&, ix::SendMessageKind, std::function<bool (int, int)> const&) (this=0x7fa2c4000b80, this@entry=0x8, message=..., sendMessageKind=sendMessageKind@entry=ix::SendMessageKind::Text, onProgressCallback=...) at /home/awelzel/corelight-oss/projects/IXWebSocket/ixwebsocket/IXWebSocket.cpp:557
#18 0x00005558aa4bf8ad in ix::WebSocket::sendText(std::__cxx11::basic_string<char, std::char_traits<char>, std::allocator<char> > const&, std::function<bool (int, int)> const&) (text=..., onProgressCallback=..., this=0x8) at /home/awelzel/corelight-oss/projects/IXWebSocket/ixwebsocket/IXWebSocket.cpp:523
#19 ix::WebSocket::sendText(std::__cxx11::basic_string<char, std::char_traits<char>, std::allocator<char> > const&, std::function<bool (int, int)> const&) (this=this@entry=0x7fa2c4000b80, text="{\"value\": 200}", onProgressCallback=...) at /home/awelzel/corelight-oss/projects/IXWebSocket/ixwebsocket/IXWebSocket.cpp:514
#20 0x00005558aa4b344f in sender (ws=std::shared_ptr<ix::WebSocket> (use count 1, weak count 0) = {...}, cs=std::shared_ptr<ix::ConnectionState> (use count 2, weak count 0) = {...}) at /home/awelzel/corelight-oss/projects/IXWebSocket/stress/run-threaded.cpp:38
#21 0x00005558aa4b5316 in std::__invoke_impl<void, void (*)(std::shared_ptr<ix::WebSocket>, std::shared_ptr<ix::ConnectionState>), std::shared_ptr<ix::WebSocket>, std::shared_ptr<ix::ConnectionState> > (__f=<optimized out>) at /usr/include/c++/13/bits/invoke.h:61
#22 std::__invoke<void (*)(std::shared_ptr<ix::WebSocket>, std::shared_ptr<ix::ConnectionState>), std::shared_ptr<ix::WebSocket>, std::shared_ptr<ix::ConnectionState> > (__fn=<optimized out>) at /usr/include/c++/13/bits/invoke.h:96
#23 std::thread::_Invoker<std::tuple<void (*)(std::shared_ptr<ix::WebSocket>, std::shared_ptr<ix::ConnectionState>), std::shared_ptr<ix::WebSocket>, std::shared_ptr<ix::ConnectionState> > >::_M_invoke<0ul, 1ul, 2ul> (this=<optimized out>) at /usr/include/c++/13/bits/std_thread.h:292
#24 std::thread::_Invoker<std::tuple<void (*)(std::shared_ptr<ix::WebSocket>, std::shared_ptr<ix::ConnectionState>), std::shared_ptr<ix::WebSocket>, std::shared_ptr<ix::ConnectionState> > >::operator() (this=<optimized out>) at /usr/include/c++/13/bits/std_thread.h:299
#25 std::thread::_State_impl<std::thread::_Invoker<std::tuple<void (*)(std::shared_ptr<ix::WebSocket>, std::shared_ptr<ix::ConnectionState>), std::shared_ptr<ix::WebSocket>, std::shared_ptr<ix::ConnectionState> > > >::_M_run (this=<optimized out>) at /usr/include/c++/13/bits/std_thread.h:244
#26 0x00007fa2e68ecdb4 in ?? () from /lib/x86_64-linux-gnu/libstdc++.so.6
#27 0x00007fa2e649ca94 in start_thread (arg=<optimized out>) at ./nptl/pthread_create.c:447
#28 0x00007fa2e6529c3c in clone3 () at ../sysdeps/unix/sysv/linux/x86_64/clone3.S:78
@awelzel awelzel linked a pull request Jan 27, 2025 that will close this issue
@bsergean
Copy link
Collaborator

Thanks for the report and the fix. I've left a comment in the PR.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging a pull request may close this issue.

2 participants