返回

加速 Pandas 合并小 CSV:3 种高效方法

python

告别列表追加:优化 Python 小 CSV 文件拼接性能

处理大量小文件时,效率往往是个头疼的问题。你可能遇到过这样的场景:需要读取一堆 CSV 文件,把它们整合成一个大文件或者一个大的数据结构(比如 Pandas DataFrame)做后续分析。如果文件数量巨大,一个常见的做法——先把每个文件读成 DataFrame,然后把这些 DataFrame 添加到一个 Python 列表里,最后用 pd.concat() 合并——可能会变得非常慢,还特别吃内存。

就拿这个例子来说:

# 伪代码示意
import pandas as pd
import os

all_files = [] # 假设通过 os.walk 找到了很多 csv 文件路径
data_frames_list = []
output_batch_size = 8
output_counter = 0

for i, file_path in enumerate(all_files):
    # 1. 读取单个小 CSV 到 DataFrame
    df_single = pd.read_csv(file_path, sep='\t') # 假设分隔符是 Tab

    # 2. 追加 DataFrame 到列表
    data_frames_list.append(df_single)

    # 3. 每 8 个文件处理一次
    if (i + 1) % output_batch_size == 0:
        # 4. 拼接列表中的所有 DataFrame
        combined_df = pd.concat(data_frames_list, ignore_index=True)

        # 5. 输出或进一步处理 combined_df
        output_filename = f"combined_batch_{output_counter}.csv"
        combined_df.to_csv(output_filename, index=False)
        print(f"已生成 {output_filename}")

        # 6. 清空列表,准备下一批
        data_frames_list = []
        output_counter += 1

# 处理可能剩余的不足 batch_size 的文件
if data_frames_list:
    combined_df = pd.concat(data_frames_list, ignore_index=True)
    output_filename = f"combined_batch_{output_counter}.csv"
    combined_df.to_csv(output_filename, index=False)
    print(f"已生成 {output_filename}")

这里的痛点在于,如果文件非常多(比如成千上万个),即使每个文件只有几行几列,data_frames_list 也会包含大量的 DataFrame 对象。这不仅占用了大量内存来存储这些临时的、独立的 DataFrame 结构,而且 pd.concat() 在合并这么多小对象时,内部也有不小的开销。

揪出性能瓶颈:为什么追加 DataFrame 到列表有点慢?

  1. 对象创建开销 : 每调用一次 pd.read_csv(),Pandas 都要创建一个完整的 DataFrame 对象。这包括数据本身的存储、索引对象、列名、数据类型推断等一系列操作。对于只有几行的小文件,这些固定开销占比可能相当高。
  2. 内存占用 : 把成百上千个 DataFrame 对象都塞进一个 Python 列表里,会实实在在地消耗大量内存。想象一下,每个 DataFrame 都有自己的内存空间,即使数据量不大,对象本身的结构也占地方。如果文件总数特别多,内存可能先撑不住。
  3. 列表追加 : Python 列表在动态增长时,底层可能需要重新分配更大的内存空间并复制旧元素,这也会带来一定的性能损耗,虽然通常没有前两点显著。
  4. pd.concat 开销 : 当 pd.concat() 接收一个包含大量 DataFrame 的列表时,它需要遍历列表,检查每个 DataFrame 的结构(列名、类型等),然后进行数据块的复制和拼接。处理的对象越多,内部协调和数据移动的成本就越高。

简单说,就是为每个小文件都“大动干戈”地创建 DataFrame,再把它们囤积起来,最后一把梭哈交给 pd.concat,这个流程本身不够“经济”。

三种提速方案,总有一款适合你

针对上面的问题,我们可以换个思路,尽量避免在内存中长时间持有大量的 DataFrame 对象。

方案一:惰性读取,优化 Pandas 用法

这是对现有流程的小改进,不完全抛弃 DataFrame,而是让 pd.concat 更聪明地工作,避免显式地构建一个巨大的 DataFrame 列表。我们可以使用生成器表达式(Generator Expression)。

