返回

Node.js 如何高效处理 PostgreSQL 大数据集,告别内存爆炸?

javascript

如何在 Node.js 中高效处理 PostgreSQL 大数据集,避免内存爆炸?

问题来了:一次加载几十万用户数据,内存扛不住啊!

写 Node.js 应用跟 PostgreSQL 打交道是家常便饭。但当你需要处理的数据量一大,比如从一个几百万用户的表里捞出十万个符合特定条件的用户(举个例子,配置了某个特定渠道 ID 追踪的用户),然后挨个处理(比如创建通知任务或者自动交易任务),麻烦就来了。

要是你像下面这样写,一次性把所有匹配的用户都抓到内存里,然后再分组处理:

async function processMessageForSubscribers(channelId, channelName, message, addresses) {
  try {
    // 一次性加载可能高达 10 万的用户数据到内存
    const users = await getUsersByTrackedTelegramChannel(channelId);
    const CHUNK_SIZE = 500;
    const notifyTasks = [];
    const autotradeTasks = [];

    // 在内存中将用户分块,然后并行处理每个块
    const processUserChunk = async (userChunk) => {
      await Promise.all(
        userChunk.map(async (user) => {
          const config = user.trackingConfig[channelId];
          const autotradeAmount = config?.autotradeAmount;

          if (config.newPost === 'NOTIFY') {
            // 创建通知任务
            createNotificationTask(user, addresses, message, channelId, channelName, autotradeAmount, notifyTasks);
          }
          // 这里可能还有创建 autotrade 任务的逻辑
          // if (config.autoTrade === true && autotradeAmount > 0) {
          //   createAutoTradeTask(user, ..., autotradeTasks);
          // }
        })
      );
    };

    // 遍历内存中的用户列表,分块处理
    for (let i = 0; i < users.length; i += CHUNK_SIZE) {
      const chunk = users.slice(i, i + CHUNK_SIZE);
      await processUserChunk(chunk);
    }

    // 将任务推送到队列
    await queueTasks(notifyTasks, autotradeTasks);
  } catch (error) {
    console.error('处理订阅者出错:', error);
    throw error; // 向上抛出错误,让调用者知道出问题了
  }
}

// 假设的数据库查询函数
async function getUsersByTrackedTelegramChannel(channelId) {
  // 这里是伪代码,实际需要使用你的 ORM 或数据库驱动
  // const query = 'SELECT * FROM users WHERE trackingConfig->>$1 IS NOT NULL';
  // return await db.query(query, [channelId]);
  // 注意:这里的实现是问题的根源,它会一次性返回所有匹配的用户
  console.log(`模拟查询 channelId 为 ${channelId} 的所有用户...`);
  // 实际应用中这里会执行数据库查询
  // 为了演示,我们假设返回了一个非常大的数组
  // !!!警告:生产环境中不要这样做,这只是为了模拟问题!!!
  const users = Array.from({ length: 100000 }, (_, i) => ({
    id: i + 1,
    name: `User ${i + 1}`,
    trackingConfig: {
      [channelId]: { newPost: 'NOTIFY', autoTrade: false, autotradeAmount: 0 },
    },
  }));
  console.log(`模拟查询完成,加载了 ${users.length} 个用户到内存`);
  return users;
}

// 假设的任务创建和队列函数
async function createNotificationTask(user, addresses, message, channelId, channelName, autotradeAmount, notifyTasks) {
  // console.log(`为用户 ${user.id} 创建通知任务`);
  notifyTasks.push({ userId: user.id, message: `New post in ${channelName}` });
}
async function queueTasks(notifyTasks, autotradeTasks) {
  console.log(`将 ${notifyTasks.length} 个通知任务和 ${autotradeTasks.length} 个自动交易任务加入队列`);
  // 实际的队列逻辑,例如推送到 Redis, RabbitMQ 等
}

这么搞,十万用户数据瞬间塞满内存,应用分分钟可能因为内存不足(OOM)挂掉,或者就算没挂,频繁的垃圾回收(GC)也会让应用性能急剧下降,卡得不行。

那咋办?有没有什么好法子,能在 Node.js 里处理这么多数据又不让内存爆炸?

刨根问底:为啥会内存爆炸?

