Egg.js 实现 Server-Sent Events (SSE) 流式数据传输
2024-12-09 18:34:12
在现代 web 开发中,实时数据传输已经成为一个重要的需求。对于许多应用来说,流式传输(Stream)是一种有效的解决方案,它能够在数据生成时实时推送给客户端,避免了等待整个响应数据生成完成再一次性返回的延迟问题。
本文将介绍如何在 Egg.js 中使用 Server-Sent Events (SSE) 和 PassThrough 流 实现流式传输,向前端实时推送数据。
什么是 Server-Sent Events (SSE)?
Server-Sent Events (SSE) 是一种基于 HTTP 的单向通信协议,允许服务器推送实时事件到客户端。与 WebSocket 不同,SSE 只支持单向从服务器到客户端的通信,而 WebSocket 支持双向通信。SSE 的优势在于实现简单、轻量,适用于实时通知、聊天、流式数据等场景。
需求
假设我们需要在 Egg.js 后端实现一个接口,实时将数据推送到前端,前端通过 SSE 接收并展示数据。我们将使用 PassThrough
流和 hunyuanSDK.generateOpenStream
方法,模拟生成的数据并推送给客户端。
1. 创建 PassThrough 流
在 Node.js 中,stream
模块提供了 PassThrough
类,它是一个非常简单的流,允许你将数据从一个地方通过管道传输到另一个地方。PassThrough
流不会改变数据,它只是一个简单的管道。
在 Egg.js 中,我们可以利用 PassThrough
来实现流式传输。
2. 后端实现流式传输
首先,我们需要在后端设置流式响应头,创建一个 PassThrough
流,并通过流向前端发送数据。以下是具体的实现:
后端代码(Egg.js)
const { PassThrough } = require('stream');
async sseTest() {
const { ctx, service } = this;
// 创建一个 PassThrough 流
const stream = new PassThrough();
// 设置 SSE 响应头
ctx.set({
"Content-Type": "text/event-stream", // 设置 SSE 响应头
"Cache-Control": "no-cache", // 禁止缓存
"Connection": "keep-alive", // 保持连接
});
// 模拟从 hunyuanSDK 获取的数据流
hunyuanSDK.generateOpenStream(Messages, "", (msg) => {
if (msg === "[DONE]") {
// 完成时发送 end 事件
stream.write("event: end\n\n");
stream.end();
return;
}
// 将每个消息推送到流
stream.write(`data: ${msg}\n\n`);
});
// 将 PassThrough 流作为响应的 body
ctx.body = stream;
}
代码解析
- 创建 PassThrough 流:
const stream = new PassThrough();
创建一个PassThrough
流,它会将数据直接传输到客户端,而不进行任何处理。 - 设置响应头: 我们设置了正确的 SSE 响应头:
Content-Type: text/event-stream
:指定响应为 SSE 数据流。Cache-Control: no-cache
:禁止缓存。Connection: keep-alive
:保持连接,使得流式传输能够持续进行。
- 流式生成数据: 我们使用了
hunyuanSDK.generateOpenStream
模拟从外部数据源获取流式数据,并通过回调函数将数据写入到PassThrough
流中。每次收到新数据时,我们就使用stream.write
将数据推送到客户端。 - 结束流: 当流式数据生成完成时,
"[DONE]"
被发送,流将结束。此时我们发送event: end
标识流已结束,并调用stream.end()
关闭流。 - 返回流数据: 最后,我们将流设置为
ctx.body
,这使得PassThrough
流的数据将被传输到客户端。
3. 前端接收流式数据
前端可以使用 EventSource
来接收 SSE 数据流。EventSource
是浏览器提供的原生 API,用于从服务器接收流式事件。
前端代码
const eventSource = new EventSource('/your-endpoint'); // 用你实际的后端路径替换
eventSource.onmessage = function(event) {
console.log("Received data:", event.data);
// 可以在这里处理数据,比如更新 UI
};
eventSource.onerror = function(error) {
console.error("Error receiving event:", error);
// 处理连接错误或重新连接逻辑
};
代码解析
- 创建
EventSource
实例:通过/your-endpoint
指定你在后端定义的流式传输接口。 - 接收数据:通过
onmessage
事件处理程序接收从后端发送的每一条数据。当服务器推送新的事件时,event.data
将包含流中的数据。 - 错误处理:通过
onerror
事件处理程序捕获连接错误或其他问题。
4. 进一步优化
- 错误处理:确保在流式数据过程中捕获和处理可能的错误,比如 API 调用失败或数据中断等。
- 客户端重连机制:SSE 会在连接丢失时自动重新连接,但你可以通过设置
reconnect
参数来优化重连的行为。 - 性能监控:在高并发场景下,流式传输可能会消耗较多的资源。你可以通过定期监控和优化服务器性能,确保流式数据传输的稳定性和高效性。
- 心跳机制:为了防止连接超时,你可以定期向客户端发送一个空的
ping
数据包,以保持连接活跃。
setInterval(() => {
stream.write("data: ping\n\n"); // 每隔 5 秒发送一次心跳
}, 5000);
5. 总结
本文介绍了如何在 Egg.js 中使用 SSE 和 PassThrough 流 实现流式数据传输。通过设置正确的响应头和使用 PassThrough
流,我们能够将后端生成的数据实时推送到前端,减少等待时间,提高用户体验。前端使用 EventSource
API 接收和处理流式数据。
流式传输在实时通知、消息推送、实时聊天等应用中非常有用,理解和掌握它的实现方式,可以帮助你更好地开发实时交互应用。