原理和作用

生成器表达式 (expression for item in iterable) 不会立刻计算并存储所有结果,而是返回一个生成器对象。只有在迭代它的时候(比如 pd.concat 内部处理时),它才会按需计算并产生下一个值。这样,内存中同一时间只需要存放少量(甚至只有一个)正在处理的 DataFrame,大大减少了内存峰值。

代码示例

import pandas as pd
import os
from pathlib import Path # 使用 pathlib 让路径处理更方便

def find_csv_files(root_dir):
    """使用生成器查找所有CSV文件"""
    for root, _, files in os.walk(root_dir):
        for file in files:
            if file.lower().endswith('.csv'):
                yield Path(root) / file # 返回 Path 对象

def process_batches_generator(root_directory, batch_size=8, output_dir="output_batches"):
    """使用生成器表达式优化拼接过程"""
    Path(output_dir).mkdir(parents=True, exist_ok=True) # 创建输出目录
    file_iterator = find_csv_files(root_directory)
    batch_counter = 0
    while True:
        # 尝试获取一批文件路径
        current_batch_paths = [next(file_iterator, None) for _ in range(batch_size)]
        # 去掉末尾可能出现的 None (文件总数不是 batch_size 的整数倍时)
        current_batch_paths = [p for p in current_batch_paths if p is not None]

        if not current_batch_paths:
            break # 没有更多文件了

        # 核心:使用生成器表达式传递给 pd.concat
        # 注意 sep='\t' 要根据实际情况修改
        combined_df = pd.concat(
            (pd.read_csv(filepath, sep='\t') for filepath in current_batch_paths),
            ignore_index=True
        )

        output_filename = Path(output_dir) / f"combined_batch_{batch_counter}.csv"
        combined_df.to_csv(output_filename, index=False, encoding='utf-8') # 明确指定编码
        print(f"已生成 {output_filename}")
        batch_counter += 1

# --- 使用示例 ---
source_directory = "/path/to/your/csv/folders" # !替换成你的 CSV 根目录
process_batches_generator(source_directory, batch_size=8)

说明

  • find_csv_files 函数现在返回一个生成器,逐个产生文件路径。
  • 主循环通过 next(file_iterator, None) 按需从生成器获取路径,组成一个小批次 current_batch_paths
  • 关键在于 pd.concat((pd.read_csv(filepath, sep='\t') for filepath in current_batch_paths), ...) 这一行。括号里的 (pd.read_csv(...) for ...) 就是生成器表达式。pd.concat 会迭代这个生成器,每次只读取和处理一个文件的 DataFrame,然后立即进行合并操作的一部分,而不是先把所有 DataFrame 都读到内存里。
  • 增加了输出目录的创建和 encoding='utf-8' 的好习惯。

优点 :内存效率显著提升,代码改动相对较小。
缺点 :对于 CPU 密集型的读取和解析(虽然这里文件小,可能不明显),总耗时可能没有本质飞跃,只是内存表现更好。

方案二:釜底抽薪,直接操作文件内容

如果你的最终目的只是把这些小 CSV 的内容(数据行)合并到一个大 CSV 文件里,并且在合并过程中不需要复杂的 DataFrame 操作,那完全可以绕开 Pandas,直接处理文件内容。

原理和作用

我们只读取每个 CSV 文件的数据行(跳过标题行,只保留第一个文件的标题行),然后直接将这些行写入到一个目标 CSV 文件中。这避免了创建和存储大量 DataFrame 对象的开销,直接进行 I/O 操作,通常非常快。

方法 A: 直接写入目标文件

代码示例

import os
import csv
from pathlib import Path

