返回

Egg.js 实现 Server-Sent Events (SSE) 流式数据传输

开发配置

在现代 web 开发中,实时数据传输已经成为一个重要的需求。对于许多应用来说,流式传输(Stream)是一种有效的解决方案,它能够在数据生成时实时推送给客户端,避免了等待整个响应数据生成完成再一次性返回的延迟问题。

本文将介绍如何在 Egg.js 中使用 Server-Sent Events (SSE)PassThrough 流 实现流式传输,向前端实时推送数据。


什么是 Server-Sent Events (SSE)?

Server-Sent Events (SSE) 是一种基于 HTTP 的单向通信协议,允许服务器推送实时事件到客户端。与 WebSocket 不同,SSE 只支持单向从服务器到客户端的通信,而 WebSocket 支持双向通信。SSE 的优势在于实现简单、轻量,适用于实时通知、聊天、流式数据等场景。

需求

假设我们需要在 Egg.js 后端实现一个接口,实时将数据推送到前端,前端通过 SSE 接收并展示数据。我们将使用 PassThrough 流和 hunyuanSDK.generateOpenStream 方法,模拟生成的数据并推送给客户端。


1. 创建 PassThrough 流

在 Node.js 中,stream 模块提供了 PassThrough 类,它是一个非常简单的流,允许你将数据从一个地方通过管道传输到另一个地方。PassThrough 流不会改变数据,它只是一个简单的管道。

Egg.js 中,我们可以利用 PassThrough 来实现流式传输。

2. 后端实现流式传输

首先,我们需要在后端设置流式响应头,创建一个 PassThrough 流,并通过流向前端发送数据。以下是具体的实现:

后端代码(Egg.js)

const { PassThrough } = require('stream');

async sseTest() {
  const { ctx, service } = this;
  
  // 创建一个 PassThrough 流
  const stream = new PassThrough();

  // 设置 SSE 响应头
  ctx.set({
    "Content-Type": "text/event-stream", // 设置 SSE 响应头
    "Cache-Control": "no-cache",         // 禁止缓存
    "Connection": "keep-alive",          // 保持连接
  });

  // 模拟从 hunyuanSDK 获取的数据流
  hunyuanSDK.generateOpenStream(Messages, "", (msg) => {
    if (msg === "[DONE]") {
      // 完成时发送 end 事件
      stream.write("event: end\n\n");
      stream.end();
      return;
    }
    // 将每个消息推送到流
    stream.write(`data: ${msg}\n\n`);
  });

  // 将 PassThrough 流作为响应的 body
  ctx.body = stream;
}

代码解析

  1. 创建 PassThrough 流const stream = new PassThrough(); 创建一个 PassThrough 流,它会将数据直接传输到客户端,而不进行任何处理。
  2. 设置响应头: 我们设置了正确的 SSE 响应头:
    • Content-Type: text/event-stream:指定响应为 SSE 数据流。
    • Cache-Control: no-cache:禁止缓存。
    • Connection: keep-alive:保持连接,使得流式传输能够持续进行。
  3. 流式生成数据: 我们使用了 hunyuanSDK.generateOpenStream 模拟从外部数据源获取流式数据,并通过回调函数将数据写入到 PassThrough 流中。每次收到新数据时,我们就使用 stream.write 将数据推送到客户端。
  4. 结束流: 当流式数据生成完成时,"[DONE]" 被发送,流将结束。此时我们发送 event: end 标识流已结束,并调用 stream.end() 关闭流。
  5. 返回流数据: 最后,我们将流设置为 ctx.body,这使得 PassThrough 流的数据将被传输到客户端。

3. 前端接收流式数据

前端可以使用 EventSource 来接收 SSE 数据流。EventSource 是浏览器提供的原生 API,用于从服务器接收流式事件。

前端代码

const eventSource = new EventSource('/your-endpoint'); // 用你实际的后端路径替换

eventSource.onmessage = function(event) {
  console.log("Received data:", event.data);
  // 可以在这里处理数据,比如更新 UI
};

eventSource.onerror = function(error) {
  console.error("Error receiving event:", error);
  // 处理连接错误或重新连接逻辑
};

代码解析

  • 创建 EventSource 实例:通过 /your-endpoint 指定你在后端定义的流式传输接口。
  • 接收数据:通过 onmessage 事件处理程序接收从后端发送的每一条数据。当服务器推送新的事件时,event.data 将包含流中的数据。
  • 错误处理:通过 onerror 事件处理程序捕获连接错误或其他问题。

4. 进一步优化

  1. 错误处理:确保在流式数据过程中捕获和处理可能的错误,比如 API 调用失败或数据中断等。
  2. 客户端重连机制:SSE 会在连接丢失时自动重新连接,但你可以通过设置 reconnect 参数来优化重连的行为。
  3. 性能监控:在高并发场景下,流式传输可能会消耗较多的资源。你可以通过定期监控和优化服务器性能,确保流式数据传输的稳定性和高效性。
  4. 心跳机制:为了防止连接超时,你可以定期向客户端发送一个空的 ping 数据包,以保持连接活跃。
setInterval(() => {
  stream.write("data: ping\n\n"); // 每隔 5 秒发送一次心跳
}, 5000);

5. 总结

本文介绍了如何在 Egg.js 中使用 SSEPassThrough 流 实现流式数据传输。通过设置正确的响应头和使用 PassThrough 流,我们能够将后端生成的数据实时推送到前端,减少等待时间,提高用户体验。前端使用 EventSource API 接收和处理流式数据。

流式传输在实时通知、消息推送、实时聊天等应用中非常有用,理解和掌握它的实现方式,可以帮助你更好地开发实时交互应用。