简单说,Node.js(或者说 V8 引擎)能使用的内存不是无限的。虽然可以调整,但总有个上限。当你执行 await getUsersByTrackedTelegramChannel(channelId) 时,数据库驱动(比如 node-postgres)会兢兢业业地把 PostgreSQL 返回的所有匹配行都读出来,然后在 Node.js 这边转换成 JavaScript 对象,最后组成一个巨大的数组 users 存在内存里。

数据库一股脑把所有用户数据全发给 Node.js。Node.js 呢?照单全收,塞进内存。几十万用户,每个用户再带点配置信息,内存噌噌往上涨,不炸才怪!即便你后面用了 CHUNK_SIZE 来分块处理,那也只是处理逻辑上的分块,数据本身早就在内存里了,治标不治本。

几招搞定:告别内存焦虑

要解决这个问题,核心思路就是:别一次把所有数据都读进内存 。让数据细水长流,用多少,取多少。具体来说,有下面几种常见的靠谱做法:

  1. 数据库游标(Cursor)与流式处理(Streaming): 这是最优雅、内存效率最高的方式。
  2. 基于 OFFSET/LIMIT 的分页查询: 实现简单,但数据量特别大时可能有性能坑。
  3. 基于 Keyset 的分页查询(也叫游标分页): 性能比 OFFSET/LIMIT 好,实现稍微复杂点。

下面咱们挨个唠唠。

方案一:拥抱流式处理 (Stream/Cursor) - 王道!

原理

数据库游标(Cursor)允许你打开一个指向查询结果集的指针,然后一次只从这个结果集里取一行(或一小批)数据,而不是一次把整个结果集都加载过来。

Node.js 里,很多数据库驱动都把这个机制封装成了 Stream(流)。你可以像处理文件流或者网络流一样,监听 'data' 事件来处理每一条数据,处理完了再去取下一条。这样一来,无论结果集有多大,Node.js 应用在同一时间点只需要在内存中保留正在处理的那一小部分数据,内存占用极低。

node-postgres (pg) 库配合 pg-query-stream 这个包就能轻松实现流式查询。

实战

先把 pg-query-stream 装上:

npm install pg-query-stream
# 或者
yarn add pg-query-stream

然后改造咱们之前的 processMessageForSubscribers 函数:

const QueryStream = require('pg-query-stream');
const { Pool } = require('pg'); // 假设你用的是 pg Pool

// 假设你已经配置好了 pg Pool
// const pool = new Pool({ connectionString: 'your_db_connection_string' });

async function processMessageForSubscribersWithStream(channelId, channelName, message, addresses) {
  const pool = new Pool({ connectionString: 'postgresql://user:password@host:port/database' }); // 替换成你的连接配置
  const client = await pool.connect();
  const batchSize = 500; // 可以调整每次批量处理的大小
  let userBatch = [];
  let notifyTasks = [];
  let autotradeTasks = [];

  // 注意:这里的 SQL 查询需要根据你的实际表结构调整
  // 使用 JSONB 操作符 ->> 来提取文本值进行比较
  const queryText = 'SELECT id, trackingConfig FROM users WHERE trackingConfig->>$1 IS NOT NULL';
  const queryParams = [channelId]; // 使用参数化查询防止 SQL 注入

  const query = new QueryStream(queryText, queryParams);

  try {
    const stream = client.query(query); // 获取查询流

    // 监听 'data' 事件,每来一条用户数据就处理
    stream.on('data', (user) => {
      // 这里拿到的 user 就是数据库里的一行数据
      // user 对象可能类似 { id: 123, trackingconfig: { 'channel123': {...} } }
      // 注意字段名可能因数据库驱动或查询方式而小写

      const config = user.trackingconfig ? user.trackingconfig[channelId] : undefined; // 注意字段名小写
      const autotradeAmount = config?.autotradeAmount;

      if (config && config.newPost === 'NOTIFY') {
         // 先收集,达到 batchSize 后统一处理
        userBatch.push(user);
        if (userBatch.length >= batchSize) {
           // 拷贝当前批次,清空 userBatch 以收集下一批
            const currentBatch = [...userBatch];
            userBatch = [];
             // 异步处理这一批,但不阻塞流的读取(fire and forget 或用 Promise.all 管理并发)
            processUserBatch(currentBatch, channelId, channelName, message, addresses, notifyTasks, autotradeTasks)
                .catch(err => console.error('处理批次时出错:', err)); // 单独捕获批次处理错误
        }
      }
    });

    // 监听 'end' 事件,表示所有数据都接收完了
    await new Promise((resolve, reject) => {
      stream.on('end', () => {
         // 处理最后一批不足 batchSize 的用户
         if (userBatch.length > 0) {
            processUserBatch(userBatch, channelId, channelName, message, addresses, notifyTasks, autotradeTasks)
            .then(resolve) // 确保最后一批处理完再结束
            .catch(reject);
         } else {
            resolve();
         }
      });

      // 监听 'error' 事件,处理流本身可能发生的错误
      stream.on('error', (err) => {
        console.error('数据库流查询出错:', err);
        reject(err);
      });
    });

    // 等待所有可能在后台运行的 processUserBatch 完成
    // 注意:上面 processUserBatch 是异步启动的,这里需要一种机制等待它们全部完成
    // 一个简单的(但不完全精确)方法是稍作等待,或者使用更复杂的 Promise 管理
    // 为了简化示例,假设 queueTasks 能处理异步添加的任务,或者 processUserBatch 直接推送到外部队列
    await new Promise(resolve => setTimeout(resolve, 2000)); // !!这是一个简化的等待,生产环境需要更可靠的机制

    await queueTasks(notifyTasks, autotradeTasks); // 将收集到的任务入队

  } catch (error) {
    console.error('处理订阅者流式查询时出错:', error);
    throw error; // 向上抛出
  } finally {
    client.release(); // 别忘了释放数据库连接
    await pool.end(); // 如果是一次性脚本,记得关闭连接池
  }
}