def combine_csv_directly(root_directory, output_base_filename="combined_batch", batch_size=8, output_dir="output_batches"):
    Path(output_dir).mkdir(parents=True, exist_ok=True)
    file_iterator = find_csv_files(root_directory) # 复用上面的查找函数
    batch_counter = 0
    files_processed_in_batch = 0
    output_file_path = None
    csv_writer = None
    output_f = None
    first_file_in_batch = True

    while True:
        file_path = next(file_iterator, None)
        if file_path is None and files_processed_in_batch == 0:
            # 没有新文件,且当前批次是空的,结束
            break
        elif file_path is None and files_processed_in_batch > 0:
            # 所有文件处理完,但当前批次还有内容,关闭最后的输出文件
            if output_f:
                output_f.close()
                print(f"完成最后一批: {output_file_path}")
            break # 结束主循环

        # ---- 开始或切换到新的批次 ----
        if files_processed_in_batch == 0:
            # 开始一个新批次
            output_file_path = Path(output_dir) / f"{output_base_filename}_{batch_counter}.csv"
            try:
                # 使用 'w' 模式打开新文件,确保是 utf-8 编码,newline='' 防止空行
                output_f = open(output_file_path, 'w', newline='', encoding='utf-8')
                csv_writer = csv.writer(output_f, delimiter='\t') # 注意分隔符
                first_file_in_batch = True # 标记这是新批次的第一个文件
                print(f"开始新批次,写入到: {output_file_path}")
            except IOError as e:
                print(f"错误:无法打开输出文件 {output_file_path}: {e}")
                # 可以选择跳过此批次或终止程序
                if output_f: output_f.close() # 确保关闭
                continue # 尝试处理下一个文件(可能会开始新的批次)


        # ---- 处理当前文件 ----
        try:
            with open(file_path, 'r', encoding='utf-8') as input_f: # 读取文件,同样指定编码
                csv_reader = csv.reader(input_f, delimiter='\t') # 注意分隔符
                header = next(csv_reader) # 读取表头

                if first_file_in_batch:
                    # 如果是批次的第一个文件,写入表头
                    csv_writer.writerow(header)
                    first_file_in_batch = False # 后续文件不再写表头

                # 写入数据行
                for row in csv_reader:
                    csv_writer.writerow(row)

            files_processed_in_batch += 1

        except FileNotFoundError:
            print(f"警告:文件未找到 {file_path},已跳过。")
        except StopIteration:
            # csv.reader 在空文件或只有一行(标题)时 next(csv_reader) 会报错 StopIteration
             print(f"警告:文件 {file_path} 为空或只有标题行,已跳过内容。")
             # 决定是否也算作处理了一个文件
             files_processed_in_batch += 1 # 如果空文件也算一个的话
             # 如果第一个文件就是空文件且需要标题,需要特殊处理
             if first_file_in_batch:
                  csv_writer.writerow(header) # 写入它的(可能是空的)表头
                  first_file_in_batch = False
        except Exception as e:
            print(f"错误:处理文件 {file_path} 时出错: {e},已跳过。")
            # 异常情况也增加计数器,避免死循环
            files_processed_in_batch += 1


        # ---- 批次结束判断 ----
        if files_processed_in_batch == batch_size:
            # 当前批次已满
            if output_f:
                output_f.close() # 关闭当前批次的输出文件
                print(f"完成批次 {batch_counter}{output_file_path}")
            files_processed_in_batch = 0 # 重置计数器
            batch_counter += 1       # 增加批次号
            output_file_path = None    # 清理路径,确保下次重新生成
            csv_writer = None
            output_f = None

    # 循环结束后的清理(上面 break 时可能需要)
    if output_f and not output_f.closed:
        output_f.close()
        print(f"已关闭最终文件: {output_file_path}")


# --- 使用示例 ---
source_directory = "/path/to/your/csv/folders" # !替换成你的 CSV 根目录
combine_csv_directly(source_directory, batch_size=8)

