返回

fs.writeFile异步循环写JSON错误?揭秘并发写入陷阱

javascript

好的,这是您要求的技术博客文章:

揪出 fs.writeFile 捣乱 JSON 的幕后黑手:异步循环中的写入陷阱

搞 Node.js 开发,跟文件打交道是家常便饭,特别是读写 JSON 配置文件或者状态文件。但有时候,一个看似简单的 fs.writeFile 操作,尤其是在异步循环里,可能会给你带来意想不到的“惊喜”——比如文件末尾多了些奇奇怪怪的括号 }} 或者数字 }32}}

遇到这种情况,你可能会像下面这位朋友一样,挠破头皮,尝试各种“复杂”的操作,结果问题依旧:

遇到的麻烦:

想在一个异步循环里,每次循环都更新一个 JSON 文件。文件内容是一个键值对对象,每次只更新整个文件(用包含一个新对象的数据)。

原始代码(经过一番折腾后变得有些复杂):

async function saveJsonFile(data, fileName = 'myFile') {
  try {
    const jsonData = JSON.stringify(data);

    // 尝试清缓存
    delete require.cache[require.resolve(`./${fileName}.json`)];

    // 尝试确保文件存在并清空内容?(这步操作很迷)
    await fs.readFile(`${fileName}.json`, 'utf8', (err) => {
      if (err) { /* 忽略错误? */ }
      else {
        // 清空文件
        fs.writeFile(`${fileName}.json`, "", "utf8", (err) => {
          if (err) { /* 继续忽略? */ }
          else {
            // 保存 JSON 数据 << 最开始可能只有这里
            fs.writeFile(`${fileName}.json`, jsonData, 'utf8', (err) => {
              if (err) { /* ... */ }
            });

            // 又尝试保存了一次?
            fs.writeFile(`${fileName}.json`, jsonData, 'utf8', (err) => {
              if (err) { /* ... */ }
            });
          }
        });
      }
    });
  } catch(e) {
    // try/catch 包裹了异步回调,可能无法捕捉回调内的错误
  }
}

// 在另一个 async 函数中调用
async function runTrading() {
  try {
    const data = [/* 一些交易数据 */];
    let lastBuyPrice = {}; // 假设从别处加载或初始化
    for (let i = 1; i <= data.length; i++) {
      // ... 一些逻辑计算 currentPrice 和 symbol
      let symbol = `prod${i}`;
      let currentPrice = Math.random() * 10000; // 示例价格
      lastBuyPrice[symbol] = currentPrice;
      console.log(`准备保存: ${JSON.stringify(lastBuyPrice)}`); // 日志看起来没问题
      await saveJsonFile(lastBuyPrice, 'lastPrices'); // 每次循环都保存
    }
  } catch (e) {
    console.error("运行出错:", e);
  }
}

// runTrading(); // 假设调用

结果得到的 lastPrices.json 文件内容却可能是:

{"prod1":32154.22}}

或者:

{"prod1":32154.22,"prod2":0,"prod3":0}32}}

明明 JSON.stringify 之后、写入之前打印日志看数据都是好的,为啥写到文件里就“画蛇添足”了呢?

问题根源在哪?刨根问底!

这个问题的核心,往往藏在 异步操作回调地狱(或者说,不恰当的异步处理方式) 的迷雾里。

