问题分析
将查询条件subList分为70个一组,通过CompletableFuture执行异步多线程分批次查询数据库,查询完成后在whenCompleteAsync方法中将结果存储在resultList中。
诡异的情况发生了,查询出来的结果resultList中有10000个数据,其中70 * n个为null的对象
try {//分页的参数 每个大小70 总大小100000List<List<Long>> subList = ......;//结果集List<QueryEntity> resultList = new ArrayList<>();//全流式处理转换成CompletableFuture[]+组装成一个无返回值CompletableFuture,join等待执行完毕。返回结果whenComplete获取CompletableFuture[] cfs = subList.stream().map(list -> CompletableFuture.supplyAsync(() -> querySQLList(list), threadPoolTaskExecutor).whenCompleteAsync((v, e) -> {resultList.addAll(v);})).toArray(CompletableFuture[]::new);//等待总任务完成,但是封装后无返回值,必须自己whenComplete()获取CompletableFuture.allOf(cfs).join();log.info("结果集中空对象的个数: " + resultList.stream().filter(Objects::isNull).count());//结果集中空对象的个数: 70} catch (Exception e) {throw new RuntimeException("异常", e);
}
问题排查
猜测1:查询数据库的问题
查询数据库的过程中,因为网络或者其他原因导致查询出来的结果为null
List<QueryEntity> querySQLList(List<Long> paramList) {List<QueryEntity> resultList = this.sqlMapper.querySQL(paramList);log.info("结果集中空对象的个数: " + resultList.stream().filter(Objects::isNull).count());//结果集中空对象的个数: 0return resultList;
}
执行方法后,日志打印的空对象个数都是0,所以这一步能查到数据
猜测2: whenCompleteAsync方法的问题
怀疑是whenCompleteAsync方法接受的参数为List中全是空参数,于是在方法中增加日志
CompletableFuture[] cfs = subList.stream().map(list -> CompletableFuture.supplyAsync(() -> querySQLList(list), threadPoolTaskExecutor).whenCompleteAsync((v, e) -> {resultList.addAll(v);log.info("结果集中空对象的个数: " + resultList.stream().filter(Objects::isNull).count());//结果集中空对象的个数: 0})).toArray(CompletableFuture[]::new);//等待总任务完成,但是封装后无返回值,必须自己whenComplete()获取CompletableFuture.allOf(cfs).join();
但是结果也是空对象个数都是0
最终原因
因为是异步多线程,所有有可能会产生并发问题。因为数据大小为10000,一个数据不多也不少,所以一开始没想到是并发导致的问题。
CopyOnWriteArrayList是线程安全的List,替换ArrayList后,就没有发生过List中为空的问题了
//结果集
List<QueryEntity> resultList = new CopyOnWriteArrayList<>();
//....分片查询