返回

Python高效处理大型文件:缓冲写入与生成器实践

python

高效 Python 处理大型文件

大型文件处理是编程中常见的挑战,尤其是当内存资源受限时。 直接读写可能导致程序运行缓慢,甚至内存溢出。 本文探讨如何高效地在 Python 中处理大型文件的写入操作, 重点关注逐行处理并批量写入数据,以此避免内存瓶颈并加快写入速度。

逐行处理与直接写入的性能问题

从示例来看,你希望将大型 XML 文件转换成 JSON Lines 格式。 最直接的办法是逐行读取 XML 文件,将其转换成 JSON 对象,然后立即写入目标 JSON Lines 文件。 初看上去代码简洁,但直接写入的效率会很低。 这是因为每次 op.write() 操作都需要执行磁盘写入,这种频繁的 I/O 操作会成为性能瓶颈,特别是处理大规模数据时。 如同每一次写一句话就去找打印机打印一样,浪费了很多时间在寻址和文件流的处理上,真正写内容的时间占比反而很低。

with open('input.xml', 'r') as ip, open('output.jsonl', 'w') as op:
    for line in ip:
        op.write(process_line(line))

利用缓冲区提升写入效率

一个可行的优化方法是利用缓冲区。 系统层面上,数据写入硬盘其实并不是逐字节进行,而是一批批的进行。因此,我们可以把需要写入的内容,暂存在内存的一个列表中,累积到一定程度时再一次性写入硬盘,这样可以大幅减少磁盘 I/O 次数,显著提升效率。 可以通过io.BufferedWriter手动设置缓存区大小,也可以使用内置的缓冲文件写入操作。

以下展示了 io.BufferedWriter 的使用:

import io

BUFFER_SIZE = 65536  # 设置缓冲区大小,单位:字节

with open('input.xml', 'r') as ip, open('output.jsonl', 'wb') as op:
    buffer = io.BufferedWriter(op, buffer_size=BUFFER_SIZE)
    for line in ip:
        json_line = process_line(line).encode('utf-8') # 将处理后的字符串转为bytes对象
        buffer.write(json_line)
    buffer.flush()  # 确保所有数据都写入磁盘
  • 操作步骤
    1. 打开源 XML 文件和目标 JSONL 文件。
    2. 使用 io.BufferedWriter 创建一个缓冲写入器,并设置合适的缓冲区大小。 缓冲区大小需要依据具体情况来调整,一般情况下 65536 (64KB) 及其倍数是个不错的起点。
    3. 逐行读取 XML 文件,通过process_line()对每一行进行处理后,把结果写入缓冲区。 因为是二进制写入,记得encode一下。
    4. 最后使用flush() 强制将缓冲区中剩余的数据写入磁盘。

  • 原理与作用: io.BufferedWriter 会将数据缓存在内存中,直到缓存满或者遇到 flush() 调用时才进行实际的写入操作。 这大大减少了对磁盘的直接访问,从而提高了效率。

  • 额外安全建议

    • 编码: 为了兼容性, 通常推荐将字符串转为UTF-8的 bytes 类型进行存储。
    • 缓冲区大小:过小的缓冲区会导致频繁写入,失去缓存的意义;过大的缓冲区则可能占用过多的内存。

利用 yield 生成器处理大型数据集

除利用缓冲写入以外,利用生成器(generator)可以减少数据处理的内存消耗。 在生成器函数中,我们可以使用 yield 按需产生数据。 生成器函数配合批量写入, 可以在保持处理大型文件的同时, 避免全部加载数据到内存中。

import json

def process_xml_lines(ip_file):
    for line in ip_file:
       #  处理 XML 并转为 JSON 格式
        json_obj = json.dumps(process_line(line)) + '\n'
        yield json_obj

def write_jsonl(input_path,output_path, batch_size=1000):
    with open(input_path, 'r') as ip,open(output_path, 'w') as op:
       line_generator = process_xml_lines(ip)
       batch_lines = []
       for line in line_generator:
        batch_lines.append(line)
        if len(batch_lines) >= batch_size:
            op.writelines(batch_lines)
            batch_lines=[]

       if batch_lines: #写入剩余批次
         op.writelines(batch_lines)

input_file = 'input.xml'
output_file = 'output.jsonl'

write_jsonl(input_file,output_file, batch_size=10000) # 可以根据机器性能调整 batch size 大小

  • 操作步骤

    1. 创建一个生成器函数process_xml_lines 用于读取文件中的行,并生成处理后的JSON对象字符串。
    2. 定义写入函数 write_jsonl, 接收输入文件,输出文件和批次大小参数。函数会使用process_xml_lines生成器读取数据,将处理好的结果分批存入列表, 达到指定批次大小后一次性写入硬盘。
    3. if batch_lines: 确保处理完最后不到一批的数据写入文件。
    4. 调用write_jsonl 传入相应参数启动转换程序
  • 原理与作用

  • 使用 yield 返回 JSON 数据,生成器可以一次处理一个 JSON 对象, 而不是一次将整个数据集加载到内存中,从而节约内存,同时也可以避免内存溢出。

  • 批次写入将多个写入操作组合在一起,减少了实际的I/O次数, 加快了整体写入的速度。

  • 额外安全建议

    • 错误处理: 处理异常可能出现的问题,例如 XML 解析错误、文件读写错误等。使用try ... except语句捕获这些错误,保证程序稳定。
    • 批次大小:调整合适的batch_size, 在性能和内存消耗间取得平衡。

    这些技巧,可以显著提高 Python 处理大型文件的效率, 结合具体场景进行合适的选择和调整。