咱们来仔细看看上面那段 saveJsonFile 代码:

  1. async/await 和回调混用: 函数体标记了 async,调用时用了 await,这本身没问题。但函数内部的核心逻辑——文件读写,却用了老式的回调风格 fs.readFilefs.writeFile
  2. await 等了个寂寞: await fs.readFile(...) 这一行,await 实际上只是等待 fs.readFile 这个 发起 读取操作的函数返回。因为 fs.readFile 是非阻塞的,它会立刻返回(或者说,几乎立刻),然后 Node.js 在后台处理文件读取,完成后再调用你提供的回调函数。await 并不会神奇地等待那个回调函数执行完毕!它等的是 fs.readFile 这个动作本身完成(启动异步操作)。同理,嵌套在里面的 fs.writeFile 也是一样。
  3. 提前返回的 saveJsonFile 由于 await 没有真正等待文件操作的回调执行完,saveJsonFile 函数会在文件可能还在被读取或写入的时候就 resolve 了(或者在出错前就执行完了 try 块)。调用它的 runTrading 函数里的 await saveJsonFile(...) 就以为“保存操作已完成”,然后立刻进入下一次循环。
  4. 并发写入大乱斗(Race Condition): 麻烦来了!runTrading 的循环跑得飞快,每次循环都调用 saveJsonFile。因为 saveJsonFile 返回得太快,下一次循环的 saveJsonFile 调用很可能在前一次调用的文件操作(特别是写入操作)还没完成时就开始执行了。想象一下,多个“工人”(异步回调)同时冲向同一个文件,手里拿着不同版本的数据(jsonData),都想把文件内容改成自己手里的样子。它们的操作会互相干扰、互相覆盖,导致最终文件内容被打乱,出现截断、混合、重复写入部分内容等情况。上面看到的 }}}32}} 就是这种混乱场面的“遗迹”。那个清空文件的 fs.writeFile("", ...) 操作更是加剧了这种混乱。
  5. 不必要的复杂性: 代码里又是清缓存,又是读文件再写空,又是写两次,这些都让问题更难定位,也增加了出错的概率。delete require.cache 对解决写入问题通常没什么帮助。

说白了,坑就在于 async/await 没有和基于回调的 fs 方法正确配合,导致对同一文件的并发写操作失去了控制,引发了竞态条件(Race Condition)

解决方案:让异步写入回归正轨

知道了问题所在,解决起来就思路清晰了。核心思想是:确保每次文件写入操作是原子性的、有序的,或者干脆减少不必要的写入次数。

方案一:拥抱 Promises,fs.promises 大显身手 (推荐)

Node.js 从 v10 开始内置了 fs.promises API,提供了基于 Promise 的文件系统操作方法。这是处理异步文件操作的现代化、更简洁也更不容易出错的方式。用它来改写 saveJsonFile

原理:
fs.promises 里的方法(如 writeFile)返回的是 Promise。当你对一个 Promise 使用 await 时,代码会 真正暂停 在那里,直到这个 Promise 被 resolve(操作成功完成)或 reject(操作失败)。这就完美解决了之前 await 等了个寂寞的问题,保证了每次写入操作完成后,循环才会继续。

改造后的 saveJsonFile

const fs = require('fs').promises; // 注意引入的是 .promises

async function saveJsonFileModern(data, fileName = 'myFile.json') { // 加个 .json 后缀更规范
  try {
    const jsonData = JSON.stringify(data, null, 2); // 添加格式化,方便阅读
    // 直接写入,覆盖旧内容。原子性由底层保证(通常情况下)
    await fs.writeFile(fileName, jsonData, 'utf8');
    // console.log(`文件 ${fileName} 保存成功。`); // 可以按需加日志
  } catch (err) {
    console.error(`写入文件 ${fileName} 出错:`, err);
    // 这里可以抛出错误,让调用者知道失败了
    throw err;
  }
}

// 调用方的 runTrading 函数保持不变,只需将调用改为:
// await saveJsonFileModern(lastBuyPrice, 'lastPrices.json');

优点:

  • 代码简洁清晰,移除了所有回调嵌套。
  • 逻辑正确,await 确实会等待写入完成。
  • 错误处理更符合 async/await 的模式(用 try...catch 捕获)。

安全建议:
虽然跟写入内容损坏关系不大,但实际应用中要注意文件路径的校验,防止路径遍历攻击。确保 fileName 不包含 ../ 等恶意字符,或者限定在一个安全的目录下。

方案二:加把锁!用队列或锁控制写入顺序

如果你确实需要在循环中频繁写入,并且不能丢失任何一次中间状态(比如,每次写入都是关键日志),但又担心并发问题,可以引入“锁”或“队列”机制。

原理:
确保同一时间只有一个“写文件”的任务在执行。后续的写入请求需要排队等待,直到当前的写入完成。

简单实现(基于 Promise 的信号量/锁):

const fs = require('fs').promises;

let isWriting = false; // 一个简单的锁标记
let writeQueue = Promise.resolve(); // 一个 Promise 链,用于串行化任务