// 辅助函数:处理一个批次的用户
async function processUserBatch(userBatch, channelId, channelName, message, addresses, notifyTasks, autotradeTasks) {
  // console.log(`处理 ${userBatch.length} 个用户批次...`);
  // 注意:这里可以像原先一样用 Promise.all 并行处理批次内的用户
  await Promise.all(
    userBatch.map(async (user) => {
       // 注意,从 stream 过来的数据字段名可能是小写
      const config = user.trackingconfig ? user.trackingconfig[channelId] : undefined;
      const autotradeAmount = config?.autotradeAmount;

      if (config && config.newPost === 'NOTIFY') {
        // 直接修改传入的数组(或返回新任务让调用者合并)
        createNotificationTask(user, addresses, message, channelId, channelName, autotradeAmount, notifyTasks);
      }
      // ... 其他任务处理 ...
    })
  );
}


// 其他辅助函数保持不变...

这样改动后,Node.js 不再需要一次性把所有用户数据加载到内存。数据像水流一样,一条条过来,处理完就扔掉(等待 GC),内存占用稳定且很低。

进阶使用技巧

  • 背压处理(Backpressure): 如果你的处理逻辑(比如 createNotificationTask 里的操作)比数据流过来的速度慢,内存还是可能会慢慢堆积(虽然是处理中的数据,不是整个结果集)。pg-query-stream 遵循 Node.js Stream 的标准接口,你可以通过 stream.pause()stream.resume() 来控制数据流的速度,实现背压管理。例如,当积压的任务达到一定数量时暂停流,处理完成后再恢复。
  • 错误处理: 流处理中的错误需要仔细考虑。stream.on('error', ...) 捕获流本身的错误(比如数据库连接断开)。在 stream.on('data', ...) 里的处理逻辑如果出错,需要用 try...catch 包裹,或者在异步操作(如 processUserBatch)的 catch 中处理,防止单个用户的处理失败导致整个流中断。
  • 事务: 如果你的处理逻辑需要更新数据库,并且要求整个大批量操作要么全成功要么全失败,流式处理不太适合直接包裹在一个大事务里(因为事务会持有锁,长时间运行的流处理可能导致锁争用或超时)。你可能需要考虑将处理逻辑设计成幂等的,或者分批次进行事务提交。对于只读然后创建外部任务(如推送到消息队列)的场景,一般不需要事务。

安全建议

  • 参数化查询: 看到代码里的 queryParams 了吗?这是必须的!永远使用参数化查询来防止 SQL 注入攻击。别自己手动拼 SQL 字符串。

方案二:分页查询 - OFFSET/LIMIT

原理

这是最直观的分页方法。通过 SQL 的 LIMIT 指定每页拿多少条数据,OFFSET 指定从哪里开始拿。

比如,LIMIT 500 OFFSET 0 拿前 500 条,LIMIT 500 OFFSET 500 拿第 501 到 1000 条,以此类推。

实战

你需要写一个循环,每次查询一页数据,处理完再查下一页,直到所有数据处理完毕。

