返回

告别 OOM:用 Reactor 高效流式处理大 CSV 文件

java

告别内存溢出:使用 Reactor 高效流式处理 CSV 文件

处理文件,尤其是大型 CSV 文件时,内存效率是个绕不开的话题。我们常常需要把文件内容转换成程序更易于处理的数据结构,比如 Map 列表。一个常见的需求是,按批次处理这些数据,例如每 10 行一组。问题来了,怎样做才能既完成任务又不至于把整个文件一次性塞进内存,导致应用 OOM (Out of Memory) 呢?

假设你手头有类似下面的代码,尝试用 Project Reactor 的 Flux 来处理 CSV 文件,目标是返回一个 Flux<List<Map<String, String>>>,其中每个 List 包含 10 条记录(Map):

// 原始有问题的代码片段 (仅为说明,非完整可运行)
public Flux<List<Map<String, String>>> processCsvFile(File csvFile, String delimiter) {
    // ... 参数校验 ...

    return Flux.using(
            () -> Files.lines(csvFile.toPath()), // 尝试流式读取
            Flux::fromStream,
            stream -> stream.close()
        )
        .collectList() // 问题点:收集所有行到 List
        .flatMapMany(lines -> { // 在所有行收集完后才开始处理
            // ... (提取 header, 处理剩余行, 映射到 Map, buffer(10)) ...
            final var headers = /* ... */;
            return Flux.fromIterable(lines.subList(1, lines.size()))
                    .map(line -> /* ... 转换为 Map ... */)
                    .buffer(10);
        })
        .onErrorResume(e -> /* ... 错误处理 ... */);
}

这段代码的意图是好的,它使用了 Files.lines,这似乎是流式处理的起点。但致命的问题在于紧随其后的 .collectList() 操作。你的直觉是对的——collectList() 正是那个和你期望的内存效率背道而驰的操作。

深入剖析:collectList() 的陷阱

Files.lines(path) 本身确实创建了一个 Stream<String>,它是懒加载的,意味着它不会立刻读取整个文件。Flux.fromStream() 将这个 Stream 转换成了一个 Flux<String>,这依然是流式的。

然而,.collectList() 彻底改变了这一切。这个操作符会等待上游的 Flux 发出所有元素(也就是文件中的所有行),然后把它们全部收集到一个 List<String> 。只有当整个文件都被读入内存,这个 List 构建完成后,后续的 .flatMapMany 才会开始执行。

对于小文件,这可能不是问题。但如果你的 CSV 文件有几百 MB 甚至几个 GB 大,collectList() 会无情地消耗大量内存,极有可能导致 OutOfMemoryError,完全违背了流式处理以节省内存的初衷。你的代码实际上是先把整个文件加载到内存,然后再“流式”地处理这个内存中的列表,内存峰值并未降低。

高效读取之道

要真正实现内存高效的文件处理,关键在于保持从头到尾的流式特性。数据应该像水流一样,逐行(或者小批量)地通过处理管道,而不是先汇集成一个巨大的水库。

这意味着我们需要避免任何会阻塞并收集整个流的操作,比如 collectList()。我们需要找到一种方法,在流动的过程中处理每一行数据,包括区分处理表头和数据行。

方案一:纯 Reactor 流式处理

我们可以直接在 Flux<String> 流上进行操作,只在最后阶段按需缓冲。

原理:

  1. 流式读取源 : 继续使用 Flux.using 配合 Files.lines 或更底层的 BufferedReader 来创建代表文件行的 Flux<String>
  2. 分离表头 : 表头(通常是第一行)需要单独处理,并且不能阻塞数据行的流动。一种方法是先用传统方式读取第一行获取表头,然后创建一个跳过第一行Flux 来处理数据行。
  3. 逐行转换 : 对数据行的 Flux<String> 应用 map 操作,使用之前获取的表头将每一行字符串解析并转换为 Map<String, String>
  4. 按需缓冲 : 在得到 Flux<Map<String, String>> 之后,使用 .buffer(10) 操作符将 Map 按 10 个一组进行缓冲,形成 Flux<List<Map<String, String>>>
  5. 资源管理 : Flux.using 确保文件资源(如 Stream 或 Reader)在使用完毕后能被正确关闭。

