返回

Node.js+Socket.IO 实现 MySQL 数据实时更新 (告别刷新)

mysql

Node.js + Socket.IO + MySQL 实现数据实时显示

遇到啥问题了?

刚接触 Node.js,想做个网页实时显示数据的功能。具体是这样:我通过一个 URL (比如 http://localhost:4000/api/motor_details?deviceId=145&a=sdfa3&b=us&c=uds) 往 MySQL 数据库里插数据。数据确实能成功插进去,但是网页上只有手动刷新页面才能看到新数据。我想要的效果是,数据插进去后,网页能自动更新,不用我手动刷新。

看看现在的代码 (index.js):

var express = require('express');
var socket = require('socket.io');
var mysql = require('mysql');
var http = require("http");
var app = express();
// 注意:创建 server 实例是为了给 socket.io 使用
var server = http.createServer(app); 
var io = socket(server); // 将 server 传递给 socket.io

server.listen(4000,function(){ // 使用 server 实例监听端口
    console.log('Listening 4000');
});

app.use(express.static('public'));

var connectSql = mysql.createConnection({
    host: "localhost",
    user: "root",
    password: "root", // 生产环境千万别用弱密码
    database: "grid_component",
    socketPath: '/Applications/MAMP/tmp/mysql/mysql.sock' // 这个路径可能因环境而异
});

connectSql.connect(function(err) { // 增加错误处理
  if (err) {
    console.error('Database connection failed: ' + err.stack);
    return;
  }
  console.log('Connected to database.');
});

app.get('/api/motor_details', function(req, res) {
    // 注意: req.param 已弃用, 对于 URL 查询参数,应该用 req.query
    var device_id = req.query.deviceId; 
    var a = req.query.a;
    var b = req.query.b;  
    var c = req.query.c;  
    
    // 基本的验证 (后面会细说)
    if (!device_id) {
       return res.status(400).send('Missing deviceId parameter');
    }

    var entries = {
        device_id: device_id,
        a: a,
        b: b,
        c: c,
        date: new Date() // 使用当前日期时间更合适
    }
    
    var query = connectSql.query('insert into entries set ?', entries, function (err, result) {
        if(err){
            console.error('Error inserting data:', err);
            // 向请求方返回错误信息
            return res.status(500).send('Database insert error'); 
        }
        console.log('Data inserted successfully, ID:', result.insertId);
        // 数据插入成功后,需要通知客户端!这是缺失的关键步骤
        
        // ---- 这里是需要添加的关键代码 ----
        io.emit('new_entry', entries); // 向所有连接的客户端广播新数据
        // -------- 结束关键代码 --------

        res.status(200).send('Data inserted successfully'); // 给API调用者一个响应
    });
});

io.on('connection',function(socket){
    console.log('A user connected:', socket.id);
    
    // 当有新用户连接时,发送当前所有数据给他
    connectSql.query('SELECT * FROM entries ORDER BY id DESC LIMIT 50', function(err, rows){ // 可以加排序和限制数量
        if(err) {
           console.error('Error fetching initial data:', err);
           // 可以考虑给这个连接发送一个错误消息
           socket.emit('error_message', 'Failed to fetch initial data.');
           return; // 发生错误时终止后续操作
        }
        console.log('Sending initial data to:', socket.id);
        socket.emit('showrows', rows); // 只发送给当前连接的客户端
    });

    // 处理断开连接
    socket.on('disconnect', function(){
        console.log('User disconnected:', socket.id);
    });
});

前端代码 (public/index.html 或类似文件中的 <script>):

// 确保在 DOM 加载完成后执行
document.addEventListener('DOMContentLoaded', (event) => {
    const displayDiv = document.getElementById("display");
    var socket = io.connect('http://localhost:4000'); // 建立连接

    // 监听 'showrows' 事件,用于加载初始数据
    socket.on('showrows', function(rows) {
        console.log('Received initial data:', rows);
        var html = '';
        // 从最新到最旧显示
        for (var i = 0; i < rows.length; i++) { 
            html += `<div>ID: ${rows[i].device_id}, A: ${rows[i].a}, B: ${rows[i].b}, C: ${rows[i].c}, Date: ${rows[i].date}</div>`;
        }
        displayDiv.innerHTML = html; // 覆盖显示初始数据
    });

    // ---- 这里是需要添加的关键代码 ----
    // 监听 'new_entry' 事件,用于接收新插入的数据
    socket.on('new_entry', function(entry) {
        console.log('Received new entry:', entry);
        // 创建一个新的 div 元素来显示新数据
        const newEntryDiv = document.createElement('div');
        newEntryDiv.innerHTML = `<div>ID: ${entry.device_id}, A: ${entry.a}, B: ${entry.b}, C: ${entry.c}, Date: ${entry.date}</div>`;
        
        // 将新数据添加到显示区域的顶部 (或者底部,根据需求调整)
        displayDiv.insertBefore(newEntryDiv, displayDiv.firstChild); 

        // (可选) 如果数据量很大,可能需要限制显示的条目数量
        // 例如,只保留最新的 50 条
        while (displayDiv.children.length > 50) {
            displayDiv.removeChild(displayDiv.lastChild);
        }
    });
    // -------- 结束关键代码 --------

    // 处理连接错误
    socket.on('connect_error', (err) => {
      console.error('Connection Failed:', err.message);
      displayDiv.innerHTML = 'Failed to connect to server...';
    });
    
    // (可选) 监听服务器发送的错误消息
    socket.on('error_message', (message) => {
        console.error('Server error:', message);
        // 可以在页面上显示错误提示
        // 例如: alert(message);
    });
});

看截图貌似没啥明显错误,但功能不符合预期。

为啥会这样?问题根源分析

问题出在 数据插入数据广播 这两个环节没有关联起来。

看看现在的逻辑:

  1. 用户连接时 (io.on('connection', ...)):

    • 当一个新的浏览器窗口打开,或者刷新页面时,会触发 connection 事件。
    • 这时,服务器会从 MySQL 查询 entries 表里的 所有 数据。
    • 然后通过 socket.emit('showrows', rows) 把这些查到的数据 发送给 当前这个刚连接 的客户端。
    • 所以,新连接或者刷新页面时,能看到当时数据库里的所有数据。
  2. API 插入数据时 (app.get('/api/motor_details', ...)):

    • 当访问那个 URL 时,Express 路由接收请求。
    • 数据被提取出来 (req.query)。
    • 数据被插入到 MySQL 数据库。
    • 但是! 插入成功后,代码仅仅是在控制台打印了日志 (console.log),并给 API 调用者返回了一个 200 OK 响应。
    • 完全没有 通知任何已经连接的 Socket.IO 客户端:“嘿!有新数据来了!”

所以说, 数据库里的数据确实更新了,但没有任何机制去主动告诉那些已经打开网页等着看数据的用户。他们只能通过刷新页面(重新建立 Socket.IO 连接,触发 io.on('connection'))来获取最新的全量数据。这当然不是“实时”的。

动手解决:让数据“飞”起来

要实现实时更新,关键在于:当数据成功插入数据库后,立刻通过 Socket.IO 把这条新数据广播给所有在线的客户端。

方案一:改造 API 接口,插入即广播

这是最直接也最常用的方法。思路很简单:在数据库插入操作成功的回调函数里,加上 Socket.IO 的广播代码。

原理:

利用 mysql 库提供的回调函数。当 connectSql.queryinsert 操作成功执行后,回调函数会被触发。在这个回调函数里,我们不仅知道了数据插入成功,还能拿到插入的数据(或者至少是插入操作成功的信息)。此时,正是通过 io 对象向所有连接的客户端发送消息的最佳时机。

我们会使用 io.emit() 而不是 socket.emit()。区别在于:

  • socket.emit(): 只向触发某个事件(比如 connection)的 那一个 客户端发送消息。
  • io.emit(): 向 所有 当前连接到服务器的客户端广播消息。这正是我们想要的,让所有人看到更新。

代码改造 (后端 index.js):

找到 app.get('/api/motor_details', ...) 这部分,修改如下:

// ... (前面的代码不变) ...

app.get('/api/motor_details', function(req, res) {
    // 使用 req.query 获取 GET 请求的查询参数
    var device_id = req.query.deviceId; 
    var a = req.query.a;
    var b = req.query.b;  
    var c = req.query.c;  
    
    // === 增加更健壮的输入验证 ===
    if (!device_id) {
       // 不仅要返回错误,也应该记录日志
       console.warn('Missing deviceId parameter from IP:', req.ip); 
       return res.status(400).json({ error: 'Missing deviceId parameter' }); // 使用 JSON 返回错误更规范
    }
    // 可以根据需要添加对 a, b, c 的验证,比如类型、长度等
    // if (typeof a !== 'string' || a.length > 50) { ... }

    var entries = {
        device_id: device_id,
        // 对可能为空的值提供默认值或做处理
        a: a || '', // 如果 a 不存在,设为空字符串
        b: b || '', 
        c: c || '',
        // 使用数据库服务器的时间通常更可靠,但这里用 Node.js 时间也行
        date: new Date() 
    };
    
    var query = connectSql.query('INSERT INTO entries SET ?', entries, function (err, result) {
        if (err) {
            console.error('Error inserting data:', err);
            // 避免泄露过多数据库细节给客户端
            return res.status(500).json({ error: 'Database operation failed' }); 
        }  
        
        console.log('Data inserted successfully, ID:', result.insertId);

        // !!!!! 关键改动 !!!!!
        // 准备要广播的数据,可以稍微处理下,比如确保日期格式一致
        const broadcastData = {
            ...entries, // 包含 device_id, a, b, c
            id: result.insertId, // 把数据库生成的 ID 也加上
            date: entries.date.toISOString() // 统一为 ISO 格式字符串
        };

        // 使用 io.emit 向所有连接的客户端广播 'new_entry' 事件,并携带新插入的数据
        io.emit('new_entry', broadcastData); 
        console.log('Broadcasted new entry:', broadcastData);
        // !!!!! 结束关键改动 !!!!!

        // 给 API 调用者一个成功的响应
        // 返回插入的数据和ID通常是好习惯
        res.status(201).json({ message: 'Data inserted successfully', data: broadcastData }); // 201 Created 更合适
    });
});

// ... (io.on('connection', ...) 部分基本可以保持原样,它负责新用户的初始数据加载) ...
// 优化 io.on('connection') 部分,增加错误处理和日志
io.on('connection', function(socket){
    console.log(`User connected: ${socket.id} from ${socket.handshake.address}`);
    
    // 发送初始数据
    connectSql.query('SELECT * FROM entries ORDER BY id DESC LIMIT 50', function(err, rows){ 
        if(err) {
           console.error(`Error fetching initial data for ${socket.id}:`, err);
           socket.emit('error_message', 'Failed to load initial data. Please refresh.'); // 给客户端明确提示
           return;
        }
        console.log(`Sending initial ${rows.length} rows to ${socket.id}`);
        // 可以在发送前处理一下数据格式,例如日期
        const formattedRows = rows.map(row => ({
            ...row,
            date: row.date ? new Date(row.date).toISOString() : null // 统一日期格式
        }));
        socket.emit('showrows', formattedRows); // 发送处理后的数据
    });

    // 监听断开连接
    socket.on('disconnect', function(reason){
        console.log(`User disconnected: ${socket.id}, reason: ${reason}`);
    });

    // 可以增加其他事件监听,例如客户端的心跳或特定请求
});

代码改造 (前端 JavaScript):

前端现在不仅要处理初始加载的 showrows 事件,还需要监听我们新定义的 new_entry 事件,并把收到的新数据追加到页面上。

// 确保在 DOM 加载完成后执行
document.addEventListener('DOMContentLoaded', (event) => {
    const displayDiv = document.getElementById("display");
    const statusDiv = document.getElementById("status"); // (可选) 加一个状态显示区域
    var socket = io.connect('http://localhost:4000'); 

    setStatus('Connecting...');

    // 处理连接成功
    socket.on('connect', () => {
        setStatus('Connected. Loading initial data...');
        console.log('Socket connected:', socket.id);
    });

    // 监听 'showrows' 事件,用于加载初始数据
    socket.on('showrows', function(rows) {
        console.log('Received initial data:', rows);
        setStatus(`Loaded ${rows.length} initial entries.`);
        var html = '';
        // 从最新到最旧显示
        for (var i = 0; i < rows.length; i++) { 
            html += createEntryHtml(rows[i]); // 使用函数创建 HTML,更清晰
        }
        displayDiv.innerHTML = html; // 覆盖显示初始数据
    });

    // !!!!! 关键改动 !!!!!
    // 监听 'new_entry' 事件,用于接收新插入的数据
    socket.on('new_entry', function(entry) {
        console.log('Received new entry:', entry);
        setStatus('New entry received!');

        const newEntryHtml = createEntryHtml(entry);
        // 创建一个临时的父元素来解析 HTML 字符串
        const tempDiv = document.createElement('div');
        tempDiv.innerHTML = newEntryHtml;
        const newEntryElement = tempDiv.firstChild;

        // 将新数据添加到显示区域的顶部
        displayDiv.insertBefore(newEntryElement, displayDiv.firstChild); 

        // (可选) 平滑滚动或高亮新条目效果
        newEntryElement.style.backgroundColor = '#ffff99'; // 临时高亮
        setTimeout(() => {
            newEntryElement.style.backgroundColor = ''; 
        }, 2000); // 2秒后移除高亮

        // (可选) 限制显示的条目数量,防止页面无限增长
        const maxEntries = 50; // 最多显示 50 条
        while (displayDiv.children.length > maxEntries) {
            displayDiv.removeChild(displayDiv.lastChild); // 移除最旧的条目 (底部的)
        }
        
        // 短暂显示状态后恢复默认
        setTimeout(() => setStatus('Connected. Waiting for data...'), 1500);
    });
    // !!!!! 结束关键改动 !!!!!

    // 处理连接错误
    socket.on('connect_error', (err) => {
      console.error('Connection Failed:', err);
      setStatus(`Connection Failed: ${err.message}. Retrying...`);
    });
    
    // 处理断开连接
    socket.on('disconnect', (reason) => {
        console.warn('Socket disconnected:', reason);
        setStatus(`Disconnected: ${reason}. Attempting to reconnect...`);
    });

    // 监听服务器发送的错误消息
    socket.on('error_message', (message) => {
        console.error('Server error:', message);
        // 可以在页面上显示更明显的错误提示
        const errorDiv = document.createElement('div');
        errorDiv.style.color = 'red';
        errorDiv.textContent = `Error: ${message}`;
        displayDiv.insertBefore(errorDiv, displayDiv.firstChild);
        setStatus('Error received from server.');
    });

    // ---- Helper Functions ----
    function setStatus(message) {
        if (statusDiv) {
            statusDiv.textContent = message;
        }
    }

    function createEntryHtml(entry) {
        // 使用模板字符串和进行适当的 HTML 转义 (虽然这里示例简单,实际应注意 XSS)
        const deviceId = entry.device_id || 'N/A';
        const a = entry.a || 'N/A';
        const b = entry.b || 'N/A';
        const c = c || 'N/A';
        // 格式化日期,如果存在的话
        const dateStr = entry.date ? new Date(entry.date).toLocaleString() : 'N/A'; 
        
        // 注意:简单显示,未做 XSS 防护!生产环境需要对来自数据库的数据进行转义
        return `<div class="entry">
                    <span>Device ID: ${deviceId}</span> | 
                    <span>A: ${a}</span> | 
                    <span>B: ${b}</span> | 
                    <span>C: ${c}</span> | 
                    <span>Time: ${dateStr}</span>
                </div>`;
    }
});

// 别忘了在 HTML 中添加 <div id="status"></div> (如果用了的话)
// <div id="status"></div>
// <div id="display">Loading...</div>

安全建议:

  1. 输入验证 (非常重要): API 接口 (/api/motor_details) 必须严格验证所有传入的参数 (deviceId, a, b, c)。检查它们是否存在、类型是否正确、长度是否符合预期、是否包含恶意字符等。防止 SQL 注入(虽然 mysql 库的 ? 占位符能防御基本注入,但验证输入总是好的)、数据损坏或意外行为。
  2. SQL 注入防护: 坚持使用参数化查询(就像代码中 connectSql.query('insert into entries set ?', entries, ...) 这样),绝对不要 手动拼接 SQL 字符串!
  3. 最小权限原则: 数据库连接用户 (root绝对 不行的) 应该只有执行 INSERTSELECT 的权限,避免给它过高的权限(如 DELETE, DROP, ALTER 等)。
  4. 错误处理: 不要在生产环境中把详细的数据库错误信息直接发送给客户端(API 响应或 Socket.IO 消息),这可能泄露敏感信息。记录详细错误到服务器日志,给客户端返回通用的错误提示。
  5. 广播内容: 确保你通过 io.emit 广播出去的数据 (entries 对象) 不包含任何敏感信息。例如,如果表里有用户的密码哈希或其他私密数据,千万不要直接整个对象广播出去。只选择需要公开显示的字段进行广播。
  6. 速率限制: 如果这个 API 可能被频繁调用,考虑添加速率限制(rate limiting),防止恶意用户通过大量请求耗尽服务器或数据库资源。可以使用 express-rate-limit 这类中间件。

进阶使用技巧:

  1. 房间 (Rooms): 如果不是所有用户都需要看到所有数据更新(比如,用户只关心特定 deviceId 的数据),可以使用 Socket.IO 的 "房间" 功能。
    • 当用户连接时,根据他的需求(可能通过 URL 参数或登录信息判断)将他的 socket 加入到一个或多个房间,例如 socket.join('device_145');
    • 在插入数据后,不再使用 io.emit() 广播给所有人,而是用 io.to('device_' + entries.device_id).emit('new_entry', entries); 只广播给订阅了这个设备 ID 的房间里的客户端。这样更高效,也更相关。
  2. 确认 (Acknowledgements): 有时你想知道客户端是否收到了你发送的消息。emit 函数可以接受一个回调函数作为最后一个参数。客户端收到消息后,可以执行这个回调,向服务器发送一个确认。这对于需要保证消息送达的场景很有用。
    • 服务器: socket.emit('request_data', someData, (response) => { console.log('Client responded:', response); });
    • 客户端: socket.on('request_data', (data, callback) => { console.log('Received data request:', data); // 处理 data... callback({ status: 'received' }); });
  3. 前端性能优化: 如果数据更新非常频繁,直接操作 DOM (像 insertBefore) 可能会有点慢。可以考虑:
    • 批量更新: 不要每来一条数据就更新 DOM,可以攒一小批(比如 100ms 内的),然后一次性更新。
    • 虚拟 DOM 库: 使用像 React, Vue, Svelte 这类库,它们会更高效地处理 DOM 更新。
    • 数据限制: 页面上只保留一定数量(比如最新的 50-100 条)的数据,旧数据自动移除,避免 DOM 元素无限增长。

方案二:数据库侦听 (更高级,可能有点 overkill)

这种方法不直接在 API 插入逻辑里加广播代码,而是让后端应用去“监听”数据库本身的变化。

原理:

  • 数据库触发器 (Triggers):entries 表上创建一个 AFTER INSERT 触发器。当有新行插入时,触发器可以执行一些操作,比如调用一个外部程序、向某个消息队列(如 Redis Pub/Sub, RabbitMQ)发送通知,或者(比较 hacky 的方式)写入一个特定的日志文件。
  • 变更数据捕获 (Change Data Capture - CDC): 一些数据库或中间件支持 CDC。它们可以捕获数据库的底层变更日志(如 MySQL 的 binlog),然后将这些变更事件流式传输出来。
  • 轮询 (Polling - 不推荐): 后端服务定期去查询数据库,看有没有比上次查询时间戳更新的数据。效率较低,不是真·实时。

实现:

  • 你的 Node.js 应用需要一个机制来接收这些来自数据库的变更信号(比如订阅消息队列、读取 binlog 流、监控日志文件)。
  • 一旦检测到 INSERT 事件,就通过 io.emit() 将数据广播出去。

优缺点:

  • 优点: 将数据插入逻辑和实时通知逻辑解耦。API 接口只负责插入,不关心后续的实时推送。推送服务可以独立扩展。
  • 缺点: 实现更复杂,需要对数据库触发器、消息队列或 CDC 技术有了解。增加了系统的组件和维护成本。对于当前问题来说,可能有点杀鸡用牛刀。

鉴于你刚接触 Node.js,方案一(改造 API 接口) 是最直接、最容易理解和实现的。

关于 req.param 的小插曲

你在问题里提到试过 req.param, req.params, req.body, req.query。这里简单说明一下在 Express 中它们的区别,以及为什么你的场景应该用 req.query

  • req.query: 用于获取 URL 中 ? 后面的查询字符串参数。你的 URL 是 http://.../motor_details?deviceId=145&a=sdfa3...,所以 deviceId, a, b, c 都在 req.query 对象里。访问方式是 req.query.deviceIdreq.query.a 等。这是 正确 的方式。
  • req.params: 用于获取路由路径中定义的参数。比如你的路由是 /api/motor/:deviceId,那么访问 /api/motor/145 时,req.params.deviceId 的值就是 145。这不适用于你当前的 URL 结构。
  • req.body: 用于获取 POST、PUT 等请求体中的数据。通常需要配合 body-parser 或 Express 内置的 express.json(), express.urlencoded() 中间件来解析请求体。你的请求是 GET 请求,数据在 URL 里,不在请求体里,所以 req.body 是空的 (undefined)。
  • req.param(): 这是一个 已弃用 (deprecated) 的方法,它曾试图按特定顺序查找 req.paramsreq.bodyreq.query 来获取参数。不要使用它 ,不仅因为它被弃用了,而且它的查找顺序和行为可能不直观,容易出错。

所以,对于你的 URL http://localhost:4000/api/motor_details?deviceId=145&a=sdfa3&b=us&c=uds,获取参数的正确方式就是 req.query.deviceIdreq.query.a 等。代码示例中已经改用了 req.query