async function saveJsonFileWithLock(data, fileName = 'myFile.json') {
  // 将写操作加入队列
  writeQueue = writeQueue.then(async () => {
    // 这里可以再加锁检查,但 Promise 链本身已保证串行
    // if (isWriting) { console.warn('逻辑错误:锁已被占用?'); return; }
    // isWriting = true; // 标记正在写入 (在 Promise 链模式下非必需)

    try {
      const jsonData = JSON.stringify(data, null, 2);
      await fs.writeFile(fileName, jsonData, 'utf8');
      // console.log(`文件 ${fileName} (队列) 保存成功。`);
    } catch (err) {
      console.error(`(队列) 写入文件 ${fileName} 出错:`, err);
      throw err; // 抛出错误,让 Promise 链断裂或被捕获
    } finally {
      // isWriting = false; // 释放锁 (在 Promise 链模式下非必需)
    }
  }).catch(err => {
    // 捕获链上的错误,防止未处理的 Promise rejection
    console.error("写入队列发生错误:", err);
    // 可以选择重置队列状态或采取其他恢复措施
    writeQueue = Promise.resolve(); // 重置队列,避免后续任务因错误阻塞
  });

  // 返回当前的队列尾部 Promise,以便外部可以等待这个特定的写操作完成(如果需要)
  return writeQueue;
}

// 调用方 runTrading 函数保持不变:
// await saveJsonFileWithLock(lastBuyPrice, 'lastPrices.json');
// 注意:虽然函数内部是串行的,但 runTrading 中的 await 仍然会等待这个特定的写入加入队列并(最终)完成。

进阶使用技巧:

  • 对于更复杂的场景,可以使用成熟的异步控制流库,如 async 库的 queuep-queue 库。它们提供了更强大的功能,比如设置并发数限制(如果你想允许一定程度的并行,但不是无限并发)、任务优先级等。
  • 使用 async.queue:
// 需要安装 async: npm install async
const async = require('async');
const fs = require('fs').promises;

// 创建一个每次只处理一个任务的队列
const saveQueue = async.queue(async (task, callback) => {
  try {
    const jsonData = JSON.stringify(task.data, null, 2);
    await fs.writeFile(task.fileName, jsonData, 'utf8');
    // console.log(`文件 ${task.fileName} (async.queue) 保存成功。`);
    callback(); // 告诉队列这个任务完成了
  } catch (err) {
    console.error(`(async.queue) 写入文件 ${task.fileName} 出错:`, err);
    callback(err); // 传递错误给队列
  }
}, 1); // 并发数为 1,保证串行

// 可选:处理队列排空事件
saveQueue.drain(() => {
  console.log('所有保存任务已完成。');
});

// 可选:处理任务错误
saveQueue.error((err, task) => {
  console.error(`任务 ${JSON.stringify(task)} 处理失败:`, err);
});

// 修改后的 saveJsonFile 函数
async function saveJsonFileAsyncQueue(data, fileName = 'myFile.json') {
  return new Promise((resolve, reject) => {
    saveQueue.push({ data, fileName }, (err) => { // 将任务推入队列
      if (err) {
        reject(err); // 如果任务失败,reject Promise
      } else {
        resolve(); // 如果任务成功,resolve Promise
      }
    });
  });
}

// 调用方 runTrading 不变:
// await saveJsonFileAsyncQueue(lastBuyPrice, 'lastPrices.json');

优点:

  • 严格保证了写入顺序,避免了竞态条件。
  • 适用于必须保留每次写入结果的场景。

缺点:

  • 实现相对复杂一些。
  • 如果写入非常频繁,后面的操作需要等待,可能会影响实时性(但这正是保证数据一致性的代价)。

方案三:优化策略,减少写入次数

退一步想,真的需要在每次循环迭代时都把数据写入磁盘吗?磁盘 I/O 相对较慢,频繁写入会影响性能。

原理:
如果业务逻辑允许,只在必要的时候写入。比如,在整个循环结束后,或者采用防抖(debounce)/节流(throttle)策略。

1. 循环结束后一次性写入:

如果只需要最终的 lastBuyPrice 状态,完全可以在循环结束后再保存。

改造 runTrading

// 假设 saveJsonFileModern 或 saveJsonFileWithLock 存在且功能正确

