Parallel Stream非线程安全
数据丢失
- 文件中读取1000用户信息
- 获取其中UserId
使用并发流,存在线程资源抢占,导致数据丢失
public static void main(String[] args) { UserDataSample userDataSample = new UserDataSample(); List<UserDataSample.User> users = userDataSample.loadUser(); // 1000 System.out.println(users.size()); // Loop 10 times for (int i = 0; i < 10; i++) { List<String> ids = new ArrayList<>(); users.parallelStream() .forEach( user -> { ids.add(user.getUserId()); }); // should be 1000 System.out.println(ids.size()); } }
运行结果
/Library/Java/JavaVirtualMachines/jdk-17.0.4.1.jdk... 1000 970 814 800 677 704 654 577 629 559 714
方案一:使用Stream Collect
如下:
/**
* collection
*
* @param users users
* @author Created by ivan
* @since 2024/5/7 11:31
*/
public static void collection(List<UserDataSample.User> users) {
List<String> ids = new ArrayList<>();
ids = users.parallelStream()
.map(UserDataSample.User::getUserId)
.collect(Collectors.toList());
// should be 1000
System.out.println(ids.size());
}
方案二:使用线程安全对象
/**
* syncList
*
* <p>写 > 读, 使用Collections.synchronizedList(new ArrayList<>())
*
* <p>读 > 写, 使用new CopyOnWriteArrayList<>();
*
* @param users users
* @author Created by ivan
* @since 2024/5/7 11:35
*/
public static void syncList(List<UserDataSample.User> users) {
List<String> ids = Collections.synchronizedList(new ArrayList<>());
users.parallelStream()
.forEach(
user -> {
ids.add(user.getUserId());
});
// should be 1000
System.out.println(ids.size());
}
最终代码
public static void main(String[] args) {
UserDataSample userDataSample = new UserDataSample();
List<UserDataSample.User> users = userDataSample.loadUser();
// 1000
System.out.println(users.size());
System.out.println("--- Start ---");
// Loop 10 times
for (int i = 0; i < 10; i++) {
System.out.println("--- Looping ---");
wrong(users);
collection(users);
syncList(users);
}
}
/**
* wrong
*
* @param users users
* @author Created by ivan
* @since 2024/5/7 11:28
*/
public static void wrong(List<UserDataSample.User> users) {
List<String> ids = new ArrayList<>();
users.parallelStream()
.forEach(
user -> {
ids.add(user.getUserId());
});
// should be 1000
System.out.println(ids.size());
}
/**
* collection
*
* @param users users
* @author Created by ivan
* @since 2024/5/7 11:31
*/
public static void collection(List<UserDataSample.User> users) {
List<String> ids = new ArrayList<>();
ids = users.parallelStream()
.map(UserDataSample.User::getUserId)
.collect(Collectors.toList());
// should be 1000
System.out.println(ids.size());
}
/**
* syncList
*
* <p>写 > 读, 使用Collections.synchronizedList(new ArrayList<>())
*
* <p>读 > 写, 使用new CopyOnWriteArrayList<>();
*
* @param users users
* @author Created by ivan
* @since 2024/5/7 11:35
*/public static void syncList(List<UserDataSample.User> users) {
List<String> ids = Collections.synchronizedList(new ArrayList<>());
users.parallelStream()
.forEach(
user -> {
ids.add(user.getUserId());
});
// should be 1000
System.out.println(ids.size());
}
执行结果
1000
--- Start ---
--- Looping ---
972
1000
1000
--- Looping ---
702
1000
1000
--- Looping ---
807
1000
1000
--- Looping ---
827
1000
1000
--- Looping ---
597
1000
1000
--- Looping ---
741
1000
1000
--- Looping ---
919
1000
1000
--- Looping ---
884
1000
1000
--- Looping ---
919
1000
1000
--- Looping ---
1000
1000
1000
性能比对
主要因子:元素数量
元素过少,并行流需要线程开启/切分/调度等,性能不如直接使用Stream
元素过多,toList收集并行元素存在瓶颈,可替换为线程安全的集合来替换
本文由 Ivan Dong 创作,采用 知识共享署名4.0 国际许可协议进行许可
本站文章除注明转载/出处外,均为本站原创或翻译,转载前请务必署名
最后编辑时间为: May 7, 2024 at 03:51 am