Node.js+Socket.IO 实现 MySQL 数据实时更新 (告别刷新)
2025-03-25 20:29:51
Node.js + Socket.IO + MySQL 实现数据实时显示
遇到啥问题了?
刚接触 Node.js,想做个网页实时显示数据的功能。具体是这样:我通过一个 URL (比如 http://localhost:4000/api/motor_details?deviceId=145&a=sdfa3&b=us&c=uds
) 往 MySQL 数据库里插数据。数据确实能成功插进去,但是网页上只有手动刷新页面才能看到新数据。我想要的效果是,数据插进去后,网页能自动更新,不用我手动刷新。
看看现在的代码 (index.js
):
var express = require('express');
var socket = require('socket.io');
var mysql = require('mysql');
var http = require("http");
var app = express();
// 注意:创建 server 实例是为了给 socket.io 使用
var server = http.createServer(app);
var io = socket(server); // 将 server 传递给 socket.io
server.listen(4000,function(){ // 使用 server 实例监听端口
console.log('Listening 4000');
});
app.use(express.static('public'));
var connectSql = mysql.createConnection({
host: "localhost",
user: "root",
password: "root", // 生产环境千万别用弱密码
database: "grid_component",
socketPath: '/Applications/MAMP/tmp/mysql/mysql.sock' // 这个路径可能因环境而异
});
connectSql.connect(function(err) { // 增加错误处理
if (err) {
console.error('Database connection failed: ' + err.stack);
return;
}
console.log('Connected to database.');
});
app.get('/api/motor_details', function(req, res) {
// 注意: req.param 已弃用, 对于 URL 查询参数,应该用 req.query
var device_id = req.query.deviceId;
var a = req.query.a;
var b = req.query.b;
var c = req.query.c;
// 基本的验证 (后面会细说)
if (!device_id) {
return res.status(400).send('Missing deviceId parameter');
}
var entries = {
device_id: device_id,
a: a,
b: b,
c: c,
date: new Date() // 使用当前日期时间更合适
}
var query = connectSql.query('insert into entries set ?', entries, function (err, result) {
if(err){
console.error('Error inserting data:', err);
// 向请求方返回错误信息
return res.status(500).send('Database insert error');
}
console.log('Data inserted successfully, ID:', result.insertId);
// 数据插入成功后,需要通知客户端!这是缺失的关键步骤
// ---- 这里是需要添加的关键代码 ----
io.emit('new_entry', entries); // 向所有连接的客户端广播新数据
// -------- 结束关键代码 --------
res.status(200).send('Data inserted successfully'); // 给API调用者一个响应
});
});
io.on('connection',function(socket){
console.log('A user connected:', socket.id);
// 当有新用户连接时,发送当前所有数据给他
connectSql.query('SELECT * FROM entries ORDER BY id DESC LIMIT 50', function(err, rows){ // 可以加排序和限制数量
if(err) {
console.error('Error fetching initial data:', err);
// 可以考虑给这个连接发送一个错误消息
socket.emit('error_message', 'Failed to fetch initial data.');
return; // 发生错误时终止后续操作
}
console.log('Sending initial data to:', socket.id);
socket.emit('showrows', rows); // 只发送给当前连接的客户端
});
// 处理断开连接
socket.on('disconnect', function(){
console.log('User disconnected:', socket.id);
});
});
前端代码 (public/index.html
或类似文件中的 <script>
):
// 确保在 DOM 加载完成后执行
document.addEventListener('DOMContentLoaded', (event) => {
const displayDiv = document.getElementById("display");
var socket = io.connect('http://localhost:4000'); // 建立连接
// 监听 'showrows' 事件,用于加载初始数据
socket.on('showrows', function(rows) {
console.log('Received initial data:', rows);
var html = '';
// 从最新到最旧显示
for (var i = 0; i < rows.length; i++) {
html += `<div>ID: ${rows[i].device_id}, A: ${rows[i].a}, B: ${rows[i].b}, C: ${rows[i].c}, Date: ${rows[i].date}</div>`;
}
displayDiv.innerHTML = html; // 覆盖显示初始数据
});
// ---- 这里是需要添加的关键代码 ----
// 监听 'new_entry' 事件,用于接收新插入的数据
socket.on('new_entry', function(entry) {
console.log('Received new entry:', entry);
// 创建一个新的 div 元素来显示新数据
const newEntryDiv = document.createElement('div');
newEntryDiv.innerHTML = `<div>ID: ${entry.device_id}, A: ${entry.a}, B: ${entry.b}, C: ${entry.c}, Date: ${entry.date}</div>`;
// 将新数据添加到显示区域的顶部 (或者底部,根据需求调整)
displayDiv.insertBefore(newEntryDiv, displayDiv.firstChild);
// (可选) 如果数据量很大,可能需要限制显示的条目数量
// 例如,只保留最新的 50 条
while (displayDiv.children.length > 50) {
displayDiv.removeChild(displayDiv.lastChild);
}
});
// -------- 结束关键代码 --------
// 处理连接错误
socket.on('connect_error', (err) => {
console.error('Connection Failed:', err.message);
displayDiv.innerHTML = 'Failed to connect to server...';
});
// (可选) 监听服务器发送的错误消息
socket.on('error_message', (message) => {
console.error('Server error:', message);
// 可以在页面上显示错误提示
// 例如: alert(message);
});
});
看截图貌似没啥明显错误,但功能不符合预期。
为啥会这样?问题根源分析
问题出在 数据插入 和 数据广播 这两个环节没有关联起来。
看看现在的逻辑:
-
用户连接时 (
io.on('connection', ...)
):- 当一个新的浏览器窗口打开,或者刷新页面时,会触发
connection
事件。 - 这时,服务器会从 MySQL 查询
entries
表里的 所有 数据。 - 然后通过
socket.emit('showrows', rows)
把这些查到的数据 只 发送给 当前这个刚连接 的客户端。 - 所以,新连接或者刷新页面时,能看到当时数据库里的所有数据。
- 当一个新的浏览器窗口打开,或者刷新页面时,会触发
-
API 插入数据时 (
app.get('/api/motor_details', ...)
):- 当访问那个 URL 时,Express 路由接收请求。
- 数据被提取出来 (
req.query
)。 - 数据被插入到 MySQL 数据库。
- 但是! 插入成功后,代码仅仅是在控制台打印了日志 (
console.log
),并给 API 调用者返回了一个200 OK
响应。 - 它 完全没有 通知任何已经连接的 Socket.IO 客户端:“嘿!有新数据来了!”
所以说, 数据库里的数据确实更新了,但没有任何机制去主动告诉那些已经打开网页等着看数据的用户。他们只能通过刷新页面(重新建立 Socket.IO 连接,触发 io.on('connection')
)来获取最新的全量数据。这当然不是“实时”的。
动手解决:让数据“飞”起来
要实现实时更新,关键在于:当数据成功插入数据库后,立刻通过 Socket.IO 把这条新数据广播给所有在线的客户端。
方案一:改造 API 接口,插入即广播
这是最直接也最常用的方法。思路很简单:在数据库插入操作成功的回调函数里,加上 Socket.IO 的广播代码。
原理:
利用 mysql
库提供的回调函数。当 connectSql.query
的 insert
操作成功执行后,回调函数会被触发。在这个回调函数里,我们不仅知道了数据插入成功,还能拿到插入的数据(或者至少是插入操作成功的信息)。此时,正是通过 io
对象向所有连接的客户端发送消息的最佳时机。
我们会使用 io.emit()
而不是 socket.emit()
。区别在于:
socket.emit()
: 只向触发某个事件(比如connection
)的 那一个 客户端发送消息。io.emit()
: 向 所有 当前连接到服务器的客户端广播消息。这正是我们想要的,让所有人看到更新。
代码改造 (后端 index.js
):
找到 app.get('/api/motor_details', ...)
这部分,修改如下:
// ... (前面的代码不变) ...
app.get('/api/motor_details', function(req, res) {
// 使用 req.query 获取 GET 请求的查询参数
var device_id = req.query.deviceId;
var a = req.query.a;
var b = req.query.b;
var c = req.query.c;
// === 增加更健壮的输入验证 ===
if (!device_id) {
// 不仅要返回错误,也应该记录日志
console.warn('Missing deviceId parameter from IP:', req.ip);
return res.status(400).json({ error: 'Missing deviceId parameter' }); // 使用 JSON 返回错误更规范
}
// 可以根据需要添加对 a, b, c 的验证,比如类型、长度等
// if (typeof a !== 'string' || a.length > 50) { ... }
var entries = {
device_id: device_id,
// 对可能为空的值提供默认值或做处理
a: a || '', // 如果 a 不存在,设为空字符串
b: b || '',
c: c || '',
// 使用数据库服务器的时间通常更可靠,但这里用 Node.js 时间也行
date: new Date()
};
var query = connectSql.query('INSERT INTO entries SET ?', entries, function (err, result) {
if (err) {
console.error('Error inserting data:', err);
// 避免泄露过多数据库细节给客户端
return res.status(500).json({ error: 'Database operation failed' });
}
console.log('Data inserted successfully, ID:', result.insertId);
// !!!!! 关键改动 !!!!!
// 准备要广播的数据,可以稍微处理下,比如确保日期格式一致
const broadcastData = {
...entries, // 包含 device_id, a, b, c
id: result.insertId, // 把数据库生成的 ID 也加上
date: entries.date.toISOString() // 统一为 ISO 格式字符串
};
// 使用 io.emit 向所有连接的客户端广播 'new_entry' 事件,并携带新插入的数据
io.emit('new_entry', broadcastData);
console.log('Broadcasted new entry:', broadcastData);
// !!!!! 结束关键改动 !!!!!
// 给 API 调用者一个成功的响应
// 返回插入的数据和ID通常是好习惯
res.status(201).json({ message: 'Data inserted successfully', data: broadcastData }); // 201 Created 更合适
});
});
// ... (io.on('connection', ...) 部分基本可以保持原样,它负责新用户的初始数据加载) ...
// 优化 io.on('connection') 部分,增加错误处理和日志
io.on('connection', function(socket){
console.log(`User connected: ${socket.id} from ${socket.handshake.address}`);
// 发送初始数据
connectSql.query('SELECT * FROM entries ORDER BY id DESC LIMIT 50', function(err, rows){
if(err) {
console.error(`Error fetching initial data for ${socket.id}:`, err);
socket.emit('error_message', 'Failed to load initial data. Please refresh.'); // 给客户端明确提示
return;
}
console.log(`Sending initial ${rows.length} rows to ${socket.id}`);
// 可以在发送前处理一下数据格式,例如日期
const formattedRows = rows.map(row => ({
...row,
date: row.date ? new Date(row.date).toISOString() : null // 统一日期格式
}));
socket.emit('showrows', formattedRows); // 发送处理后的数据
});
// 监听断开连接
socket.on('disconnect', function(reason){
console.log(`User disconnected: ${socket.id}, reason: ${reason}`);
});
// 可以增加其他事件监听,例如客户端的心跳或特定请求
});
代码改造 (前端 JavaScript):
前端现在不仅要处理初始加载的 showrows
事件,还需要监听我们新定义的 new_entry
事件,并把收到的新数据追加到页面上。
// 确保在 DOM 加载完成后执行
document.addEventListener('DOMContentLoaded', (event) => {
const displayDiv = document.getElementById("display");
const statusDiv = document.getElementById("status"); // (可选) 加一个状态显示区域
var socket = io.connect('http://localhost:4000');
setStatus('Connecting...');
// 处理连接成功
socket.on('connect', () => {
setStatus('Connected. Loading initial data...');
console.log('Socket connected:', socket.id);
});
// 监听 'showrows' 事件,用于加载初始数据
socket.on('showrows', function(rows) {
console.log('Received initial data:', rows);
setStatus(`Loaded ${rows.length} initial entries.`);
var html = '';
// 从最新到最旧显示
for (var i = 0; i < rows.length; i++) {
html += createEntryHtml(rows[i]); // 使用函数创建 HTML,更清晰
}
displayDiv.innerHTML = html; // 覆盖显示初始数据
});
// !!!!! 关键改动 !!!!!
// 监听 'new_entry' 事件,用于接收新插入的数据
socket.on('new_entry', function(entry) {
console.log('Received new entry:', entry);
setStatus('New entry received!');
const newEntryHtml = createEntryHtml(entry);
// 创建一个临时的父元素来解析 HTML 字符串
const tempDiv = document.createElement('div');
tempDiv.innerHTML = newEntryHtml;
const newEntryElement = tempDiv.firstChild;
// 将新数据添加到显示区域的顶部
displayDiv.insertBefore(newEntryElement, displayDiv.firstChild);
// (可选) 平滑滚动或高亮新条目效果
newEntryElement.style.backgroundColor = '#ffff99'; // 临时高亮
setTimeout(() => {
newEntryElement.style.backgroundColor = '';
}, 2000); // 2秒后移除高亮
// (可选) 限制显示的条目数量,防止页面无限增长
const maxEntries = 50; // 最多显示 50 条
while (displayDiv.children.length > maxEntries) {
displayDiv.removeChild(displayDiv.lastChild); // 移除最旧的条目 (底部的)
}
// 短暂显示状态后恢复默认
setTimeout(() => setStatus('Connected. Waiting for data...'), 1500);
});
// !!!!! 结束关键改动 !!!!!
// 处理连接错误
socket.on('connect_error', (err) => {
console.error('Connection Failed:', err);
setStatus(`Connection Failed: ${err.message}. Retrying...`);
});
// 处理断开连接
socket.on('disconnect', (reason) => {
console.warn('Socket disconnected:', reason);
setStatus(`Disconnected: ${reason}. Attempting to reconnect...`);
});
// 监听服务器发送的错误消息
socket.on('error_message', (message) => {
console.error('Server error:', message);
// 可以在页面上显示更明显的错误提示
const errorDiv = document.createElement('div');
errorDiv.style.color = 'red';
errorDiv.textContent = `Error: ${message}`;
displayDiv.insertBefore(errorDiv, displayDiv.firstChild);
setStatus('Error received from server.');
});
// ---- Helper Functions ----
function setStatus(message) {
if (statusDiv) {
statusDiv.textContent = message;
}
}
function createEntryHtml(entry) {
// 使用模板字符串和进行适当的 HTML 转义 (虽然这里示例简单,实际应注意 XSS)
const deviceId = entry.device_id || 'N/A';
const a = entry.a || 'N/A';
const b = entry.b || 'N/A';
const c = c || 'N/A';
// 格式化日期,如果存在的话
const dateStr = entry.date ? new Date(entry.date).toLocaleString() : 'N/A';
// 注意:简单显示,未做 XSS 防护!生产环境需要对来自数据库的数据进行转义
return `<div class="entry">
<span>Device ID: ${deviceId}</span> |
<span>A: ${a}</span> |
<span>B: ${b}</span> |
<span>C: ${c}</span> |
<span>Time: ${dateStr}</span>
</div>`;
}
});
// 别忘了在 HTML 中添加 <div id="status"></div> (如果用了的话)
// <div id="status"></div>
// <div id="display">Loading...</div>
安全建议:
- 输入验证 (非常重要): API 接口 (
/api/motor_details
) 必须严格验证所有传入的参数 (deviceId
,a
,b
,c
)。检查它们是否存在、类型是否正确、长度是否符合预期、是否包含恶意字符等。防止 SQL 注入(虽然mysql
库的?
占位符能防御基本注入,但验证输入总是好的)、数据损坏或意外行为。 - SQL 注入防护: 坚持使用参数化查询(就像代码中
connectSql.query('insert into entries set ?', entries, ...)
这样),绝对不要 手动拼接 SQL 字符串! - 最小权限原则: 数据库连接用户 (
root
是 绝对 不行的) 应该只有执行INSERT
和SELECT
的权限,避免给它过高的权限(如DELETE
,DROP
,ALTER
等)。 - 错误处理: 不要在生产环境中把详细的数据库错误信息直接发送给客户端(API 响应或 Socket.IO 消息),这可能泄露敏感信息。记录详细错误到服务器日志,给客户端返回通用的错误提示。
- 广播内容: 确保你通过
io.emit
广播出去的数据 (entries
对象) 不包含任何敏感信息。例如,如果表里有用户的密码哈希或其他私密数据,千万不要直接整个对象广播出去。只选择需要公开显示的字段进行广播。 - 速率限制: 如果这个 API 可能被频繁调用,考虑添加速率限制(rate limiting),防止恶意用户通过大量请求耗尽服务器或数据库资源。可以使用
express-rate-limit
这类中间件。
进阶使用技巧:
- 房间 (Rooms): 如果不是所有用户都需要看到所有数据更新(比如,用户只关心特定
deviceId
的数据),可以使用 Socket.IO 的 "房间" 功能。- 当用户连接时,根据他的需求(可能通过 URL 参数或登录信息判断)将他的
socket
加入到一个或多个房间,例如socket.join('device_145');
。 - 在插入数据后,不再使用
io.emit()
广播给所有人,而是用io.to('device_' + entries.device_id).emit('new_entry', entries);
只广播给订阅了这个设备 ID 的房间里的客户端。这样更高效,也更相关。
- 当用户连接时,根据他的需求(可能通过 URL 参数或登录信息判断)将他的
- 确认 (Acknowledgements): 有时你想知道客户端是否收到了你发送的消息。
emit
函数可以接受一个回调函数作为最后一个参数。客户端收到消息后,可以执行这个回调,向服务器发送一个确认。这对于需要保证消息送达的场景很有用。- 服务器:
socket.emit('request_data', someData, (response) => { console.log('Client responded:', response); });
- 客户端:
socket.on('request_data', (data, callback) => { console.log('Received data request:', data); // 处理 data... callback({ status: 'received' }); });
- 服务器:
- 前端性能优化: 如果数据更新非常频繁,直接操作 DOM (像
insertBefore
) 可能会有点慢。可以考虑:- 批量更新: 不要每来一条数据就更新 DOM,可以攒一小批(比如 100ms 内的),然后一次性更新。
- 虚拟 DOM 库: 使用像 React, Vue, Svelte 这类库,它们会更高效地处理 DOM 更新。
- 数据限制: 页面上只保留一定数量(比如最新的 50-100 条)的数据,旧数据自动移除,避免 DOM 元素无限增长。
方案二:数据库侦听 (更高级,可能有点 overkill)
这种方法不直接在 API 插入逻辑里加广播代码,而是让后端应用去“监听”数据库本身的变化。
原理:
- 数据库触发器 (Triggers): 在
entries
表上创建一个AFTER INSERT
触发器。当有新行插入时,触发器可以执行一些操作,比如调用一个外部程序、向某个消息队列(如 Redis Pub/Sub, RabbitMQ)发送通知,或者(比较 hacky 的方式)写入一个特定的日志文件。 - 变更数据捕获 (Change Data Capture - CDC): 一些数据库或中间件支持 CDC。它们可以捕获数据库的底层变更日志(如 MySQL 的 binlog),然后将这些变更事件流式传输出来。
- 轮询 (Polling - 不推荐): 后端服务定期去查询数据库,看有没有比上次查询时间戳更新的数据。效率较低,不是真·实时。
实现:
- 你的 Node.js 应用需要一个机制来接收这些来自数据库的变更信号(比如订阅消息队列、读取 binlog 流、监控日志文件)。
- 一旦检测到
INSERT
事件,就通过io.emit()
将数据广播出去。
优缺点:
- 优点: 将数据插入逻辑和实时通知逻辑解耦。API 接口只负责插入,不关心后续的实时推送。推送服务可以独立扩展。
- 缺点: 实现更复杂,需要对数据库触发器、消息队列或 CDC 技术有了解。增加了系统的组件和维护成本。对于当前问题来说,可能有点杀鸡用牛刀。
鉴于你刚接触 Node.js,方案一(改造 API 接口) 是最直接、最容易理解和实现的。
关于 req.param
的小插曲
你在问题里提到试过 req.param
, req.params
, req.body
, req.query
。这里简单说明一下在 Express 中它们的区别,以及为什么你的场景应该用 req.query
:
req.query
: 用于获取 URL 中?
后面的查询字符串参数。你的 URL 是http://.../motor_details?deviceId=145&a=sdfa3...
,所以deviceId
,a
,b
,c
都在req.query
对象里。访问方式是req.query.deviceId
,req.query.a
等。这是 正确 的方式。req.params
: 用于获取路由路径中定义的参数。比如你的路由是/api/motor/:deviceId
,那么访问/api/motor/145
时,req.params.deviceId
的值就是145
。这不适用于你当前的 URL 结构。req.body
: 用于获取 POST、PUT 等请求体中的数据。通常需要配合body-parser
或 Express 内置的express.json()
,express.urlencoded()
中间件来解析请求体。你的请求是 GET 请求,数据在 URL 里,不在请求体里,所以req.body
是空的 (undefined
)。req.param()
: 这是一个 已弃用 (deprecated) 的方法,它曾试图按特定顺序查找req.params
、req.body
、req.query
来获取参数。不要使用它 ,不仅因为它被弃用了,而且它的查找顺序和行为可能不直观,容易出错。
所以,对于你的 URL http://localhost:4000/api/motor_details?deviceId=145&a=sdfa3&b=us&c=uds
,获取参数的正确方式就是 req.query.deviceId
、req.query.a
等。代码示例中已经改用了 req.query
。