From cd7a035ba80a97e5dfee821770de491f61894f1e Mon Sep 17 00:00:00 2001 From: Ayndpa Date: Wed, 19 Nov 2025 20:25:25 +0800 Subject: [PATCH] =?UTF-8?q?=E9=87=8D=E6=9E=84MultiplexManager=E5=92=8CTCPS?= =?UTF-8?q?erver=EF=BC=8C=E4=BC=98=E5=8C=96=E5=AE=A2=E6=88=B7=E7=AB=AF?= =?UTF-8?q?=E7=AE=A1=E7=90=86=E5=92=8C=E8=BF=9E=E6=8E=A5=E9=80=BB=E8=BE=91?= =?UTF-8?q?=EF=BC=8C=E6=B7=BB=E5=8A=A0=E5=BC=82=E6=AD=A5=E8=AF=BB=E5=8F=96?= =?UTF-8?q?=E5=92=8C=E9=94=99=E8=AF=AF=E5=A4=84=E7=90=86?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- net/multiplex_manager.cpp | 131 +++++++++++++++++++++---------- net/tcp_server.cpp | 18 ++--- net/tcp_server.h | 2 - steam/steam_message_handler.cpp | 17 +++- steam/steam_message_handler.h | 3 + steam/steam_networking_manager.h | 1 + 6 files changed, 116 insertions(+), 56 deletions(-) diff --git a/net/multiplex_manager.cpp b/net/multiplex_manager.cpp index b8c2206..452aeff 100644 --- a/net/multiplex_manager.cpp +++ b/net/multiplex_manager.cpp @@ -3,119 +3,164 @@ #include #include -MultiplexManager::MultiplexManager(ISteamNetworkingSockets* steamInterface, HSteamNetConnection steamConn, - boost::asio::io_context& io_context, bool& isHost, int& localPort) +MultiplexManager::MultiplexManager(ISteamNetworkingSockets *steamInterface, HSteamNetConnection steamConn, + boost::asio::io_context &io_context, bool &isHost, int &localPort) : steamInterface_(steamInterface), steamConn_(steamConn), io_context_(io_context), isHost_(isHost), localPort_(localPort) {} -MultiplexManager::~MultiplexManager() { +MultiplexManager::~MultiplexManager() +{ // Close all sockets std::lock_guard lock(mapMutex_); - for (auto& pair : clientMap_) { + for (auto &pair : clientMap_) + { pair.second->close(); } clientMap_.clear(); } -std::string MultiplexManager::addClient(std::shared_ptr socket) { - std::lock_guard lock(mapMutex_); - std::string id = nanoid::generate(6); - clientMap_[id] = socket; - readBuffers_[id].resize(1024); +std::string MultiplexManager::addClient(std::shared_ptr socket) +{ + std::string id; + { + std::lock_guard lock(mapMutex_); + id = nanoid::generate(6); + clientMap_[id] = socket; + readBuffers_[id].resize(1024); + } startAsyncRead(id); + std::cout << "Added client with id " << id << std::endl; return id; } -void MultiplexManager::removeClient(const std::string& id) { +void MultiplexManager::removeClient(const std::string &id) +{ std::lock_guard lock(mapMutex_); auto it = clientMap_.find(id); - if (it != clientMap_.end()) { + if (it != clientMap_.end()) + { it->second->close(); clientMap_.erase(it); } readBuffers_.erase(id); + + std::cout << "Removed client with id " << id << std::endl; } -std::shared_ptr MultiplexManager::getClient(const std::string& id) { +std::shared_ptr MultiplexManager::getClient(const std::string &id) +{ std::lock_guard lock(mapMutex_); auto it = clientMap_.find(id); - if (it != clientMap_.end()) { + if (it != clientMap_.end()) + { return it->second; } return nullptr; } -void MultiplexManager::sendTunnelPacket(const std::string& id, const char* data, size_t len, int type) { +void MultiplexManager::sendTunnelPacket(const std::string &id, const char *data, size_t len, int type) +{ // Packet format: string id (6 chars + null), uint32_t type, then data if type==0 size_t idLen = id.size() + 1; // include null terminator size_t packetSize = idLen + sizeof(uint32_t) + (type == 0 ? len : 0); std::vector packet(packetSize); std::memcpy(&packet[0], id.c_str(), idLen); - uint32_t* pType = reinterpret_cast(&packet[idLen]); + uint32_t *pType = reinterpret_cast(&packet[idLen]); *pType = type; - if (type == 0 && data) { + if (type == 0 && data) + { std::memcpy(&packet[idLen + sizeof(uint32_t)], data, len); } steamInterface_->SendMessageToConnection(steamConn_, packet.data(), packet.size(), k_nSteamNetworkingSend_Reliable, nullptr); } -void MultiplexManager::handleTunnelPacket(const char* data, size_t len) { +void MultiplexManager::handleTunnelPacket(const char *data, size_t len) +{ size_t idLen = 7; // 6 + null - if (len < idLen + sizeof(uint32_t)) { + if (len < idLen + sizeof(uint32_t)) + { std::cerr << "Invalid tunnel packet size" << std::endl; return; } std::string id(data, 6); - uint32_t type = *reinterpret_cast(data + idLen); - if (type == 0) { + uint32_t type = *reinterpret_cast(data + idLen); + if (type == 0) + { // Data packet size_t dataLen = len - idLen - sizeof(uint32_t); - const char* packetData = data + idLen + sizeof(uint32_t); + const char *packetData = data + idLen + sizeof(uint32_t); auto socket = getClient(id); - if (!socket && isHost_ && localPort_ > 0) { + if (!socket && isHost_ && localPort_ > 0) + { // 如果是主持且没有对应的 TCP Client,创建一个连接到本地端口 std::cout << "Creating new TCP client for id " << id << " connecting to localhost:" << localPort_ << std::endl; - try { + try + { auto newSocket = std::make_shared(io_context_); tcp::resolver resolver(io_context_); auto endpoints = resolver.resolve("127.0.0.1", std::to_string(localPort_)); boost::asio::connect(*newSocket, endpoints); - - std::lock_guard lock(mapMutex_); - clientMap_[id] = newSocket; - readBuffers_[id].resize(1024); - socket = newSocket; + + std::string tempId = id; + { + std::lock_guard lock(mapMutex_); + clientMap_[id] = newSocket; + readBuffers_[id].resize(1024); + socket = newSocket; + } std::cout << "Successfully created TCP client for id " << id << std::endl; - startAsyncRead(id); - } catch (const std::exception& e) { + startAsyncRead(tempId); + } + catch (const std::exception &e) + { std::cerr << "Failed to create TCP client for id " << id << ": " << e.what() << std::endl; return; } } - if (socket) { - boost::asio::async_write(*socket, boost::asio::buffer(packetData, dataLen), [](const boost::system::error_code&, std::size_t) {}); - } else { + if (socket) + { + boost::asio::async_write(*socket, boost::asio::buffer(packetData, dataLen), [](const boost::system::error_code &, std::size_t) {}); + } + else + { std::cerr << "No client found for id " << id << std::endl; } - } else if (type == 1) { + } + else if (type == 1) + { // Disconnect packet removeClient(id); std::cout << "Client " << id << " disconnected" << std::endl; - } else { + } + else + { std::cerr << "Unknown packet type " << type << std::endl; } } -void MultiplexManager::startAsyncRead(const std::string& id) { +void MultiplexManager::startAsyncRead(const std::string &id) +{ auto socket = getClient(id); - if (!socket || readBuffers_.find(id) == readBuffers_.end()) return; + if (!socket) + { + std::cout << "Error: Socket is null for id " << id << std::endl; + return; + } socket->async_read_some(boost::asio::buffer(readBuffers_[id]), - [this, id](const boost::system::error_code& ec, std::size_t bytes_transferred) { - if (!ec && bytes_transferred > 0) { + [this, id](const boost::system::error_code &ec, std::size_t bytes_transferred) + { + if (!ec) + { + if (bytes_transferred > 0) + { sendTunnelPacket(id, readBuffers_[id].data(), bytes_transferred, 0); - startAsyncRead(id); - } else { - removeClient(id); } - }); + startAsyncRead(id); + } + else + { + std::cout << "Error reading from TCP client " << id << ": " << ec.message() << std::endl; + removeClient(id); + } + }); } \ No newline at end of file diff --git a/net/tcp_server.cpp b/net/tcp_server.cpp index 689849e..9ef2b6f 100644 --- a/net/tcp_server.cpp +++ b/net/tcp_server.cpp @@ -15,9 +15,6 @@ bool TCPServer::start() { acceptor_.bind(endpoint); acceptor_.listen(); - multiplexManager_ = std::make_unique(manager_->getInterface(), manager_->getConnection(), - io_context_, manager_->getIsHost(), *manager_->getLocalPort()); - running_ = true; serverThread_ = std::thread([this]() { std::cout << "Server thread started" << std::endl; @@ -40,7 +37,6 @@ void TCPServer::stop() { serverThread_.join(); } acceptor_.close(); - multiplexManager_.reset(); } void TCPServer::sendToAll(const std::string& message, std::shared_ptr excludeSocket) { @@ -66,7 +62,8 @@ void TCPServer::start_accept() { acceptor_.async_accept(*socket, [this, socket](const boost::system::error_code& error) { if (!error) { std::cout << "New client connected" << std::endl; - std::string id = multiplexManager_->addClient(socket); + auto multiplexManager = manager_->getMessageHandler()->getMultiplexManager(manager_->getConnection()); + std::string id = multiplexManager->addClient(socket); { std::lock_guard lock(clientsMutex_); clients_.push_back(socket); @@ -83,9 +80,9 @@ void TCPServer::start_read(std::shared_ptr socket, std::string id) auto buffer = std::make_shared>(1024); socket->async_read_some(boost::asio::buffer(*buffer), [this, socket, buffer, id](const boost::system::error_code& error, std::size_t bytes_transferred) { if (!error) { - std::cout << "Received " << bytes_transferred << " bytes from TCP client " << id << std::endl; if (manager_->isConnected()) { - multiplexManager_->sendTunnelPacket(id, buffer->data(), bytes_transferred, 0); + auto multiplexManager = manager_->getMessageHandler()->getMultiplexManager(manager_->getConnection()); + multiplexManager->sendTunnelPacket(id, buffer->data(), bytes_transferred, 0); } else { std::cout << "Not connected to Steam, skipping forward" << std::endl; } @@ -95,10 +92,11 @@ void TCPServer::start_read(std::shared_ptr socket, std::string id) std::cout << "TCP client " << id << " disconnected or error: " << error.message() << std::endl; // Send disconnect packet if (manager_->isConnected()) { - multiplexManager_->sendTunnelPacket(id, nullptr, 0, 1); + auto multiplexManager = manager_->getMessageHandler()->getMultiplexManager(manager_->getConnection()); + multiplexManager->sendTunnelPacket(id, nullptr, 0, 1); + // Remove client + multiplexManager->removeClient(id); } - // Remove client - multiplexManager_->removeClient(id); std::lock_guard lock(clientsMutex_); clients_.erase(std::remove(clients_.begin(), clients_.end(), socket), clients_.end()); } diff --git a/net/tcp_server.h b/net/tcp_server.h index 3b8077e..eaeb8b2 100644 --- a/net/tcp_server.h +++ b/net/tcp_server.h @@ -27,7 +27,6 @@ public: void sendToAll(const std::string& message, std::shared_ptr excludeSocket = nullptr); void sendToAll(const char* data, size_t size, std::shared_ptr excludeSocket = nullptr); int getClientCount(); - MultiplexManager* getMultiplexManager() { return multiplexManager_.get(); } private: void start_accept(); @@ -42,5 +41,4 @@ private: std::mutex clientsMutex_; std::thread serverThread_; SteamNetworkingManager* manager_; - std::unique_ptr multiplexManager_; }; \ No newline at end of file diff --git a/steam/steam_message_handler.cpp b/steam/steam_message_handler.cpp index 85122ec..c59801c 100644 --- a/steam/steam_message_handler.cpp +++ b/steam/steam_message_handler.cpp @@ -17,14 +17,30 @@ void SteamMessageHandler::start() { if (running_) return; running_ = true; thread_ = std::thread([this]() { run(); }); + // Start io_context in a separate thread + io_thread_ = std::thread([this]() { + auto work = boost::asio::make_work_guard(io_context_); + io_context_.run(); + }); } void SteamMessageHandler::stop() { if (!running_) return; running_ = false; + io_context_.stop(); if (thread_.joinable()) { thread_.join(); } + if (io_thread_.joinable()) { + io_thread_.join(); + } +} + +std::shared_ptr SteamMessageHandler::getMultiplexManager(HSteamNetConnection conn) { + if (multiplexManagers_.find(conn) == multiplexManagers_.end()) { + multiplexManagers_[conn] = std::make_shared(m_pInterface_, conn, io_context_, g_isHost_, localPort_); + } + return multiplexManagers_[conn]; } void SteamMessageHandler::run() { @@ -50,7 +66,6 @@ void SteamMessageHandler::pollMessages() { ISteamNetworkingMessage* pIncomingMsgs[10]; int numMsgs = m_pInterface_->ReceiveMessagesOnConnection(conn, pIncomingMsgs, 10); for (int i = 0; i < numMsgs; ++i) { - std::cout << "Received message on connection " << conn << std::endl; ISteamNetworkingMessage* pIncomingMsg = pIncomingMsgs[i]; const char* data = (const char*)pIncomingMsg->m_pData; size_t size = pIncomingMsg->m_cbSize; diff --git a/steam/steam_message_handler.h b/steam/steam_message_handler.h index 205318f..460ed84 100644 --- a/steam/steam_message_handler.h +++ b/steam/steam_message_handler.h @@ -19,6 +19,8 @@ public: void start(); void stop(); + std::shared_ptr getMultiplexManager(HSteamNetConnection conn); + private: void run(); void pollMessages(); @@ -33,6 +35,7 @@ private: std::map> multiplexManagers_; std::thread thread_; + std::thread io_thread_; bool running_; }; diff --git a/steam/steam_networking_manager.h b/steam/steam_networking_manager.h index 6e4c361..81125b3 100644 --- a/steam/steam_networking_manager.h +++ b/steam/steam_networking_manager.h @@ -58,6 +58,7 @@ public: // Message handler void startMessageHandler(); void stopMessageHandler(); + SteamMessageHandler* getMessageHandler() { return messageHandler_; } // Update user info (ping, relay status) void update();