Node.js 如何高效处理 PostgreSQL 大数据集,告别内存爆炸?
2025-03-24 14:51:28
如何在 Node.js 中高效处理 PostgreSQL 大数据集,避免内存爆炸?
问题来了:一次加载几十万用户数据,内存扛不住啊!
写 Node.js 应用跟 PostgreSQL 打交道是家常便饭。但当你需要处理的数据量一大,比如从一个几百万用户的表里捞出十万个符合特定条件的用户(举个例子,配置了某个特定渠道 ID 追踪的用户),然后挨个处理(比如创建通知任务或者自动交易任务),麻烦就来了。
要是你像下面这样写,一次性把所有匹配的用户都抓到内存里,然后再分组处理:
async function processMessageForSubscribers(channelId, channelName, message, addresses) {
try {
// 一次性加载可能高达 10 万的用户数据到内存
const users = await getUsersByTrackedTelegramChannel(channelId);
const CHUNK_SIZE = 500;
const notifyTasks = [];
const autotradeTasks = [];
// 在内存中将用户分块,然后并行处理每个块
const processUserChunk = async (userChunk) => {
await Promise.all(
userChunk.map(async (user) => {
const config = user.trackingConfig[channelId];
const autotradeAmount = config?.autotradeAmount;
if (config.newPost === 'NOTIFY') {
// 创建通知任务
createNotificationTask(user, addresses, message, channelId, channelName, autotradeAmount, notifyTasks);
}
// 这里可能还有创建 autotrade 任务的逻辑
// if (config.autoTrade === true && autotradeAmount > 0) {
// createAutoTradeTask(user, ..., autotradeTasks);
// }
})
);
};
// 遍历内存中的用户列表,分块处理
for (let i = 0; i < users.length; i += CHUNK_SIZE) {
const chunk = users.slice(i, i + CHUNK_SIZE);
await processUserChunk(chunk);
}
// 将任务推送到队列
await queueTasks(notifyTasks, autotradeTasks);
} catch (error) {
console.error('处理订阅者出错:', error);
throw error; // 向上抛出错误,让调用者知道出问题了
}
}
// 假设的数据库查询函数
async function getUsersByTrackedTelegramChannel(channelId) {
// 这里是伪代码,实际需要使用你的 ORM 或数据库驱动
// const query = 'SELECT * FROM users WHERE trackingConfig->>$1 IS NOT NULL';
// return await db.query(query, [channelId]);
// 注意:这里的实现是问题的根源,它会一次性返回所有匹配的用户
console.log(`模拟查询 channelId 为 ${channelId} 的所有用户...`);
// 实际应用中这里会执行数据库查询
// 为了演示,我们假设返回了一个非常大的数组
// !!!警告:生产环境中不要这样做,这只是为了模拟问题!!!
const users = Array.from({ length: 100000 }, (_, i) => ({
id: i + 1,
name: `User ${i + 1}`,
trackingConfig: {
[channelId]: { newPost: 'NOTIFY', autoTrade: false, autotradeAmount: 0 },
},
}));
console.log(`模拟查询完成,加载了 ${users.length} 个用户到内存`);
return users;
}
// 假设的任务创建和队列函数
async function createNotificationTask(user, addresses, message, channelId, channelName, autotradeAmount, notifyTasks) {
// console.log(`为用户 ${user.id} 创建通知任务`);
notifyTasks.push({ userId: user.id, message: `New post in ${channelName}` });
}
async function queueTasks(notifyTasks, autotradeTasks) {
console.log(`将 ${notifyTasks.length} 个通知任务和 ${autotradeTasks.length} 个自动交易任务加入队列`);
// 实际的队列逻辑,例如推送到 Redis, RabbitMQ 等
}
这么搞,十万用户数据瞬间塞满内存,应用分分钟可能因为内存不足(OOM)挂掉,或者就算没挂,频繁的垃圾回收(GC)也会让应用性能急剧下降,卡得不行。
那咋办?有没有什么好法子,能在 Node.js 里处理这么多数据又不让内存爆炸?
刨根问底:为啥会内存爆炸?
简单说,Node.js(或者说 V8 引擎)能使用的内存不是无限的。虽然可以调整,但总有个上限。当你执行 await getUsersByTrackedTelegramChannel(channelId)
时,数据库驱动(比如 node-postgres
)会兢兢业业地把 PostgreSQL 返回的所有匹配行都读出来,然后在 Node.js 这边转换成 JavaScript 对象,最后组成一个巨大的数组 users
存在内存里。
数据库一股脑把所有用户数据全发给 Node.js。Node.js 呢?照单全收,塞进内存。几十万用户,每个用户再带点配置信息,内存噌噌往上涨,不炸才怪!即便你后面用了 CHUNK_SIZE
来分块处理,那也只是处理逻辑上的分块,数据本身早就在内存里了,治标不治本。
几招搞定:告别内存焦虑
要解决这个问题,核心思路就是:别一次把所有数据都读进内存 。让数据细水长流,用多少,取多少。具体来说,有下面几种常见的靠谱做法:
- 数据库游标(Cursor)与流式处理(Streaming): 这是最优雅、内存效率最高的方式。
- 基于 OFFSET/LIMIT 的分页查询: 实现简单,但数据量特别大时可能有性能坑。
- 基于 Keyset 的分页查询(也叫游标分页): 性能比 OFFSET/LIMIT 好,实现稍微复杂点。
下面咱们挨个唠唠。
方案一:拥抱流式处理 (Stream/Cursor) - 王道!
原理
数据库游标(Cursor)允许你打开一个指向查询结果集的指针,然后一次只从这个结果集里取一行(或一小批)数据,而不是一次把整个结果集都加载过来。
Node.js 里,很多数据库驱动都把这个机制封装成了 Stream(流)。你可以像处理文件流或者网络流一样,监听 'data'
事件来处理每一条数据,处理完了再去取下一条。这样一来,无论结果集有多大,Node.js 应用在同一时间点只需要在内存中保留正在处理的那一小部分数据,内存占用极低。
node-postgres
(pg) 库配合 pg-query-stream
这个包就能轻松实现流式查询。
实战
先把 pg-query-stream
装上:
npm install pg-query-stream
# 或者
yarn add pg-query-stream
然后改造咱们之前的 processMessageForSubscribers
函数:
const QueryStream = require('pg-query-stream');
const { Pool } = require('pg'); // 假设你用的是 pg Pool
// 假设你已经配置好了 pg Pool
// const pool = new Pool({ connectionString: 'your_db_connection_string' });
async function processMessageForSubscribersWithStream(channelId, channelName, message, addresses) {
const pool = new Pool({ connectionString: 'postgresql://user:password@host:port/database' }); // 替换成你的连接配置
const client = await pool.connect();
const batchSize = 500; // 可以调整每次批量处理的大小
let userBatch = [];
let notifyTasks = [];
let autotradeTasks = [];
// 注意:这里的 SQL 查询需要根据你的实际表结构调整
// 使用 JSONB 操作符 ->> 来提取文本值进行比较
const queryText = 'SELECT id, trackingConfig FROM users WHERE trackingConfig->>$1 IS NOT NULL';
const queryParams = [channelId]; // 使用参数化查询防止 SQL 注入
const query = new QueryStream(queryText, queryParams);
try {
const stream = client.query(query); // 获取查询流
// 监听 'data' 事件,每来一条用户数据就处理
stream.on('data', (user) => {
// 这里拿到的 user 就是数据库里的一行数据
// user 对象可能类似 { id: 123, trackingconfig: { 'channel123': {...} } }
// 注意字段名可能因数据库驱动或查询方式而小写
const config = user.trackingconfig ? user.trackingconfig[channelId] : undefined; // 注意字段名小写
const autotradeAmount = config?.autotradeAmount;
if (config && config.newPost === 'NOTIFY') {
// 先收集,达到 batchSize 后统一处理
userBatch.push(user);
if (userBatch.length >= batchSize) {
// 拷贝当前批次,清空 userBatch 以收集下一批
const currentBatch = [...userBatch];
userBatch = [];
// 异步处理这一批,但不阻塞流的读取(fire and forget 或用 Promise.all 管理并发)
processUserBatch(currentBatch, channelId, channelName, message, addresses, notifyTasks, autotradeTasks)
.catch(err => console.error('处理批次时出错:', err)); // 单独捕获批次处理错误
}
}
});
// 监听 'end' 事件,表示所有数据都接收完了
await new Promise((resolve, reject) => {
stream.on('end', () => {
// 处理最后一批不足 batchSize 的用户
if (userBatch.length > 0) {
processUserBatch(userBatch, channelId, channelName, message, addresses, notifyTasks, autotradeTasks)
.then(resolve) // 确保最后一批处理完再结束
.catch(reject);
} else {
resolve();
}
});
// 监听 'error' 事件,处理流本身可能发生的错误
stream.on('error', (err) => {
console.error('数据库流查询出错:', err);
reject(err);
});
});
// 等待所有可能在后台运行的 processUserBatch 完成
// 注意:上面 processUserBatch 是异步启动的,这里需要一种机制等待它们全部完成
// 一个简单的(但不完全精确)方法是稍作等待,或者使用更复杂的 Promise 管理
// 为了简化示例,假设 queueTasks 能处理异步添加的任务,或者 processUserBatch 直接推送到外部队列
await new Promise(resolve => setTimeout(resolve, 2000)); // !!这是一个简化的等待,生产环境需要更可靠的机制
await queueTasks(notifyTasks, autotradeTasks); // 将收集到的任务入队
} catch (error) {
console.error('处理订阅者流式查询时出错:', error);
throw error; // 向上抛出
} finally {
client.release(); // 别忘了释放数据库连接
await pool.end(); // 如果是一次性脚本,记得关闭连接池
}
}
// 辅助函数:处理一个批次的用户
async function processUserBatch(userBatch, channelId, channelName, message, addresses, notifyTasks, autotradeTasks) {
// console.log(`处理 ${userBatch.length} 个用户批次...`);
// 注意:这里可以像原先一样用 Promise.all 并行处理批次内的用户
await Promise.all(
userBatch.map(async (user) => {
// 注意,从 stream 过来的数据字段名可能是小写
const config = user.trackingconfig ? user.trackingconfig[channelId] : undefined;
const autotradeAmount = config?.autotradeAmount;
if (config && config.newPost === 'NOTIFY') {
// 直接修改传入的数组(或返回新任务让调用者合并)
createNotificationTask(user, addresses, message, channelId, channelName, autotradeAmount, notifyTasks);
}
// ... 其他任务处理 ...
})
);
}
// 其他辅助函数保持不变...
这样改动后,Node.js 不再需要一次性把所有用户数据加载到内存。数据像水流一样,一条条过来,处理完就扔掉(等待 GC),内存占用稳定且很低。
进阶使用技巧
- 背压处理(Backpressure): 如果你的处理逻辑(比如
createNotificationTask
里的操作)比数据流过来的速度慢,内存还是可能会慢慢堆积(虽然是处理中的数据,不是整个结果集)。pg-query-stream
遵循 Node.js Stream 的标准接口,你可以通过stream.pause()
和stream.resume()
来控制数据流的速度,实现背压管理。例如,当积压的任务达到一定数量时暂停流,处理完成后再恢复。 - 错误处理: 流处理中的错误需要仔细考虑。
stream.on('error', ...)
捕获流本身的错误(比如数据库连接断开)。在stream.on('data', ...)
里的处理逻辑如果出错,需要用try...catch
包裹,或者在异步操作(如processUserBatch
)的catch
中处理,防止单个用户的处理失败导致整个流中断。 - 事务: 如果你的处理逻辑需要更新数据库,并且要求整个大批量操作要么全成功要么全失败,流式处理不太适合直接包裹在一个大事务里(因为事务会持有锁,长时间运行的流处理可能导致锁争用或超时)。你可能需要考虑将处理逻辑设计成幂等的,或者分批次进行事务提交。对于只读然后创建外部任务(如推送到消息队列)的场景,一般不需要事务。
安全建议
- 参数化查询: 看到代码里的
queryParams
了吗?这是必须的!永远使用参数化查询来防止 SQL 注入攻击。别自己手动拼 SQL 字符串。
方案二:分页查询 - OFFSET/LIMIT
原理
这是最直观的分页方法。通过 SQL 的 LIMIT
指定每页拿多少条数据,OFFSET
指定从哪里开始拿。
比如,LIMIT 500 OFFSET 0
拿前 500 条,LIMIT 500 OFFSET 500
拿第 501 到 1000 条,以此类推。
实战
你需要写一个循环,每次查询一页数据,处理完再查下一页,直到所有数据处理完毕。
async function processMessageForSubscribersWithOffsetLimit(channelId, channelName, message, addresses) {
const pool = new Pool({ connectionString: 'postgresql://user:password@host:port/database' }); // 替换配置
const CHUNK_SIZE = 500; // 每页大小
let offset = 0;
let keepFetching = true;
let notifyTasks = [];
let autotradeTasks = [];
try {
while (keepFetching) {
const client = await pool.connect();
try {
// 使用 LIMIT 和 OFFSET 进行分页查询
const queryText = `
SELECT id, trackingConfig
FROM users
WHERE trackingConfig->>$1 IS NOT NULL
ORDER BY id -- OFFSET/LIMIT 需要配合 ORDER BY 保证分页稳定
LIMIT $2 OFFSET $3
`;
const queryParams = [channelId, CHUNK_SIZE, offset];
const { rows: userChunk } = await client.query(queryText, queryParams);
if (userChunk.length > 0) {
console.log(`获取到 ${userChunk.length} 条用户数据,偏移量: ${offset}`);
// 处理当前页的数据(可以并行处理)
await Promise.all(
userChunk.map(async (user) => {
const config = user.trackingconfig ? user.trackingconfig[channelId] : undefined;
const autotradeAmount = config?.autotradeAmount;
if (config && config.newPost === 'NOTIFY') {
createNotificationTask(user, addresses, message, channelId, channelName, autotradeAmount, notifyTasks);
}
// ... 其他任务处理 ...
})
);
// 更新 offset,准备获取下一页
offset += userChunk.length;
// 如果返回的数据量小于请求的 CHUNK_SIZE,说明已经是最后一页了
if (userChunk.length < CHUNK_SIZE) {
keepFetching = false;
}
} else {
// 没有数据返回了,结束循环
keepFetching = false;
}
} finally {
client.release(); // 每次查询后释放连接
}
}
await queueTasks(notifyTasks, autotradeTasks); // 所有分页处理完后,任务入队
} catch (error) {
console.error('处理订阅者分页查询(OFFSET/LIMIT)时出错:', error);
throw error;
} finally {
await pool.end(); // 结束时关闭连接池
}
}
缺点与注意
- 性能问题:
OFFSET
越大,查询性能通常越差。因为数据库需要扫描并跳过OFFSET
指定数量的行才能找到你想要的那一页数据。当OFFSET
达到几万甚至几十万时,查询可能变得非常慢。 - 数据一致性: 如果在分页查询的过程中,有新的符合条件的数据插入或者旧数据被修改/删除,可能会导致某些数据被重复处理或遗漏。使用稳定的
ORDER BY
(比如按主键id
排序) 可以缓解一部分问题,但不能完全避免。 - 需要
ORDER BY
: 为了保证分页的顺序稳定,必须配合ORDER BY
子句,最好是按唯一键(如id
)排序。
安全建议
- 同样,必须使用参数化查询 防止 SQL 注入。
方案三:分页查询 - Keyset Pagination (游标分页)
原理
Keyset Pagination (有时也叫 Cursor-based Pagination,但这里的 Cursor 不是指数据库服务器游标,而是指上一页最后一条记录的标识) 是一种更高效的分页方式,它避免了 OFFSET
的性能陷阱。
它的思路是:你记住上一页最后一条记录的排序键(比如 id
),然后在下一页查询时,使用 WHERE
条件来获取排序键大于(或小于,取决于排序方向)该值的数据。
例如,如果按 id
升序排列,第一页查 SELECT ... WHERE trackingConfig->>'...' IS NOT NULL ORDER BY id LIMIT 500
,得到最后一条记录 id
是 1234
。下一页就查 SELECT ... WHERE trackingConfig->>'...' IS NOT NULL AND id > 1234 ORDER BY id LIMIT 500
。
实战
async function processMessageForSubscribersWithKeyset(channelId, channelName, message, addresses) {
const pool = new Pool({ connectionString: 'postgresql://user:password@host:port/database' }); // 替换配置
const PAGE_SIZE = 500; // 每页大小
let lastId = 0; // 假设 id 是数字类型且从大于 0 开始
let keepFetching = true;
let notifyTasks = [];
let autotradeTasks = [];
try {
while (keepFetching) {
const client = await pool.connect();
try {
// 使用 Keyset Pagination 进行分页查询
// 确保有一个可靠的、有序的列(通常是主键 id)
const queryText = `
SELECT id, trackingConfig
FROM users
WHERE trackingConfig->>$1 IS NOT NULL AND id > $2
ORDER BY id ASC -- 必须按 id 排序
LIMIT $3
`;
const queryParams = [channelId, lastId, PAGE_SIZE];
const { rows: userChunk } = await client.query(queryText, queryParams);
if (userChunk.length > 0) {
console.log(`获取到 ${userChunk.length} 条用户数据,上一页最后 ID: ${lastId}`);
// 处理当前页的数据(可以并行处理)
await Promise.all(
userChunk.map(async (user) => {
const config = user.trackingconfig ? user.trackingconfig[channelId] : undefined;
const autotradeAmount = config?.autotradeAmount;
if (config && config.newPost === 'NOTIFY') {
createNotificationTask(user, addresses, message, channelId, channelName, autotradeAmount, notifyTasks);
}
// ... 其他任务处理 ...
})
);
// 更新 lastId 为当前页的最后一条记录的 id
lastId = userChunk[userChunk.length - 1].id;
// 如果返回的数据量小于请求的 PAGE_SIZE,说明已经是最后一页了
// Keyset 分页也可以通过判断返回数量是否小于页面大小来决定是否结束
if (userChunk.length < PAGE_SIZE) {
keepFetching = false;
}
} else {
// 没有更多数据了,结束循环
keepFetching = false;
}
} finally {
client.release(); // 释放连接
}
}
await queueTasks(notifyTasks, autotradeTasks); // 任务入队
} catch (error) {
console.error('处理订阅者分页查询(Keyset)时出错:', error);
throw error;
} finally {
await pool.end(); // 关闭连接池
}
}
优点
- 性能好: 查询性能通常比较稳定,不会像
OFFSET
那样随着页码增大而显著下降。数据库可以高效地利用索引直接定位到id > lastId
的位置开始查找。 - 数据一致性相对较好: 对于新增数据,只要它们落在当前查询范围之后,就不会影响分页结果。但如果在分页过程中修改了排序键或者删除了数据,仍可能出现跳过或重复(尽管比
OFFSET
好些)。
前提条件
- 需要一个(或一组)稳定且带有索引的排序字段,通常是主键
id
或created_at
+id
(如果created_at
可能重复)。查询语句需要包含这个字段的WHERE
条件和ORDER BY
。
安全建议
- 参数化查询!参数化查询!参数化查询!重要的事情说三遍。
选哪个?看场景!
好了,咱们介绍了三种处理大数据的姿势:
-
流式处理 (Stream/Cursor):
- 优点: 内存占用最低,实时性好(数据一来就开始处理),适合需要顺序处理所有匹配数据的场景。
- 缺点: 实现稍微复杂一点(需要处理流事件和背压),不太适合需要随机访问数据的场景。
- 推荐场景: 像你的这种需要遍历处理大量匹配记录的后台任务,流式处理是首选。
-
OFFSET/LIMIT 分页:
- 优点: 实现简单,直观易懂。
- 缺点: 数据量大时性能差,可能存在数据不一致问题。
- 推荐场景: 数据量不大(比如几千几万条),或者对性能要求不高,或者只是做个简单的后台管理界面的分页展示。对于几十万级别的数据处理任务,慎用。
-
Keyset Pagination:
- 优点: 性能比 OFFSET/LIMIT 好得多,相对稳定。
- 缺点: 实现比 OFFSET/LIMIT 稍复杂,需要有合适的排序键。
- 推荐场景: 需要分页加载大量数据,并且性能是重要考量。比如实现高性能的“加载更多”功能,或者作为流式处理不可行时的替代方案。
对于你的问题——处理十万级别的用户数据来创建任务——强烈推荐使用方案一:流式处理 (Stream/Cursor) 。它是解决这类问题最地道、内存效率最高的方法。如果因为某些原因(比如老旧的库不支持流或者特殊业务逻辑限制)不能用流,那么 Keyset Pagination 是次优选择。尽量避免在大数据集上使用 OFFSET/LIMIT。