async function processMessageForSubscribersWithOffsetLimit(channelId, channelName, message, addresses) {
  const pool = new Pool({ connectionString: 'postgresql://user:password@host:port/database' }); // 替换配置
  const CHUNK_SIZE = 500; // 每页大小
  let offset = 0;
  let keepFetching = true;
  let notifyTasks = [];
  let autotradeTasks = [];

  try {
    while (keepFetching) {
      const client = await pool.connect();
      try {
        // 使用 LIMIT 和 OFFSET 进行分页查询
        const queryText = `
          SELECT id, trackingConfig 
          FROM users 
          WHERE trackingConfig->>$1 IS NOT NULL 
          ORDER BY id -- OFFSET/LIMIT 需要配合 ORDER BY 保证分页稳定
          LIMIT $2 OFFSET $3
        `;
        const queryParams = [channelId, CHUNK_SIZE, offset];
        const { rows: userChunk } = await client.query(queryText, queryParams);

        if (userChunk.length > 0) {
          console.log(`获取到 ${userChunk.length} 条用户数据,偏移量: ${offset}`);

          // 处理当前页的数据(可以并行处理)
          await Promise.all(
            userChunk.map(async (user) => {
              const config = user.trackingconfig ? user.trackingconfig[channelId] : undefined;
              const autotradeAmount = config?.autotradeAmount;

              if (config && config.newPost === 'NOTIFY') {
                createNotificationTask(user, addresses, message, channelId, channelName, autotradeAmount, notifyTasks);
              }
              // ... 其他任务处理 ...
            })
          );

          // 更新 offset,准备获取下一页
          offset += userChunk.length;

          // 如果返回的数据量小于请求的 CHUNK_SIZE,说明已经是最后一页了
          if (userChunk.length < CHUNK_SIZE) {
            keepFetching = false;
          }
        } else {
          // 没有数据返回了,结束循环
          keepFetching = false;
        }
      } finally {
        client.release(); // 每次查询后释放连接
      }
    }

    await queueTasks(notifyTasks, autotradeTasks); // 所有分页处理完后,任务入队

  } catch (error) {
    console.error('处理订阅者分页查询(OFFSET/LIMIT)时出错:', error);
    throw error;
  } finally {
    await pool.end(); // 结束时关闭连接池
  }
}

缺点与注意

  • 性能问题: OFFSET 越大,查询性能通常越差。因为数据库需要扫描并跳过 OFFSET 指定数量的行才能找到你想要的那一页数据。当 OFFSET 达到几万甚至几十万时,查询可能变得非常慢。
  • 数据一致性: 如果在分页查询的过程中,有新的符合条件的数据插入或者旧数据被修改/删除,可能会导致某些数据被重复处理或遗漏。使用稳定的 ORDER BY (比如按主键 id 排序) 可以缓解一部分问题,但不能完全避免。
  • 需要 ORDER BY 为了保证分页的顺序稳定,必须配合 ORDER BY 子句,最好是按唯一键(如 id)排序。

安全建议

  • 同样,必须使用参数化查询 防止 SQL 注入。

方案三:分页查询 - Keyset Pagination (游标分页)

原理

Keyset Pagination (有时也叫 Cursor-based Pagination,但这里的 Cursor 不是指数据库服务器游标,而是指上一页最后一条记录的标识) 是一种更高效的分页方式,它避免了 OFFSET 的性能陷阱。

它的思路是:你记住上一页最后一条记录的排序键(比如 id),然后在下一页查询时,使用 WHERE 条件来获取排序键大于(或小于,取决于排序方向)该值的数据。

例如,如果按 id 升序排列,第一页查 SELECT ... WHERE trackingConfig->>'...' IS NOT NULL ORDER BY id LIMIT 500,得到最后一条记录 id1234。下一页就查 SELECT ... WHERE trackingConfig->>'...' IS NOT NULL AND id > 1234 ORDER BY id LIMIT 500

实战