async function runTradingOptimized() {
  try {
    const data = [/* ... */];
    let lastBuyPrice = {}; // 初始化

    for (let i = 1; i <= data.length; i++) {
      let symbol = `prod${i}`;
      let currentPrice = Math.random() * 10000;
      lastBuyPrice[symbol] = currentPrice;
      // 循环内不再保存!只在内存中更新对象
      console.log(`更新内存中的价格: ${symbol} = ${currentPrice}`);
    }

    // 循环结束后,执行一次保存操作
    console.log(`循环结束,准备保存最终结果: ${JSON.stringify(lastBuyPrice, null, 2)}`);
    await saveJsonFileModern(lastBuyPrice, 'lastPrices-final.json'); // 或者用带锁的版本
    console.log('最终价格已保存到文件。');

  } catch (e) {
    console.error("优化运行出错:", e);
  }
}

// runTradingOptimized(); // 调用优化后的版本

优点:

  • 性能最高,极大减少了 I/O 操作。
  • 代码逻辑简单。
  • 自然避免了并发写入问题。

缺点:

  • 如果在循环过程中程序意外崩溃,内存中的 lastBuyPrice 更新会丢失,文件里保存的是上次成功运行的最终结果(或者初始状态)。不适用于需要过程持久化的场景。

2. 防抖(Debounce)或节流(Throttle):

如果既想减少写入次数,又想在操作间歇期保存一下进度,可以用防抖或节流。

  • 防抖 (Debounce): 在连续触发写入操作时,只有当触发停止一段时间(例如 500ms)后,才执行一次真正的写入。如果在这段时间内又有新的写入请求,则重新计时。适用于只需要最终状态,但希望在“平静”下来后自动保存的场景。
  • 节流 (Throttle): 保证在一个固定时间段内(例如 1 秒),最多只执行一次写入操作。期间收到的多次请求只认第一次(或最后一次,取决于实现)并执行,忽略其他。适用于需要限制写入频率,但仍想大致反映变化的场景。

可以使用 lodash 库的 debouncethrottle 方法,或者自己实现一个简单的版本。

代码示例(概念性,使用 lodash.debounce):

// 需要安装 lodash: npm install lodash
const _ = require('lodash');
const fs = require('fs').promises;

// 创建一个防抖函数,延迟 500ms 执行写入
const debouncedSave = _.debounce(async (data, fileName) => {
  try {
    const jsonData = JSON.stringify(data, null, 2);
    await fs.writeFile(fileName, jsonData, 'utf8');
    console.log(`文件 ${fileName} (防抖后) 保存成功。`);
  } catch (err) {
    console.error(`(防抖) 写入文件 ${fileName} 出错:`, err);
  }
}, 500); // 500毫秒延迟

async function runTradingWithDebounce() {
  try {
    const data = [/* ... */];
    let lastBuyPrice = {};

    for (let i = 1; i <= data.length; i++) {
      let symbol = `prod${i}`;
      let currentPrice = Math.random() * 10000;
      lastBuyPrice[symbol] = currentPrice;
      console.log(`准备防抖保存: ${symbol} = ${currentPrice}`);
      // 每次都调用 debouncedSave,但实际写入会被推迟和合并
      // 注意:_.debounce 返回的函数本身不返回 Promise,所以不能直接 await
      // 需要传递一个副本,因为对象是引用传递
      debouncedSave({ ...lastBuyPrice }, 'lastPrices-debounced.json');
    }

    // 循环结束后,可能需要确保最后一次更改被写入
    // 如果 debounce 设置了 leading: false (默认) 和 trailing: true (默认)
    // 它会在最后一次调用后的延迟时间结束时触发。
    // 但如果希望立即看到结果,可以调用 .flush()
    // await debouncedSave.flush(); // 如果 flush 返回 Promise 的话可以 await,取决于库实现

  } catch (e) {
    console.error("带防抖的运行出错:", e);
  }
}

// runTradingWithDebounce();

优点:

  • 平衡了性能和数据更新频率。

缺点:

  • 引入了外部库或需要自己实现,增加了复杂度。
  • 理解防抖/节流的原理和行为很重要。
  • 仍有数据丢失风险(在两次写入间隔期内崩溃)。

选择哪种方案取决于你的具体需求:数据一致性的重要程度、写入频率、性能要求以及代码复杂度的接受程度。对于大多数简单场景,优先推荐使用 fs.promises API (方案一) ,它简洁、符合 Node.js 的现代异步模型,并且能直接解决问题根源。