ParallelStream 是 Java 8 引入的并行流处理功能,虽然能提高处理效率,但使用不当会导致各种问题。以下是常见问题及其解决方案:
问题:在并行流中使用非线程安全的集合或共享变量会导致数据不一致。
解决方案:
// 错误示例
List<Integer> list = new ArrayList<>();
IntStream.range(0, 1000).parallel().forEach(list::add);
// 正确做法1:使用线程安全集合
List<Integer> safeList = Collections.synchronizedList(new ArrayList<>());
IntStream.range(0, 1000).parallel().forEach(safeList::add);
// 正确做法2:使用collect方法
List<Integer> collectedList = IntStream.range(0, 1000)
.parallel()
.boxed()
.collect(Collectors.toList());
问题:某些操作(如limit、skip、findFirst等)在并行流中性能可能更差。
解决方案:
// 顺序流更适合顺序依赖性操作
List<Integer> result = list.stream()
.filter(i -> i > 10)
.limit(100)
.collect(Collectors.toList());
问题:并行流中修改共享变量会导致竞态条件。
解决方案:
// 错误示例
int[] sum = {0};
IntStream.range(0, 1000).parallel().forEach(i -> sum[0] += i);
// 正确做法:使用reduce或collect
int sum = IntStream.range(0, 1000)
.parallel()
.reduce(0, Integer::sum);
问题:数据量太小或任务太简单时,并行化反而降低性能。
解决方案:
// 仅在数据量大且计算复杂时使用并行流
if(list.size() > 10000) {
list.parallelStream().forEach(this::complexOperation);
} else {
list.stream().forEach(this::complexOperation);
}
问题:并行流中使用阻塞操作可能导致线程饥饿。
解决方案:
// 避免在并行流中使用阻塞IO操作
List<String> results = urls.parallelStream()
.map(url -> fetchWithTimeout(url)) // 确保有超时机制
.collect(Collectors.toList());
问题:默认使用ForkJoinPool.commonPool(),可能影响其他并行任务。
解决方案:
// 使用自定义线程池
ForkJoinPool customPool = new ForkJoinPool(4);
customPool.submit(() -> {
list.parallelStream().forEach(this::process);
}).get();
问题:并行流中异常可能被吞没或难以追踪。
解决方案:
// 使用CompletableFuture包装处理
list.parallelStream()
.map(item -> CompletableFuture.supplyAsync(() -> process(item)))
.map(future -> future.exceptionally(e -> {
log.error("Error processing", e);
return null;
}))
.collect(Collectors.toList());
ParallelStream是一个强大的工具,但"能力越大,责任越大",合理使用才能发挥其优势。