并发请求处理:乐观锁与数据库锁的实践
2025-01-20 19:39:49
处理并发请求
在软件系统中,并发请求处理是常见且必须面对的问题。尤其是在处理涉及到数据变更的操作时,比如账户余额的变动,不正确的并发控制会导致数据不一致。以下探讨如何处理并发请求以及解决特定场景下的问题。
问题分析
在处理支付系统或涉及用户账户余额操作时,并发请求带来的数据竞争是一个关键问题。 多个并发请求同时读取用户当前的账户余额,并基于这个初始值进行操作(例如,减去提现金额),随后各自进行更新。 由于每个请求基于相同的初始余额进行操作,导致最终余额结果不正确,并且数据同步出现错误。SELECT ... FOR UPDATE
查询可以锁定记录,看似解决了问题,但是由于Prisma
transaction
内的锁与外层锁的机制,实际没有完全锁住行数据,最终问题依旧存在。
解决方案
为了有效应对并发请求,可以采用几种不同的策略。 关键在于保证操作的原子性与隔离性,即同一时间只有一个操作能影响数据。
乐观锁机制
乐观锁通过在更新数据时检查数据的版本号来实现并发控制。每个数据记录都有一个版本号字段。读取数据时,同时读取版本号,更新数据时,将版本号加一。如果更新期间,版本号没有发生变化,则说明没有其他请求更改过此数据,更新操作可以继续。如果版本号发生变化,说明有其他请求更改过此数据,更新操作需要重新读取,重复上述流程。乐观锁适用于并发冲突较少,读取远多于写入的场景。
代码示例 (使用Prisma
)
async initiatePayout(payload: PayoutInitiateRequestDto, req: IRequest): Promise<PayoutResponse> {
let updated = false;
while(!updated) {
const user = await this.prisma.user.findUnique({where: { id: userId }});
if (!user) {
throw new Error('User not found');
}
const availableBalance = user.totalPayout;
const payoutAmount = +payload.amount;
if (availableBalance < payoutAmount) {
throw new Error('Insufficient balance');
}
const apiResponse = await this.externalApi.payout(payload);
if(apiResponse.success) {
try {
await this.prisma.user.update({
where: {
id: userId,
version: user.version,
},
data:{
totalPayout: availableBalance - payoutAmount,
version: user.version + 1,
}
});
updated = true;
} catch(e) {
if(e.code === "P2017"){ //prisma更新失败代码,需要重新读取并更新数据
continue;
} else {
throw e; // 非并发失败,抛出其他异常
}
}
}
return apiResponse;
}
}
- 操作步骤:
- 每次获取用户数据时,包括版本号。
- 发起支付时,
update
语句增加where
字段判断当前版本号与获取数据时版本号是否一致。 - 如果版本号一致,更新成功,同时增加版本号;版本号不一致时,
Prisma
会抛出异常。update
更新失败,回到第一步重新执行,直至更新成功。 - 注意: 需要预先在数据库
User
表添加version
字段,初始值为 0 。
注意: 需要合理控制循环次数和重试逻辑,避免长时间的忙等待。可以在重试多次后抛出异常。
数据库层面的锁
可以利用数据库本身提供的锁机制来确保数据操作的隔离性,使用显式事务加行级锁。数据库事务保证事务内的操作要么全部执行,要么全部不执行,行级锁保证同一时间只有一个事务能修改某一行记录。 MySQL
使用SELECT ... FOR UPDATE
可以实现这一目标,但是必须在事务内执行。
代码示例 (使用Prisma raw query
)
async initiatePayout(payload: PayoutInitiateRequestDto, req: IRequest): Promise<PayoutResponse> {
return await this.prisma.$transaction(async (prisma) => {
// 使用 select for update 获取user, 使用mysql行级锁,同一时间只允许一个线程更新行数据。
const user = await prisma.$queryRaw<User[]>`SELECT * FROM User WHERE id = ${userId} FOR UPDATE;`
if (!user || user.length === 0) {
throw new Error('User not found');
}
const availableBalance = user[0]?.totalPayout;
const payoutAmount = +payload.amount;
if (availableBalance < payoutAmount) {
throw new Error('Insufficient balance');
}
const apiResponse = await this.externalApi.payout(payload);
if (apiResponse.success) {
// Deduct the balance after successful payout
await prisma.user.update({
where: { id: userId },
data: { totalPayout: availableBalance - payoutAmount },
});
}
return apiResponse;
});
}
- 操作步骤:
- 在
Prisma
事务内部, 使用$queryRaw
执行SELECT ... FOR UPDATE
查询,锁定目标用户的记录。 - 检查账户余额,如果余额足够进行支付,调用外部支付API。
- 当外部支付 API 返回成功时,在同一个事务中更新数据库用户账户余额。
Prisma
会自动处理事务的提交或回滚。
- 在
注意:
* SELECT ... FOR UPDATE
可以锁定行数据,需要搭配事务使用,才能保证锁的原子性。
* 在数据库层面使用锁要谨慎,如果锁持有时间过长,可能导致死锁或性能问题。尽可能缩小锁的范围。
分布式锁
在分布式系统中,单机数据库锁不能满足需要,此时需要使用分布式锁。分布式锁可以通过诸如 Redis
、ZooKeeper
等组件实现。基本原理是在外部组件存储锁的标识,需要加锁的时候,判断是否存在此锁。 此锁可以是 Redis setnx
key的操作,亦或者是 Zookeeper
临时顺序节点,通过这些方式保证一个请求同时只有一个能获取到锁。 分布式锁可以解决集群下的并发请求问题,通常适用于对性能要求不高,但数据一致性要求高的场景。
由于文章长度所限,暂不提供详细的分布式锁代码。
安全建议
- 输入验证: 永远要验证所有用户输入,防止非法输入。
- 授权: 确保只有授权用户才能执行相关的操作。
- 事务管理: 合理使用数据库事务来保持数据的一致性,正确处理事务的回滚。
- 错误处理: 对于可能出现的错误,务必进行充分的处理。记录异常信息以便问题诊断。
- 日志记录: 记录所有的相关操作,包括用户的请求信息,数据修改等等,以便日后审计和问题回溯。
在应对并发请求时,选择哪种方案,取决于系统的具体需求。针对上述问题场景,可以考虑采用数据库锁或者乐观锁解决数据竞争问题,对于更高并发场景可以考虑引入分布式锁解决问题。无论选择哪种方式,都需要做好详细的测试,从而确保数据的一致性和可靠性。