分布式任务调度
在分布式架构下,一个服务会部署多个实例来运行业务;如果在这种分布式系统环境下运行任务调度,称为分布式任务调度。
分布式任务调度框架:xxl-job
xxl-job环境搭建
本机
仓库源码:xxl-job
- 初始化调度数据库
- 修改数据库连接信息
此时启动xxl-job-admin项目,在浏览器输入
http://localhost:8080/xxl-job-admin
即可看到调度中心页面
docker
docker安装xxl-job
docker run -d \
-e PARAMS="--spring.datasource.url=jdbc:mysql://192.168.140.102:3306/xxl_job?Unicode=true&characterEncoding=UTF-8 \
--spring.datasource.username=root \
--spring.datasource.password=123" \
-p 8888:8080 -v /tmp:/data/applogs \
--name xxl-job-admin \
--restart=always \
xuxueli/xxl-job-admin:2.3.0
此时在浏览器输入
http://192.168.140.102:8888/xxl-job-admin
即可看到调度中心页面
项目集成xxl-job
- 导入xxl-job依赖
<dependency><groupId>com.xuxueli</groupId><artifactId>xxl-job-core</artifactId><version>2.3.0</version>
</dependency>
- 添加配置
@Configuration
public class XxlJobConfig {private Logger logger = LoggerFactory.getLogger(XxlJobConfig.class);@Value("${xxl.job.admin.addresses}")private String adminAddresses;@Value("${xxl.job.executor.appname}")private String appname;@Value("${xxl.job.executor.port}")private int port;@Beanpublic XxlJobSpringExecutor xxlJobExecutor() {logger.info(">>>>>>>>>>> xxl-job config init.");XxlJobSpringExecutor xxlJobSpringExecutor = new XxlJobSpringExecutor();xxlJobSpringExecutor.setAdminAddresses(adminAddresses);xxlJobSpringExecutor.setAppname(appname);xxlJobSpringExecutor.setPort(port);return xxlJobSpringExecutor;}
}
xxl:job:admin:addresses: http://192.168.140.102:8888/xxljob-adminexecutor:appname: xxl-job-executor-sample # 去“任务调度中心-执行器管理”里找port: 9999
- 任务代码
@Component
public class HelloJob {@XxlJob("demoJobHandler") // 去任务调度中心里查public void helloJob() {System.out.println("简单任务执行了");}
}
任务详解
执行器
任务绑定的执行器,任务触发调度时,会自动发现注册成功的执行器,实现任务自动发现功能。执行器也可以方便的进行任务分组,每个任务必须绑定一个执行器。
报警邮件
任务调度失败时邮件通知的邮箱地址
调度配置
一般选用CRON表达式
任务配置
- 运行模式:BEAN模式(任务以JobHandler方式维护在执行器端)
- JobHandler:运行模式为 “BEAN模式” 时生效,对应执行器中新开发的JobHandler类“@JobHandler”注解自定义的value值
路由策略
-
FIRST(第一个):固定选择第一个机器;
-
LAST(最后一个):固定选择最后一个机器;
-
ROUND(轮询):每个微服务轮询的去执行任务
-
RANDOM(随机):随机选择在线的机器;
-
CONSISTENT_HASH(一致性HASH):每个任务按照Hash算法固定选择某一台机器,且所有任务均匀散列在不同机器上。
-
LEAST_FREQUENTLY_USED(最不经常使用):使用频率最低的机器优先被选举;
-
LEAST_RECENTLY_USED(最近最久未使用):最久未使用的机器优先被选举;
-
FAILOVER(故障转移):按照顺序依次进行心跳检测,第一个心跳检测成功的机器选定为目标执行器并发起调度;
-
BUSYOVER(忙碌转移):按照顺序依次进行空闲检测,第一个空闲检测成功的机器选定为目标执行器并发起调度;
-
SHARDING_BROADCAST(分片广播):广播触发对应集群中所有机器执行一次任务,同时系统自动传递分片参数;可根据分片参数开发分片任务;
分片广播
任务路由策略选择“分片广播”的情况下,一次任务调度将会广播触发对应集群中所有执行器执行一次任务。
在同一个时间点,如果需要同时执行大量的任务,肯定是需要用到集群的,此时可以给每台服务器分配多个任务。
假设每秒10000请求,
【轮询方式】:实例A在第一秒接受了全部10000个任务并处理,而实例B空闲,第二秒则是实例B接受并处理10000个任务,A空闲
【分片广播方式】:实例A和实例B两个服务同时执行10000个任务
@Component
public class HelloJob {@XxlJob("shardingJobHandler")public void shardingJobHandler() {// 分片的参数int shardIndex = XxlJobHelper.getShardIndex(); // 当前某个分片int shardTotal = XxlJobHelper.getShardTotal(); // 总分片数// 业务逻辑List<Integer> list = getList();for(Integer i : list) {if(i % shardTotal == shardIndex) {System.out.println("当前第"+shardIndex+"分片执行了,任务项是:" + i);}}}public List getList() {ArrayList<Integer> list = new ArrayList<>();for (int i = 0; i < 10000; i++) {list.add(i);}return list;}
}
应用:热点文章定时计算
spring传统的定时任务@Scheduled,但是存在一些问题:
- 集群任务的重复执行问题
- cron表达式定义在代码中,修改不方便
- 定时任务失败了,无法重试,也没有统计
- 如果任务量过大,不能有效地分片执行
需求
计算热点文章分值业务代码
/*** 计算热点文章*/
@Override
public void computeHotArticle() {// 1. 查询前五天的文章数据Date dayParam = DateTime.now().minusDays(5).toDate();List<ApArticle> articleList = articleMapper.findArticleListByLast5days(dayParam);// 2. 计算文章的分值List<HotArticleVo> hotArticleVoList = computeArticleScore(articleList);// 3. 为每个频道缓存30条分值较高的文章cacheTagToRedis(hotArticleVoList);
}/*** 计算文章分值* @param articleList* @return*/
private List<HotArticleVo> computeArticleScore(List<ApArticle> articleList) {if (articleList == null || articleList.size() == 0) {return null;}List<HotArticleVo> hotArticleVoList = new ArrayList<>();for (ApArticle apArticle : articleList) {int score = computeScore(apArticle);HotArticleVo hotArticleVo = new HotArticleVo();BeanUtils.copyProperties(apArticle, hotArticleVo);hotArticleVo.setScore(score);hotArticleVoList.add(hotArticleVo);}return hotArticleVoList;
}/*** 计算文章的具体分值* @param apArticle* @return*/
private int computeScore(ApArticle apArticle) {Integer scere = 0;if(apArticle.getLikes() != null){scere += apArticle.getLikes() * ArticleConstants.HOT_ARTICLE_LIKE_WEIGHT;}if(apArticle.getViews() != null){scere += apArticle.getViews();}if(apArticle.getComment() != null){scere += apArticle.getComment() * ArticleConstants.HOT_ARTICLE_COMMENT_WEIGHT;}if(apArticle.getCollection() != null){scere += apArticle.getCollection() * ArticleConstants.HOT_ARTICLE_COLLECTION_WEIGHT;}return scere;
}/*** 缓存到redis中* @param hotArticleVoList*/
private void cacheTagToRedis(List<HotArticleVo> hotArticleVoList) {// 1. 为每个频道缓存30条分值较高的文章ResponseResult responseResult = wemediaClient.getChannels();if(responseResult.getCode().equals(200)) {// List<WmChannel> wmChannels = (List<WmChannel>)responseResult.getData();String channelJson = JSON.toJSONString(responseResult.getData());List<WmChannel> wmChannels = JSON.parseArray(channelJson, WmChannel.class);// 检索每个频道的文章if(wmChannels != null && wmChannels.size() > 0) {for(WmChannel wmChannel : wmChannels) {// 当前频道的数据,降序排列,最多30条List<HotArticleVo> list = hotArticleVoList.stream().filter(x -> wmChannel.getId() == x.getChannelId()).limit(30) // 最多30条.sorted(Comparator.comparing(HotArticleVo::getScore).reversed())// 给文章进行排序.collect(Collectors.toList());cacheService.set(ArticleConstants.HOT_ARTICLE_FIRST_PAGE + wmChannel.getId(), JSON.toJSONString(list));}}}// 2. 设置推荐的数据List<HotArticleVo> list = hotArticleVoList.stream().limit(30) // 最多30条.sorted(Comparator.comparing(HotArticleVo::getScore).reversed())// 给文章进行排序.collect(Collectors.toList());cacheService.set(ArticleConstants.HOT_ARTICLE_FIRST_PAGE + ArticleConstants.DEFAULT_TAG, JSON.toJSONString(list));
}
设置定时任务(凌晨两点执行一次)
- 新增执行器
- 新增任务
- 引入依赖
<dependency><groupId>com.xuxueli</groupId><artifactId>xxl-job-core</artifactId><version>2.3.0</version>
</dependency>
- 添加配置
@Configuration
public class XxlJobConfig {private Logger logger = LoggerFactory.getLogger(XxlJobConfig.class);@Value("${xxl.job.admin.addresses}")private String adminAddresses;@Value("${xxl.job.executor.appname}")private String appname;@Value("${xxl.job.executor.port}")private int port;@Beanpublic XxlJobSpringExecutor xxlJobExecutor() {logger.info(">>>>>>>>>>> xxl-job config init.");XxlJobSpringExecutor xxlJobSpringExecutor = new XxlJobSpringExecutor();xxlJobSpringExecutor.setAdminAddresses(adminAddresses);xxlJobSpringExecutor.setAppname(appname);xxlJobSpringExecutor.setPort(port);return xxlJobSpringExecutor;}
}
xxl:job:admin:addresses: http://192.168.140.102:8888/xxl-job-adminexecutor:appname: leadnews-hotarticle-executor port: 9999
- 创建任务
@Component
@Slf4j
@RequiredArgsConstructor
public class ComputeHotArticleJob {private final HotArticleService hotArticleService;@XxlJob("computeHotArticleJob")public void handle() {log.info("热文章分值计算调度任务开始执行");hotArticleService.computeHotArticle(); // 调用“计算热点文章分值业务代码”log.info("热文章分值计算调度任务执行结束");}
}
只要在查询文章列表的时候,先去判断redis中是否有数据,如果有数据,直接从redis中获取。(redis中存储的数据就是按照文章的热点分值排序后的)