#include "Group.h" #include "Hub.h" namespace uWS { template void Group::setUserData(void *user) { this->userData = user; } template void *Group::getUserData() { return userData; } template void Group::timerCallback(uS::Timer *timer) { Group *group = (Group *) timer->getData(); group->forEach([](uWS::WebSocket *webSocket) { if (webSocket->hasOutstandingPong) { webSocket->terminate(); } else { webSocket->hasOutstandingPong = true; } }); if (group->userPingMessage.length()) { group->broadcast(group->userPingMessage.data(), group->userPingMessage.length(), OpCode::TEXT); } else { group->broadcast(nullptr, 0, OpCode::PING); } } template void Group::startAutoPing(int intervalMs, std::string userMessage) { timer = new uS::Timer(loop); timer->setData(this); timer->start(timerCallback, intervalMs, intervalMs); userPingMessage = userMessage; } template void Group::addHttpSocket(HttpSocket *httpSocket) { if (httpSocketHead) { httpSocketHead->prev = httpSocket; httpSocket->next = httpSocketHead; } else { httpSocket->next = nullptr; // start timer httpTimer = new uS::Timer(hub->getLoop()); httpTimer->setData(this); httpTimer->start([](uS::Timer *httpTimer) { Group *group = (Group *) httpTimer->getData(); group->forEachHttpSocket([](HttpSocket *httpSocket) { if (httpSocket->missedDeadline) { httpSocket->terminate(); } else if (!httpSocket->outstandingResponsesHead) { httpSocket->missedDeadline = true; } }); }, 1000, 1000); } httpSocketHead = httpSocket; httpSocket->prev = nullptr; } template void Group::removeHttpSocket(HttpSocket *httpSocket) { if (iterators.size()) { iterators.top() = httpSocket->next; } if (httpSocket->prev == httpSocket->next) { httpSocketHead = nullptr; httpTimer->stop(); httpTimer->close(); } else { if (httpSocket->prev) { ((HttpSocket *) httpSocket->prev)->next = httpSocket->next; } else { httpSocketHead = (HttpSocket *) httpSocket->next; } if (httpSocket->next) { ((HttpSocket *) httpSocket->next)->prev = httpSocket->prev; } } } template void Group::addWebSocket(WebSocket *webSocket) { if (webSocketHead) { webSocketHead->prev = webSocket; webSocket->next = webSocketHead; } else { webSocket->next = nullptr; } webSocketHead = webSocket; webSocket->prev = nullptr; } template void Group::removeWebSocket(WebSocket *webSocket) { if (iterators.size()) { iterators.top() = webSocket->next; } if (webSocket->prev == webSocket->next) { webSocketHead = nullptr; } else { if (webSocket->prev) { ((WebSocket *) webSocket->prev)->next = webSocket->next; } else { webSocketHead = (WebSocket *) webSocket->next; } if (webSocket->next) { ((WebSocket *) webSocket->next)->prev = webSocket->prev; } } } template Group::Group(int extensionOptions, unsigned int maxPayload, Hub *hub, uS::NodeData *nodeData) : uS::NodeData(*nodeData), maxPayload(maxPayload), hub(hub), extensionOptions(extensionOptions) { connectionHandler = [](WebSocket *, HttpRequest) {}; transferHandler = [](WebSocket *) {}; messageHandler = [](WebSocket *, char *, size_t, OpCode) {}; disconnectionHandler = [](WebSocket *, int, char *, size_t) {}; pingHandler = pongHandler = [](WebSocket *, char *, size_t) {}; errorHandler = [](errorType) {}; httpRequestHandler = [](HttpResponse *, HttpRequest, char *, size_t, size_t) {}; httpConnectionHandler = [](HttpSocket *) {}; httpDisconnectionHandler = [](HttpSocket *) {}; httpCancelledRequestHandler = [](HttpResponse *) {}; httpDataHandler = [](HttpResponse *, char *, size_t, size_t) {}; this->extensionOptions |= CLIENT_NO_CONTEXT_TAKEOVER | SERVER_NO_CONTEXT_TAKEOVER; } template void Group::stopListening() { if (isServer) { if (user) { // todo: we should allow one group to listen to many ports! uS::ListenSocket *listenSocket = (uS::ListenSocket *) user; if (listenSocket->timer) { listenSocket->timer->stop(); listenSocket->timer->close(); } listenSocket->closeSocket(); // mark as stopped listening (extra care?) user = nullptr; } } if (async) { async->close(); } } template void Group::onConnection(std::function *, HttpRequest)> handler) { connectionHandler = handler; } template void Group::onTransfer(std::function *)> handler) { transferHandler = handler; } template void Group::onMessage(std::function *, char *, size_t, OpCode)> handler) { messageHandler = handler; } template void Group::onDisconnection(std::function *, int, char *, size_t)> handler) { disconnectionHandler = handler; } template void Group::onPing(std::function *, char *, size_t)> handler) { pingHandler = handler; } template void Group::onPong(std::function *, char *, size_t)> handler) { pongHandler = handler; } template void Group::onError(std::function handler) { errorHandler = handler; } template void Group::onHttpConnection(std::function *)> handler) { httpConnectionHandler = handler; } template void Group::onHttpRequest(std::function handler) { httpRequestHandler = handler; } template void Group::onHttpData(std::function handler) { httpDataHandler = handler; } template void Group::onHttpDisconnection(std::function *)> handler) { httpDisconnectionHandler = handler; } template void Group::onCancelledHttpRequest(std::function handler) { httpCancelledRequestHandler = handler; } template void Group::onHttpUpgrade(std::function *, HttpRequest)> handler) { httpUpgradeHandler = handler; } template void Group::broadcast(const char *message, size_t length, OpCode opCode) { #ifdef UWS_THREADSAFE std::lock_guard lockGuard(*asyncMutex); #endif typename WebSocket::PreparedMessage *preparedMessage = WebSocket::prepareMessage((char *) message, length, opCode, false); forEach([preparedMessage](uWS::WebSocket *ws) { ws->sendPrepared(preparedMessage); }); WebSocket::finalizeMessage(preparedMessage); } template void Group::terminate() { forEach([](uWS::WebSocket *ws) { ws->terminate(); }); stopListening(); } template void Group::close(int code, char *message, size_t length) { forEach([code, message, length](uWS::WebSocket *ws) { ws->close(code, message, length); }); stopListening(); if (timer) { timer->stop(); timer->close(); } } template struct Group; template struct Group; }