云计算百科
云计算领域专业知识百科平台

List ToMap优化优化再优化到极致

请对以下代码进行优化!

看着是不是很简单?你是不是日常也会这么写?

当前使用的是java21,

//1-统计blockName出现的次数
ConcurrentHashMap<String, Integer> map = new ConcurrentHashMap<>();
stockList.parallelStream().forEach(st -> {
String blockName = st.getBlock_name();
if (map.containsKey(blockName)) {
map.put(blockName, map.get(blockName) + 1);
} else {
map.put(blockName, 1);
}
});
//2-保存blockName出现的次数
stockList.parallelStream().forEach(st -> {
String blockName = st.getBlock_name();
st.setFinance_type(String.valueOf(map.get(blockName)));
});

优化一

ConcurrentHashMap 和 parallelStream 来统计 stockList 中每个 blockName 的出现次数,并将统计结果设置到每个 stock 对象的 finance_type 属性中。虽然代码功能正确,但存在以下优化空间:

  • 重复遍历问题:代码使用了两个 parallelStream().forEach 分别处理计数和赋值,增加了开销。可以合并为一次遍历。

  • 线程安全改进:ConcurrentHashMap 的 containsKey 和 get 组合不是原子操作,可能导致并发问题。可以使用 compute 或 merge 方法来确保线程安全。

  • 性能优化:parallelStream 在小数据集上可能引入不必要的线程开销,建议根据数据规模选择是否并行。

  • 代码简洁性:可以使用 Collectors.groupingBy 简化计数逻辑。

  • 以下是优化后的代码:

    ConcurrentHashMap<String, Integer> map = new ConcurrentHashMap<>();
    // 统计 blockName 出现次数
    stockList.parallelStream()
    .collect(Collectors.groupingBy(
    Stock::getBlock_name,
    Collectors.counting()))
    .forEach((blockName, count) ->
    map.put(blockName, count.intValue()));

    // 设置 finance_type
    stockList.parallelStream().forEach(st ->
    st.setFinance_type(String.valueOf(map.get(st.getBlock_name()))));

    优化点说明:

  • 单次统计:使用 Collectors.groupingBy 和 Collectors.counting 一次性完成 blockName 的计数,替代手动 forEach 和 put 操作。

  • 线程安全:collect 操作是线程安全的,生成的 Map 结果直接写入 ConcurrentHashMap,避免了 containsKey 和 get 的非原子操作。

  • 代码更简洁:groupingBy 是函数式编程风格,逻辑更清晰,减少了手动维护 map 的代码。

  • 保留并行处理:继续使用 parallelStream 保持并发性能,适合大数据量场景。

  • 进一步优化建议:

    • 并行与串行选择:如果 stockList 数据量较小(例如几百条),parallelStream 的线程切换成本可能高于单线程处理,建议测试切换为 stream() 的性能。

    • 合并操作:如果 finance_type 只依赖于计数结果,可以进一步合并计数和赋值逻辑为单次遍历,但需要权衡代码可读性。例如:

      Map<String, Long> countMap = stockList.parallelStream()
      .collect(Collectors.groupingBy(
      Stock::getBlock_name,
      Collectors.counting()));
      stockList.forEach(st ->
      st.setFinance_type(String.valueOf(countMap.get(st.getBlock_name()))));

        这里去掉了 ConcurrentHashMap,因为计数后的 Map 只读不写,无需并发支持。

      • 异常处理:确保 getBlock_name 不会返回 null,否则需添加空值检查以避免 NullPointerException。

      • 性能测试:建议对优化后的代码进行性能测试,确认在你的实际数据规模下是否提升了效率。

      注意事项:

      • 如果 stockList 非常大,parallelStream 能有效利用多核 CPU,但需确保 Stock 对象的 getBlock_name 和 setFinance_type 方法线程安全。

      • 如果 finance_type 的值仅为字符串化的计数,考虑直接在 Stock 类中维护计数逻辑,减少 Map 的使用(视业务需求而定)。

      优化二

      针对上百万条数据的场景,性能和线程安全是关键。以下是针对你的需求(ConcurrentHashMap 统计 blockName 出现次数并设置 finance_type)进一步优化的代码,重点提升性能和确保绝对线程安全,同时保持简洁性。

      优化目标

    • 性能最佳:减少遍历次数,优化并行处理,降低锁竞争。

    • 线程安全:确保所有操作在高并发下无竞争或数据不一致。

    • 适用大数量:针对百万级数据,充分利用多核 CPU,减少内存开销。

    • 优化后的代码

      ConcurrentMap<String, Long> collect1 = stockList.parallelStream().filter(st -> st.getBlock_name() != null).collect(Collectors.groupingByConcurrent(Stock::getBlock_name, Collectors.counting()));
      stockList.parallelStream().forEach(st -> {
      st.setFinance_type(String.valueOf(map.get(st.getBlock_name())));
      });

      优化点详解

    • 使用 **groupingByConcurrent**:

    • Collectors.groupingByConcurrent 专为并行流设计,内部使用 ConcurrentHashMap 存储结果,确保计数过程线程安全。

    • 相比普通 groupingBy,它减少了合并阶段的开销,适合百万级数据的高并发场景。

    • 直接生成计数结果,省去手动维护 ConcurrentHashMap 的步骤。

    • 单次计数 + 单次赋值:

    • 第一次 parallelStream 完成所有 blockName 的计数,生成 countMap。

    • 第二次 parallelStream 遍历 stockList,将计数结果设置到 finance_type。

    • 虽然仍是两次遍历,但每一步都是高度并行的,且避免了在计数阶段对 ConcurrentHashMap 的竞争性写入。

    • 线程安全保证:

    • groupingByConcurrent 确保计数过程无竞争。

    • countMap.get 是只读操作,ConcurrentHashMap 的读操作天然线程安全,无需额外同步。

    • 假设 Stock.setFinance_type 是简单的字段赋值(无复杂逻辑),则线程安全无问题。如果 setFinance_type 包含复杂逻辑,需确保其线程安全。

    • 性能优化:

    • 并行流:parallelStream 充分利用多核 CPU,适合百万级数据。Java 21 的 Fork/Join 框架会根据 CPU 核心数动态分配任务。

    • 避免锁竞争:groupingByConcurrent 内部优化了 ConcurrentHashMap 的分段锁,减少并发写入的瓶颈。

    • 内存效率:countMap 只存储每个 blockName 的计数,内存占用可控。

    • 终极优化

      针对你的需求(百万级数据、所有 finance_type 基于最终计数、性能最佳且线程安全),且考虑到系统支持异步框架(如 Java 21 的虚拟线程或 Reactor),以下是使用 CompletableFuture 和虚拟线程优化的最终代码方案。相比之前的 groupingByConcurrent 方案,异步框架可以进一步提升吞吐量,尤其在高并发场景下。代码确保 finance_type 基于最终计数值

      最终优化代码(基于 CompletableFuture 和虚拟线程)

      ·        

      //使用虚拟线程执行器
      Executor virtualThreadExecutor = Executors.newVirtualThreadPerTaskExecutor();
      //异步计数
      CompletableFuture<ConcurrentMap<String, Long>> concurrentMapCompletableFuture = CompletableFuture.supplyAsync(() -> stockList.parallelStream().filter(st -> st.getBlock_name() != null)
      .collect(Collectors.groupingByConcurrent(
      Stock::getBlock_name,
      Collectors.counting()
      ))
      , virtualThreadExecutor);
      //异步赋值
      CompletableFuture<Void> voidCompletableFuture = concurrentMapCompletableFuture.thenAcceptAsync(countMap -> stockList.parallelStream().filter(st -> st.getBlock_name() != null)
      .forEach(st -> st.setFinance_type(String.valueOf(countMap.get(st.getBlock_name())))), virtualThreadExecutor);

      //wait for all
      voidCompletableFuture.join();

      优化说明

    • 虚拟线程(Java 21):

    • 使用 Executors.newVirtualThreadPerTaskExecutor() 创建虚拟线程执行器。虚拟线程是 Java 21 的轻量级线程,创建和切换成本极低,适合高并发任务。

    • 虚拟线程允许为每个任务分配独立线程,最大化并发性,特别适合 I/O 密集或阻塞操作较少的场景(如你的计数和赋值逻辑)。

    • CompletableFuture 异步处理:

    • 将计数和赋值分为两个异步阶段:

      • countFuture:异步执行 groupingByConcurrent,生成最终计数 Map。

      • assignFuture:在计数完成后异步执行 finance_type 赋值。

    • thenAcceptAsync 确保赋值阶段等待计数完成,保证 finance_type 基于最终计数值。

    • 异步执行减少主线程阻塞,提高吞吐量。

    • 线程安全:

    • groupingByConcurrent 使用 ConcurrentHashMap,计数过程线程安全。

    • 赋值阶段只读 countMap,无竞争,Stock.setFinance_type 假设为简单字段赋值(线程安全)。

    • 虚拟线程隔离任务,降低线程切换开销。

    • 性能优化:

    • 并行流 + 虚拟线程:parallelStream 结合虚拟线程充分利用多核 CPU,适合百万级数据。

    • 异步流水线:CompletableFuture 将计数和赋值解耦,允许系统在等待计数完成时调度其他任务。

    • 低开销:虚拟线程的轻量级特性减少上下文切换成本,相比传统线程池更高效。

    • 空值处理:

    • 添加 filter(st -> st.getBlock_name() != null) 防止 blockName 为 null 导致异常。

    • 为什么优于之前的方案

      • 相比 **groupingByConcurrent****(同步并行流)**:

        • 虚拟线程减少线程池管理的开销,适合高并发任务。

        • CompletableFuture 的异步执行允许更灵活的任务调度,可能在多任务环境中提升整体吞吐量。

      • 相比 AtomicInteger** 单次遍历**:

        • 确保 finance_type 基于最终计数值(如 "44444"),符合你的业务需求。

        • 避免了 AtomicInteger 的 CAS 竞争问题,性能更稳定。

      最后执行结果比较

      怎么样,结果是不是出乎意料。

      java8如何切换到java21?

      已经都设置java21了,但是还报错

      java: 找不到符号   符号:   方法 newVirtualThreadPerTaskExecutor()   位置: 类 java.util.concurrent

      原因 没有完全设置

      1-idea里设置。

      2-pom.xml里再继续设置。

      赞(0)
      未经允许不得转载:网硕互联帮助中心 » List ToMap优化优化再优化到极致
      分享到: 更多 (0)

      评论 抢沙发

      评论前必须登录!