加速 Pandas 合并小 CSV:3 种高效方法
2025-04-27 11:30:56
告别列表追加:优化 Python 小 CSV 文件拼接性能
处理大量小文件时,效率往往是个头疼的问题。你可能遇到过这样的场景:需要读取一堆 CSV 文件,把它们整合成一个大文件或者一个大的数据结构(比如 Pandas DataFrame)做后续分析。如果文件数量巨大,一个常见的做法——先把每个文件读成 DataFrame,然后把这些 DataFrame 添加到一个 Python 列表里,最后用 pd.concat()
合并——可能会变得非常慢,还特别吃内存。
就拿这个例子来说:
# 伪代码示意
import pandas as pd
import os
all_files = [] # 假设通过 os.walk 找到了很多 csv 文件路径
data_frames_list = []
output_batch_size = 8
output_counter = 0
for i, file_path in enumerate(all_files):
# 1. 读取单个小 CSV 到 DataFrame
df_single = pd.read_csv(file_path, sep='\t') # 假设分隔符是 Tab
# 2. 追加 DataFrame 到列表
data_frames_list.append(df_single)
# 3. 每 8 个文件处理一次
if (i + 1) % output_batch_size == 0:
# 4. 拼接列表中的所有 DataFrame
combined_df = pd.concat(data_frames_list, ignore_index=True)
# 5. 输出或进一步处理 combined_df
output_filename = f"combined_batch_{output_counter}.csv"
combined_df.to_csv(output_filename, index=False)
print(f"已生成 {output_filename}")
# 6. 清空列表,准备下一批
data_frames_list = []
output_counter += 1
# 处理可能剩余的不足 batch_size 的文件
if data_frames_list:
combined_df = pd.concat(data_frames_list, ignore_index=True)
output_filename = f"combined_batch_{output_counter}.csv"
combined_df.to_csv(output_filename, index=False)
print(f"已生成 {output_filename}")
这里的痛点在于,如果文件非常多(比如成千上万个),即使每个文件只有几行几列,data_frames_list
也会包含大量的 DataFrame 对象。这不仅占用了大量内存来存储这些临时的、独立的 DataFrame 结构,而且 pd.concat()
在合并这么多小对象时,内部也有不小的开销。
揪出性能瓶颈:为什么追加 DataFrame 到列表有点慢?
- 对象创建开销 : 每调用一次
pd.read_csv()
,Pandas 都要创建一个完整的 DataFrame 对象。这包括数据本身的存储、索引对象、列名、数据类型推断等一系列操作。对于只有几行的小文件,这些固定开销占比可能相当高。 - 内存占用 : 把成百上千个 DataFrame 对象都塞进一个 Python 列表里,会实实在在地消耗大量内存。想象一下,每个 DataFrame 都有自己的内存空间,即使数据量不大,对象本身的结构也占地方。如果文件总数特别多,内存可能先撑不住。
- 列表追加 : Python 列表在动态增长时,底层可能需要重新分配更大的内存空间并复制旧元素,这也会带来一定的性能损耗,虽然通常没有前两点显著。
pd.concat
开销 : 当pd.concat()
接收一个包含大量 DataFrame 的列表时,它需要遍历列表,检查每个 DataFrame 的结构(列名、类型等),然后进行数据块的复制和拼接。处理的对象越多,内部协调和数据移动的成本就越高。
简单说,就是为每个小文件都“大动干戈”地创建 DataFrame,再把它们囤积起来,最后一把梭哈交给 pd.concat
,这个流程本身不够“经济”。
三种提速方案,总有一款适合你
针对上面的问题,我们可以换个思路,尽量避免在内存中长时间持有大量的 DataFrame 对象。
方案一:惰性读取,优化 Pandas 用法
这是对现有流程的小改进,不完全抛弃 DataFrame,而是让 pd.concat
更聪明地工作,避免显式地构建一个巨大的 DataFrame 列表。我们可以使用生成器表达式(Generator Expression)。
原理和作用
生成器表达式 (expression for item in iterable)
不会立刻计算并存储所有结果,而是返回一个生成器对象。只有在迭代它的时候(比如 pd.concat
内部处理时),它才会按需计算并产生下一个值。这样,内存中同一时间只需要存放少量(甚至只有一个)正在处理的 DataFrame,大大减少了内存峰值。
代码示例
import pandas as pd
import os
from pathlib import Path # 使用 pathlib 让路径处理更方便
def find_csv_files(root_dir):
"""使用生成器查找所有CSV文件"""
for root, _, files in os.walk(root_dir):
for file in files:
if file.lower().endswith('.csv'):
yield Path(root) / file # 返回 Path 对象
def process_batches_generator(root_directory, batch_size=8, output_dir="output_batches"):
"""使用生成器表达式优化拼接过程"""
Path(output_dir).mkdir(parents=True, exist_ok=True) # 创建输出目录
file_iterator = find_csv_files(root_directory)
batch_counter = 0
while True:
# 尝试获取一批文件路径
current_batch_paths = [next(file_iterator, None) for _ in range(batch_size)]
# 去掉末尾可能出现的 None (文件总数不是 batch_size 的整数倍时)
current_batch_paths = [p for p in current_batch_paths if p is not None]
if not current_batch_paths:
break # 没有更多文件了
# 核心:使用生成器表达式传递给 pd.concat
# 注意 sep='\t' 要根据实际情况修改
combined_df = pd.concat(
(pd.read_csv(filepath, sep='\t') for filepath in current_batch_paths),
ignore_index=True
)
output_filename = Path(output_dir) / f"combined_batch_{batch_counter}.csv"
combined_df.to_csv(output_filename, index=False, encoding='utf-8') # 明确指定编码
print(f"已生成 {output_filename}")
batch_counter += 1
# --- 使用示例 ---
source_directory = "/path/to/your/csv/folders" # !替换成你的 CSV 根目录
process_batches_generator(source_directory, batch_size=8)
说明
find_csv_files
函数现在返回一个生成器,逐个产生文件路径。- 主循环通过
next(file_iterator, None)
按需从生成器获取路径,组成一个小批次current_batch_paths
。 - 关键在于
pd.concat((pd.read_csv(filepath, sep='\t') for filepath in current_batch_paths), ...)
这一行。括号里的(pd.read_csv(...) for ...)
就是生成器表达式。pd.concat
会迭代这个生成器,每次只读取和处理一个文件的 DataFrame,然后立即进行合并操作的一部分,而不是先把所有 DataFrame 都读到内存里。 - 增加了输出目录的创建和
encoding='utf-8'
的好习惯。
优点 :内存效率显著提升,代码改动相对较小。
缺点 :对于 CPU 密集型的读取和解析(虽然这里文件小,可能不明显),总耗时可能没有本质飞跃,只是内存表现更好。
方案二:釜底抽薪,直接操作文件内容
如果你的最终目的只是把这些小 CSV 的内容(数据行)合并到一个大 CSV 文件里,并且在合并过程中不需要复杂的 DataFrame 操作,那完全可以绕开 Pandas,直接处理文件内容。
原理和作用
我们只读取每个 CSV 文件的数据行(跳过标题行,只保留第一个文件的标题行),然后直接将这些行写入到一个目标 CSV 文件中。这避免了创建和存储大量 DataFrame 对象的开销,直接进行 I/O 操作,通常非常快。
方法 A: 直接写入目标文件
代码示例
import os
import csv
from pathlib import Path
def combine_csv_directly(root_directory, output_base_filename="combined_batch", batch_size=8, output_dir="output_batches"):
Path(output_dir).mkdir(parents=True, exist_ok=True)
file_iterator = find_csv_files(root_directory) # 复用上面的查找函数
batch_counter = 0
files_processed_in_batch = 0
output_file_path = None
csv_writer = None
output_f = None
first_file_in_batch = True
while True:
file_path = next(file_iterator, None)
if file_path is None and files_processed_in_batch == 0:
# 没有新文件,且当前批次是空的,结束
break
elif file_path is None and files_processed_in_batch > 0:
# 所有文件处理完,但当前批次还有内容,关闭最后的输出文件
if output_f:
output_f.close()
print(f"完成最后一批: {output_file_path}")
break # 结束主循环
# ---- 开始或切换到新的批次 ----
if files_processed_in_batch == 0:
# 开始一个新批次
output_file_path = Path(output_dir) / f"{output_base_filename}_{batch_counter}.csv"
try:
# 使用 'w' 模式打开新文件,确保是 utf-8 编码,newline='' 防止空行
output_f = open(output_file_path, 'w', newline='', encoding='utf-8')
csv_writer = csv.writer(output_f, delimiter='\t') # 注意分隔符
first_file_in_batch = True # 标记这是新批次的第一个文件
print(f"开始新批次,写入到: {output_file_path}")
except IOError as e:
print(f"错误:无法打开输出文件 {output_file_path}: {e}")
# 可以选择跳过此批次或终止程序
if output_f: output_f.close() # 确保关闭
continue # 尝试处理下一个文件(可能会开始新的批次)
# ---- 处理当前文件 ----
try:
with open(file_path, 'r', encoding='utf-8') as input_f: # 读取文件,同样指定编码
csv_reader = csv.reader(input_f, delimiter='\t') # 注意分隔符
header = next(csv_reader) # 读取表头
if first_file_in_batch:
# 如果是批次的第一个文件,写入表头
csv_writer.writerow(header)
first_file_in_batch = False # 后续文件不再写表头
# 写入数据行
for row in csv_reader:
csv_writer.writerow(row)
files_processed_in_batch += 1
except FileNotFoundError:
print(f"警告:文件未找到 {file_path},已跳过。")
except StopIteration:
# csv.reader 在空文件或只有一行(标题)时 next(csv_reader) 会报错 StopIteration
print(f"警告:文件 {file_path} 为空或只有标题行,已跳过内容。")
# 决定是否也算作处理了一个文件
files_processed_in_batch += 1 # 如果空文件也算一个的话
# 如果第一个文件就是空文件且需要标题,需要特殊处理
if first_file_in_batch:
csv_writer.writerow(header) # 写入它的(可能是空的)表头
first_file_in_batch = False
except Exception as e:
print(f"错误:处理文件 {file_path} 时出错: {e},已跳过。")
# 异常情况也增加计数器,避免死循环
files_processed_in_batch += 1
# ---- 批次结束判断 ----
if files_processed_in_batch == batch_size:
# 当前批次已满
if output_f:
output_f.close() # 关闭当前批次的输出文件
print(f"完成批次 {batch_counter} 到 {output_file_path}")
files_processed_in_batch = 0 # 重置计数器
batch_counter += 1 # 增加批次号
output_file_path = None # 清理路径,确保下次重新生成
csv_writer = None
output_f = None
# 循环结束后的清理(上面 break 时可能需要)
if output_f and not output_f.closed:
output_f.close()
print(f"已关闭最终文件: {output_file_path}")
# --- 使用示例 ---
source_directory = "/path/to/your/csv/folders" # !替换成你的 CSV 根目录
combine_csv_directly(source_directory, batch_size=8)
说明
- 核心逻辑是打开一个输出文件 (
output_f
),然后遍历输入文件 (file_path
)。 - 使用 Python 内置的
csv
模块来读写,这比手动分割字符串更健壮,能正确处理包含引号、逗号等特殊字符的数据。确保读写时指定了相同的分隔符(delimiter='\t'
)。 - 通过
first_file_in_batch
标志来控制只在每个批次的第一个输入文件被处理时才写入表头。 - 读取输入文件的每一行数据(跳过表头),直接写入到输出文件中。
- 按
batch_size
控制何时关闭当前输出文件并开始下一个。 - 添加了基本的错误处理(
try...except
),比如文件找不到或者读写错误。 - 重要 : 必须使用
newline=''
参数打开 CSV 文件进行写入,否则在 Windows 上可能会出现多余的空行。 - 重要 : 读写文件时,明确指定
encoding='utf-8'
(或其他正确编码) 是个好习惯,避免乱码问题。
优点 : 速度通常最快,内存占用极低,因为它几乎是流式处理。
缺点 : 不生成中间的 DataFrame 对象。如果你在合并后 立即 需要对整个批次的数据进行复杂的 Pandas 分析,这种方法就不太方便,你需要再用 pd.read_csv()
读取刚生成的大文件。
方法 B: 借助 io.StringIO
内存缓冲 (如果最终需要 DataFrame)
如果你最终还是需要一个合并后的 DataFrame 对象(比如你的“进一步数据分析”),但又想避免内存里堆积大量小 DataFrame 对象,可以结合方案二的思路和内存缓冲。
原理和作用
先把一个批次内所有文件的内容(除了多余的表头)读取并拼接成一个大的字符串,存放在内存中的一个文本流对象 io.StringIO
里。当一个批次处理完毕后,再用 pd.read_csv()
一次性从这个内存流对象中读取数据,生成最终的那个大的 DataFrame。
代码示例
import pandas as pd
import os
import io
from pathlib import Path
# 复用 find_csv_files 函数
def process_batches_stringio(root_directory, batch_size=8, output_dir="output_pandas"):
Path(output_dir).mkdir(parents=True, exist_ok=True)
file_iterator = find_csv_files(root_directory)
batch_counter = 0
while True:
current_batch_paths = [next(file_iterator, None) for _ in range(batch_size)]
current_batch_paths = [p for p in current_batch_paths if p is not None]
if not current_batch_paths:
break
# 使用 io.StringIO 作为内存中的文件缓冲区
string_buffer = io.StringIO()
first_file = True
for file_path in current_batch_paths:
try:
with open(file_path, 'r', encoding='utf-8') as input_f:
header = input_f.readline() # 读取表头行
if first_file:
string_buffer.write(header) # 第一个文件写入表头
first_file = False
# 写入剩余的数据行
content = input_f.read()
string_buffer.write(content)
# 可选:确保每个文件内容后有换行符,如果源文件末尾可能没有的话
# if not content.endswith('\n'):
# string_buffer.write('\n')
except FileNotFoundError:
print(f"警告:文件未找到 {file_path},已跳过。")
except Exception as e:
print(f"错误:处理文件 {file_path} 时出错: {e},已跳过。")
# 将缓冲区的指针移到开头,准备读取
string_buffer.seek(0)
if string_buffer.getvalue(): # 确保缓冲区有内容
try:
# 从内存缓冲区直接读取为 DataFrame
# 注意 sep='\t'
combined_df = pd.read_csv(string_buffer, sep='\t')
# 在这里可以对 combined_df 进行进一步的分析
# --- 如果需要保存 ---
output_filename = Path(output_dir) / f"combined_batch_{batch_counter}_df.csv"
combined_df.to_csv(output_filename, index=False, encoding='utf-8')
print(f"已生成DataFrame并保存到 {output_filename}")
except pd.errors.EmptyDataError:
print(f"警告:批次 {batch_counter} 处理后得到空数据,可能所有文件都为空或出错。")
except Exception as e:
print(f"错误:从内存缓冲区创建DataFrame时出错: {e}")
else:
print(f"警告:批次 {batch_counter} 未能收集到任何有效数据。")
string_buffer.close() # 关闭 StringIO 对象释放内存
batch_counter += 1
# --- 使用示例 ---
source_directory = "/path/to/your/csv/folders" # !替换成你的 CSV 根目录
process_batches_stringio(source_directory, batch_size=8)
说明
- 为每个批次创建一个
io.StringIO()
对象string_buffer
。 - 遍历批次内的文件,将第一个文件的完整内容(包括表头)写入 buffer,后续文件只写入数据部分(跳过表头)。这里简化为直接读写,注意原始文件末尾是否有换行符可能影响拼接。更稳妥的做法还是用
csv
模块读写到 buffer。 - 批次结束后,
string_buffer.seek(0)
把读写位置重置到开头。 pd.read_csv(string_buffer, ...)
直接从内存读取,生成一个大的 DataFramecombined_df
。- 然后你可以用
combined_df
做分析,或者像示例中那样保存它。 - 记得处理完后
string_buffer.close()
。
优点 : 相比方案一,进一步减少了中间 DataFrame 对象的数量,内存效率较高。相比方法 A,最终直接得到需要的 DataFrame,方便后续处理。
缺点 : 字符串拼接本身也有开销,如果批次内的总数据量非常大(比如单个批次就达到几个 GB),内存缓冲仍然可能成为瓶颈。速度上可能略慢于直接写文件的方法 A。
进阶技巧 (方案二中使用 csv
模块)
上面方法 A 和 B 的代码示例中,对文件内容的处理相对简单。如果 CSV 文件内的数据可能包含特殊字符(比如引号内的逗号/制表符、换行符),直接进行字符串拼接或按行读写可能出错。使用内置 csv
模块能更可靠地处理这些情况。
# 在方案二 (方法 A 或 B) 中集成 csv 模块的读写片段
# ---- 读取部分 (替代 input_f.read() 或 readline/read) ----
# (需要先 import csv)
csv_reader = csv.reader(input_f, delimiter='\t')
header = next(csv_reader) # 读取表头
# 对于方法 A (直接写文件):
# if first_file_in_batch:
# csv_writer.writerow(header)
# first_file_in_batch = False
# for row in csv_reader: # csv_reader 是一个迭代器
# csv_writer.writerow(row)
# 对于方法 B (写 StringIO):
# (需要先 from io import StringIO; import csv)
# temp_buffer = StringIO()
# csv_writer_mem = csv.writer(temp_buffer, delimiter='\t')
# if first_file:
# csv_writer_mem.writerow(header)
# first_file = False
# for row in csv_reader:
# csv_writer_mem.writerow(row)
# # ... 处理完一个文件后 ...
# string_buffer.write(temp_buffer.getvalue()) # 将内存 csv 内容写入主 buffer
# temp_buffer.close()
# ---- 读取 StringIO 时 (方法 B 的最后一步 pd.read_csv) ----
# string_buffer.seek(0)
# combined_df = pd.read_csv(string_buffer, sep='\t') # 使用 pd.read_csv 时,它内部会处理 CSV 解析
使用 csv.reader
和 csv.writer
可以确保即使单元格内有复杂内容也能正确处理。
方案三:拥抱 Dask,处理更大规模的数据(进阶)
如果你的文件数量真的达到了海量级别,或者单个文件也很大,总数据量超出了单机内存承受范围,可以考虑使用 Dask。
原理和作用
Dask 是一个用于并行计算的 Python 库。dask.dataframe
模块提供了一个类似 Pandas DataFrame 的接口,但它可以操作比内存更大的数据集,并能在多核甚至多台机器上并行处理任务。dask.dataframe.read_csv
可以直接处理文件路径的模式匹配(如 *.csv
),并在后台惰性地、并行地读取和组织数据。
代码示例
import dask.dataframe as dd
import os
from pathlib import Path
def process_with_dask(root_directory, output_base_filename="dask_combined", output_dir="output_dask"):
Path(output_dir).mkdir(parents=True, exist_ok=True)
# Dask 可以直接接受包含通配符的路径列表或单个路径
# 让 Dask 处理所有子目录下的 CSV 文件
all_csv_path_pattern = os.path.join(root_directory, '**', '*.csv') # '** ' 递归查找
print(f"让 Dask 从 {all_csv_path_pattern} 读取数据...")
# sep='\t' 要根据实际情况指定
# blocksize 控制 Dask 内部处理块的大小,可以调整以平衡内存和并行度
ddf = dd.read_csv(all_csv_path_pattern, sep='\t', blocksize="64MB") # 示例块大小
# Dask 的操作是惰性的,这里只是构建了计算图谱
# ddf 包含多个 Pandas DataFrame 分区
print(f"数据已由 Dask 加载(惰性)。总分区数: {ddf.npartitions}")
# --- 如果需要进行一些 Dask 支持的分析 ---
# 例如: mean_value = ddf['some_column'].mean()
# 注意:需要调用 .compute() 来触发实际计算
# result = mean_value.compute()
# print(f"计算得到平均值: {result}")
# --- 如果需要将所有数据合并并保存到一个(或多个)CSV 文件 ---
# to_csv 会将每个分区保存为单独的文件,或尝试合并
output_path = Path(output_dir) / f"{output_base_filename}_*.csv" # Dask 会生成带编号的文件
print(f"开始将 Dask DataFrame 写入 CSV 文件(可能产生多个文件)到: {output_dir}")
# single_file=True 会尝试合并,但如果数据太大可能失败或很慢
# 不用 single_file=True 则会输出 npartitions 个文件
ddf.to_csv(str(output_path.parent / output_path.name), index=False, single_file=False, encoding='utf-8')
# 如果你想强制合并成一个大文件(内存可能成为瓶颈):
# print("尝试计算并合并为 Pandas DataFrame...")
# combined_pandas_df = ddf.compute() # 这会将所有数据加载到内存
# output_single_path = Path(output_dir) / f"{output_base_filename}_single.csv"
# print(f"保存为单个文件: {output_single_path}")
# combined_pandas_df.to_csv(output_single_path, index=False, encoding='utf-8')
print("Dask 处理完成。")
# --- 使用示例 ---
source_directory = "/path/to/your/csv/folders" # !替换成你的 CSV 根目录
process_with_dask(source_directory)
说明
- Dask 使用
dd.read_csv
可以非常方便地读取匹配模式的文件。 - 它内部会将数据划分为多个分区(Pandas DataFrame),并行处理。
- 所有操作默认是惰性的,只有调用
.compute()
或to_csv()
等触发计算的方法时才会真正执行。 to_csv
默认会为每个分区生成一个文件,可以通过single_file=True
尝试合并(小心内存)。- 批处理逻辑 :Dask 本身就按分区处理数据,天然地包含了某种形式的“批处理”。如果你的每 8 个文件逻辑非常特殊且不能简单地由 Dask 的分区策略替代,可能需要更复杂的 Dask 图构建,或者先用方案二的方法预处理成较大的中间文件再交给 Dask。但对于简单的拼接和后续分析,Dask 的直接读取通常更优。
优点 : 擅长处理大数据量和并行计算,接口与 Pandas 相似,学习曲线相对平缓。
缺点 : 引入了新的库依赖,对于你的小文件(6列5行)场景,其本身的调度开销可能比收益还大,有点“杀鸡用牛刀”。但如果未来数据规模扩展,它是很好的备选方案。
根据你的具体情况——小文件,但数量可能很大,且需要按批次(每8个)合并——方案一(生成器优化)和方案二(直接文件操作或 StringIO)看起来是最直接、最合适的改进方向。如果内存是主要瓶颈,优先考虑它们。如果目的是纯粹合并成大 CSV,方案二方法 A 可能最快。如果合并后马上要做 DataFrame 分析,方案二方法 B 或方案一是不错的选择。