返回

如何用Java Stream合并不同数据源的DTO对象?

java

如何使用 Java Stream 合并不同数据源的 DTO 对象属性

在实际开发中,从不同数据源获取数据并整合到统一的视图是常见的需求。本文将探讨如何利用 Java Stream 的强大功能,高效地将来自不同数据库结果集的数据映射到同一个 DTO 对象。

问题场景

假设我们有一个 ItemDTO 对象,需要从 BigQuery 和 DB2 两个数据库中获取数据并填充其属性。BigQuery 数据库包含商品编号 (itemNumber)、商品名称 (itemName)、邮编 (postcode) 信息,而 DB2 数据库包含商品编号 (itemNumber)、地点 (location) 和价格 (price) 信息。

我们的目标是将这两个数据源的数据合并到同一个 ItemDTO 对象列表中,最终结果应包含所有字段的有效值。

代码示例分析

你提供的代码示例中,使用 filteranyMatch 方法尝试匹配来自两个数据源的 itemNumber 字段,并将匹配的结果收集到 resultList 中。

List<ItemDTO> resultList = db2List.stream()
                .filter(a -> bqList.stream().anyMatch(item -> item.getItem_nbr().equals(a.getItem_nbr())))
                .collect(Collectors.toList());

这段代码存在一个问题:它只保留了第一个数据源 (BigQuery) 中的字段值,而第二个数据源 (DB2) 中的字段值则被忽略了。这显然不符合我们的预期。

解决方案

为了实现将两个数据源的字段合并到同一个 ItemDTO 对象的目标,我们需要更强大的 Stream 操作,例如 flatMapgroupingBy,以及自定义的合并逻辑。以下是一种可行的解决方案:

import java.util.Collection;
import java.util.List;
import java.util.Map;
import java.util.stream.Collectors;
import java.util.stream.Stream;

public class ItemDTOMerger {

    public static class ItemDTO {
        String itemNumber;
        String itemName;
        int postcode;
        String location;
        String price;

        // 构造函数、Getter 和 Setter 方法
    }

    public static List<ItemDTO> mergeItemDTOs(List<ItemDTO> bigQueryResults, List<ItemDTO> db2Results) {
        // 将两个数据源的 Stream 合并
        Map<String, ItemDTO> mergedMap = Stream.of(bigQueryResults, db2Results)
                .flatMap(Collection::stream)
                // 根据 itemNumber 分组
                .collect(Collectors.groupingBy(ItemDTO::getItemNumber,
                        // 使用自定义的合并逻辑
                        Collectors.collectingAndThen(Collectors.toList(), list -> mergeItems(list))));

        // 将合并后的 Map 转换为 List
        return mergedMap.values().stream().toList();
    }

    // 自定义的合并逻辑
    private static ItemDTO mergeItems(List<ItemDTO> items) {
        ItemDTO mergedItem = new ItemDTO();
        items.forEach(item -> {
            if (item.getItemNumber() != null) mergedItem.setItemNumber(item.getItemNumber());
            if (item.getItemName() != null) mergedItem.setItemName(item.getItemName());
            if (item.getPostcode() != 0) mergedItem.setPostcode(item.getPostcode());
            if (item.getLocation() != null) mergedItem.setLocation(item.getLocation());
            if (item.getPrice() != null) mergedItem.setPrice(item.getPrice());
        });
        return mergedItem;
    }
}

代码解读

让我们逐步分析这段代码是如何工作的:

  1. 合并 Stream: 首先,我们使用 Stream.of() 将两个数据源的 List 转换为 Stream,然后使用 flatMap() 将它们合并成一个包含所有 ItemDTO 对象的 Stream。

  2. 分组: 我们使用 collect(Collectors.groupingBy()) 方法根据 itemNumber 字段对合并后的 Stream 进行分组。分组的结果是一个 Map<String, List<ItemDTO>> 结构,其中 key 是 itemNumber,value 是具有相同 itemNumberItemDTO 对象列表。

  3. 自定义合并逻辑: 为了将每个分组内的多个 ItemDTO 对象合并成一个,我们使用 Collectors.collectingAndThen() 方法对每个分组应用自定义的 mergeItems() 方法。mergeItems() 方法遍历分组内的所有 ItemDTO 对象,并将非空字段值合并到一个新的 ItemDTO 对象中。

  4. 转换为 List: 最后,我们将合并后的 Map 的 values 转换为最终的 List<ItemDTO> 对象。

常见问题

以下是使用 Java Stream 合并 DTO 对象属性时,开发者常遇到的问题及其解答:

1. 为什么需要使用 flatMap 方法?

flatMap 方法用于将多个 Stream 合并成一个 Stream。在本例中,我们需要将来自 BigQuery 和 DB2 两个数据源的 Stream 合并成一个包含所有 ItemDTO 对象的 Stream,以便后续进行分组和合并操作。

2. groupingBy 方法的作用是什么?

groupingBy 方法用于根据指定的字段对 Stream 进行分组。在本例中,我们使用 itemNumber 字段对合并后的 Stream 进行分组,以便将具有相同 itemNumberItemDTO 对象放在一起。

3. 如何自定义合并逻辑?

可以通过实现 BinaryOperator<ItemDTO> 接口或使用 lambda 表达式来自定义合并逻辑。在本例中,我们定义了一个名为 mergeItems() 的方法,该方法接受一个 List<ItemDTO> 对象作为参数,并返回一个合并后的 ItemDTO 对象。

4. 如何处理字段值冲突?

在合并 ItemDTO 对象时,可能会遇到来自不同数据源的字段值冲突的情况。例如,BigQuery 中的 itemName 字段值可能与 DB2 中的 itemName 字段值不同。在这种情况下,需要根据业务需求定义明确的冲突解决策略。例如,可以优先使用来自某个特定数据源的字段值,或者将多个字段值拼接成一个字符串。

5. 如何提高合并效率?

如果数据量较大,可以使用并行流来提高合并效率。可以通过调用 parallelStream() 方法将 Stream 转换为并行流。

希望本文能够帮助你理解如何使用 Java Stream 合并不同数据源的 DTO 对象属性。这种方法清晰简洁,易于理解和维护,能够有效解决实际开发中遇到的数据合并问题。