说明

  • 核心逻辑是打开一个输出文件 (output_f),然后遍历输入文件 (file_path)。
  • 使用 Python 内置的 csv 模块来读写,这比手动分割字符串更健壮,能正确处理包含引号、逗号等特殊字符的数据。确保读写时指定了相同的分隔符(delimiter='\t')。
  • 通过 first_file_in_batch 标志来控制只在每个批次的第一个输入文件被处理时才写入表头。
  • 读取输入文件的每一行数据(跳过表头),直接写入到输出文件中。
  • batch_size 控制何时关闭当前输出文件并开始下一个。
  • 添加了基本的错误处理(try...except),比如文件找不到或者读写错误。
  • 重要 : 必须使用 newline='' 参数打开 CSV 文件进行写入,否则在 Windows 上可能会出现多余的空行。
  • 重要 : 读写文件时,明确指定 encoding='utf-8' (或其他正确编码) 是个好习惯,避免乱码问题。

优点 : 速度通常最快,内存占用极低,因为它几乎是流式处理。
缺点 : 不生成中间的 DataFrame 对象。如果你在合并后 立即 需要对整个批次的数据进行复杂的 Pandas 分析,这种方法就不太方便,你需要再用 pd.read_csv() 读取刚生成的大文件。

方法 B: 借助 io.StringIO 内存缓冲 (如果最终需要 DataFrame)

如果你最终还是需要一个合并后的 DataFrame 对象(比如你的“进一步数据分析”),但又想避免内存里堆积大量小 DataFrame 对象,可以结合方案二的思路和内存缓冲。

原理和作用

先把一个批次内所有文件的内容(除了多余的表头)读取并拼接成一个大的字符串,存放在内存中的一个文本流对象 io.StringIO 里。当一个批次处理完毕后,再用 pd.read_csv() 一次性从这个内存流对象中读取数据,生成最终的那个大的 DataFrame。

代码示例

import pandas as pd
import os
import io
from pathlib import Path

# 复用 find_csv_files 函数

def process_batches_stringio(root_directory, batch_size=8, output_dir="output_pandas"):
    Path(output_dir).mkdir(parents=True, exist_ok=True)
    file_iterator = find_csv_files(root_directory)
    batch_counter = 0
    while True:
        current_batch_paths = [next(file_iterator, None) for _ in range(batch_size)]
        current_batch_paths = [p for p in current_batch_paths if p is not None]

        if not current_batch_paths:
            break

        # 使用 io.StringIO 作为内存中的文件缓冲区
        string_buffer = io.StringIO()
        first_file = True

        for file_path in current_batch_paths:
            try:
                with open(file_path, 'r', encoding='utf-8') as input_f:
                    header = input_f.readline() # 读取表头行
                    if first_file:
                        string_buffer.write(header) # 第一个文件写入表头
                        first_file = False

                    # 写入剩余的数据行
                    content = input_f.read()
                    string_buffer.write(content)
                    # 可选:确保每个文件内容后有换行符,如果源文件末尾可能没有的话
                    # if not content.endswith('\n'):
                    #     string_buffer.write('\n')

            except FileNotFoundError:
                print(f"警告:文件未找到 {file_path},已跳过。")
            except Exception as e:
                 print(f"错误:处理文件 {file_path} 时出错: {e},已跳过。")

        # 将缓冲区的指针移到开头,准备读取
        string_buffer.seek(0)

        if string_buffer.getvalue(): # 确保缓冲区有内容
            try:
                # 从内存缓冲区直接读取为 DataFrame
                # 注意 sep='\t'
                combined_df = pd.read_csv(string_buffer, sep='\t')

                # 在这里可以对 combined_df 进行进一步的分析

                # --- 如果需要保存 ---
                output_filename = Path(output_dir) / f"combined_batch_{batch_counter}_df.csv"
                combined_df.to_csv(output_filename, index=False, encoding='utf-8')
                print(f"已生成DataFrame并保存到 {output_filename}")
            except pd.errors.EmptyDataError:
                 print(f"警告:批次 {batch_counter} 处理后得到空数据,可能所有文件都为空或出错。")
            except Exception as e:
                 print(f"错误:从内存缓冲区创建DataFrame时出错: {e}")

        else:
             print(f"警告:批次 {batch_counter} 未能收集到任何有效数据。")


        string_buffer.close() # 关闭 StringIO 对象释放内存
        batch_counter += 1

# --- 使用示例 ---
source_directory = "/path/to/your/csv/folders" # !替换成你的 CSV 根目录
process_batches_stringio(source_directory, batch_size=8)

说明

  • 为每个批次创建一个 io.StringIO() 对象 string_buffer
  • 遍历批次内的文件,将第一个文件的完整内容(包括表头)写入 buffer,后续文件只写入数据部分(跳过表头)。这里简化为直接读写,注意原始文件末尾是否有换行符可能影响拼接。更稳妥的做法还是用 csv 模块读写到 buffer。
  • 批次结束后,string_buffer.seek(0) 把读写位置重置到开头。
  • pd.read_csv(string_buffer, ...) 直接从内存读取,生成一个大的 DataFrame combined_df
  • 然后你可以用 combined_df 做分析,或者像示例中那样保存它。
  • 记得处理完后 string_buffer.close()

优点 : 相比方案一,进一步减少了中间 DataFrame 对象的数量,内存效率较高。相比方法 A,最终直接得到需要的 DataFrame,方便后续处理。
缺点 : 字符串拼接本身也有开销,如果批次内的总数据量非常大(比如单个批次就达到几个 GB),内存缓冲仍然可能成为瓶颈。速度上可能略慢于直接写文件的方法 A。

进阶技巧 (方案二中使用 csv 模块)

上面方法 A 和 B 的代码示例中,对文件内容的处理相对简单。如果 CSV 文件内的数据可能包含特殊字符(比如引号内的逗号/制表符、换行符),直接进行字符串拼接或按行读写可能出错。使用内置 csv 模块能更可靠地处理这些情况。

# 在方案二 (方法 A 或 B) 中集成 csv 模块的读写片段

# ---- 读取部分 (替代 input_f.read() 或 readline/read) ----
# (需要先 import csv)
csv_reader = csv.reader(input_f, delimiter='\t')
header = next(csv_reader) # 读取表头

# 对于方法 A (直接写文件):
# if first_file_in_batch:
#     csv_writer.writerow(header)
#     first_file_in_batch = False
# for row in csv_reader: # csv_reader 是一个迭代器
#     csv_writer.writerow(row)

# 对于方法 B (写 StringIO):
# (需要先 from io import StringIO; import csv)
# temp_buffer = StringIO()
# csv_writer_mem = csv.writer(temp_buffer, delimiter='\t')
# if first_file:
#     csv_writer_mem.writerow(header)
#     first_file = False
# for row in csv_reader:
#     csv_writer_mem.writerow(row)
# # ... 处理完一个文件后 ...
# string_buffer.write(temp_buffer.getvalue()) # 将内存 csv 内容写入主 buffer
# temp_buffer.close()


# ---- 读取 StringIO 时 (方法 B 的最后一步 pd.read_csv) ----
# string_buffer.seek(0)
# combined_df = pd.read_csv(string_buffer, sep='\t') # 使用 pd.read_csv 时,它内部会处理 CSV 解析

使用 csv.readercsv.writer 可以确保即使单元格内有复杂内容也能正确处理。

方案三:拥抱 Dask,处理更大规模的数据(进阶)

如果你的文件数量真的达到了海量级别,或者单个文件也很大,总数据量超出了单机内存承受范围,可以考虑使用 Dask。

原理和作用

Dask 是一个用于并行计算的 Python 库。dask.dataframe 模块提供了一个类似 Pandas DataFrame 的接口,但它可以操作比内存更大的数据集,并能在多核甚至多台机器上并行处理任务。dask.dataframe.read_csv 可以直接处理文件路径的模式匹配(如 *.csv),并在后台惰性地、并行地读取和组织数据。

代码示例

import dask.dataframe as dd
import os
from pathlib import Path

def process_with_dask(root_directory, output_base_filename="dask_combined", output_dir="output_dask"):
    Path(output_dir).mkdir(parents=True, exist_ok=True)
    # Dask 可以直接接受包含通配符的路径列表或单个路径
    # 让 Dask 处理所有子目录下的 CSV 文件
    all_csv_path_pattern = os.path.join(root_directory, '**', '*.csv') # '** ' 递归查找

    print(f"让 Dask 从 {all_csv_path_pattern} 读取数据...")
    # sep='\t' 要根据实际情况指定
    # blocksize 控制 Dask 内部处理块的大小,可以调整以平衡内存和并行度
    ddf = dd.read_csv(all_csv_path_pattern, sep='\t', blocksize="64MB") # 示例块大小

    # Dask 的操作是惰性的,这里只是构建了计算图谱
    # ddf 包含多个 Pandas DataFrame 分区

    print(f"数据已由 Dask 加载(惰性)。总分区数: {ddf.npartitions}")

    # --- 如果需要进行一些 Dask 支持的分析 ---
    # 例如: mean_value = ddf['some_column'].mean()
    # 注意:需要调用 .compute() 来触发实际计算
    # result = mean_value.compute()
    # print(f"计算得到平均值: {result}")

    # --- 如果需要将所有数据合并并保存到一个(或多个)CSV 文件 ---
    # to_csv 会将每个分区保存为单独的文件,或尝试合并
    output_path = Path(output_dir) / f"{output_base_filename}_*.csv" # Dask 会生成带编号的文件
    print(f"开始将 Dask DataFrame 写入 CSV 文件(可能产生多个文件)到: {output_dir}")
    # single_file=True 会尝试合并,但如果数据太大可能失败或很慢
    # 不用 single_file=True 则会输出 npartitions 个文件
    ddf.to_csv(str(output_path.parent / output_path.name), index=False, single_file=False, encoding='utf-8')

    # 如果你想强制合并成一个大文件(内存可能成为瓶颈):
    # print("尝试计算并合并为 Pandas DataFrame...")
    # combined_pandas_df = ddf.compute() # 这会将所有数据加载到内存
    # output_single_path = Path(output_dir) / f"{output_base_filename}_single.csv"
    # print(f"保存为单个文件: {output_single_path}")
    # combined_pandas_df.to_csv(output_single_path, index=False, encoding='utf-8')

    print("Dask 处理完成。")


# --- 使用示例 ---
source_directory = "/path/to/your/csv/folders" # !替换成你的 CSV 根目录
process_with_dask(source_directory)

说明

  • Dask 使用 dd.read_csv 可以非常方便地读取匹配模式的文件。
  • 它内部会将数据划分为多个分区(Pandas DataFrame),并行处理。
  • 所有操作默认是惰性的,只有调用 .compute()to_csv() 等触发计算的方法时才会真正执行。
  • to_csv 默认会为每个分区生成一个文件,可以通过 single_file=True 尝试合并(小心内存)。
  • 批处理逻辑 :Dask 本身就按分区处理数据,天然地包含了某种形式的“批处理”。如果你的每 8 个文件逻辑非常特殊且不能简单地由 Dask 的分区策略替代,可能需要更复杂的 Dask 图构建,或者先用方案二的方法预处理成较大的中间文件再交给 Dask。但对于简单的拼接和后续分析,Dask 的直接读取通常更优。

优点 : 擅长处理大数据量和并行计算,接口与 Pandas 相似,学习曲线相对平缓。
缺点 : 引入了新的库依赖,对于你的小文件(6列5行)场景,其本身的调度开销可能比收益还大,有点“杀鸡用牛刀”。但如果未来数据规模扩展,它是很好的备选方案。

根据你的具体情况——小文件,但数量可能很大,且需要按批次(每8个)合并——方案一(生成器优化)和方案二(直接文件操作或 StringIO)看起来是最直接、最合适的改进方向。如果内存是主要瓶颈,优先考虑它们。如果目的是纯粹合并成大 CSV,方案二方法 A 可能最快。如果合并后马上要做 DataFrame 分析,方案二方法 B 或方案一是不错的选择。