返回

Spark Structured Streaming持续写入文件技巧:避免文件过大难题

java

Spark Structured Streaming:分批持续写入文件,避免文件过大

前言

在使用 Apache Spark Structured Streaming 处理数据流时,我们经常需要将结果写入文件进行持久化。然而,当流式数据持续不断地到来时,如何防止写入的文件变得过大,从而影响性能和资源消耗呢?本文将探讨一种解决方案,利用 Spark Structured Streaming 的特性,实现分批持续写入文件,并控制文件大小,满足特定需求。

问题

假设我们有一个数据流,需要将其写入 Parquet 文件。同时,我们希望将每个文件的大小限制在特定数量的记录内,例如 5 条记录。最初,我们尝试了如下代码:

StreamingQuery query1 = joinedDF
                .writeStream()
                .foreachBatch(
                        (VoidFunction2<Dataset<Row>, Long>) (dataset, batchId) -> {
                            if (!dataset.isEmpty()) {

                                Dataset<Row> filtered = dataset.sort(col("_start_time").desc()).limit(1);
                                filtered.show();


                                filtered
                                        .selectExpr(columnNames)
                                        .write()
                                        .option("maxRecordsPerFile", 5)
                                        .mode("overwrite")
                                        .format("parquet")
                                        .partitionBy("event_date", "probe")
                                        .option("path", "./parquet_results")
                                        .option("checkpointLocation", "./path_to_checkpoint_location")
                                        .save();


                            }
                        }
                )
                .start();

但遗憾的是,此代码未能实现预期结果,它覆盖了现有的文件,并且新写入的文件仍然只包含 1 条记录。

解决思路

为了解决上述问题,我们需要采取以下步骤:

  1. 更改写入模式: 将写入模式从 overwrite 更改为 append,以便新数据追加到现有文件中,而不是覆盖它。
  2. 启用文件滚动: 使用 rollInterval 选项来启用文件滚动,指定在将新数据追加到新文件之前保留当前文件的时长或大小。
  3. 优化分区策略: 考虑根据日期或时间对数据进行分区,以将记录分布到多个文件中,避免单个文件变得太大。
  4. 监测文件大小: 使用 StreamingQueryListener 或其他工具来监测文件大小,并在达到指定阈值时采取措施,例如创建新文件或压缩现有文件。

更新后的代码

StreamingQuery query1 = joinedDF
                .writeStream()
                .foreachBatch(
                        (VoidFunction2<Dataset<Row>, Long>) (dataset, batchId) -> {
                            if (!dataset.isEmpty()) {

                                Dataset<Row> filtered = dataset.sort(col("_start_time").desc()).limit(1);
                                filtered.show();


                                filtered
                                        .selectExpr(columnNames)
                                        .write()
                                        .option("maxRecordsPerFile", 5)
                                        .mode("append")
                                        .format("parquet")
                                        .partitionBy("event_date", "probe")
                                        .option("path", "./parquet_results")
                                        .option("rollInterval", "5 minutes")
                                        .option("checkpointLocation", "./path_to_checkpoint_location")
                                        .save();


                            }
                        }
                )
                .start();

注意事项

  • 使用 append 模式时,确保已在流查询的开始部分创建目标目录。
  • rollInterval 选项以毫秒为单位,并且不应超过 maxFilesPerTrigger 选项。
  • 仔细调整文件滚动策略,以在性能和文件管理之间取得平衡。

常见问题解答

  1. 为什么覆盖模式无法正常工作?

    覆盖模式覆盖了现有文件,而不是追加新数据。这会导致现有文件被删除,而我们希望保留文件并追加新记录。

  2. 文件滚动如何工作?

    文件滚动根据指定的 rollInterval 创建新文件。例如,如果 rollInterval 设置为 5 分钟,那么每 5 分钟就创建一个新文件,并将新数据追加到该文件中。

  3. 如何优化分区策略?

    分区策略可以帮助将记录分布到多个文件中,避免单个文件变得太大。例如,我们可以根据日期或时间对数据进行分区。

  4. 如何监测文件大小?

    可以使用 StreamingQueryListener 或其他工具来监测文件大小。当文件大小达到指定阈值时,我们可以采取措施,例如创建新文件或压缩现有文件。

  5. 在使用 append 模式时需要特别注意什么?

    使用 append 模式时,我们需要确保在流查询的开始部分创建目标目录。否则,写入操作将失败。

结论

通过利用 Spark Structured Streaming 的高级特性,我们成功地解决了数据流持续写入文件时文件过大的问题。通过更改写入模式、启用文件滚动并优化分区策略,我们能够将新记录追加到现有文件中,并控制文件大小,从而提高了流式数据处理的性能和效率。