async function processMessageForSubscribersWithKeyset(channelId, channelName, message, addresses) {
  const pool = new Pool({ connectionString: 'postgresql://user:password@host:port/database' }); // 替换配置
  const PAGE_SIZE = 500; // 每页大小
  let lastId = 0; // 假设 id 是数字类型且从大于 0 开始
  let keepFetching = true;
  let notifyTasks = [];
  let autotradeTasks = [];

  try {
    while (keepFetching) {
      const client = await pool.connect();
      try {
        // 使用 Keyset Pagination 进行分页查询
        // 确保有一个可靠的、有序的列(通常是主键 id)
        const queryText = `
          SELECT id, trackingConfig 
          FROM users 
          WHERE trackingConfig->>$1 IS NOT NULL AND id > $2 
          ORDER BY id ASC -- 必须按 id 排序
          LIMIT $3
        `;
        const queryParams = [channelId, lastId, PAGE_SIZE];
        const { rows: userChunk } = await client.query(queryText, queryParams);

        if (userChunk.length > 0) {
          console.log(`获取到 ${userChunk.length} 条用户数据,上一页最后 ID: ${lastId}`);

          // 处理当前页的数据(可以并行处理)
          await Promise.all(
             userChunk.map(async (user) => {
               const config = user.trackingconfig ? user.trackingconfig[channelId] : undefined;
               const autotradeAmount = config?.autotradeAmount;

               if (config && config.newPost === 'NOTIFY') {
                 createNotificationTask(user, addresses, message, channelId, channelName, autotradeAmount, notifyTasks);
               }
               // ... 其他任务处理 ...
             })
          );


          // 更新 lastId 为当前页的最后一条记录的 id
          lastId = userChunk[userChunk.length - 1].id;

          // 如果返回的数据量小于请求的 PAGE_SIZE,说明已经是最后一页了
          // Keyset 分页也可以通过判断返回数量是否小于页面大小来决定是否结束
           if (userChunk.length < PAGE_SIZE) {
             keepFetching = false;
           }

        } else {
          // 没有更多数据了,结束循环
          keepFetching = false;
        }
      } finally {
        client.release(); // 释放连接
      }
    }

     await queueTasks(notifyTasks, autotradeTasks); // 任务入队

  } catch (error) {
    console.error('处理订阅者分页查询(Keyset)时出错:', error);
    throw error;
  } finally {
      await pool.end(); // 关闭连接池
  }
}

优点

  • 性能好: 查询性能通常比较稳定,不会像 OFFSET 那样随着页码增大而显著下降。数据库可以高效地利用索引直接定位到 id > lastId 的位置开始查找。
  • 数据一致性相对较好: 对于新增数据,只要它们落在当前查询范围之后,就不会影响分页结果。但如果在分页过程中修改了排序键或者删除了数据,仍可能出现跳过或重复(尽管比 OFFSET 好些)。

前提条件

  • 需要一个(或一组)稳定且带有索引的排序字段,通常是主键 idcreated_at + id (如果 created_at 可能重复)。查询语句需要包含这个字段的 WHERE 条件和 ORDER BY

安全建议

  • 参数化查询!参数化查询!参数化查询!重要的事情说三遍。

选哪个?看场景!

好了,咱们介绍了三种处理大数据的姿势:

  1. 流式处理 (Stream/Cursor):

    • 优点: 内存占用最低,实时性好(数据一来就开始处理),适合需要顺序处理所有匹配数据的场景。
    • 缺点: 实现稍微复杂一点(需要处理流事件和背压),不太适合需要随机访问数据的场景。
    • 推荐场景: 像你的这种需要遍历处理大量匹配记录的后台任务,流式处理是首选。
  2. OFFSET/LIMIT 分页:

    • 优点: 实现简单,直观易懂。
    • 缺点: 数据量大时性能差,可能存在数据不一致问题。
    • 推荐场景: 数据量不大(比如几千几万条),或者对性能要求不高,或者只是做个简单的后台管理界面的分页展示。对于几十万级别的数据处理任务,慎用。
  3. Keyset Pagination:

    • 优点: 性能比 OFFSET/LIMIT 好得多,相对稳定。
    • 缺点: 实现比 OFFSET/LIMIT 稍复杂,需要有合适的排序键。
    • 推荐场景: 需要分页加载大量数据,并且性能是重要考量。比如实现高性能的“加载更多”功能,或者作为流式处理不可行时的替代方案。

对于你的问题——处理十万级别的用户数据来创建任务——强烈推荐使用方案一:流式处理 (Stream/Cursor) 。它是解决这类问题最地道、内存效率最高的方法。如果因为某些原因(比如老旧的库不支持流或者特殊业务逻辑限制)不能用流,那么 Keyset Pagination 是次优选择。尽量避免在大数据集上使用 OFFSET/LIMIT。