Java parallelStream 非线程安全
in developCo-De with 0 comment

Java parallelStream 非线程安全

in developCo-De with 0 comment

Parallel Stream非线程安全

数据丢失

方案一:使用Stream Collect

如下:

/**  
 * collection 
 * 
 * @param users users  
 * @author Created by ivan  
 * @since 2024/5/7 11:31  
 */
 public static void collection(List<UserDataSample.User> users) {  
  List<String> ids = new ArrayList<>();  
  ids = users.parallelStream()
             .map(UserDataSample.User::getUserId)
             .collect(Collectors.toList());  
  // should be 1000  
  System.out.println(ids.size());  
}

方案二:使用线程安全对象


/**  
 * syncList 
 * 
 * <p>写 > 读, 使用Collections.synchronizedList(new ArrayList<>())  
 * 
 * <p>读 > 写, 使用new CopyOnWriteArrayList<>();  
 * 
 * @param users users  
 * @author Created by ivan  
 * @since 2024/5/7 11:35  
 */
 public static void syncList(List<UserDataSample.User> users) {  
  List<String> ids = Collections.synchronizedList(new ArrayList<>());  
  users.parallelStream()  
      .forEach(  
          user -> {  
            ids.add(user.getUserId());  
          });  
  // should be 1000  
  System.out.println(ids.size());  
}

最终代码

public static void main(String[] args) {  
  UserDataSample userDataSample = new UserDataSample();  
  List<UserDataSample.User> users = userDataSample.loadUser();  
  // 1000  
  System.out.println(users.size());  
  System.out.println("--- Start ---");  
  
  // Loop 10 times  
  for (int i = 0; i < 10; i++) {  
    System.out.println("--- Looping ---");  
    wrong(users);  
    collection(users);  
    syncList(users);  
  }  
}  
  
/**  
 * wrong 
 * 
 * @param users users  
 * @author Created by ivan  
 * @since 2024/5/7 11:28  
 */
 public static void wrong(List<UserDataSample.User> users) {  
  List<String> ids = new ArrayList<>();  
  users.parallelStream()  
      .forEach(  
          user -> {  
            ids.add(user.getUserId());  
          });  
  // should be 1000  
  System.out.println(ids.size());  
}  
  
/**  
 * collection 
 * 
 * @param users users  
 * @author Created by ivan  
 * @since 2024/5/7 11:31  
 */
 public static void collection(List<UserDataSample.User> users) {  
  List<String> ids = new ArrayList<>();  
  ids = users.parallelStream()
              .map(UserDataSample.User::getUserId)
              .collect(Collectors.toList());  
  // should be 1000  
  System.out.println(ids.size());  
}  
  
/**  
 * syncList 
 * 
 * <p>写 > 读, 使用Collections.synchronizedList(new ArrayList<>())  
 * 
 * <p>读 > 写, 使用new CopyOnWriteArrayList<>();  
 * 
 * @param users users  
 * @author Created by ivan  
 * @since 2024/5/7 11:35  
 */public static void syncList(List<UserDataSample.User> users) {  
  List<String> ids = Collections.synchronizedList(new ArrayList<>());  
  users.parallelStream()  
      .forEach(  
          user -> {  
            ids.add(user.getUserId());  
          });  
  // should be 1000  
  System.out.println(ids.size());  
}

执行结果

1000
--- Start ---
--- Looping ---
972
1000
1000
--- Looping ---
702
1000
1000
--- Looping ---
807
1000
1000
--- Looping ---
827
1000
1000
--- Looping ---
597
1000
1000
--- Looping ---
741
1000
1000
--- Looping ---
919
1000
1000
--- Looping ---
884
1000
1000
--- Looping ---
919
1000
1000
--- Looping ---
1000
1000
1000

性能比对

主要因子:元素数量

元素过少,并行流需要线程开启/切分/调度等,性能不如直接使用Stream

元素过多,toList收集并行元素存在瓶颈,可替换为线程安全的集合来替换

Comments are closed.