代码示例 (基于 Files.lines().skip(1)):

import reactor.core.publisher.Flux;
import com.google.common.base.Splitter; // 假设继续使用 Guava Splitter

import java.io.BufferedReader;
import java.io.File;
import java.io.IOException;
import java.nio.file.Files;
import java.nio.file.Path;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.regex.Pattern;
import java.util.stream.Stream;
import org.slf4j.Logger; // 引入日志库
import org.slf4j.LoggerFactory;
import static org.apache.commons.lang3.Validate.notBlank; // 假设使用 Apache Commons Lang 校验

// ... other imports

public class CsvProcessor {

    private static final Logger log = LoggerFactory.getLogger(CsvProcessor.class);

    public Flux<List<Map<String, String>>> processCsvFileStream(File csvFile, String delimiter) {
        notBlank(csvFile.getPath(), "文件路径不能为空");
        notBlank(delimiter, "分隔符不能为空");

        if (!csvFile.exists() || !csvFile.isFile() || !csvFile.canRead()) {
             log.atError().addKeyValue("csvFile", csvFile.getPath()).log("CSV 文件无法访问或读取");
             return Flux.error(new IOException("无法读取文件: " + csvFile.getPath()));
        }

        Path path = csvFile.toPath();
        // 使用 try-with-resources 安全地读取第一行获取表头
        List<String> headers;
        try (BufferedReader reader = Files.newBufferedReader(path)) {
            String headerLine = reader.readLine();
            if (headerLine == null) {
                log.atWarn().addKeyValue("csvFile", csvFile.getName()).log("CSV 文件为空或只有表头");
                return Flux.empty(); // 文件为空或只有一行,返回空 Flux
            }
            // 清理可能的尾部重复分隔符
            String cleanedHeaderLine = headerLine.replaceAll(Pattern.quote(delimiter) + "+
import reactor.core.publisher.Flux;
import com.google.common.base.Splitter; // 假设继续使用 Guava Splitter

import java.io.BufferedReader;
import java.io.File;
import java.io.IOException;
import java.nio.file.Files;
import java.nio.file.Path;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.regex.Pattern;
import java.util.stream.Stream;
import org.slf4j.Logger; // 引入日志库
import org.slf4j.LoggerFactory;
import static org.apache.commons.lang3.Validate.notBlank; // 假设使用 Apache Commons Lang 校验

// ... other imports

public class CsvProcessor {

    private static final Logger log = LoggerFactory.getLogger(CsvProcessor.class);

    public Flux<List<Map<String, String>>> processCsvFileStream(File csvFile, String delimiter) {
        notBlank(csvFile.getPath(), "文件路径不能为空");
        notBlank(delimiter, "分隔符不能为空");

        if (!csvFile.exists() || !csvFile.isFile() || !csvFile.canRead()) {
             log.atError().addKeyValue("csvFile", csvFile.getPath()).log("CSV 文件无法访问或读取");
             return Flux.error(new IOException("无法读取文件: " + csvFile.getPath()));
        }

        Path path = csvFile.toPath();
        // 使用 try-with-resources 安全地读取第一行获取表头
        List<String> headers;
        try (BufferedReader reader = Files.newBufferedReader(path)) {
            String headerLine = reader.readLine();
            if (headerLine == null) {
                log.atWarn().addKeyValue("csvFile", csvFile.getName()).log("CSV 文件为空或只有表头");
                return Flux.empty(); // 文件为空或只有一行,返回空 Flux
            }
            // 清理可能的尾部重复分隔符
            String cleanedHeaderLine = headerLine.replaceAll(Pattern.quote(delimiter) + "+$", "");
            headers = Splitter.on(delimiter)
                               .trimResults()
                               .splitToList(cleanedHeaderLine);
        } catch (IOException e) {
            log.atError()
                .addKeyValue("csvFile", csvFile.getName())
                .setCause(e)
                .log("读取 CSV 文件头时出错");
            return Flux.error(new RuntimeException("读取 CSV 文件头时出错", e));
        }

        // 创建数据行的流,跳过已读取的表头行
        return Flux.using(
                () -> Files.lines(path), // 每次重新打开流以跳过
                stream -> Flux.fromStream(stream.skip(1)), // 从流创建 Flux 并跳过第一行
                Stream::close // 确保流被关闭
            )
            .map(line -> {
                // 清理可能的尾部重复分隔符
                String cleanedLine = line.replaceAll(Pattern.quote(delimiter) + "+$", "");
                List<String> values = Splitter.on(delimiter)
                                             .trimResults()
                                             .splitToList(cleanedLine);
                Map<String, String> outputMap = new HashMap<>();
                for (int i = 0; i < headers.size(); i++) {
                    // 如果值列表长度小于表头长度,用空字符串填充
                    outputMap.put(headers.get(i), i < values.size() ? values.get(i) : "");
                }
                return outputMap;
            })
            .buffer(10) // 在这里缓冲 Map,而不是行字符串
            // .publishOn(Schedulers.boundedElastic()) // 可选:如果转换逻辑CPU密集,或下游处理慢,切换调度器
            .onErrorResume(e -> {
                log.atError()
                        .addKeyValue("csvFile", csvFile.getName())
                        .setCause(e)
                        .log("处理 CSV 数据行时出错");
                // 根据需要决定是继续抛出错误还是返回一个空的 Flux 或特定错误信号
                return Flux.error(new RuntimeException("处理 CSV 数据行时出错", e));
            });
    }

    // ... 可能的辅助方法或主方法调用示例 ...
}
quot;
, ""); headers = Splitter.on(delimiter) .trimResults() .splitToList(cleanedHeaderLine); } catch (IOException e) { log.atError() .addKeyValue("csvFile", csvFile.getName()) .setCause(e) .log("读取 CSV 文件头时出错"); return Flux.error(new RuntimeException("读取 CSV 文件头时出错", e)); } // 创建数据行的流,跳过已读取的表头行 return Flux.using( () -> Files.lines(path), // 每次重新打开流以跳过 stream -> Flux.fromStream(stream.skip(1)), // 从流创建 Flux 并跳过第一行 Stream::close // 确保流被关闭 ) .map(line -> { // 清理可能的尾部重复分隔符 String cleanedLine = line.replaceAll(Pattern.quote(delimiter) + "+
import reactor.core.publisher.Flux;
import com.google.common.base.Splitter; // 假设继续使用 Guava Splitter

import java.io.BufferedReader;
import java.io.File;
import java.io.IOException;
import java.nio.file.Files;
import java.nio.file.Path;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.regex.Pattern;
import java.util.stream.Stream;
import org.slf4j.Logger; // 引入日志库
import org.slf4j.LoggerFactory;
import static org.apache.commons.lang3.Validate.notBlank; // 假设使用 Apache Commons Lang 校验

// ... other imports

public class CsvProcessor {

    private static final Logger log = LoggerFactory.getLogger(CsvProcessor.class);

    public Flux<List<Map<String, String>>> processCsvFileStream(File csvFile, String delimiter) {
        notBlank(csvFile.getPath(), "文件路径不能为空");
        notBlank(delimiter, "分隔符不能为空");

        if (!csvFile.exists() || !csvFile.isFile() || !csvFile.canRead()) {
             log.atError().addKeyValue("csvFile", csvFile.getPath()).log("CSV 文件无法访问或读取");
             return Flux.error(new IOException("无法读取文件: " + csvFile.getPath()));
        }

        Path path = csvFile.toPath();
        // 使用 try-with-resources 安全地读取第一行获取表头
        List<String> headers;
        try (BufferedReader reader = Files.newBufferedReader(path)) {
            String headerLine = reader.readLine();
            if (headerLine == null) {
                log.atWarn().addKeyValue("csvFile", csvFile.getName()).log("CSV 文件为空或只有表头");
                return Flux.empty(); // 文件为空或只有一行,返回空 Flux
            }
            // 清理可能的尾部重复分隔符
            String cleanedHeaderLine = headerLine.replaceAll(Pattern.quote(delimiter) + "+$", "");
            headers = Splitter.on(delimiter)
                               .trimResults()
                               .splitToList(cleanedHeaderLine);
        } catch (IOException e) {
            log.atError()
                .addKeyValue("csvFile", csvFile.getName())
                .setCause(e)
                .log("读取 CSV 文件头时出错");
            return Flux.error(new RuntimeException("读取 CSV 文件头时出错", e));
        }

        // 创建数据行的流,跳过已读取的表头行
        return Flux.using(
                () -> Files.lines(path), // 每次重新打开流以跳过
                stream -> Flux.fromStream(stream.skip(1)), // 从流创建 Flux 并跳过第一行
                Stream::close // 确保流被关闭
            )
            .map(line -> {
                // 清理可能的尾部重复分隔符
                String cleanedLine = line.replaceAll(Pattern.quote(delimiter) + "+$", "");
                List<String> values = Splitter.on(delimiter)
                                             .trimResults()
                                             .splitToList(cleanedLine);
                Map<String, String> outputMap = new HashMap<>();
                for (int i = 0; i < headers.size(); i++) {
                    // 如果值列表长度小于表头长度,用空字符串填充
                    outputMap.put(headers.get(i), i < values.size() ? values.get(i) : "");
                }
                return outputMap;
            })
            .buffer(10) // 在这里缓冲 Map,而不是行字符串
            // .publishOn(Schedulers.boundedElastic()) // 可选:如果转换逻辑CPU密集,或下游处理慢,切换调度器
            .onErrorResume(e -> {
                log.atError()
                        .addKeyValue("csvFile", csvFile.getName())
                        .setCause(e)
                        .log("处理 CSV 数据行时出错");
                // 根据需要决定是继续抛出错误还是返回一个空的 Flux 或特定错误信号
                return Flux.error(new RuntimeException("处理 CSV 数据行时出错", e));
            });
    }

    // ... 可能的辅助方法或主方法调用示例 ...
}
quot;
, ""); List<String> values = Splitter.on(delimiter) .trimResults() .splitToList(cleanedLine); Map<String, String> outputMap = new HashMap<>(); for (int i = 0; i < headers.size(); i++) { // 如果值列表长度小于表头长度,用空字符串填充 outputMap.put(headers.get(i), i < values.size() ? values.get(i) : ""); } return outputMap; }) .buffer(10) // 在这里缓冲 Map,而不是行字符串 // .publishOn(Schedulers.boundedElastic()) // 可选:如果转换逻辑CPU密集,或下游处理慢,切换调度器 .onErrorResume(e -> { log.atError() .addKeyValue("csvFile", csvFile.getName()) .setCause(e) .log("处理 CSV 数据行时出错"); // 根据需要决定是继续抛出错误还是返回一个空的 Flux 或特定错误信号 return Flux.error(new RuntimeException("处理 CSV 数据行时出错", e)); }); } // ... 可能的辅助方法或主方法调用示例 ... }

关键点:

  • 先读表头 : 通过 BufferedReader 单独、安全地读取第一行。
  • skip(1) : 在 Files.lines() 创建的 Stream 上调用 skip(1),确保 Flux 只包含数据行。
  • 流式转换 : map 操作现在作用于每一条数据行 String,将其转换为 Map。整个文件内容不会同时加载到内存。
  • 末端缓冲 : buffer(10) 在所有转换完成后执行,只收集少量的 Map 对象,内存占用可控。

安全与健壮性建议:

  • 文件存在性和权限 : 在尝试读取前,检查文件是否存在、是否是文件以及是否有读取权限。
  • 空文件处理 : 处理空文件或只有表头的文件(如示例中返回 Flux.empty())。
  • 行数据健壮性 : Splitter 对于标准 CSV 可能不够健壮(例如,字段包含引号或分隔符)。对于复杂的 CSV,考虑方案二。当前代码通过 i < values.size() 简单处理了列数不匹配的情况,更复杂的场景可能需要更精细的错误处理或填充逻辑。
  • 字符编码 : Files.lines(path) 使用默认 UTF-8 编码。如果文件编码不同,需使用 Files.lines(path, Charset.forName("Your-Encoding"))

进阶使用技巧:

  • IO 线程 : 文件读取是 IO 密集型操作。默认情况下,Files.lines 可能在调用线程执行。对于需要非阻塞行为的应用(如 Web 服务器),可以将整个 Flux 链(或至少 IO 部分)通过 subscribeOn(Schedulers.boundedElastic()) 切换到适合 IO 操作的线程池执行,避免阻塞主线程或事件循环线程。
  • 背压(Backpressure) : Reactor 的 Flux 原生支持背压。这意味着如果下游消费者(.buffer(10) 或最终的订阅者)处理速度跟不上文件读取和转换的速度,Flux 会自动向上游传递信号,减缓数据产生速度,防止内存因过快的生产者而堆积。buffer(10) 本身也会应用背压。

方案二:拥抱专业 CSV 库

手动解析 CSV 容易出错,特别是当文件包含带引号的字段、字段内换行、转义字符等复杂情况时。使用成熟的 CSV 解析库通常是更稳妥、更省心的选择。

原理:

  1. 引入库 : 添加像 Apache Commons CSV, OpenCSV 或 Jackson Dataformat CSV 这样的库到项目依赖。
  2. 创建解析器 : 这些库通常提供一个能逐条读取记录的解析器(Parser),并能自动处理表头和复杂的 CSV 格式。
  3. 集成 Reactor : 解析器通常是可迭代的(Iterable<CSVRecord>)或提供类似流的API。可以用 Flux.fromIterable()Flux.generate() 将其接入 Reactor 流。
  4. 资源管理 : 依然使用 Flux.using 来管理 CSV 解析器及其底层的 Reader,确保它们在使用后关闭。
  5. 转换与缓冲 : 将库提供的记录对象(如 CSVRecord)映射成 Map<String, String>,然后使用 .buffer(10)

代码示例 (使用 Apache Commons CSV):

首先,添加依赖 (以 Maven 为例):

<dependency>
    <groupId>org.apache.commons</groupId>
    <artifactId>commons-csv</artifactId>
    <version>1.10.0</version> <!-- 使用最新稳定版本 -->
</dependency>

然后,修改处理逻辑:

import org.apache.commons.csv.CSVFormat;
import org.apache.commons.csv.CSVParser;
import org.apache.commons.csv.CSVRecord;
import reactor.core.publisher.Flux;
// ... 其他必要的 import ...
import java.io.File;
import java.io.FileReader;
import java.io.IOException;
import java.io.Reader;
import java.util.List;
import java.util.Map;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import static org.apache.commons.lang3.Validate.notBlank;


public class CsvProcessorWithLibrary {

    private static final Logger log = LoggerFactory.getLogger(CsvProcessorWithLibrary.class);

    public Flux<List<Map<String, String>>> processCsvFileWithLibrary(File csvFile, char delimiter) {
        notBlank(csvFile.getPath(), "文件路径不能为空");
        // Delimiter is now char for Commons CSV

        if (!csvFile.exists() || !csvFile.isFile() || !csvFile.canRead()) {
             log.atError().addKeyValue("csvFile", csvFile.getPath()).log("CSV 文件无法访问或读取");
             return Flux.error(new IOException("无法读取文件: " + csvFile.getPath()));
        }

        // 定义 CSV 格式
        CSVFormat csvFormat = CSVFormat.DEFAULT.builder()
                .setDelimiter(delimiter)
                .setHeader() // 告诉库第一行是表头,自动使用
                .setSkipHeaderRecord(true) // 读取时跳过表头行本身
                .setTrim(true) // 去除值两端空白
                .setIgnoreEmptyLines(true) // 忽略空行
                .build();

        // 使用 Flux.using 管理资源
        return Flux.using(
                () -> { // 资源供应商:创建 Reader 和 CSVParser
                    Reader reader = new FileReader(csvFile); // 注意:指定文件编码是个好习惯
                    // CSVParser parser = CSVParser.parse(reader, csvFormat);
                    // 或者更精细控制 Reader 的创建,比如指定 Charset:
                    // Reader reader = Files.newBufferedReader(csvFile.toPath(), StandardCharsets.UTF_8);
                    return CSVParser.parse(reader, csvFormat);
                },
                parser -> Flux.fromIterable(parser), // 从 Iterable<CSVRecord> 创建 Flux
                parser -> { // 资源清理器:关闭 parser (它会关闭底层的 Reader)
                    try {
                        parser.close();
                    } catch (IOException e) {
                        log.atWarn()
                            .addKeyValue("csvFile", csvFile.getName())
                            .setCause(e)
                            .log("关闭 CSVParser 时发生错误");
                        // 根据策略决定是否需要抛出异常影响流
                    }
                }
            )
            .map(CSVRecord::toMap) // CSVRecord 可以直接转为 Map<String, String>
            .buffer(10)
            // .publishOn(Schedulers.boundedElastic()) // 同样可选
            .onErrorResume(e -> {
                log.atError()
                    .addKeyValue("csvFile", csvFile.getName())
                    .setCause(e)
                    .log("使用库处理 CSV 数据时出错");
                // 可能需要区分是解析错误还是 IO 错误
                return Flux.error(new RuntimeException("处理 CSV 数据时出错 (使用库)", e));
            });
    }

     // ... 可能的辅助方法或主方法调用示例 ...
}

关键点:

  • 库的便利 : Apache Commons CSV 负责了复杂的解析逻辑,包括自动处理表头、引号、转义等。
  • 资源管理 : Flux.using 优雅地管理了 CSVParser 和它包装的 Reader 的生命周期。
  • Flux.fromIterable : 这是将实现了 Iterable 接口的 CSV 解析器接入 Reactor 流的关键。
  • CSVRecord.toMap() : 直接将解析出的记录转换为所需的 Map 结构,非常方便。

安全与健壮性建议:

  • 依赖管理 : 引入了外部库,需要管理其版本和潜在冲突。
  • 编码 : 务必确认 CSV 文件的实际编码,并在创建 Reader 时明确指定(如 Files.newBufferedReader(path, StandardCharsets.UTF_8)new InputStreamReader(new FileInputStream(file), "GBK")),否则可能出现乱码。Apache Commons CSV 的 CSVParser.parse(File, Charset, CSVFormat) 是更直接的方式。
  • 错误处理 : 库可能会在解析非法格式的行时抛出特定的异常,需要在 onErrorResume 中适当处理。

进阶使用技巧:

  • 灵活配置 : CSV 库通常提供丰富的配置选项,如不同的引号策略、是否忽略空行、注释标记等,可以根据具体文件格式调整 CSVFormat
  • 性能 : 对于超大文件,某些库(如 univocity-parsers)以高性能著称,可以作为备选方案进行基准测试比较。

不止步于内存:性能考量

虽然上述方案解决了内存瓶颈,但整体性能还受其他因素影响:

  • 磁盘 I/O : 文件读取速度是基础。快速的存储(如 SSD)会有帮助。
  • CPU : 解析字符串、创建 Map 对象、可能的正则表达式操作都会消耗 CPU。如果转换逻辑复杂,这可能成为新的瓶颈。使用高效的库(如方案二)通常比手动解析(方案一的简单 Splitter)在复杂场景下更高效。
  • 缓冲大小 : .buffer(10) 中的 10 是一个权衡。太小可能导致下游处理单元频繁触发但每次处理量少;太大则会增加单批次的内存占用(虽然远小于整个文件)。需要根据下游处理能力和内存限制进行调整。
  • 调度器 : 如前所述,使用 subscribeOn(Schedulers.boundedElastic()) 处理 I/O 和/或 publishOn() 切换计算密集型任务的执行线程,可以提高应用的响应性和吞吐量。

选择哪种方案取决于你的具体需求:如果 CSV 格式简单可控,且不想引入额外依赖,方案一(纯 Reactor 流式处理) 是个不错的起点。如果 CSV 文件结构复杂多变,或者追求代码简洁性和健壮性,强烈推荐方案二(使用专业 CSV 库)

无论选择哪种,核心思想都是一致的:保持数据流的流动性,避免一次性加载整个文件内容 ,这样才能真正有效地处理大型文件,告别内存溢出的烦恼。