You signed in with another tab or window. Reload to refresh your session.You signed out in another tab or window. Reload to refresh your session.You switched accounts on another tab or window. Reload to refresh your session.Dismiss alert
I'm working on a multi-threaded server application that streams data to clients form dedicated sender threads. This has been working fine except for an occasional segfault, std::bad_function_callor a plain hang. I've worked out a condensed reproducer below.
The reproducer essentially launches a new thread for each client and streams data to it as fast as possible. From the comments and locking done by the library, it seems that should be valid usage (and unclear how else one would stream data to clients independently from the receiving path).
This is an example client code client.py.txt, receiving a few messages, then disconnecting and reconnecting immediately to do the same in a tight loop.
Running the server with 4 to 8 such clients, results in the following crash after a few seconds:
$ ./server
...
Thread[139624607368896] error after 395 sends (312)
Thread[139624607368896] exit (312)
Close 312
Reaped 1
Close 313
terminate called after throwing an instance of 'std::bad_function_call'
what(): bad_function_call
Aborted (core dumped)
Sometimes it results in just a hangs or segfaults, but most of the time it's the std::bad_function_call.
I have traced it to the client's sendText() detecting an error racing with the server-side thread doing the same. Both invoke setReadyState(CLOSED), possibly invoking _onCloseCallback() twice. Worse, the server-side thread unsetting_onMessageCallback while the sending thread is still executing setReadyState(), accessing the then unset callback member.
I have a local fix adding a mutex into setReadyState() to avoid this. Will open shortly.
#include "ixwebsocket/IXConnectionState.h"
#include "ixwebsocket/IXWebSocket.h"
#include "ixwebsocket/IXWebSocketMessage.h"
#include "ixwebsocket/IXWebSocketMessageType.h"
#include "ixwebsocket/IXWebSocketServer.h"
#include <chrono>
#include <cstdlib>
#include <iostream>
#include <memory>
#include <mutex>
#include <string>
#include <thread>
struct Client
{
std::thread thread;
std::shared_ptr<ix::ConnectionState> cs;
bool closed = false;
};
// Global clients table.
std::mutex clients_mtx;
std::map<std::string, Client> clients;
void reaper();
void sender(std::shared_ptr<ix::WebSocket> ws, std::shared_ptr<ix::ConnectionState> cs)
{
size_t sends = 0;
auto make_json = [](size_t v) -> std::string
{
std::string s = R"({"value": )";
s += std::to_string(v);
s += "}";
return s;
};
while (!cs->isTerminated())
{
auto info = ws->sendText(make_json(sends));
if (!info.success)
{
std::cerr << "Thread[" << std::this_thread::get_id() << "] error after " << sends
<< " sends (" << cs->getId() << ")" << std::endl;
break;
}
++sends;
}
std::cerr << "Thread[" << std::this_thread::get_id() << "] exit " << "(" << cs->getId() << ")"
<< std::endl;
}
/* Main entry */
int main()
{
int port = 1234;
const char* port_str = getenv("SERVER_PORT");
if (port_str) port = atoi(port_str);
auto server = std::make_shared<ix::WebSocketServer>(port);
server->setOnConnectionCallback(
[](std::weak_ptr<ix::WebSocket> wws, std::shared_ptr<ix::ConnectionState> cs)
{
auto ws = wws.lock();
if (!ws)
{
std::cerr << "Could not acquire WS client!" << std::endl;
return;
}
ws->setOnMessageCallback(
[ws = ws, cs = cs](const ix::WebSocketMessagePtr& msg)
{
std::lock_guard<std::mutex> lk {clients_mtx};
if (msg->type == ix::WebSocketMessageType::Open)
{
std::cerr << "Open " << cs->getId() << std::endl;
// Spawn a thread streaming data to the client.
clients[cs->getId()] = {};
clients[cs->getId()].cs = cs;
clients[cs->getId()].thread = std::thread {sender, ws, cs};
}
else if (msg->type == ix::WebSocketMessageType::Close)
{
std::cerr << "Close " << cs->getId() << std::endl;
clients[cs->getId()].closed = true;
}
});
});
// Run reaper thread in background.
std::thread r {reaper};
if (!server->listenAndStart())
{
std::cerr << "Failed ot listen and start" << std::endl;
exit(1);
}
server->wait();
server->stop();
return 0;
}
void reaper()
{
while (true)
{
int reaped = 0;
{
std::lock_guard<std::mutex> lk {clients_mtx};
for (auto it = clients.begin(); it != clients.end(); /**/)
{
auto& [thread, cs, closed] = it->second;
if (closed && cs->isTerminated())
{
thread.join();
it = clients.erase(it);
++reaped;
}
else
++it;
}
}
if (reaped) std::cerr << "Reaped " << reaped << std::endl;
std::this_thread::sleep_for(std::chrono::microseconds {3});
}
}
Backtrace of crash
(gdb) bt
#0 __pthread_kill_implementation (no_tid=0, signo=6, threadid=<optimized out>) at ./nptl/pthread_kill.c:44
#1 __pthread_kill_internal (signo=6, threadid=<optimized out>) at ./nptl/pthread_kill.c:78
#2 __GI___pthread_kill (threadid=<optimized out>, signo=signo@entry=6) at ./nptl/pthread_kill.c:89
#3 0x00007fa2e644526e in __GI_raise (sig=sig@entry=6) at ../sysdeps/posix/raise.c:26
#4 0x00007fa2e64288ff in __GI_abort () at ./stdlib/abort.c:79
#5 0x00007fa2e68a5ff5 in ?? () from /lib/x86_64-linux-gnu/libstdc++.so.6
#6 0x00007fa2e68bb0da in ?? () from /lib/x86_64-linux-gnu/libstdc++.so.6
#7 0x00007fa2e68a5a55 in std::terminate() () from /lib/x86_64-linux-gnu/libstdc++.so.6
#8 0x00007fa2e68bb391 in __cxa_throw () from /lib/x86_64-linux-gnu/libstdc++.so.6
#9 0x00007fa2e68a9670 in std::__throw_bad_function_call() () from /lib/x86_64-linux-gnu/libstdc++.so.6
#10 0x00005558aa4bd8de in std::function<void (std::unique_ptr<ix::WebSocketMessage, std::default_delete<ix::WebSocketMessage> > const&)>::operator()(std::unique_ptr<ix::WebSocketMessage, std::default_delete<ix::WebSocketMessage> > const&) const (__args#0=std::unique_ptr<ix::WebSocketMessage> = {...}, this=<optimized out>) at /usr/include/c++/13/bits/std_function.h:590
#11 operator() (code=<optimized out>, reason=..., wireSize=<optimized out>, remote=<optimized out>, __closure=<optimized out>) at /home/awelzel/corelight-oss/projects/IXWebSocket/ixwebsocket/IXWebSocket.cpp:49
#12 0x00005558aa4c7d80 in std::function<void (unsigned short, std::__cxx11::basic_string<char, std::char_traits<char>, std::allocator<char> > const&, unsigned long, bool)>::operator()(unsigned short, std::__cxx11::basic_string<char, std::char_traits<char>, std::allocator<char> > const&, unsigned long, bool) const (__args#3=<optimized out>, __args#2=<optimized out>, __args#1=..., __args#0=<optimized out>, this=0x7fa2c4000c98) at /usr/include/c++/13/bits/std_function.h:591
#13 ix::WebSocketTransport::setReadyState (this=this@entry=0x7fa2c4000b80, readyState=readyState@entry=ix::WebSocketTransport::ReadyState::CLOSED) at /home/awelzel/corelight-oss/projects/IXWebSocket/ixwebsocket/IXWebSocketTransport.cpp:217
#14 0x00005558aa4c802d in ix::WebSocketTransport::flushSendBuffer (this=this@entry=0x7fa2c4000b80) at /home/awelzel/corelight-oss/projects/IXWebSocket/ixwebsocket/IXWebSocketTransport.cpp:1234
#15 0x00005558aa4c8977 in ix::WebSocketTransport::sendData(ix::WebSocketTransport::wsheader_type::opcode_type, ix::IXWebSocketSendData const&, bool, std::function<bool (int, int)> const&) (this=this@entry=0x7fa2c4000b80, type=type@entry=ix::WebSocketTransport::wsheader_type::TEXT_FRAME, message=..., compress=<optimized out>, onProgressCallback=...) at /home/awelzel/corelight-oss/projects/IXWebSocket/ixwebsocket/IXWebSocketTransport.cpp:930
#16 0x00005558aa4c8c4c in ix::WebSocketTransport::sendText(ix::IXWebSocketSendData const&, std::function<bool (int, int)> const&) (this=this@entry=0x7fa2c4000b80, message=..., onProgressCallback=...) at /home/awelzel/corelight-oss/projects/IXWebSocket/ixwebsocket/IXWebSocketTransport.cpp:1054
#17 0x00005558aa4bf525 in ix::WebSocket::sendMessage(ix::IXWebSocketSendData const&, ix::SendMessageKind, std::function<bool (int, int)> const&) (this=0x7fa2c4000b80, this@entry=0x8, message=..., sendMessageKind=sendMessageKind@entry=ix::SendMessageKind::Text, onProgressCallback=...) at /home/awelzel/corelight-oss/projects/IXWebSocket/ixwebsocket/IXWebSocket.cpp:557
#18 0x00005558aa4bf8ad in ix::WebSocket::sendText(std::__cxx11::basic_string<char, std::char_traits<char>, std::allocator<char> > const&, std::function<bool (int, int)> const&) (text=..., onProgressCallback=..., this=0x8) at /home/awelzel/corelight-oss/projects/IXWebSocket/ixwebsocket/IXWebSocket.cpp:523
#19 ix::WebSocket::sendText(std::__cxx11::basic_string<char, std::char_traits<char>, std::allocator<char> > const&, std::function<bool (int, int)> const&) (this=this@entry=0x7fa2c4000b80, text="{\"value\": 200}", onProgressCallback=...) at /home/awelzel/corelight-oss/projects/IXWebSocket/ixwebsocket/IXWebSocket.cpp:514
#20 0x00005558aa4b344f in sender (ws=std::shared_ptr<ix::WebSocket> (use count 1, weak count 0) = {...}, cs=std::shared_ptr<ix::ConnectionState> (use count 2, weak count 0) = {...}) at /home/awelzel/corelight-oss/projects/IXWebSocket/stress/run-threaded.cpp:38
#21 0x00005558aa4b5316 in std::__invoke_impl<void, void (*)(std::shared_ptr<ix::WebSocket>, std::shared_ptr<ix::ConnectionState>), std::shared_ptr<ix::WebSocket>, std::shared_ptr<ix::ConnectionState> > (__f=<optimized out>) at /usr/include/c++/13/bits/invoke.h:61
#22 std::__invoke<void (*)(std::shared_ptr<ix::WebSocket>, std::shared_ptr<ix::ConnectionState>), std::shared_ptr<ix::WebSocket>, std::shared_ptr<ix::ConnectionState> > (__fn=<optimized out>) at /usr/include/c++/13/bits/invoke.h:96
#23 std::thread::_Invoker<std::tuple<void (*)(std::shared_ptr<ix::WebSocket>, std::shared_ptr<ix::ConnectionState>), std::shared_ptr<ix::WebSocket>, std::shared_ptr<ix::ConnectionState> > >::_M_invoke<0ul, 1ul, 2ul> (this=<optimized out>) at /usr/include/c++/13/bits/std_thread.h:292
#24 std::thread::_Invoker<std::tuple<void (*)(std::shared_ptr<ix::WebSocket>, std::shared_ptr<ix::ConnectionState>), std::shared_ptr<ix::WebSocket>, std::shared_ptr<ix::ConnectionState> > >::operator() (this=<optimized out>) at /usr/include/c++/13/bits/std_thread.h:299
#25 std::thread::_State_impl<std::thread::_Invoker<std::tuple<void (*)(std::shared_ptr<ix::WebSocket>, std::shared_ptr<ix::ConnectionState>), std::shared_ptr<ix::WebSocket>, std::shared_ptr<ix::ConnectionState> > > >::_M_run (this=<optimized out>) at /usr/include/c++/13/bits/std_thread.h:244
#26 0x00007fa2e68ecdb4 in ?? () from /lib/x86_64-linux-gnu/libstdc++.so.6
#27 0x00007fa2e649ca94 in start_thread (arg=<optimized out>) at ./nptl/pthread_create.c:447
#28 0x00007fa2e6529c3c in clone3 () at ../sysdeps/unix/sysv/linux/x86_64/clone3.S:78
The text was updated successfully, but these errors were encountered:
I'm working on a multi-threaded server application that streams data to clients form dedicated sender threads. This has been working fine except for an occasional segfault,
std::bad_function_call
or a plain hang. I've worked out a condensed reproducer below.The reproducer essentially launches a new thread for each client and streams data to it as fast as possible. From the comments and locking done by the library, it seems that should be valid usage (and unclear how else one would stream data to clients independently from the receiving path).
This is an example client code client.py.txt, receiving a few messages, then disconnecting and reconnecting immediately to do the same in a tight loop.
Running the server with 4 to 8 such clients, results in the following crash after a few seconds:
Sometimes it results in just a hangs or segfaults, but most of the time it's the
std::bad_function_call
.I have traced it to the client's
sendText()
detecting an error racing with the server-side thread doing the same. Both invokesetReadyState(CLOSED)
, possibly invoking_onCloseCallback()
twice. Worse, the server-side thread unsetting_onMessageCallback
while the sending thread is still executingsetReadyState()
, accessing the then unset callback member.I have a local fix adding a mutex into setReadyState() to avoid this. Will open shortly.
Backtrace of crash
The text was updated successfully, but these errors were encountered: