xxl-job核心源码解析

xxl-job源码解析

如何自研一个xxljob

在这里插入图片描述

在这里插入图片描述

  • 注册服务
  • 调度服务
  • RPC组件(基建,底层严重依赖)
  • 日志服务
  • 告警服务

系统架构

在这里插入图片描述

执行流程
在这里插入图片描述

各大调度中心对比

在这里插入图片描述

1)服务端启动流程

在这里插入图片描述

首先找到配置类 XxlJobAdminConfig
在这里插入图片描述

可以发现该类实现 InitializingBean接口,这里直接看 afterPropertiesSet方法即可。

    @Override
    public void afterPropertiesSet() throws Exception {
        //利用静态声明的只会加载一次的特性,初始化一个单例对象。
        adminConfig = this;

        //初始化xxjob调度器
        xxlJobScheduler = new XxlJobScheduler();
        xxlJobScheduler.init();
    }

初始化xxjob调度器

    public void init() throws Exception {
        // init i18n
        initI18n();

        // admin trigger pool start
        JobTriggerPoolHelper.toStart(); //初始化触发器线程池

        // admin registry monitor run  30秒执行一次,维护注册表信息, 判断在线超时时间90s
        JobRegistryHelper.getInstance().start();

        // admin fail-monitor run   运行事变监视器,主要失败发送邮箱,重试触发器
        JobFailMonitorHelper.getInstance().start();

        // admin lose-monitor run ( depend on JobTriggerPoolHelper ) //将丢失主机信息调度日志更改状态
        JobCompleteHelper.getInstance().start();

        // admin log report start  统计一些失败成功报表
        JobLogReportHelper.getInstance().start();

        // start-schedule  ( depend on JobTriggerPoolHelper ) 执行调度器
        JobScheduleHelper.getInstance().start();

        logger.info(">>>>>>>>> init xxl-job admin success.");
    }


该方法主要做了如下事情:

  1. init i18n
  2. 初始化触发器线程池
  3. 维护注册表信息(30秒执行一次)
  4. 将丢失主机信息调度日志更改状态
  5. 统计一些失败成功报表,删除过期日志
  6. 执行调度器

1)1:初始化触发器线程池(JobTriggerPoolHelper)

    public static void toStart() {
        helper.start();
    }

 public void start(){
        //最大200线程,最多处理1000任务
        fastTriggerPool = new ThreadPoolExecutor(
                10,
                XxlJobAdminConfig.getAdminConfig().getTriggerPoolFastMax(),
                60L,
                TimeUnit.SECONDS,
                new LinkedBlockingQueue<Runnable>(1000),
                new ThreadFactory() {
                    @Override
                    public Thread newThread(Runnable r) {
                        return new Thread(r, "xxl-job, admin JobTriggerPoolHelper-fastTriggerPool-" + r.hashCode());
                    }
                });

        //最大100线程,最多处理2000任务
        //一分钟内超时10次,则采用慢触发器执行
        slowTriggerPool = new ThreadPoolExecutor(
                10,
                XxlJobAdminConfig.getAdminConfig().getTriggerPoolSlowMax(),
                60L,
                TimeUnit.SECONDS,
                new LinkedBlockingQueue<Runnable>(2000),
                new ThreadFactory() {
                    @Override
                    public Thread newThread(Runnable r) {
                        return new Thread(r, "xxl-job, admin JobTriggerPoolHelper-slowTriggerPool-" + r.hashCode());
                    }
                });
    }

注意:这里分别初始化了2个线程池,一个快一个慢,优先选择快,当一分钟以内任务超过10次执行时间超过500ms,则加入慢线程池执行。

1)2:维护注册表信息(30秒执行一次)(JobRegistryHelper)

这里分别做了三件事

  1. 初始化注册或者删除线程池,主要负责客户端注册或者销毁到xxl_job_registry表
  2. 剔除超时注册机器
  3. 更新xxl_job_group执行器地址列表
public void start(){

		// for registry or remove
		//初始化注册或者删除线程池
		registryOrRemoveThreadPool = new ThreadPoolExecutor(
				2,
				10,
				30L,
				TimeUnit.SECONDS,
				new LinkedBlockingQueue<Runnable>(2000),
				new ThreadFactory() {
					@Override
					public Thread newThread(Runnable r) {
						return new Thread(r, "xxl-job, admin JobRegistryMonitorHelper-registryOrRemoveThreadPool-" + r.hashCode());
					}
				},
				//注意:这里的拒绝策略就是再次执行...^_^'''
				new RejectedExecutionHandler() {
					@Override
					public void rejectedExecution(Runnable r, ThreadPoolExecutor executor) {
						r.run();
						logger.warn(">>>>>>>>>>> xxl-job, registry or remove too fast, match threadpool rejected handler(run now).");
					}
				});

		// for monitor    30秒执行一次,维护注册表信息, 判断在线超时时间90s
		registryMonitorThread = new Thread(new Runnable() {
			@Override
			public void run() {
				while (!toStop) {
					try {
						// auto registry group
						//查询自动注册的数据
						//这里如果没添加自动注册的数据,则不会进入该方法,然后删除register表中超时注册数据。
						List<XxlJobGroup> groupList = XxlJobAdminConfig.getAdminConfig().getXxlJobGroupDao().findByAddressType(0);
						if (groupList!=null && !groupList.isEmpty()) {

							// remove dead address (admin/executor)
							// 1):从注册表中删除超时90s的机器,不分是否自动注册
							List<Integer> ids = XxlJobAdminConfig.getAdminConfig().getXxlJobRegistryDao().findDead(RegistryConfig.DEAD_TIMEOUT, new Date());
							if (ids!=null && ids.size()>0) {
								//从数据库删除注册机器信息
								XxlJobAdminConfig.getAdminConfig().getXxlJobRegistryDao().removeDead(ids);
							}

							// fresh online address (admin/executor)
							// 获取所有在线机器,注册表: 见"xxl_job_registry"表, "执行器" 在进行任务注册时将会周期性维护一条注册记录,
							// 即机器地址和AppName的绑定关系; "调度中心" 从而可以动态感知每个AppName在线的机器列表;
							HashMap<String, List<String>> appAddressMap = new HashMap<String, List<String>>();  //维护注册表注册key和注册value
							// 不分是否自动注册
							List<XxlJobRegistry> list = XxlJobAdminConfig.getAdminConfig().getXxlJobRegistryDao().findAll(RegistryConfig.DEAD_TIMEOUT, new Date());
							if (list != null) {
								for (XxlJobRegistry item: list) {
									// 2):将注册类型为EXECUTOR的XxlJobRegistry集合改装成appname=>设置触发器的ip地址
									if (RegistryConfig.RegistType.EXECUTOR.name().equals(item.getRegistryGroup())) {
										//AppName: 每个执行器机器集群的唯一标示, 任务注册以 "执行器" 为最小粒度进行注册; 每个任务通过其绑定的执行器可感知对应的执行器机器列表;
										String appname = item.getRegistryKey();
										List<String> registryList = appAddressMap.get(appname);
										if (registryList == null) {
											registryList = new ArrayList<String>();
										}

										if (!registryList.contains(item.getRegistryValue())) {
											registryList.add(item.getRegistryValue());
										}
										appAddressMap.put(appname, registryList);
									}
								}
							}

							// fresh group address
							// 3):更新xxl_job_group执行器地址列表
							for (XxlJobGroup group: groupList) {
								List<String> registryList = appAddressMap.get(group.getAppname());
								String addressListStr = null;//将所有配置触发器的ip地址,使用,拼接
								if (registryList!=null && !registryList.isEmpty()) {
									Collections.sort(registryList);
									StringBuilder addressListSB = new StringBuilder();
									for (String item:registryList) {
										addressListSB.append(item).append(",");
									}
									addressListStr = addressListSB.toString();
									addressListStr = addressListStr.substring(0, addressListStr.length()-1);
								}
								group.setAddressList(addressListStr);//更新设置了触发器的ip地址
								group.setUpdateTime(new Date());//更新修改时间

								//将注册表中appname对应的多条ip地址,整成appname-> ips(IP1,IP2,IP3)格式存储xxl_job_group表中,只针对自动注册。
								XxlJobAdminConfig.getAdminConfig().getXxlJobGroupDao().update(group);
							}
						}
					} catch (Exception e) {
						if (!toStop) {
							logger.error(">>>>>>>>>>> xxl-job, job registry monitor thread error:{}", e);
						}
					}
					try {
						TimeUnit.SECONDS.sleep(RegistryConfig.BEAT_TIMEOUT);
					} catch (InterruptedException e) {
						if (!toStop) {
							logger.error(">>>>>>>>>>> xxl-job, job registry monitor thread error:{}", e);
						}
					}
				}
				logger.info(">>>>>>>>>>> xxl-job, job registry monitor thread stop");
			}
		});
		registryMonitorThread.setDaemon(true);
		registryMonitorThread.setName("xxl-job, admin JobRegistryMonitorHelper-registryMonitorThread");
		registryMonitorThread.start();
	}

1)3:运行失败监视器,主要失败发送邮箱,重试触发器(JobFailMonitorHelper)

这里分别做了三件事

  1. 失败重试

    这里判断失败有2种情况(trigger_code表示调度中心调用执行器状态,handle_code表示执行器执行结束后回调给调度中心的状态,200均标识成功,500标识失败)
    第一种:trigger_code!=(0,200) 且 handle_code!=0
    第二种:handle_code!=200

    List<Long> failLogIds = XxlJobAdminConfig.getAdminConfig().getXxlJobLogDao().findFailJobLogIds(1000);
    
    	<select id="findFailJobLogIds" resultType="long" >
    		SELECT id FROM "xxl_job_log"
    		WHERE (
    			(trigger_code = 0 and handle_code != 0)
    			OR
    			(handle_code != 200 )
    		)
    		AND "alarm_status" = 0
    		ORDER BY id ASC
    		LIMIT #{pagesize}
    	</select>
    
  2. 告警(这里可向spring注入JobAlarm)

public void start(){
		monitorThread = new Thread(new Runnable() {

			@Override
			public void run() {

				// monitor
				while (!toStop) {
					try {
						//获取执行失败的日志 调度日志表: 用于保存XXL-JOB任务调度的历史信息,如调度结果、执行结果、调度入参、调度机器和执行器等等;
						List<Long> failLogIds = XxlJobAdminConfig.getAdminConfig().getXxlJobLogDao().findFailJobLogIds(1000);
						if (failLogIds!=null && !failLogIds.isEmpty()) {//1:执行触发器成功,返回值失败.2:触发器失败
							for (long failLogId: failLogIds) {

								// lock log   加锁,乐观修改alarm_status=-1
								int lockRet = XxlJobAdminConfig.getAdminConfig().getXxlJobLogDao().updateAlarmStatus(failLogId, 0, -1);
								if (lockRet < 1) {
									continue;
								}
								XxlJobLog log = XxlJobAdminConfig.getAdminConfig().getXxlJobLogDao().load(failLogId); //获取失败日志具体信息
								XxlJobInfo info = XxlJobAdminConfig.getAdminConfig().getXxlJobInfoDao().loadById(log.getJobId()); //加载job信息

								// 1、fail retry monitor
								if (log.getExecutorFailRetryCount() > 0) { //若可重试次数>0,则再次执行触发器
									JobTriggerPoolHelper.trigger(log.getJobId(), TriggerTypeEnum.RETRY, (log.getExecutorFailRetryCount()-1), log.getExecutorShardingParam(), log.getExecutorParam(), null);
									String retryMsg = "<br><br><span style=\"color:#F39C12;\" > >>>>>>>>>>>"+ I18nUtil.getString("jobconf_trigger_type_retry") +"<<<<<<<<<<< </span><br>";
									log.setTriggerMsg(log.getTriggerMsg() + retryMsg);
									XxlJobAdminConfig.getAdminConfig().getXxlJobLogDao().updateTriggerInfo(log); //修改触发器执行信息
								}

								// 2、fail alarm monitor 失败警告监视器
								int newAlarmStatus = 0;		// 告警状态:0-默认、-1=锁定状态、1-无需告警、2-告警成功、3-告警失败
								if (info!=null && info.getAlarmEmail()!=null && info.getAlarmEmail().trim().length()>0) {//若设置报警邮箱,则执行报警
									boolean alarmResult = XxlJobAdminConfig.getAdminConfig().getJobAlarmer().alarm(info, log);  //发送警报
									newAlarmStatus = alarmResult?2:3; //获取警报执行状态
								} else {//没设置报警邮箱,则更改状态为不需要告警
									newAlarmStatus = 1;
								}
								//释放锁
								XxlJobAdminConfig.getAdminConfig().getXxlJobLogDao().updateAlarmStatus(failLogId, -1, newAlarmStatus);
							}
						}

					} catch (Exception e) {
						if (!toStop) {
							logger.error(">>>>>>>>>>> xxl-job, job fail monitor thread error:{}", e);
						}
					}

                    try {
                        TimeUnit.SECONDS.sleep(10);
                    } catch (Exception e) {
                        if (!toStop) {
                            logger.error(e.getMessage(), e);
                        }
                    }

                }

				logger.info(">>>>>>>>>>> xxl-job, job fail monitor thread stop");

			}
		});
		monitorThread.setDaemon(true);
		monitorThread.setName("xxl-job, admin JobFailMonitorHelper");
		monitorThread.start();
	}

在这里可通过实现JobAlarm接口,注入spring容器执行报警

boolean alarmResult = XxlJobAdminConfig.getAdminConfig().getJobAlarmer().alarm(info, log);

 public boolean alarm(XxlJobInfo info, XxlJobLog jobLog) {

        boolean result = false;
        if (jobAlarmList!=null && jobAlarmList.size()>0) {
            result = true;  // success means all-success
            for (JobAlarm alarm: jobAlarmList) {
                boolean resultItem = false;
                try {
                    resultItem = alarm.doAlarm(info, jobLog);
                } catch (Exception e) {
                    logger.error(e.getMessage(), e);
                }
                if (!resultItem) {
                    result = false;
                }
            }
        }

        return result;
    }

这里jobAlarmList集合里面有很多JobAlarm对象, 都是执行初始化方法从spring注入的,也就是说扩展的话,只需要实现JobAlarm接口,注入spring即可。

[外链图片转存中…(img-vYBUqfpD-1686799715579)]

1)4:将丢失主机信息调度日志更改状态(JobCompleteHelper)

public void start(){

		// for callback  针对回调函数处理的线程池
		callbackThreadPool = new ThreadPoolExecutor(
				2,
				20,
				30L,
				TimeUnit.SECONDS,
				new LinkedBlockingQueue<Runnable>(3000),
				new ThreadFactory() {
					@Override
					public Thread newThread(Runnable r) {
						return new Thread(r, "xxl-job, admin JobLosedMonitorHelper-callbackThreadPool-" + r.hashCode());
					}
				},
				new RejectedExecutionHandler() {
					@Override
					public void rejectedExecution(Runnable r, ThreadPoolExecutor executor) {
						r.run();
						logger.warn(">>>>>>>>>>> xxl-job, callback too fast, match threadpool rejected handler(run now).");
					}
				});


		// for monitor
		monitorThread = new Thread(new Runnable() {

			@Override
			public void run() {

				// wait for JobTriggerPoolHelper-init
				try {
					TimeUnit.MILLISECONDS.sleep(50);
				} catch (InterruptedException e) {
					if (!toStop) {
						logger.error(e.getMessage(), e);
					}
				}

				// monitor
				while (!toStop) {
					try {
						// 任务结果丢失处理:调度记录停留在 "运行中" 状态超过10min,且对应执行器心跳注册失败不在线,则将本地调度主动标记失败;
						Date losedTime = DateUtil.addMinutes(new Date(), -10); //调度日志表: 用于保存XXL-JOB任务调度的历史信息,如调度结果、执行结果、调度入参、调度机器和执行器等等;
						List<Long> losedJobIds  = XxlJobAdminConfig.getAdminConfig().getXxlJobLogDao().findLostJobIds(losedTime);

						if (losedJobIds!=null && losedJobIds.size()>0) {
							for (Long logId: losedJobIds) {

								XxlJobLog jobLog = new XxlJobLog();
								jobLog.setId(logId);

								jobLog.setHandleTime(new Date());
								jobLog.setHandleCode(ReturnT.FAIL_CODE);
								jobLog.setHandleMsg( I18nUtil.getString("joblog_lost_fail") );

								//更改处理状态
								XxlJobCompleter.updateHandleInfoAndFinish(jobLog);
							}

						}
					} catch (Exception e) {
						if (!toStop) {
							logger.error(">>>>>>>>>>> xxl-job, job fail monitor thread error:{}", e);
						}
					}

                    try {
                        TimeUnit.SECONDS.sleep(60);
                    } catch (Exception e) {
                        if (!toStop) {
                            logger.error(e.getMessage(), e);
                        }
                    }

                }

				logger.info(">>>>>>>>>>> xxl-job, JobLosedMonitorHelper stop");

			}
		});
		monitorThread.setDaemon(true);
		monitorThread.setName("xxl-job, admin JobLosedMonitorHelper");
		monitorThread.start();
	}

更改处理状态

   public static int updateHandleInfoAndFinish(XxlJobLog xxlJobLog) {

        // finish 若父任务正常结束,则终止子任务,以及设置Childmsg
        finishJob(xxlJobLog);

        // text最大64kb 避免长度过长 截断超过长度限制字符
        if (xxlJobLog.getHandleMsg().length() > 15000) {
            xxlJobLog.setHandleMsg( xxlJobLog.getHandleMsg().substring(0, 15000) );
        }

        // fresh handle 更新超时joblog
        return XxlJobAdminConfig.getAdminConfig().getXxlJobLogDao().updateHandleInfo(xxlJobLog);
    }

1)5:统计一些失败成功报表,删除过期日志(JobLogReportHelper)

  public void start(){
        logrThread = new Thread(new Runnable() {
            //每分钟刷新一次
            @Override
            public void run() {

                // last clean log time   记录上次清除日志时间
                long lastCleanLogTime = 0;


                while (!toStop) {

                    // 1、log-report refresh: refresh log report in 3 days
                    try {

                        for (int i = 0; i < 3; i++) {

                            // today  分别统计今天,昨天,前天0~24点的数据
                            Calendar itemDay = Calendar.getInstance();
                            itemDay.add(Calendar.DAY_OF_MONTH, -i);
                            itemDay.set(Calendar.HOUR_OF_DAY, 0);
                            itemDay.set(Calendar.MINUTE, 0);
                            itemDay.set(Calendar.SECOND, 0);
                            itemDay.set(Calendar.MILLISECOND, 0);

                            Date todayFrom = itemDay.getTime();

                            itemDay.set(Calendar.HOUR_OF_DAY, 23);
                            itemDay.set(Calendar.MINUTE, 59);
                            itemDay.set(Calendar.SECOND, 59);
                            itemDay.set(Calendar.MILLISECOND, 999);

                            Date todayTo = itemDay.getTime();

                            // refresh log-report every minute
                            //设置默认值
                            XxlJobLogReport xxlJobLogReport = new XxlJobLogReport();
                            xxlJobLogReport.setTriggerDay(todayFrom);
                            xxlJobLogReport.setRunningCount(0);
                            xxlJobLogReport.setSucCount(0);
                            xxlJobLogReport.setFailCount(0);

                            //查询失败, 成功,总的调用次数
                            Map<String, Object> triggerCountMap = XxlJobAdminConfig.getAdminConfig().getXxlJobLogDao().findLogReport(todayFrom, todayTo);
                            if (triggerCountMap!=null && triggerCountMap.size()>0) {
                                int triggerDayCount = triggerCountMap.containsKey("triggerDayCount")?Integer.valueOf(String.valueOf(triggerCountMap.get("triggerDayCount"))):0;
                                int triggerDayCountRunning = triggerCountMap.containsKey("triggerDayCountRunning")?Integer.valueOf(String.valueOf(triggerCountMap.get("triggerDayCountRunning"))):0;
                                int triggerDayCountSuc = triggerCountMap.containsKey("triggerDayCountSuc")?Integer.valueOf(String.valueOf(triggerCountMap.get("triggerDayCountSuc"))):0;
                                int triggerDayCountFail = triggerDayCount - triggerDayCountRunning - triggerDayCountSuc;

                                xxlJobLogReport.setRunningCount(triggerDayCountRunning);
                                xxlJobLogReport.setSucCount(triggerDayCountSuc);
                                xxlJobLogReport.setFailCount(triggerDayCountFail);
                            }

                            // do refresh
                            //刷新调用次数,若找不到则默认都是0
                            int ret = XxlJobAdminConfig.getAdminConfig().getXxlJobLogReportDao().update(xxlJobLogReport);
                            if (ret < 1) {
                                //没数据则保存
                                XxlJobAdminConfig.getAdminConfig().getXxlJobLogReportDao().save(xxlJobLogReport);
                            }
                        }

                    } catch (Exception e) {
                        if (!toStop) {
                            logger.error(">>>>>>>>>>> xxl-job, job log report thread error:{}", e);
                        }
                    }

                    // 2、log-clean: switch open & once each day
                    //设置了保留日志天数且日志保留了24小时,则进入
                    if (XxlJobAdminConfig.getAdminConfig().getLogretentiondays()>0
                            && System.currentTimeMillis() - lastCleanLogTime > 24*60*60*1000) {

                        // expire-time
                        //通过日志保留天数算出清除log时间
                        Calendar expiredDay = Calendar.getInstance();
                        expiredDay.add(Calendar.DAY_OF_MONTH, -1 * XxlJobAdminConfig.getAdminConfig().getLogretentiondays());
                        expiredDay.set(Calendar.HOUR_OF_DAY, 0);
                        expiredDay.set(Calendar.MINUTE, 0);
                        expiredDay.set(Calendar.SECOND, 0);
                        expiredDay.set(Calendar.MILLISECOND, 0);
                        Date clearBeforeTime = expiredDay.getTime();

                        // clean expired log
                        List<Long> logIds = null;
                        do {
                            //这里传了3个0表示查询所有,而不是单个任务id
                            logIds = XxlJobAdminConfig.getAdminConfig().getXxlJobLogDao().findClearLogIds(0, 0, clearBeforeTime, 0, 1000);
                            //删除过期数据
                            if (logIds!=null && logIds.size()>0) {
                                XxlJobAdminConfig.getAdminConfig().getXxlJobLogDao().clearLog(logIds);
                            }
                        } while (logIds!=null && logIds.size()>0);

                        // update clean time
                        lastCleanLogTime = System.currentTimeMillis();
                    }

                    try {
                        TimeUnit.MINUTES.sleep(1);
                    } catch (Exception e) {
                        if (!toStop) {
                            logger.error(e.getMessage(), e);
                        }
                    }

                }

                logger.info(">>>>>>>>>>> xxl-job, job log report thread stop");

            }
        });
        logrThread.setDaemon(true);
        logrThread.setName("xxl-job, admin JobLogReportHelper");
        logrThread.start();
    }

1)6:执行调度器(核心)(JobScheduleHelper)

这里首先会加锁,然后查询出下次执行时间在未来5秒以内的所有任务,一次最多获取6000条。然后根据过期时间会分成三种对应处理。

  1. 触发器下次执行时间过期时间 > 5S
  2. 触发器下次执行时间过期时间 < 5S
  3. 触发器下次执行时间在未来5S以内。

加锁

                        conn = XxlJobAdminConfig.getAdminConfig().getDataSource().getConnection();
                        connAutoCommit = conn.getAutoCommit();
                        conn.setAutoCommit(false);

                        preparedStatement = conn.prepareStatement(  "select * from xxl_job_lock where lock_name = 'schedule_lock' for update" );
                        preparedStatement.execute();

具体如下:
在这里插入图片描述

当数据添加到ringData后,ringThread会一秒执行一次,然后从ringData获取数据。ringData本质是一个ConcurrentHashMap,容量60,key => 触发器下次执行时间(秒为单位)%60,value => 触发器jobId。这里采用了时间轮的思想,定时任务一秒执行一次,会以当前时间秒往前递推2秒。将数据从ringData取出来,然后执行任务。比如现在时间为0点0分0秒,当前时间秒为0,则会将ringData中索引为59和58的数据捞出来。xxl-job这样做是为了避免处理耗时太长,所以会跨过刻度,多向前校验一个刻度。
在这里插入图片描述

   public void start(){

        // schedule thread
        scheduleThread = new Thread(new Runnable() {
            @Override
            public void run() {

                try {  
                  //下5秒之后执行一次,等待服务器启动。
                    TimeUnit.MILLISECONDS.sleep(5000 - System.currentTimeMillis()%1000 );
                } catch (InterruptedException e) {
                    if (!scheduleThreadToStop) {
                        logger.error(e.getMessage(), e);
                    }
                }
                logger.info(">>>>>>>>> init xxl-job admin scheduler success.");

                // pre-read count: treadpool-size * trigger-qps (each trigger cost 50ms, qps = 1000/50 = 20)  每个触发器花费50ms,每个线程单位时间内处理20任务,最多同时处理300*20=6000任务
                int preReadCount = (XxlJobAdminConfig.getAdminConfig().getTriggerPoolFastMax() + XxlJobAdminConfig.getAdminConfig().getTriggerPoolSlowMax()) * 20;

                while (!scheduleThreadToStop) {

                    // Scan Job
                    long start = System.currentTimeMillis();

                    Connection conn = null;
                    Boolean connAutoCommit = null;
                    PreparedStatement preparedStatement = null;

                    boolean preReadSuc = true;
                    try {
                        //设置手动提交
                        conn = XxlJobAdminConfig.getAdminConfig().getDataSource().getConnection();
                        connAutoCommit = conn.getAutoCommit();
                        conn.setAutoCommit(false);
                        //获取任务调度锁表内数据信息,加写锁
                        preparedStatement = conn.prepareStatement(  "select * from xxl_job_lock where lock_name = 'schedule_lock' for update" );
                        preparedStatement.execute();

                        // tx start

                        // 1、pre read
                        long nowTime = System.currentTimeMillis();   //获取当前时间后5秒,同时最多负载的分页数
                        List<XxlJobInfo> scheduleList = XxlJobAdminConfig.getAdminConfig().getXxlJobInfoDao().scheduleJobQuery(nowTime + PRE_READ_MS, preReadCount);
                        if (scheduleList!=null && scheduleList.size()>0) {
                            // 2、push time-ring
                            for (XxlJobInfo jobInfo: scheduleList) {

                                // time-ring jump
                                if (nowTime > jobInfo.getTriggerNextTime() + PRE_READ_MS) {  //触发器过期时间>5s
                                    // 2.1、trigger-expire > 5s:pass && make next-trigger-time
                                    logger.warn(">>>>>>>>>>> xxl-job, schedule misfire, jobId = " + jobInfo.getId());

                                    // 1、misfire match
//                                    - 调度过期策略:
//                                    - 忽略:调度过期后,忽略过期的任务,从当前时间开始重新计算下次触发时间;
//                                    - 立即执行一次:调度过期后,立即执行一次,并从当前时间开始重新计算下次触发时间;
                                    MisfireStrategyEnum misfireStrategyEnum = MisfireStrategyEnum.match(jobInfo.getMisfireStrategy(), MisfireStrategyEnum.DO_NOTHING);
                                    if (MisfireStrategyEnum.FIRE_ONCE_NOW == misfireStrategyEnum) { //若过期策略为FIRE_ONCE_NOW,则立即执行一次
                                        // FIRE_ONCE_NOW 》 trigger //执行触发器
                                        JobTriggerPoolHelper.trigger(jobInfo.getId(), TriggerTypeEnum.MISFIRE, -1, null, null, null);
                                        logger.debug(">>>>>>>>>>> xxl-job, schedule push trigger : jobId = " + jobInfo.getId() );
                                    }

                                    // 2、fresh next  更新下次执行时间
                                    refreshNextValidTime(jobInfo, new Date());

                                } else if (nowTime > jobInfo.getTriggerNextTime()) { //触发器过期时间<5s
                                    // 2.2、trigger-expire < 5s:direct-trigger && make next-trigger-time

                                    // 1、trigger //执行触发器
                                    JobTriggerPoolHelper.trigger(jobInfo.getId(), TriggerTypeEnum.CRON, -1, null, null, null);
                                    logger.debug(">>>>>>>>>>> xxl-job, schedule push trigger : jobId = " + jobInfo.getId() );

                                    // 2、fresh next 更新下次执行时间
                                    refreshNextValidTime(jobInfo, new Date());

                                    // next-trigger-time in 5s, pre-read again 下次触发时间在当前时间往后5秒范围内
                                    if (jobInfo.getTriggerStatus()==1 && nowTime + PRE_READ_MS > jobInfo.getTriggerNextTime()) {

                                        // 1、make ring second 获取下次执行秒
                                        int ringSecond = (int)((jobInfo.getTriggerNextTime()/1000)%60);

                                        // 2、push time ring
                                        pushTimeRing(ringSecond, jobInfo.getId());

                                        // 3、fresh next 更新下次执行时间
                                        refreshNextValidTime(jobInfo, new Date(jobInfo.getTriggerNextTime()));

                                    }

                                } else {
                                    // 2.3、trigger-pre-read:time-ring trigger && make next-trigger-time
                                    //未来五秒以内执行的所有任务添加到ringData
                                    // 1、make ring second
                                    int ringSecond = (int)((jobInfo.getTriggerNextTime()/1000)%60);

                                    // 2、push time ring
                                    pushTimeRing(ringSecond, jobInfo.getId());

                                    // 3、fresh next
                                    refreshNextValidTime(jobInfo, new Date(jobInfo.getTriggerNextTime()));

                                }

                            }

                            // 3、update trigger info 更新执行时间和上次执行时间到数据库
                            for (XxlJobInfo jobInfo: scheduleList) {
                                XxlJobAdminConfig.getAdminConfig().getXxlJobInfoDao().scheduleUpdate(jobInfo);
                            }

                        } else {
                            preReadSuc = false;
                        }

                        // tx stop


                    } catch (Exception e) {
                        if (!scheduleThreadToStop) {
                            logger.error(">>>>>>>>>>> xxl-job, JobScheduleHelper#scheduleThread error:{}", e);
                        }
                    } finally {

                        // commit
                        if (conn != null) {
                            try {
                                conn.commit();
                            } catch (SQLException e) {
                                if (!scheduleThreadToStop) {
                                    logger.error(e.getMessage(), e);
                                }
                            }
                            try {
                                conn.setAutoCommit(connAutoCommit);
                            } catch (SQLException e) {
                                if (!scheduleThreadToStop) {
                                    logger.error(e.getMessage(), e);
                                }
                            }
                            try {
                                conn.close();
                            } catch (SQLException e) {
                                if (!scheduleThreadToStop) {
                                    logger.error(e.getMessage(), e);
                                }
                            }
                        }

                        // close PreparedStatement
                        if (null != preparedStatement) {
                            try {
                                preparedStatement.close();
                            } catch (SQLException e) {
                                if (!scheduleThreadToStop) {
                                    logger.error(e.getMessage(), e);
                                }
                            }
                        }
                    }
                    long cost = System.currentTimeMillis()-start;


                    // Wait seconds, align second
                    if (cost < 1000) {  // scan-overtime, not wait
                        try {
                            // pre-read period: success > scan each second; fail > skip this period;
                            //若执行成功,下一秒继续执行。执行失败或没查询出数据则5秒执行一次。
                            TimeUnit.MILLISECONDS.sleep((preReadSuc?1000:PRE_READ_MS) - System.currentTimeMillis()%1000);
                        } catch (InterruptedException e) {
                            if (!scheduleThreadToStop) {
                                logger.error(e.getMessage(), e);
                            }
                        }
                    }

                }

                logger.info(">>>>>>>>>>> xxl-job, JobScheduleHelper#scheduleThread stop");
            }
        });
        scheduleThread.setDaemon(true);
        scheduleThread.setName("xxl-job, admin JobScheduleHelper#scheduleThread");
        scheduleThread.start();


        // ring thread
        ringThread = new Thread(new Runnable() {
            @Override
            public void run() {

                while (!ringThreadToStop) {

                    // align second
                    try {
                        TimeUnit.MILLISECONDS.sleep(1000 - System.currentTimeMillis() % 1000);
                    } catch (InterruptedException e) {
                        if (!ringThreadToStop) {
                            logger.error(e.getMessage(), e);
                        }
                    }

                    try {
                        // second data
                        List<Integer> ringItemData = new ArrayList<>();
                        int nowSecond = Calendar.getInstance().get(Calendar.SECOND);   // 避免处理耗时太长,跨过刻度,向前校验一个刻度;
                        for (int i = 0; i < 2; i++) {
                                                      //假设现在为1秒,那么执行任务之后,5秒之后的任务分别会添加到23456下标位置。
                            // i=1:(1+60-1)%60=0
                            // i=2:(1+60-2)%60=59
                            List<Integer> tmpData = ringData.remove( (nowSecond+60-i)%60 );
                            if (tmpData != null) {
                                ringItemData.addAll(tmpData);
                            }
                        }

                        // ring trigger
                        logger.debug(">>>>>>>>>>> xxl-job, time-ring beat : " + nowSecond + " = " + Arrays.asList(ringItemData) );
                        if (ringItemData.size() > 0) {
                            // do trigger
                            for (int jobId: ringItemData) {
                                // do trigger //执行触发器
                                JobTriggerPoolHelper.trigger(jobId, TriggerTypeEnum.CRON, -1, null, null, null);
                            }
                            // clear
                            ringItemData.clear();
                        }
                    } catch (Exception e) {
                        if (!ringThreadToStop) {
                            logger.error(">>>>>>>>>>> xxl-job, JobScheduleHelper#ringThread error:{}", e);
                        }
                    }
                }
                logger.info(">>>>>>>>>>> xxl-job, JobScheduleHelper#ringThread stop");
            }
        });
        ringThread.setDaemon(true);
        ringThread.setName("xxl-job, admin JobScheduleHelper#ringThread");
        ringThread.start();
    }

2)客户端启动流程

构建作业执行器 , 这里将 com.xxl.job.core.executor.impl.XxlJobSpringExecutor 交由容器托管

@Configuration
@Slf4j
public class XxlJobConfig {

    @Value("${xxl.job.admin.addresses}")
    private String adminAddresses;

    @Value("${xxl.job.accessToken}")
    private String accessToken;

    @Value("${xxl.job.executor.appname}")
    private String appname;

    @Value("${xxl.job.executor.address}")
    private String address;

    @Value("${xxl.job.executor.ip}")
    private String ip;

    @Value("${xxl.job.executor.port}")
    private int port;

    @Value("${xxl.job.executor.logpath}")
    private String logPath;

    @Value("${xxl.job.executor.logretentiondays}")
    private int logRetentionDays;

    @Bean
    public XxlJobSpringExecutor xxlJobExecutor() {
        log.info(">>>>>>>>>>> xxl-job config init.");
        XxlJobSpringExecutor xxlJobSpringExecutor = new XxlJobSpringExecutor();
        xxlJobSpringExecutor.setAdminAddresses(adminAddresses);
        xxlJobSpringExecutor.setAppname(appname);
        xxlJobSpringExecutor.setAddress(address);
        xxlJobSpringExecutor.setIp(ip);
        xxlJobSpringExecutor.setPort(port);
        xxlJobSpringExecutor.setAccessToken(accessToken);
        xxlJobSpringExecutor.setLogPath(logPath);
        xxlJobSpringExecutor.setLogRetentionDays(logRetentionDays);

        return xxlJobSpringExecutor;
    }

}

XxlJobSpringExecutor ,看接口继承体系,可以发现注入了 ApplicationContext 对象。以及实现了 SmartInitializingSingleton 接口,实现该接口的当spring容器初始完成,紧接着执行监听器发送监听后,就会遍历所有的Bean然后初始化所有单例非懒加载的bean,最后在实例化阶段结束时触发回调接口
在这里插入图片描述

 // start
    @Override
    public void afterSingletonsInstantiated() {

        // init JobHandler Repository
        /*initJobHandlerRepository(applicationContext);*/

        // init JobHandler Repository (for method)   初始化调度器资源管理器
        initJobHandlerMethodRepository(applicationContext);

        // refresh GlueFactory 刷新GlueFactory
        GlueFactory.refreshInstance(1);

        // super start
        try {
            super.start();
        } catch (Exception e) {
            throw new RuntimeException(e);
        }
    }

该方法主要做了如下事情:

  1. 初始化调度器资源管理器(从spring容器中将标记了XxlJob注解的方法,将其封装并添加到map中。)
  2. 刷新GlueFactory
  3. 启动服务,接收服务器请求。

2)1:初始化调度器资源管理器

该方法主要做了如下事情:

  1. 从spring容器获取所有对象,并遍历查找方法上标记XxlJob注解的方法
  2. 将xxljob配置的jobname作为key,对象,反射的执行,初始,销毁方法作为value注册jobHandlerRepository 中
  private void initJobHandlerMethodRepository(ApplicationContext applicationContext) {
        if (applicationContext == null) {
            return;
        }
        // init job handler from method
        String[] beanDefinitionNames = applicationContext.getBeanNamesForType(Object.class, false, true);
        for (String beanDefinitionName : beanDefinitionNames) {//遍历每个容器对象
            Object bean = applicationContext.getBean(beanDefinitionName);

            Map<Method, XxlJob> annotatedMethods = null;   // referred to :org.springframework.context.event.EventListenerMethodProcessor.processBean
            try { //获取每个注解XxlJob方法
                annotatedMethods = MethodIntrospector.selectMethods(bean.getClass(),
                        new MethodIntrospector.MetadataLookup<XxlJob>() {
                            @Override
                            public XxlJob inspect(Method method) {
                                return AnnotatedElementUtils.findMergedAnnotation(method, XxlJob.class);
                            }
                        });
            } catch (Throwable ex) {
                logger.error("xxl-job method-jobhandler resolve error for bean[" + beanDefinitionName + "].", ex);
            }
            if (annotatedMethods==null || annotatedMethods.isEmpty()) {
                continue;
            }
            //遍历标记了XxlJob注解的方法
            for (Map.Entry<Method, XxlJob> methodXxlJobEntry : annotatedMethods.entrySet()) {
                Method executeMethod = methodXxlJobEntry.getKey();
                XxlJob xxlJob = methodXxlJobEntry.getValue();
                if (xxlJob == null) {
                    continue;
                }

                String name = xxlJob.value();//获取配置xxjob的触发器名称
                if (name.trim().length() == 0) {
                    throw new RuntimeException("xxl-job method-jobhandler name invalid, for[" + bean.getClass() + "#" + executeMethod.getName() + "] .");
                }
                if (loadJobHandler(name) != null) { //工作处理资源库是否有相同命名
                    throw new RuntimeException("xxl-job jobhandler[" + name + "] naming conflicts.");
                }

                // execute method
                /*if (!(method.getParameterTypes().length == 1 && method.getParameterTypes()[0].isAssignableFrom(String.class))) {
                    throw new RuntimeException("xxl-job method-jobhandler param-classtype invalid, for[" + bean.getClass() + "#" + method.getName() + "] , " +
                            "The correct method format like \" public ReturnT<String> execute(String param) \" .");
                }
                if (!method.getReturnType().isAssignableFrom(ReturnT.class)) {
                    throw new RuntimeException("xxl-job method-jobhandler return-classtype invalid, for[" + bean.getClass() + "#" + method.getName() + "] , " +
                            "The correct method format like \" public ReturnT<String> execute(String param) \" .");
                }*/

                executeMethod.setAccessible(true);//设置可访问,设置后可通过反射调用私有方法

                // init and destory
                Method initMethod = null;
                Method destroyMethod = null;

                if (xxlJob.init().trim().length() > 0) {
                    try {  //获取XxlJob标记的方法,配置的init方法
                        initMethod = bean.getClass().getDeclaredMethod(xxlJob.init());
                        initMethod.setAccessible(true);
                    } catch (NoSuchMethodException e) {
                        throw new RuntimeException("xxl-job method-jobhandler initMethod invalid, for[" + bean.getClass() + "#" + executeMethod.getName() + "] .");
                    }
                }
                if (xxlJob.destroy().trim().length() > 0) {
                    try {//获取XxlJob标记的方法,配置的destroy方法
                        destroyMethod = bean.getClass().getDeclaredMethod(xxlJob.destroy());
                        destroyMethod.setAccessible(true);
                    } catch (NoSuchMethodException e) {
                        throw new RuntimeException("xxl-job method-jobhandler destroyMethod invalid, for[" + bean.getClass() + "#" + executeMethod.getName() + "] .");
                    }
                }

                // registry jobhandler 将xxljob配置的jobname作为key,对象,反射的执行,初始,销毁方法作为value注册jobHandlerRepository中
                registJobHandler(name, new MethodJobHandler(bean, executeMethod, initMethod, destroyMethod));
            }
        }

    }


2)2:刷新GlueFactory

	public static void refreshInstance(int type){
		if (type == 0) {
			glueFactory = new GlueFactory();
		} else if (type == 1) {
			glueFactory = new SpringGlueFactory();
		}
	}

2)3:核心启动类

该方法主要做了如下事情:

  1. 初始化日志文件
  2. 封装调度中心请求路径,用于访问调度中心
  3. 清除过期日志
  4. 回调调度中心任务执行状态
  5. 执行内嵌服务
    public void start() throws Exception {

        // init logpath
        XxlJobFileAppender.initLogPath(logPath);//初始化日志文件

        // init invoker, admin-client
        initAdminBizList(adminAddresses, accessToken); //初始化admin链接路径存储集合


        // init JobLogFileCleanThread  清除过期日志
        JobLogFileCleanThread.getInstance().start(logRetentionDays);

        // init TriggerCallbackThread
        TriggerCallbackThread.getInstance().start();

        // init executor-server 执行内嵌服务
        initEmbedServer(address, ip, port, appname, accessToken);
    }

2)3)1:初始化日志文件

	public static void initLogPath(String logPath){
		// init
		if (logPath!=null && logPath.trim().length()>0) {
			logBasePath = logPath;
		}
		// mk base dir 日志文件不存在则创建
		File logPathDir = new File(logBasePath);
		if (!logPathDir.exists()) {
			logPathDir.mkdirs();
		}
		logBasePath = logPathDir.getPath();

		// mk glue dir 创建glue目录
		File glueBaseDir = new File(logPathDir, "gluesource");
		if (!glueBaseDir.exists()) {
			glueBaseDir.mkdirs();
		}
		glueSrcPath = glueBaseDir.getPath();
	}

2)3)2:封装调度中心请求路径,用于访问调度中心

   private void initAdminBizList(String adminAddresses, String accessToken) throws Exception {
        if (adminAddresses!=null && adminAddresses.trim().length()>0) {
            for (String address: adminAddresses.trim().split(",")) { //多个admin地址以,分隔
                if (address!=null && address.trim().length()>0) {

                    AdminBiz adminBiz = new AdminBizClient(address.trim(), accessToken);

                    if (adminBizList == null) {
                        adminBizList = new ArrayList<AdminBiz>();
                    } //将admin地址以及token添加adminBiz中
                    adminBizList.add(adminBiz);
                }
            }
        }
    }

2)3)3:主要清除过期日志

 public void start(final long logRetentionDays){

        // limit min value 日志最大保存天数<3天,直接退出
        if (logRetentionDays < 3 ) {
            return;
        }
        //一天执行一次
        localThread = new Thread(new Runnable() {
            @Override
            public void run() {
                while (!toStop) {
                    try {
                        // clean log dir, over logRetentionDays 查询目录下所有子文件(包含目录)
                        File[] childDirs = new File(XxlJobFileAppender.getLogPath()).listFiles();
                        if (childDirs!=null && childDirs.length>0) {

                            // today   获取今天0点时间
                            Calendar todayCal = Calendar.getInstance();
                            todayCal.set(Calendar.HOUR_OF_DAY,0);
                            todayCal.set(Calendar.MINUTE,0);
                            todayCal.set(Calendar.SECOND,0);
                            todayCal.set(Calendar.MILLISECOND,0);

                            Date todayDate = todayCal.getTime();

                            for (File childFile: childDirs) {

                                // valid  不是目录跳过
                                if (!childFile.isDirectory()) {
                                    continue;
                                }//查询不到'-'则跳过
                                if (childFile.getName().indexOf("-") == -1) {
                                    continue;
                                }

                                // file create date  获取文件创建时间,文件都是以年-月-日命名的
                                Date logFileCreateDate = null;
                                try {
                                    SimpleDateFormat simpleDateFormat = new SimpleDateFormat("yyyy-MM-dd");
                                    logFileCreateDate = simpleDateFormat.parse(childFile.getName());
                                } catch (ParseException e) {
                                    logger.error(e.getMessage(), e);
                                }
                                if (logFileCreateDate == null) {
                                    continue;
                                }
                                //大于日志最大存活时间则清除
                                if ((todayDate.getTime()-logFileCreateDate.getTime()) >= logRetentionDays * (24 * 60 * 60 * 1000) ) {
                                    FileUtil.deleteRecursively(childFile);//超过保存天数则清除日志
                                }

                            }
                        }

                    } catch (Exception e) {
                        if (!toStop) {
                            logger.error(e.getMessage(), e);
                        }

                    }

                    try { //睡眠一天处理
                        TimeUnit.DAYS.sleep(1);
                    } catch (InterruptedException e) {
                        if (!toStop) {
                            logger.error(e.getMessage(), e);
                        }
                    }
                }
                logger.info(">>>>>>>>>>> xxl-job, executor JobLogFileCleanThread thread destory.");

            }
        });
        localThread.setDaemon(true);
        localThread.setName("xxl-job, executor JobLogFileCleanThread");
        localThread.start();
    }

2)3)4: 回调调度中心任务执行状态

具体看主动触发->客户端回调服务端

 public void start() {

        // valid 是否有配置admin路径
        if (XxlJobExecutor.getAdminBizList() == null) {
            logger.warn(">>>>>>>>>>> xxl-job, executor callback config fail, adminAddresses is null.");
            return;
        }

        // callback
        triggerCallbackThread = new Thread(new Runnable() {

            @Override
            public void run() {

                // normal callback
                while(!toStop){
                    try {
                        //获取回调参数
                        HandleCallbackParam callback = getInstance().callBackQueue.take();
                        if (callback != null) {

                            // callback list param
                            List<HandleCallbackParam> callbackParamList = new ArrayList<HandleCallbackParam>();
                            //移除队列中所有元素到callbackParamList中
                            int drainToNum = getInstance().callBackQueue.drainTo(callbackParamList);
                            callbackParamList.add(callback);

                            // callback, will retry if error 通知admin
                            if (callbackParamList!=null && callbackParamList.size()>0) {
                                doCallback(callbackParamList);
                            }
                        }
                    } catch (Exception e) {
                        if (!toStop) {
                            logger.error(e.getMessage(), e);
                        }
                    }
                }

                // last callback
                try {
                    List<HandleCallbackParam> callbackParamList = new ArrayList<HandleCallbackParam>();
                    int drainToNum = getInstance().callBackQueue.drainTo(callbackParamList);
                    if (callbackParamList!=null && callbackParamList.size()>0) {
                        doCallback(callbackParamList);
                    }
                } catch (Exception e) {
                    if (!toStop) {
                        logger.error(e.getMessage(), e);
                    }
                }
                logger.info(">>>>>>>>>>> xxl-job, executor callback thread destory.");

            }
        });
        triggerCallbackThread.setDaemon(true);
        triggerCallbackThread.setName("xxl-job, executor TriggerCallbackThread");
        triggerCallbackThread.start();


        // retry
        triggerRetryCallbackThread = new Thread(new Runnable() {
            @Override
            public void run() {
                while(!toStop){
                    try {
                        retryFailCallbackFile();
                    } catch (Exception e) {
                        if (!toStop) {
                            logger.error(e.getMessage(), e);
                        }

                    }
                    try {
                        TimeUnit.SECONDS.sleep(RegistryConfig.BEAT_TIMEOUT);
                    } catch (InterruptedException e) {
                        if (!toStop) {
                            logger.error(e.getMessage(), e);
                        }
                    }
                }
                logger.info(">>>>>>>>>>> xxl-job, executor retry callback thread destory.");
            }
        });
        triggerRetryCallbackThread.setDaemon(true);
        triggerRetryCallbackThread.start();

    }

2)3)5: 执行内嵌服务(开启端口监听,等待服务端调用)

该方法主要做了如下事情:

  1. 使用netty开放端口,等待服务端调用
  2. 注册到服务端(心跳30S)
  3. 向服务端申请剔除服务
    private void initEmbedServer(String address, String ip, int port, String appname, String accessToken) throws Exception {

        // fill ip port  若没设置端口,则寻找可用端口
        port = port>0?port: NetUtil.findAvailablePort(9999);
        ip = (ip!=null&&ip.trim().length()>0)?ip: IpUtil.getIp(); //若没设置IP,则获取本机Ip

        // generate address 构造地址,若没设置地址,则将ip,port拼接成地址
        if (address==null || address.trim().length()==0) {
            String ip_port_address = IpUtil.getIpPort(ip, port);   // registry-address:default use address to registry , otherwise use ip:port if address is null
            address = "http://{ip_port}/".replace("{ip_port}", ip_port_address);
        }

        // accessToken
        if (accessToken==null || accessToken.trim().length()==0) {
            logger.warn(">>>>>>>>>>> xxl-job accessToken is empty. To ensure system security, please set the accessToken.");
        }

        // start  启动嵌入服务器 ,向服务端注册,以及监听端口,主要服务服务端调用。
        embedServer = new EmbedServer();
        embedServer.start(address, port, appname, accessToken);
    }

 public void start(final String address, final int port, final String appname, final String accessToken) {
        executorBiz = new ExecutorBizImpl();
        thread = new Thread(new Runnable() {

            @Override
            public void run() {

                // param
                EventLoopGroup bossGroup = new NioEventLoopGroup();
                EventLoopGroup workerGroup = new NioEventLoopGroup();
                ThreadPoolExecutor bizThreadPool = new ThreadPoolExecutor(
                        0,
                        200,
                        60L,
                        TimeUnit.SECONDS,
                        new LinkedBlockingQueue<Runnable>(2000),
                        new ThreadFactory() {
                            @Override
                            public Thread newThread(Runnable r) {
                                return new Thread(r, "xxl-rpc, EmbedServer bizThreadPool-" + r.hashCode());
                            }
                        },
                        new RejectedExecutionHandler() {
                            @Override
                            public void rejectedExecution(Runnable r, ThreadPoolExecutor executor) {
                                throw new RuntimeException("xxl-job, EmbedServer bizThreadPool is EXHAUSTED!");
                            }
                        });


                try {
                    // start server
                    ServerBootstrap bootstrap = new ServerBootstrap();
                    bootstrap.group(bossGroup, workerGroup)
                            .channel(NioServerSocketChannel.class)
                            .childHandler(new ChannelInitializer<SocketChannel>() {
                                @Override
                                public void initChannel(SocketChannel channel) throws Exception {
                                    channel.pipeline()
                                            .addLast(new IdleStateHandler(0, 0, 30 * 3, TimeUnit.SECONDS))  // beat 3N, close if idle
                                            .addLast(new HttpServerCodec())
                                            .addLast(new HttpObjectAggregator(5 * 1024 * 1024))  // merge request & reponse to FULL
                                            .addLast(new EmbedHttpServerHandler(executorBiz, accessToken, bizThreadPool));
                                }
                            })
                            .childOption(ChannelOption.SO_KEEPALIVE, true);

                    // bind 异步绑定port上
                    ChannelFuture future = bootstrap.bind(port).sync();

                    logger.info(">>>>>>>>>>> xxl-job remoting server start success, nettype = {}, port = {}", EmbedServer.class, port);

                    // start registry  //注册
                    startRegistry(appname, address);

                    // wait util stop
                    future.channel().closeFuture().sync();

                } catch (InterruptedException e) {
                    if (e instanceof InterruptedException) {
                        logger.info(">>>>>>>>>>> xxl-job remoting server stop.");
                    } else {
                        logger.error(">>>>>>>>>>> xxl-job remoting server error.", e);
                    }
                } finally {
                    // stop
                    try {
                        workerGroup.shutdownGracefully();
                        bossGroup.shutdownGracefully();
                    } catch (Exception e) {
                        logger.error(e.getMessage(), e);
                    }
                }

            }

        });
        thread.setDaemon(true);	// daemon, service jvm, user thread leave >>> daemon leave >>> jvm leave
        thread.start();
    }

向服务端注册,默认30秒执行一次

    public void startRegistry(final String appname, final String address) {
        // start registry
        ExecutorRegistryThread.getInstance().start(appname, address);
    }

    public void start(final String appname, final String address){

        // valid appname不允许为null
        if (appname==null || appname.trim().length()==0) {
            logger.warn(">>>>>>>>>>> xxl-job, executor registry config fail, appname is null.");
            return;
        }//服务端地址不能为null
        if (XxlJobExecutor.getAdminBizList() == null) {
            logger.warn(">>>>>>>>>>> xxl-job, executor registry config fail, adminAddresses is null.");
            return;
        }

        registryThread = new Thread(new Runnable() {
            @Override
            public void run() {

                // registry
                while (!toStop) {
                    try {
                        RegistryParam registryParam = new RegistryParam(RegistryConfig.RegistType.EXECUTOR.name(), appname, address);
                        for (AdminBiz adminBiz: XxlJobExecutor.getAdminBizList()) {
                            try {
                                ReturnT<String> registryResult = adminBiz.registry(registryParam); //向server注册服务(http请求),注册内容appname,当前服务监听地址
                                if (registryResult!=null && ReturnT.SUCCESS_CODE == registryResult.getCode()) { //访问成功
                                    registryResult = ReturnT.SUCCESS;
                                    logger.debug(">>>>>>>>>>> xxl-job registry success, registryParam:{}, registryResult:{}", new Object[]{registryParam, registryResult});
                                    break;
                                } else {
                                    logger.info(">>>>>>>>>>> xxl-job registry fail, registryParam:{}, registryResult:{}", new Object[]{registryParam, registryResult});
                                }
                            } catch (Exception e) {
                                logger.info(">>>>>>>>>>> xxl-job registry error, registryParam:{}", registryParam, e);
                            }

                        }
                    } catch (Exception e) {
                        if (!toStop) {
                            logger.error(e.getMessage(), e);
                        }

                    }

                    try {
                        if (!toStop) { //心跳时间30秒
                            TimeUnit.SECONDS.sleep(RegistryConfig.BEAT_TIMEOUT);
                        }
                    } catch (InterruptedException e) {
                        if (!toStop) {
                            logger.warn(">>>>>>>>>>> xxl-job, executor registry thread interrupted, error msg:{}", e.getMessage());
                        }
                    }
                }

                // registry remove  删除注册
                try {
                    RegistryParam registryParam = new RegistryParam(RegistryConfig.RegistType.EXECUTOR.name(), appname, address);
                    for (AdminBiz adminBiz: XxlJobExecutor.getAdminBizList()) {
                        try {  
                            ReturnT<String> registryResult = adminBiz.registryRemove(registryParam);
                            if (registryResult!=null && ReturnT.SUCCESS_CODE == registryResult.getCode()) {
                                registryResult = ReturnT.SUCCESS;
                                logger.info(">>>>>>>>>>> xxl-job registry-remove success, registryParam:{}, registryResult:{}", new Object[]{registryParam, registryResult});
                                break;
                            } else {
                                logger.info(">>>>>>>>>>> xxl-job registry-remove fail, registryParam:{}, registryResult:{}", new Object[]{registryParam, registryResult});
                            }
                        } catch (Exception e) {
                            if (!toStop) {
                                logger.info(">>>>>>>>>>> xxl-job registry-remove error, registryParam:{}", registryParam, e);
                            }

                        }

                    }
                } catch (Exception e) {
                    if (!toStop) {
                        logger.error(e.getMessage(), e);
                    }
                }
                logger.info(">>>>>>>>>>> xxl-job, executor registry thread destory.");

            }
        });
        registryThread.setDaemon(true);
        registryThread.setName("xxl-job, executor ExecutorRegistryThread");
        registryThread.start();
    }

3) 服务注册

上一节中,执行内嵌服务时,客户端会主动向服务端发起注册,每间隔30秒,客户端发送post请求访问调度中心,上报心跳结果

在这里插入图片描述

执行器客户端:

com.xxl.job.core.thread.ExecutorRegistryThread#start

在这里插入图片描述

AdminBizClient#registry

    @Override
    public ReturnT<String> registry(RegistryParam registryParam) {
        return XxlJobRemotingUtil.postBody(addressUrl + "api/registry", accessToken, timeout, registryParam, String.class);
    }

服务端 :

com.xxl.job.admin.controller.JobApiController#api

在这里插入图片描述

    public ReturnT<String> registry(RegistryParam registryParam) {
        return JobRegistryHelper.getInstance().registry(registryParam);
    }

	public ReturnT<String> registry(RegistryParam registryParam) {

		// valid  校验
		if (!StringUtils.hasText(registryParam.getRegistryGroup())
				|| !StringUtils.hasText(registryParam.getRegistryKey())
				|| !StringUtils.hasText(registryParam.getRegistryValue())) {
			return new ReturnT<String>(ReturnT.FAIL_CODE, "Illegal Argument.");
		}

		// async execute 异步注册
		registryOrRemoveThreadPool.execute(new Runnable() {
			@Override
			public void run() { //更新修改时间
				int ret = XxlJobAdminConfig.getAdminConfig().getXxlJobRegistryDao().registryUpdate(registryParam.getRegistryGroup(), registryParam.getRegistryKey(), registryParam.getRegistryValue(), new Date());
				if (ret < 1) {//说明暂未数据,才新增
					XxlJobAdminConfig.getAdminConfig().getXxlJobRegistryDao().registrySave(registryParam.getRegistryGroup(), registryParam.getRegistryKey(), registryParam.getRegistryValue(), new Date());

					// fresh  空实现
					freshGroupRegistryInfo(registryParam);
				}
			}
		});

		return ReturnT.SUCCESS;
	}

客户端接收响应 :
在这里插入图片描述

4) 主动触发(核心)

  1. 调度中心向客户端发起post请求
  2. client 通过内嵌服务netty接收,异步线程处理
  3. client找到job绑定的线程,将任务丢到阻塞队列中。然后返回结果给调度中心。
  4. 调度中心更改任务状态。
  5. 客户端执行任务后,将执行结果丢到回调线程的阻塞队列处理。
  6. 回调线程通过post请求访问调度中心,调度中心更改job最终结果。
  7. 倘若超过10分钟调度中心没收到回调线程的请求,则设置job最终结果失败。

4)1:服务端执行触发器(可以是客户端主动调用服务端API要求手动执行任务,也可以是服务端的初始化触发线程执行的任务调度)

触发地址:com.xxl.job.admin.controller.JobInfoController#triggerJob

	@RequestMapping("/trigger")
	@ResponseBody
	//@PermissionLimit(limit = false)
	public ReturnT<String> triggerJob(int id, String executorParam, String addressList) {
		// force cover job param 设置默认值
		if (executorParam == null) {
			executorParam = "";
		}
		//触发器类型,手动 ,重试次数,'执行器任务分片参数,格式如 1/2',任务参数,机器地址
		JobTriggerPoolHelper.trigger(id, TriggerTypeEnum.MANUAL, -1, null, executorParam, addressList);
		return ReturnT.SUCCESS;
	}

主动触发和被动触发最终都会走到这。

public static void trigger(int jobId, TriggerTypeEnum triggerType, int failRetryCount, String executorShardingParam, String executorParam, String addressList) {
    helper.addTrigger(jobId, triggerType, failRetryCount, executorShardingParam, executorParam, addressList);
}

添加一个触发任务丢到线程池执行,这里jobTimeoutCountMap会记录一分钟以内每个job执行超过500ms的次数。当某个job一分钟以内多于10次时间超过500ms,则采用慢触发器触发。这里每分钟会清除一次jobTimeoutCountMap。这里通过线程隔离的手段优化执行时间。

JobTriggerPoolHelper#addTrigger

   public void addTrigger(final int jobId,
                           final TriggerTypeEnum triggerType,
                           final int failRetryCount,
                           final String executorShardingParam,
                           final String executorParam,
                           final String addressList) {

        // choose thread pool  获取线程池
        ThreadPoolExecutor triggerPool_ = fastTriggerPool;
        //获取超时次数
        AtomicInteger jobTimeoutCount = jobTimeoutCountMap.get(jobId);
        //一分钟内超时10次,则采用慢触发器执行
        if (jobTimeoutCount!=null && jobTimeoutCount.get() > 10) {      // job-timeout 10 times in 1 min
            triggerPool_ = slowTriggerPool;
        }

        // trigger
        triggerPool_.execute(new Runnable() {
            @Override
            public void run() {

                long start = System.currentTimeMillis();

                try {
                    // do trigger //执行触发器
                    XxlJobTrigger.trigger(jobId, triggerType, failRetryCount, executorShardingParam, executorParam, addressList);
                } catch (Exception e) {
                    logger.error(e.getMessage(), e);
                } finally {

                    // check timeout-count-map  更新成为下一分钟
                    long minTim_now = System.currentTimeMillis()/60000;
                    if (minTim != minTim_now) {
                        minTim = minTim_now; //当达到下一分钟则清除超时任务
                        jobTimeoutCountMap.clear();
                    }

                    // incr timeout-count-map
                    long cost = System.currentTimeMillis()-start;
                    if (cost > 500) {       // ob-timeout threshold 500ms
                        //执行时间超过500ms,则记录执行次数
                        AtomicInteger timeoutCount = jobTimeoutCountMap.putIfAbsent(jobId, new AtomicInteger(1));
                        if (timeoutCount != null) {
                            timeoutCount.incrementAndGet();
                        }
                    }

                }

            }
        });
    }

XxlJobTrigger#trigger

    public static void trigger(int jobId,
                               TriggerTypeEnum triggerType,
                               int failRetryCount,
                               String executorShardingParam,
                               String executorParam,
                               String addressList) {

        // load data  从数据库获取任务详情信息
        XxlJobInfo jobInfo = XxlJobAdminConfig.getAdminConfig().getXxlJobInfoDao().loadById(jobId);
        if (jobInfo == null) {
            logger.warn(">>>>>>>>>>>> trigger fail, jobId invalid,jobId={}", jobId);
            return;
        }
        if (executorParam != null) {
            //设置任务参数
            jobInfo.setExecutorParam(executorParam);
        }
        //获取失败重试次数
        int finalFailRetryCount = failRetryCount>=0?failRetryCount:jobInfo.getExecutorFailRetryCount();
        //获取job分组信息
        XxlJobGroup group = XxlJobAdminConfig.getAdminConfig().getXxlJobGroupDao().load(jobInfo.getJobGroup());

        // cover addressList  设置地址集合
        if (addressList!=null && addressList.trim().length()>0) {
            group.setAddressType(1);
            group.setAddressList(addressList.trim());
        }

        // sharding param  拆分executorParam任务参数,填入shardingParam数组
        int[] shardingParam = null;
        if (executorShardingParam!=null){
            String[] shardingArr = executorParam.split("/");
            if (shardingArr.length==2 && isNumeric(shardingArr[0]) && isNumeric(shardingArr[1])) {
                shardingParam = new int[2];
                shardingParam[0] = Integer.valueOf(shardingArr[0]);
                shardingParam[1] = Integer.valueOf(shardingArr[1]);
            }
        } //如果路由策略是分片广播模式,同时注册地址不为空
        if (ExecutorRouteStrategyEnum.SHARDING_BROADCAST==ExecutorRouteStrategyEnum.match(jobInfo.getExecutorRouteStrategy(), null)
                && group.getRegistryList()!=null && !group.getRegistryList().isEmpty()
                && shardingParam==null) {
            for (int i = 0; i < group.getRegistryList().size(); i++) {  //遍历执行每个注册地址,集群广播
                processTrigger(group, jobInfo, finalFailRetryCount, triggerType, i, group.getRegistryList().size());
            }
        } else {
            if (shardingParam == null) {  //当shardingParam为空,设置默认值,分别标识
                shardingParam = new int[]{0, 1};
            } //执行触发器
            processTrigger(group, jobInfo, finalFailRetryCount, triggerType, shardingParam[0], shardingParam[1]);
        }

    }

XxlJobTrigger#processTrigger

这里会依据路由策略模式,选择对应的路由方式处理,采用了策略模式. 默认first

  private static void processTrigger(XxlJobGroup group, XxlJobInfo jobInfo, int finalFailRetryCount, TriggerTypeEnum triggerType, int index, int total){

        // param  获取阻塞处理策略
        ExecutorBlockStrategyEnum blockStrategy = ExecutorBlockStrategyEnum.match(jobInfo.getExecutorBlockStrategy(), ExecutorBlockStrategyEnum.SERIAL_EXECUTION);  // block strategy
        // route strategy 获取路由策略,默认first
        ExecutorRouteStrategyEnum executorRouteStrategyEnum = ExecutorRouteStrategyEnum.match(jobInfo.getExecutorRouteStrategy(), null);
        String shardingParam = (ExecutorRouteStrategyEnum.SHARDING_BROADCAST==executorRouteStrategyEnum)?String.valueOf(index).concat("/").concat(String.valueOf(total)):null;

        // 1、save log-id 保存执行日志
        XxlJobLog jobLog = new XxlJobLog();
        jobLog.setJobGroup(jobInfo.getJobGroup());
        jobLog.setJobId(jobInfo.getId());
        jobLog.setTriggerTime(new Date());
        XxlJobAdminConfig.getAdminConfig().getXxlJobLogDao().save(jobLog);
        logger.debug(">>>>>>>>>>> xxl-job trigger start, jobId:{}", jobLog.getId());

        // 2、init trigger-param
        TriggerParam triggerParam = new TriggerParam();
        triggerParam.setJobId(jobInfo.getId());
        triggerParam.setExecutorHandler(jobInfo.getExecutorHandler());
        triggerParam.setExecutorParams(jobInfo.getExecutorParam());
        triggerParam.setExecutorBlockStrategy(jobInfo.getExecutorBlockStrategy());
        triggerParam.setExecutorTimeout(jobInfo.getExecutorTimeout());
        triggerParam.setLogId(jobLog.getId());
        triggerParam.setLogDateTime(jobLog.getTriggerTime().getTime());
        triggerParam.setGlueType(jobInfo.getGlueType());
        triggerParam.setGlueSource(jobInfo.getGlueSource());
        triggerParam.setGlueUpdatetime(jobInfo.getGlueUpdatetime().getTime());
        triggerParam.setBroadcastIndex(index);
        triggerParam.setBroadcastTotal(total);

        // 3、init address  获取触发器执行地址
        String address = null;
        ReturnT<String> routeAddressResult = null;
        if (group.getRegistryList()!=null && !group.getRegistryList().isEmpty()) {  //如果是集群广播模式
            if (ExecutorRouteStrategyEnum.SHARDING_BROADCAST == executorRouteStrategyEnum) {
                if (index < group.getRegistryList().size()) {  //查询匹配地址执行
                    address = group.getRegistryList().get(index);
                } else { //超过size,则默认执行第一个.
                    address = group.getRegistryList().get(0);
                }
            } else {
                //根据设置的路由策略,执行路由器,获取返回结果   ,这里用到了策略模式
                routeAddressResult = executorRouteStrategyEnum.getRouter().route(triggerParam, group.getRegistryList());
                if (routeAddressResult.getCode() == ReturnT.SUCCESS_CODE) {
                    address = routeAddressResult.getContent();
                }
            }
        } else { //获取不到注册地址,返回失败值
            routeAddressResult = new ReturnT<String>(ReturnT.FAIL_CODE, I18nUtil.getString("jobconf_trigger_address_empty"));
        }

        // 4、trigger remote executor
        ReturnT<String> triggerResult = null;
        if (address != null) { //这里真正的执行触发器
            triggerResult = runExecutor(triggerParam, address);
        } else {//获取不到执行地址直接返回
            triggerResult = new ReturnT<String>(ReturnT.FAIL_CODE, null);
        }

        // 5、collection trigger info
        StringBuffer triggerMsgSb = new StringBuffer();
        triggerMsgSb.append(I18nUtil.getString("jobconf_trigger_type")).append(":").append(triggerType.getTitle());
        triggerMsgSb.append("<br>").append(I18nUtil.getString("jobconf_trigger_admin_adress")).append(":").append(IpUtil.getIp());
        triggerMsgSb.append("<br>").append(I18nUtil.getString("jobconf_trigger_exe_regtype")).append(":")
                .append( (group.getAddressType() == 0)?I18nUtil.getString("jobgroup_field_addressType_0"):I18nUtil.getString("jobgroup_field_addressType_1") );
        triggerMsgSb.append("<br>").append(I18nUtil.getString("jobconf_trigger_exe_regaddress")).append(":").append(group.getRegistryList());
        triggerMsgSb.append("<br>").append(I18nUtil.getString("jobinfo_field_executorRouteStrategy")).append(":").append(executorRouteStrategyEnum.getTitle());
        if (shardingParam != null) {
            triggerMsgSb.append("("+shardingParam+")");
        }
        triggerMsgSb.append("<br>").append(I18nUtil.getString("jobinfo_field_executorBlockStrategy")).append(":").append(blockStrategy.getTitle());
        triggerMsgSb.append("<br>").append(I18nUtil.getString("jobinfo_field_timeout")).append(":").append(jobInfo.getExecutorTimeout());
        triggerMsgSb.append("<br>").append(I18nUtil.getString("jobinfo_field_executorFailRetryCount")).append(":").append(finalFailRetryCount);

        triggerMsgSb.append("<br><br><span style=\"color:#00c0ef;\" > >>>>>>>>>>>"+ I18nUtil.getString("jobconf_trigger_run") +"<<<<<<<<<<< </span><br>")
                .append((routeAddressResult!=null&&routeAddressResult.getMsg()!=null)?routeAddressResult.getMsg()+"<br><br>":"").append(triggerResult.getMsg()!=null?triggerResult.getMsg():"");

        // 6、save log trigger-info  更改执行日志状态
        jobLog.setExecutorAddress(address);
        jobLog.setExecutorHandler(jobInfo.getExecutorHandler());
        jobLog.setExecutorParam(jobInfo.getExecutorParam());
        jobLog.setExecutorShardingParam(shardingParam);
        jobLog.setExecutorFailRetryCount(finalFailRetryCount);
        //jobLog.setTriggerTime();
        jobLog.setTriggerCode(triggerResult.getCode());//设置执行触发器返回值
        jobLog.setTriggerMsg(triggerMsgSb.toString());//设置返回结果信息
        XxlJobAdminConfig.getAdminConfig().getXxlJobLogDao().updateTriggerInfo(jobLog);

        logger.debug(">>>>>>>>>>> xxl-job trigger end, jobId:{}", jobLog.getId());
    }

路由策略

FIRST(I18nUtil.getString(“jobconf_route_first”), new ExecutorRouteFirst()), //(第一个)固定选择第一个机器
LAST(I18nUtil.getString(“jobconf_route_last”), new ExecutorRouteLast()), //(最后一个)固定选择最后一个机器
ROUND(I18nUtil.getString(“jobconf_route_round”), new ExecutorRouteRound()), //(轮询)轮询
RANDOM(I18nUtil.getString(“jobconf_route_random”), new ExecutorRouteRandom()), //(随机)随机选择在线的机器;
CONSISTENT_HASH(I18nUtil.getString(“jobconf_route_consistenthash”), new ExecutorRouteConsistentHash()), //(一致性HASH)每个任务按照Hash算法固定选择某一台机器,且所有任务均匀散列在不同机器上。
LEAST_FREQUENTLY_USED(I18nUtil.getString(“jobconf_route_lfu”), new ExecutorRouteLFU()), //(最不经常使用)使用频率最低的机器优先被选举;
LEAST_RECENTLY_USED(I18nUtil.getString(“jobconf_route_lru”), new ExecutorRouteLRU()), //(最近最久未使用)最久未使用的机器优先被选举;
FAILOVER(I18nUtil.getString(“jobconf_route_failover”), new ExecutorRouteFailover()), //(故障转移)按照顺序依次进行心跳检测,第一个心跳检测成功的机器选定为目标执行器并发起调度;
BUSYOVER(I18nUtil.getString(“jobconf_route_busyover”), new ExecutorRouteBusyover()), //(忙碌转移)按照顺序依次进行空闲检测,第一个空闲检测成功的机器选定为目标执行器并发起调度;
SHARDING_BROADCAST(I18nUtil.getString(“jobconf_route_shard”), null); //(分片广播)广播触发对应集群中所有机器执行一次任务,同时系统自动传递分片参数;可根据分片参数开发分片任务;

获取客户端地址,简单拼接http+token地址。

XxlJobTrigger#runExecutor 真正的执行触发器

    public static ReturnT<String> runExecutor(TriggerParam triggerParam, String address){
        ReturnT<String> runResult = null;
        try {
            ExecutorBiz executorBiz = XxlJobScheduler.getExecutorBiz(address);//获取业务执行器地址,就执行器地址后面拼接token
          //通过post请求客户端执行job
            runResult = executorBiz.run(triggerParam);
        } catch (Exception e) {
            logger.error(">>>>>>>>>>> xxl-job trigger error, please check if the executor[{}] is running.", address, e);
            runResult = new ReturnT<String>(ReturnT.FAIL_CODE, ThrowableUtil.toString(e));
        }
        //返回结果设置msg
        StringBuffer runResultSB = new StringBuffer(I18nUtil.getString("jobconf_trigger_run") + ":");
        runResultSB.append("<br>address:").append(address);
        runResultSB.append("<br>code:").append(runResult.getCode());
        runResultSB.append("<br>msg:").append(runResult.getMsg());

        runResult.setMsg(runResultSB.toString());
        return runResult;
    }

ExecutorBizClient#run 通过post请求客户端执行job

  @Override
    public ReturnT<String> run(TriggerParam triggerParam) {
        return XxlJobRemotingUtil.postBody(addressUrl + "run", accessToken, timeout, triggerParam, String.class);
    }

XxlJobRemotingUtil#postBody

xxl-job自己服务端向客户端内部发起的的rpc通信,在客户端执行器启动时便在EmbedServer开启了服务端口的监听

在这里插入图片描述

4)2:客户端接收处理任务

在这里插入图片描述

执行器的内部使用netty监听了端口

EmbedServer#start 中会开启netty服务端口监听,具体监听请求处理执行的是EmbedHttpServerHandlerd的channelRead0方法

   @Override
        protected void channelRead0(final ChannelHandlerContext ctx, FullHttpRequest msg) throws Exception {

            // request parse
            //final byte[] requestBytes = ByteBufUtil.getBytes(msg.content());    // byteBuf.toString(io.netty.util.CharsetUtil.UTF_8);
            String requestData = msg.content().toString(CharsetUtil.UTF_8);//解析请求数据
            String uri = msg.uri();//获取uri,后面通过uri来处理不同的请求
            HttpMethod httpMethod = msg.method();//获取请求方式,Post/Get
            boolean keepAlive = HttpUtil.isKeepAlive(msg);//保持长连接
            String accessTokenReq = msg.headers().get(XxlJobRemotingUtil.XXL_JOB_ACCESS_TOKEN);

            // invoke
            bizThreadPool.execute(new Runnable() {
                @Override
                public void run() {
                    // do invoke
                    Object responseObj = process(httpMethod, uri, requestData, accessTokenReq);

                    // to json  响应结果转json
                    String responseJson = GsonTool.toJson(responseObj);

                    // write response
                    writeResponse(ctx, keepAlive, responseJson);
                }
            });
        }


执行触发器

   private Object process(HttpMethod httpMethod, String uri, String requestData, String accessTokenReq) {

            // valid 不是POST直接返回异常
            if (HttpMethod.POST != httpMethod) {
                return new ReturnT<String>(ReturnT.FAIL_CODE, "invalid request, HttpMethod not support.");
            }
            if (uri==null || uri.trim().length()==0) { //校验uri
                return new ReturnT<String>(ReturnT.FAIL_CODE, "invalid request, uri-mapping empty.");
            }//校验token是否正确
            if (accessToken!=null
                    && accessToken.trim().length()>0
                    && !accessToken.equals(accessTokenReq)) {
                return new ReturnT<String>(ReturnT.FAIL_CODE, "The access token is wrong.");
            }

            // services mapping
            try {
                if ("/beat".equals(uri)) {
                    return executorBiz.beat();
                } else if ("/idleBeat".equals(uri)) {
                    IdleBeatParam idleBeatParam = GsonTool.fromJson(requestData, IdleBeatParam.class);
                    return executorBiz.idleBeat(idleBeatParam);
                } else if ("/run".equals(uri)) { //执行触发器
                    TriggerParam triggerParam = GsonTool.fromJson(requestData, TriggerParam.class);//请求参数解析成TriggerParam
                    return executorBiz.run(triggerParam);
                } else if ("/kill".equals(uri)) {
                    KillParam killParam = GsonTool.fromJson(requestData, KillParam.class);
                    return executorBiz.kill(killParam);
                } else if ("/log".equals(uri)) {
                    LogParam logParam = GsonTool.fromJson(requestData, LogParam.class);
                    return executorBiz.log(logParam);
                } else {
                    return new ReturnT<String>(ReturnT.FAIL_CODE, "invalid request, uri-mapping("+ uri +") not found.");
                }
            } catch (Exception e) {
                logger.error(e.getMessage(), e);
                return new ReturnT<String>(ReturnT.FAIL_CODE, "request error:" + ThrowableUtil.toString(e));
            }
        }


  1. 绑定作业到具体线程(这里绑定的jobHandler在执行器启动时就已经根据@XxlJob注解的方法初始化注册到一个名为jobHandlerRepository的ConcurrentMap中了)
  2. 检查缓存中是否存在调度中心要调度的jobThread,存在则设置调度中心传递来的参数,不存在则新建jobThread并注册到名为jobThreadRepository的ConcurrentMap中了
  3. jobThread在注册到jobThreadRepository时执行了其start方法

核心调用XxlJobExecutor.registJobThread

 @Override
    public ReturnT<String> run(TriggerParam triggerParam) {
        // load old:jobHandler + jobThread
        //获取job绑定线程
        JobThread jobThread = XxlJobExecutor.loadJobThread(triggerParam.getJobId());
        //获取作业处理器
        IJobHandler jobHandler = jobThread!=null?jobThread.getHandler():null;
        String removeOldReason = null;

        // valid:jobHandler + jobThread
        //获取任务运行模式
        GlueTypeEnum glueTypeEnum = GlueTypeEnum.match(triggerParam.getGlueType());
        //任务以JobHandler方式维护在执行器端;需要结合 "JobHandler" 属性匹配执行器中任务;
        if (GlueTypeEnum.BEAN == glueTypeEnum) {
            // new jobhandler  获取job处理器,也就是方法声明
            IJobHandler newJobHandler = XxlJobExecutor.loadJobHandler(triggerParam.getExecutorHandler());

            // valid old jobThread 如果job处理器不一样,则kill旧处理器绑定的线程
            if (jobThread!=null && jobHandler != newJobHandler) {
                // change handler, need kill old thread
                removeOldReason = "change jobhandler or glue type, and terminate the old job thread.";

                jobThread = null;
                jobHandler = null;
            }

            // valid handler
            //执行到这儿,要么新旧处理器不一致,要么没有绑定过任何线程
            if (jobHandler == null) {
                jobHandler = newJobHandler;
                //没找到处理器,直接返回异常
                if (jobHandler == null) {
                    return new ReturnT<String>(ReturnT.FAIL_CODE, "job handler [" + triggerParam.getExecutorHandler() + "] not found.");
                }
            }

        } else if (GlueTypeEnum.GLUE_GROOVY == glueTypeEnum) {

            // valid old jobThread
            if (jobThread != null &&
                    !(jobThread.getHandler() instanceof GlueJobHandler
                        && ((GlueJobHandler) jobThread.getHandler()).getGlueUpdatetime()==triggerParam.getGlueUpdatetime() )) {
                // change handler or gluesource updated, need kill old thread
                removeOldReason = "change job source or glue type, and terminate the old job thread.";

                jobThread = null;
                jobHandler = null;
            }

            // valid handler
            if (jobHandler == null) {
                try {
                    IJobHandler originJobHandler = GlueFactory.getInstance().loadNewInstance(triggerParam.getGlueSource());
                    jobHandler = new GlueJobHandler(originJobHandler, triggerParam.getGlueUpdatetime());
                } catch (Exception e) {
                    logger.error(e.getMessage(), e);
                    return new ReturnT<String>(ReturnT.FAIL_CODE, e.getMessage());
                }
            }
        } else if (glueTypeEnum!=null && glueTypeEnum.isScript()) {

            // valid old jobThread
            if (jobThread != null &&
                    !(jobThread.getHandler() instanceof ScriptJobHandler
                            && ((ScriptJobHandler) jobThread.getHandler()).getGlueUpdatetime()==triggerParam.getGlueUpdatetime() )) {
                // change script or gluesource updated, need kill old thread
                removeOldReason = "change job source or glue type, and terminate the old job thread.";

                jobThread = null;
                jobHandler = null;
            }

            // valid handler
            if (jobHandler == null) {
                jobHandler = new ScriptJobHandler(triggerParam.getJobId(), triggerParam.getGlueUpdatetime(), triggerParam.getGlueSource(), GlueTypeEnum.match(triggerParam.getGlueType()));
            }
        } else {
            return new ReturnT<String>(ReturnT.FAIL_CODE, "glueType[" + triggerParam.getGlueType() + "] is not valid.");
        }

        // executor block strategy
        //获取任务阻塞策略
        if (jobThread != null) {
            ExecutorBlockStrategyEnum blockStrategy = ExecutorBlockStrategyEnum.match(triggerParam.getExecutorBlockStrategy(), null);
            if (ExecutorBlockStrategyEnum.DISCARD_LATER == blockStrategy) {
                // discard when running
                if (jobThread.isRunningOrHasQueue()) {
                    return new ReturnT<String>(ReturnT.FAIL_CODE, "block strategy effect:"+ExecutorBlockStrategyEnum.DISCARD_LATER.getTitle());
                }
            } else if (ExecutorBlockStrategyEnum.COVER_EARLY == blockStrategy) {
                // kill running jobThread
                if (jobThread.isRunningOrHasQueue()) {
                    removeOldReason = "block strategy effect:" + ExecutorBlockStrategyEnum.COVER_EARLY.getTitle();

                    jobThread = null;
                }
            } else {
                // just queue trigger
            }
        }

        // replace thread (new or exists invalid)
        //作业没绑定过线程,则绑定作业到具体线程
        if (jobThread == null) {
            jobThread = XxlJobExecutor.registJobThread(triggerParam.getJobId(), jobHandler, removeOldReason);
        }

        // push data to queue  将任务参数设置到任务内部的阻塞队列
        ReturnT<String> pushResult = jobThread.pushTriggerQueue(triggerParam);
        return pushResult;
    }

jobThread具体的run方法

  @Override
	public void run() {

    	// init
    	try {
			handler.init();//执行初始化任务
		} catch (Throwable e) {
    		logger.error(e.getMessage(), e);
		}

		// execute
		while(!toStop){
			running = false;
			idleTimes++;//统计空闲执行次数

            TriggerParam triggerParam = null;
            try {
				// to check toStop signal, we need cycle, so wo cannot use queue.take(), instand of poll(timeout)
				triggerParam = triggerQueue.poll(3L, TimeUnit.SECONDS); //获取触发器任务
				if (triggerParam!=null) {
					running = true;
					idleTimes = 0;//空闲次数重置
					triggerLogIdSet.remove(triggerParam.getLogId());  //删除logId主要用来判断是否重复执行

					// log filename, like "logPath/yyyy-MM-dd/9999.log"  写入log文件
					String logFileName = XxlJobFileAppender.makeLogFileName(new Date(triggerParam.getLogDateTime()), triggerParam.getLogId());
					XxlJobContext xxlJobContext = new XxlJobContext(
							triggerParam.getJobId(),
							triggerParam.getExecutorParams(),
							logFileName,
							triggerParam.getBroadcastIndex(),
							triggerParam.getBroadcastTotal());

					// init job context
					XxlJobContext.setXxlJobContext(xxlJobContext);

					// execute
					XxlJobHelper.log("<br>----------- xxl-job job execute start -----------<br>----------- Param:" + xxlJobContext.getJobParam());
					//设置了超时就异步线程处理
					if (triggerParam.getExecutorTimeout() > 0) {
						// limit timeout
						Thread futureThread = null;
						try {
							FutureTask<Boolean> futureTask = new FutureTask<Boolean>(new Callable<Boolean>() {
								@Override
								public Boolean call() throws Exception {

									// init job context
									XxlJobContext.setXxlJobContext(xxlJobContext);

									handler.execute();
									return true;
								}
							});
							futureThread = new Thread(futureTask);
							futureThread.start();
							//异步线程处理并获取返回值
							Boolean tempResult = futureTask.get(triggerParam.getExecutorTimeout(), TimeUnit.SECONDS);
						} catch (TimeoutException e) {

							XxlJobHelper.log("<br>----------- xxl-job job execute timeout");
							XxlJobHelper.log(e);

							// handle result
							XxlJobHelper.handleTimeout("job execute timeout ");
						} finally {
							futureThread.interrupt();
						}
					} else {
						// just execute  没设置超时时间,则立刻执行触发器
						handler.execute();
					}

					// valid execute handle data
					if (XxlJobContext.getXxlJobContext().getHandleCode() <= 0) {
						XxlJobHelper.handleFail("job handle result lost.");
					} else {
						String tempHandleMsg = XxlJobContext.getXxlJobContext().getHandleMsg();
						tempHandleMsg = (tempHandleMsg!=null&&tempHandleMsg.length()>50000)
								?tempHandleMsg.substring(0, 50000).concat("...")
								:tempHandleMsg;
						XxlJobContext.getXxlJobContext().setHandleMsg(tempHandleMsg);
					}
					XxlJobHelper.log("<br>----------- xxl-job job execute end(finish) -----------<br>----------- Result: handleCode="
							+ XxlJobContext.getXxlJobContext().getHandleCode()
							+ ", handleMsg = "
							+ XxlJobContext.getXxlJobContext().getHandleMsg()
					);

				} else {
					if (idleTimes > 30) { //空闲执行次数超过30次,且队列没任务,则删除并终止线程
						if(triggerQueue.size() == 0) {	// avoid concurrent trigger causes jobId-lost
							XxlJobExecutor.removeJobThread(jobId, "excutor idel times over limit.");
						}
					}
				}
			} catch (Throwable e) {
				if (toStop) {
					XxlJobHelper.log("<br>----------- JobThread toStop, stopReason:" + stopReason);
				}

				// handle result
				StringWriter stringWriter = new StringWriter();
				e.printStackTrace(new PrintWriter(stringWriter));
				String errorMsg = stringWriter.toString();

				XxlJobHelper.handleFail(errorMsg);

				XxlJobHelper.log("<br>----------- JobThread Exception:" + errorMsg + "<br>----------- xxl-job job execute end(error) -----------");
			} finally {
                if(triggerParam != null) {
                    // callback handler info
                    if (!toStop) {
                        // commonm  加入回调队列
                        TriggerCallbackThread.pushCallBack(new HandleCallbackParam(
                        		triggerParam.getLogId(),
								triggerParam.getLogDateTime(),
								XxlJobContext.getXxlJobContext().getHandleCode(),
								XxlJobContext.getXxlJobContext().getHandleMsg() )
						);
                    } else {
                        // is killed
                        TriggerCallbackThread.pushCallBack(new HandleCallbackParam(
                        		triggerParam.getLogId(),
								triggerParam.getLogDateTime(),
								XxlJobContext.HANDLE_COCE_FAIL,
								stopReason + " [job running, killed]" )
						);
                    }
                }
            }
        }

		// callback trigger request in queue
		while(triggerQueue !=null && triggerQueue.size()>0){
			TriggerParam triggerParam = triggerQueue.poll();
			if (triggerParam!=null) {
				// is killed 加入回调队列
				TriggerCallbackThread.pushCallBack(new HandleCallbackParam(
						triggerParam.getLogId(),
						triggerParam.getLogDateTime(),
						XxlJobContext.HANDLE_COCE_FAIL,
						stopReason + " [job not executed, in the job queue, killed.]")
				);
			}
		}

		// destroy
		try { //销毁,执行调度器设置的销毁方法
			handler.destroy();
		} catch (Throwable e) {
			logger.error(e.getMessage(), e);
		}

		logger.info(">>>>>>>>>>> xxl-job JobThread stoped, hashCode:{}", Thread.currentThread());
	}

handler.execute(); 这里利用反射调用方法。

com.xxl.job.core.handler.impl.MethodJobHandler

@Override
public void execute() throws Exception {
    Class<?>[] paramTypes = method.getParameterTypes();
    if (paramTypes.length > 0) {
        method.invoke(target, new Object[paramTypes.length]);       // method-param can not be primitive-types
    } else {
        method.invoke(target);
    }
}

4)3:客户端回调服务端

当服务端调用客户端执行后,实际上会直接返回;想要监听客户端任务执行过程中的日志,客户端启动时启动了一个TriggerCallbackThread的线程,会将客户端任务执行过程中的日志回调发送给服务端

在上面的ExecutorBizImpl#run方法中的最后是直接

在这里插入图片描述

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mfbz.cn/a/28799.html

如若内容造成侵权/违法违规/事实不符,请联系我们进行投诉反馈qq邮箱809451989@qq.com,一经查实,立即删除!

相关文章

批量生成,本地推理,人工智能声音克隆框架PaddleSpeech本地批量克隆实践(Python3.10)

云端炼丹固然是极好的&#xff0c;但不能否认的是&#xff0c;成本要比本地高得多&#xff0c;同时考虑到深度学习的训练相对于推理来说成本也更高&#xff0c;这主要是因为它需要大量的数据、计算资源和时间等资源&#xff0c;并且对超参数的调整也要求较高&#xff0c;更适合…

熬夜三晚之深度解析DuckDB MetaPipeline

深度解析DuckDB MetaPipeline 深度解析DuckDB MetaPipeline 1.导语 2.基础理论 3.HashJoin深度解析 3.1 RESULT_COLLECTOR 3.2 PROJECTION 3.3 HASH_JOIN 4.Ready 4.1 翻转 4.2 MetaPipeline与pipeline 5.汇总 1.导语 DuckDB 是…

深入理解深度学习——Transformer:基础知识

分类目录&#xff1a;《深入理解深度学习》总目录 相关文章&#xff1a; 作为当下最先进的深度学习架构之一&#xff0c;Transformer被广泛应用于自然语言处理领域。它不单替代了以前流行的循环神经网络(recurrent neural network, RNN)和长短期记忆(long short-term memory, …

从 Google 删库,到蚂蚁跑路,Care 与 Fear 点燃的 Flare

Bytebase 第一次完成融资后写了一篇文章&#xff0c;主要讲了从行业层面做 Bytebase 的逻辑。一年过去了&#xff0c;这一年我们所处的开源/infra/数据库/企业服务赛道从热点归于平静&#xff0c;尤其在国内&#xff0c;又习惯性地反应过度&#xff0c;直接降到冰点。但从全球来…

Apache RocketMQ RCE漏洞复现(CVE-2023-33246)

RocketMQ RocketMQ是阿里巴巴在2012年开发的分布式消息中间件&#xff0c;专为万亿级超大规模的消息处理而设计&#xff0c;具有高吞吐量、低延迟、海量堆积、顺序收发等特点。它是阿里巴巴双十一购物狂欢节和众多大规模互联网业务场景的必备基础设施。 漏洞概述 在其5.1.0版…

基于Servlet+mysql+jsp学生宿舍信息管理系统

基于Servletmysqljsp学生宿舍信息管理系统 一、系统介绍二、功能展示1.用户登陆2.学生-主页面3.学生-缺勤记录4.学生-修改密码5.宿舍管理员-主页面6.宿舍管理员-学生查看7.宿舍管理员-缺勤记录8.系统管理员-宿舍管理员管理9.系统管理员-学生管理10.系统管理员-宿舍楼管理11.系统…

5大趋势与10大应用场景!未来的智能工厂要这么建...

在经济下行压力、人口红利消失、消费结构升级、疫情冲击等多种因素推动下&#xff0c;制造企业加快转型步伐&#xff0c;工厂正向高效化、智能化、绿色化方向跃迁升级&#xff0c;不断涌现出技术创新、应用领先、成效显著的智能工厂。 近日&#xff0c;中国信息通信研究院发布…

外观模式(十三)

每天都是全新的一天&#xff0c;感谢今日努力的自己。 上一章简单介绍了组合模式(十二), 如果没有看过, 请观看上一章 一. 外观模式 引用 菜鸟教程里面的外观模式介绍: https://www.runoob.com/design-pattern/facade-pattern.html 外观模式&#xff08;Facade Pattern&…

OpenCV 项目开发实战--用 (C++ / Python)实现伪着色

文末附相关测试代码下载链接 在本教程中,我们将首先展示一种使用 OpenCV 的预定义颜色图对灰度图像进行伪彩色/伪彩色 的简单方法。如果您更愿意使用自己的颜色图,我们还将展示一种定义自定义颜色图的方法。 行星和太空中其他物体的灰度图像通常是伪彩色的,以显示细…

Vue中如何进行滚动吸顶与侧边栏固定

Vue中如何进行滚动吸顶与侧边栏固定 在Vue应用程序中&#xff0c;当需要实现滚动吸顶和侧边栏固定效果时&#xff0c;我们可以使用一些技术来实现。这些技术包括CSS和JavaScript&#xff0c;可以帮助我们实现各种各样的滚动效果。 如何实现滚动吸顶&#xff1f; 滚动吸顶是指…

[Halcon3D] 3D鞋点胶的点云边界提取

&#x1f4e2;博客主页&#xff1a;https://loewen.blog.csdn.net&#x1f4e2;欢迎点赞 &#x1f44d; 收藏 ⭐留言 &#x1f4dd; 如有错误敬请指正&#xff01;&#x1f4e2;本文由 丶布布原创&#xff0c;首发于 CSDN&#xff0c;转载注明出处&#x1f649;&#x1f4e2;现…

JavaWeb之JSP

文章目录 JSP的基本介绍JSP的本质JSP的三种语法JSP头部的page指令language属性contentType属性image.pngpageEncoding属性import属性autoFlush属性 - 给out输出流使用buffer属性 - 给out输出流使用errorPage属性isErrorPage属性session属性extends属性 JSP中的常用脚本声明脚本…

什么是oa系统,什么是工单系统,有啥区别?

什么是oa系统&#xff0c;什么是工单系统&#xff0c;有啥区别&#xff1f; 一、OA系统与工单系统介绍 1、什么是OA系统 OA系统全称为Office Automation&#xff0c;即办公自动化系统。它是一种专门为企业和机构的日常办公工作提供服务的综合性软件平台&#xff0c;具有信息…

Python抓取商品详情方法的几种方法比较

抓取商品详情的方法有很多种&#xff0c;以下是其中几种常见的方法及其优缺点&#xff1a; 1.使用requests库发送HTTP请求&#xff0c;然后解析HTML或JSON格式的数据&#xff1a; 优点&#xff1a;这种方法可以抓取几乎所有网站上的数据&#xff0c;支持GET和POST请求&#xff…

佩戴比较舒适的蓝牙耳机有哪些?值得入手的蓝牙耳机分享

​对于年轻人来说&#xff0c;耳机使用场景丰富&#xff0c;时尚追求度高&#xff0c;喜好的音乐类型也是多种多样&#xff0c;需求侧重也不尽相同。下面我来推荐几款相当不错的蓝牙耳机给大家&#xff0c;总会有喜欢那款&#xff01; 一、南卡OE蓝牙耳机 佩戴舒适度打分&…

界面开发框架Qt新手入门指南 - 使用Calendar组件创建日历(一)

Qt 是目前最先进、最完整的跨平台C开发工具。它不仅完全实现了一次编写&#xff0c;所有平台无差别运行&#xff0c;更提供了几乎所有开发过程中需要用到的工具。如今&#xff0c;Qt已被运用于超过70个行业、数千家企业&#xff0c;支持数百万设备及应用。 本文中的CalendarWi…

快速傅里叶变换python实现

img { margin: auto; display: block } 一、前言 我想认真写好快速傅里叶变换(Fast Fourier Transform&#xff0c;FFT)&#xff0c;所以这篇文章会由浅到细&#xff0c;由窄到宽的讲解&#xff0c;但是傅里叶变换对于寻常人并不是很容易理解的&#xff0c;所以对于基础不牢的人…

RK3588平台开发系列讲解(USB篇)USB Device端口组合配置过程

平台内核版本安卓版本RK3588Linux 5.10Android 12文章目录 一、configfs二、configfs 配置过程2.1、使能相关的宏2.2、挂载configfs2.3、创建名为g1的usb复合设备2.4、配置PID和VID2.5、创建并配置strings子目录2.6、创建configuration和字符串2.7、创建functions2.8、将functi…

【C#】并行编程实战:任务并行性(上)

在 .NET 的初始版本中&#xff0c;我们只能依赖线程&#xff08;线程可以直接创建或者使用 ThreadPool 类创建&#xff09;。ThreadPool 类提供了一个托管抽象层&#xff0c;但是开发人员仍然需要依靠 Thread 类来进行更好的控制。而 Thread 类维护困难&#xff0c;且不可托管&…

【半监督图像分割 2023 CVPR】UniMatch

【半监督图像分割 2023 CVPR】UniMatch 论文题目&#xff1a;Revisiting Weak-to-Strong Consistency in Semi-Supervised Semantic Segmentation 中文题目&#xff1a;重新审视半监督语义分割中的强弱一致性 论文链接&#xff1a;https://arxiv.org/abs/2208.09910 论文代码&a…
最新文章