返回

uWebSockets:轻量级 WebSocket 服务器与客户端搭建 (C++)

windows

uWebSockets 库:打造轻量级 WebSocket 服务器与客户端

在使用 C++ 开发需要实时双向通信的应用时,WebSocket 协议是个不错的选择。uWebSockets 是一个高性能的 WebSocket 和 HTTP 库,简单好用。这篇博客讲讲怎么用 uWebSockets 库搭建一个最基本的 WebSocket 服务器和客户端,并处理客户端重连的问题。

一、 遇到的问题

老哥你贴的代码里,服务器能启动,显示 "Server listening on port 9001",但客户端连不上,客户端的回调函数一个都没触发。 另外, 还想问问服务器重启后,客户端该怎么自动重连。

二、 问题原因分析

先看看代码可能出了什么问题:

  1. connect 的时机 : 客户端的 connect 操作是在主线程里,而且紧跟在服务器启动之后。服务器的启动需要一点时间,客户端的 connect 可能在服务器完全准备好之前就执行了,导致连接失败。
  2. 事件循环 : uWebSockets 基于事件循环工作。run() 方法会启动事件循环。在你的客户端代码中,.connect("ws://localhost:9001", nullptr).run() 这样直接调用 run() 会阻塞主线程。导致客户端没有进入事件循环,无法处理网络事件。
  3. 缺乏重连机制 : 代码里没有处理客户端连接断开后自动重连的逻辑。

三、 解决方案

下面给出几个解决方案,一步步解决这些问题。

1. 确保服务器完全启动

为确保客户端连接时,服务器已完全就绪, 我们使用 std::promisestd::future, 来实现线程同步.

原理:

  • std::promise 用来设置一个值(或异常),这个值可以在将来通过 std::future 获取。
  • 服务器线程完全启动后,通过 promise.set_value() 设置一个值。
  • 主线程通过 future.wait() 等待,直到服务器线程设置了这个值。

代码示例:

#include <uwebsockets/App.h>
#include <thread>
#include <iostream>
#include <future>

class Server {
public:
    std::promise<void> server_ready_promise;

    void runServer() {
        uWS::App().ws<void>("/*", {
            .open = [](auto* ws) {
                std::cout << "Server: Client connected" << std::endl;
            },
            .message = [](auto* ws, std::string_view message, uWS::OpCode opCode) {
                std::cout << "Server: Received and broadcast message: " << message << std::endl;
             ws->publish("broadcast", message, opCode);
            },
            .close = [](auto* ws, int code, std::string_view message) {
                std::cout << "Server: Client disconnected" << std::endl;
            }
        }).listen(9001, [this](auto* listen_socket) {
            if (listen_socket) {
                std::cout << "Server listening on port 9001" << std::endl;
                server_ready_promise.set_value(); // 服务器已准备好
            }
        }).run();
    }
};

// Client 类和 main 函数保持不变(暂时)

main 函数里:

int main() {
    Server server;
    std::future<void> server_ready_future = server.server_ready_promise.get_future();
    std::thread server_thread([&server]() { server.runServer(); });

    server_ready_future.wait(); // 等待服务器启动

     Client client;
     std::thread client_thread([&client](){
        client.runClient();
     });
      

    server_thread.join();
      client_thread.join();
    return 0;
}

2. 客户端异步连接

将客户端 connect 放入它自己的事件循环, 避免阻塞主线程. 并且要创建一个新的线程给 Client, 使它拥有自己的事件循环.

原理:

  • uWS::App::connect 放到客户端线程执行。
  • 使用 uWS::App::connect 的回调处理连接成功与否。

代码示例:
修改 Client 部分代码.

class Client {
public:
    uWS::App* app = nullptr;
    uWS::WebSocket<false, true>* client_ws = nullptr;

    void runClient() {
      app = new uWS::App();
      app->ws<void>("/*", {
            .open = [this](auto* ws) {
                client_ws = ws; // 存一下ws指针,以后发消息用
                std::cout << "Client: Connected to server" << std::endl;
                ws->send("Hello from client!", uWS::OpCode::TEXT);
            },
            .message = [](auto* ws, std::string_view message, uWS::OpCode opCode) {
                std::cout << "Client: Received message: " << message << std::endl;
            },
            .close = [this](auto* ws, int code, std::string_view message) {
                 client_ws = nullptr;
                std::cout << "Client: Disconnected from server" << std::endl;
            }
        }).connect("ws://localhost:9001",  nullptr).run();

    }

     void SendMessage(std::string_view message){
            if (client_ws)
            {
                client_ws->send(message,uWS::OpCode::TEXT);
            }  
     }
};

3. 客户端自动重连

.close 回调里,设置一个定时器,过一段时间后尝试重新连接。

原理:

  • 利用 uWebSockets 的 setTimer 设置定时任务。
  • .close 回调中启动定时器,几秒后再次调用 connect

代码示例:
在 Client 代码修改:

class Client
{
public:
  uWS::App* app = nullptr;
  uWS::WebSocket<false, true>* client_ws = nullptr;
  
  void reconnect() {
    std::cout << "Client: Reconnecting..." << std::endl;

      app->connect("ws://localhost:9001", nullptr);
  }

    void runClient() {
      app = new uWS::App();

      app->ws<void>("/*", {
          .open = [this](auto *ws) {
            client_ws = ws;  //保存ws指针
              std::cout << "Client: Connected to server" << std::endl;
                ws->send("Hello from client!", uWS::OpCode::TEXT);
          },
          .message = [](auto *ws, std::string_view message, uWS::OpCode opCode) {
              std::cout << "Client: Received message: " << message << std::endl;
          },
          .close = [this](auto *ws, int code, std::string_view message) {
              std::cout << "Client: Disconnected from server" << std::endl;
                client_ws = nullptr;
                //尝试重连.
              app->getLoop()->setTimer([this]() {
                  reconnect();
              }, 5000); // 5秒后重连
          }
      }).connect("ws://localhost:9001",  nullptr).run();
    }
    
     void SendMessage(std::string_view message){
        if (client_ws)
        {
            client_ws->send(message,uWS::OpCode::TEXT);
        }  
     }
};

4.安全提示(非必须,按需)

  • 消息验证: 生产环境里,记得对收到的消息进行校验,防止恶意数据。
  • 限制连接数: 防止过多客户端连接耗尽服务器资源, uWebSocket 提供 maxPayloadLength 等选项限制连接。
  • 使用SSL/TLS :对于任何需要secure的连接都应该使用 uWS::SSLApp 替代 uWS::App, 将链接从"ws://" 更改成 "wss://".

5. 进阶技巧

  • 发布/订阅: 可以使用 ws->subscribe("topic")ws->publish("topic", message) 实现消息的发布/订阅。
  • Backpressure (背压): uWebSockets 会自动处理背压,发送速度过快会自动调整,可以放心用。
  • UserData: 每个WebSocket可以关联自定义的UserData, 用于自定义数据, 例如在 Server 的 open 中增加类型:
        struct MyData
        {
          /* data */
          std::string custom_data;
        };
    
        ...
         uWS::App().ws<MyData>("/*", {
          .open = [](auto* ws) {
                MyData *myData = (MyData *)ws->getUserData();
                myData->custom_data = "initial data";
                std::cout << "Server: Client connected" << std::endl;
            },
          ...
    

四、完整代码

综合上面几点,最终的完整代码:

Server.cpp

#include <uwebsockets/App.h>
#include <thread>
#include <iostream>
#include <future>

class Server {
public:
    std::promise<void> server_ready_promise;

      struct MyData
        {
          /* data */
          std::string custom_data;
        };

    void runServer() {
       uWS::App().ws<MyData>("/*", {
            .open = [](auto* ws) {
                MyData *myData = (MyData *)ws->getUserData();
                myData->custom_data = "initial data";
                std::cout << "Server: Client connected" << std::endl;
            },
            .message = [](auto* ws, std::string_view message, uWS::OpCode opCode) {
               std::cout << "Server: Received and broadcast message: " << message << std::endl;
                ws->publish("broadcast", message, opCode);
            },
            .close = [](auto* ws, int code, std::string_view message) {
                std::cout << "Server: Client disconnected" << std::endl;
            }
        }).listen(9001, [this](auto* listen_socket) {
            if (listen_socket) {
                std::cout << "Server listening on port 9001" << std::endl;
                server_ready_promise.set_value(); // 服务器已准备好
            }
        }).run();
    }
};

Client.cpp

#include <uwebsockets/App.h>
#include <thread>
#include <iostream>
class Client
{
public:
  uWS::App* app = nullptr;
  uWS::WebSocket<false, true>* client_ws = nullptr;
  
  void reconnect() {
    std::cout << "Client: Reconnecting..." << std::endl;

      app->connect("ws://localhost:9001", nullptr);
  }

    void runClient() {
      app = new uWS::App();

      app->ws<void>("/*", {
          .open = [this](auto *ws) {
            client_ws = ws;  //保存ws指针
              std::cout << "Client: Connected to server" << std::endl;
                ws->send("Hello from client!", uWS::OpCode::TEXT);
          },
          .message = [](auto *ws, std::string_view message, uWS::OpCode opCode) {
              std::cout << "Client: Received message: " << message << std::endl;
          },
          .close = [this](auto *ws, int code, std::string_view message) {
              std::cout << "Client: Disconnected from server" << std::endl;
                client_ws = nullptr;
                //尝试重连.
              app->getLoop()->setTimer([this]() {
                  reconnect();
              }, 5000); // 5秒后重连
          }
      }).connect("ws://localhost:9001",  nullptr).run();
    }
    
     void SendMessage(std::string_view message){
        if (client_ws)
        {
            client_ws->send(message,uWS::OpCode::TEXT);
        }  
     }
};

main.cpp

#include "Server.cpp"
#include "Client.cpp"
int main() {
    Server server;
    std::future<void> server_ready_future = server.server_ready_promise.get_future();
    std::thread server_thread([&server]() { server.runServer(); });

    server_ready_future.wait(); // 等待服务器启动

     Client client;
     std::thread client_thread([&client](){
        client.runClient();
     });

    server_thread.join();
      client_thread.join();
    return 0;
}

搞定! 这下你的 uWebSockets 服务器和客户端就能正常工作,而且客户端还能自动重连了。