最近在公司的风控系统搬砖,写代码时,其中的某一个步骤是将一个大的Map(内有上万条Key,Value值)遍历一遍,逐一分析每一个Key,Value值,并进行因子解析,以便建立用户画像。其中因子解析的步骤,可能会很长,涉及到数值计算,数据库查询或者接口调用,处理速度从1ms-100ms不等,当总数据量超过一万条时,对整个系统的性能损耗非常大。
使用parallelStream
我接手到的代码是这样的:
1 | map.forEach((key, value) -> { |
既然用到了Stream来处理,于是我便使用了parallelStream来实现集合的并行处理
,只需要对Stream调用链加上parallelStream()
方法即可打开:
1 | map.entrySet().parallelStream().forEach(entry -> { |
该方法即可打开Java并行处理集合的功能,让我们来写方法验证该方法是否可以真的提高处理速度。 首先我们构建一个测试数据,一个只有大小为10的HashMap:
1 | private static final HashMap<String,Integer> map = new HashMap<String,Integer>(); |
编写两个方法(不用并行与使用并行),都输出值,并计算耗时:
1 | long start = System.currentTimeMillis(); |
执行后,得到的结果为:
1 | 0,1,2,3,4,5,6,7,8,9,1129ms |
从以上的输出可以得出的基本结论有:
- 使用parallelStream的代码确实是并行运行了,因为输出不是正序的
- 使用parallelStream确实可以在某种程度提高集合处理速度
线程安全
显而易见,parallelStream是非线程安全的,举个简单的例子:
1 | private static List<Integer> list1 = new ArrayList<>(); |
所以,我们在使用parallelStream时,需要注意线程安全的问题,该加锁的就加锁,外部调用的ArrayList,HashMap等也必须使用和其对等的线程安全类,例如:ConcurrentHashMap等。