XXL-JOB真的要凉了?出现了一个王炸级别的分布式任务调度与计算框架?( 五 )

执行结果

XXL-JOB真的要凉了?出现了一个王炸级别的分布式任务调度与计算框架?

文章插图
并行处理器:MapReduceProcessorMapReduce 是最复杂也是最强大的一种执行器,它允许开发者完成任务的拆分 , 将子任务派发到集群中其他Worker 执行,是执行大批量处理任务的不二之?。∈迪?MapReduce 处理器需要继承 MapReduceProcessor类,具体用法如下示例代码所示:
@Slf4j@Componentpublic class MapReduceProcessorDemo implements MapReduceProcessor {@Overridepublic ProcessResult process(TaskContext context) throws Exception {final OmsLogger omsLogger = context.getOmsLogger();// 判断是否为根任务if (isRootTask()) {// 构造子任务List<SubTask> subTaskList = Lists.newLinkedList();SubTask subTask=new SubTask();subTask.setSiteId(1L);subTask.setName("iron.guo");subTaskList.add(subTask);/** 子任务的构造由开发者自己定义* eg. 现在需要从文件中读取100W个ID,并处理数据库中这些ID对应的数据,那么步骤如下:* 1. 根任务(RootTask)读取文件 , 流式拉取100W个ID,并按1000个一批的大小组装成子任务进行派发* 2. 非根任务获取子任务,完成业务逻辑的处理*/// 调用 map 方法 , 派发子任务(map 可能会失败并抛出异常,做好业务操作)map(subTaskList, "DATA_PROCESS_TASK");omsLogger.info("执行根任务-派发子任务");return new ProcessResult(true, "ROOT_PROCESS_SUCCESS");}// 非子任务 , 可根据 subTask 的类型 或 TaskName 来判断分支if (context.getSubTask() instanceof SubTask) {omsLogger.info("执行子任务开始");omsLogger.info("Get from SubTask : name is {} and id is {}",((SubTask) context.getSubTask()).getName(),((SubTask) context.getSubTask()).getSiteId());// 执行子任务,注:子任务人可以 map 产生新的子任务,可以构建任意级的 MapReduce 处理器return new ProcessResult(true, "PROCESS_SUB_TASK_SUCCESS");}return new ProcessResult(false, "UNKNOWN_BUG");}@Overridepublic ProcessResult reduce(TaskContext taskContext, List<TaskResult> taskResults) {// 所有 Task 执行结束后 , reduce 将会被执行// taskResults 保存了所有子任务的执行结果// 用法举例,统计执行结果AtomicLong successCnt = new AtomicLong(0);taskResults.forEach(tr -> {if (tr.isSuccess()) {successCnt.incrementAndGet();}});// 该结果将作为任务最终的执行结果return new ProcessResult(true, "success task num:" + successCnt.get());}// 自定义的子任务@Getter@Setter@NoArgsConstructor@AllArgsConstructorprivate static class SubTask {private Long siteId;private String name;}}执行结果
XXL-JOB真的要凉了?出现了一个王炸级别的分布式任务调度与计算框架?

文章插图
注:Map 处理器相当于 MapReduce 处理器的阉割版本(阉割了 reduce 方法),此处不再单独举例 。
工作流
XXL-JOB真的要凉了?出现了一个王炸级别的分布式任务调度与计算框架?

文章插图
点击右上角按钮 新建工作流,即可录入新的工作流,具体界面和说明如下所示 。
  • 工作流名称:名称,无实际业务用途,请尽量精简字段
  • 工作流描述:描述,无实际业务用途 , 请尽量精简字段
  • 定时信息:该工作流的触发方式的触发方式,包含时间表达式类型选择框和时间表达式输入框
CRON -> 填写 CRON 表达式(在线生成网站)
API -> 不需要填写任何参数,表明该任务由 OpenAPI 触发
  • 生命周期:定时策略生效的时间段
  • 最大实例:该工作流同时执行的数量
  • 任务依赖关系:提供编辑界面可视化操作,绘制 DAG(有向无环图),配置工作流内各个任务的依赖关系
DAG 操作指南编辑依赖关系v4.0.0 以后支持节点的自由拖拉拽,不用再点点点了,哈哈哈 ~