最新要闻

广告

手机

iphone11大小尺寸是多少?苹果iPhone11和iPhone13的区别是什么?

iphone11大小尺寸是多少?苹果iPhone11和iPhone13的区别是什么?

警方通报辅警执法直播中被撞飞:犯罪嫌疑人已投案

警方通报辅警执法直播中被撞飞:犯罪嫌疑人已投案

家电

天天滚动:elastic-job源码(2)-选举机制

来源:博客园


(相关资料图)

选举机制:利用zookeeper分布式锁机制,每一个job都存在节点选举机制,用于job分片处理。Job在初始化的时候就会实施选举机制如下初始化的代码:
public void registerStartUpInfo(final boolean enabled) {    //开始所有的监听器    listenerManager.startAllListeners();    //选举leader /{namespace}/leader/election/instance 放置选举出来的服务器    leaderService.electLeader();    //{namespace}/{ipservers} 设置enable处理    serverService.persistOnline(enabled);    //临时节点   /{namespave}/instances 放置运行服务实例信息    instanceService.persistOnline();    //开启一个异步服务    if (!reconcileService.isRunning()) {        reconcileService.startAsync();    }}
listenerManager.startAllListeners();会开启一个选举相关的listenerManager ElectionListenerManager.classleaderService.electLeader();执行选举功能 第一步:执行选举功能
public void electLeader() {    log.debug("Elect a new leader now.");    this.jobNodeStorage.executeInLeader("leader/election/latch", new LeaderService.LeaderElectionExecutionCallback());    log.debug("Leader election completed.");}
public void executeInLeader(String key, LeaderExecutionCallback callback) {    try {        LeaderLatch latch = new LeaderLatch(this.client, key);        try {            latch.start();            latch.await();            callback.execute();        } catch (Throwable var7) {            try {                latch.close();            } catch (Throwable var6) {                var7.addSuppressed(var6);            }            throw var7;        }        latch.close();    } catch (Exception var8) {        this.handleException(var8);    }}
{job name}/leader/election/latch节点加zk锁,在抢到锁之后,调用callback对象中的execute方法
class LeaderElectionExecutionCallback implements LeaderExecutionCallback {        @Override    public void execute() {        //{jobname}/leader/election/instance 不存在        if (!hasLeader()) {            //创建临时节点 {jobname}/leader/election/instance 值为 当前运行的实例值 例如:10.100.16.75@-@134642 前面是ip地址,后面是产生的随机数            //当应用实例与zk断开重新连接时,该节点信息会清除            jobNodeStorage.fillEphemeralJobNode(LeaderNode.INSTANCE, JobRegistry.getInstance().getJobInstance(jobName).getJobInstanceId());        }    }}
第二步:ElectionListenerManager.class开启监听
@Overridepublic void start() {    addDataListener(new LeaderElectionJobListener());    addDataListener(new LeaderAbdicationJobListener());}
执行start方法有两个监听LeaderElectionJobListener:用于leader宕机之后重新选举监听LeaderAbdicationJobListener :用于监听leader宕机数据处理 LeaderElectionJobListener.java
@Overridepublic void onChange(final DataChangedEvent event) {    //1.schedulerMap 和 jobInstanceMap 没有job信息    //2.{jobname}/service/{ip} 节点数据为DISABLE 或者 ({jobname}/leader/election/instance 节点的类型为删除且{jobname}/servers 节点的值是ENABLED 且  {jobname}/instances 节点下有其他的在线实例)    //当前运行的job实例宕机,并且有其他运行的实例    if (!JobRegistry.getInstance().isShutdown(jobName) && (isActiveElection(event.getKey(), event.getValue()) || isPassiveElection(event.getKey(), event.getType()))) {        //重新选举        leaderService.electLeader();    }}

LeaderAbdicationJobListener.java

@Overridepublic void onChange(final DataChangedEvent event) {    //{jobname}/leader/election/instance节点的实例id和JobRegistry对象中的实例id相等    //{jobname}/service/{ip}/ 是DISABLED    //就是实例下线    if (leaderService.isLeader() && isLocalServerDisabled(event.getKey(), event.getValue())) {        //删除{jobname}/leader/election/instance 节点        leaderService.removeLeader();    }}

关键词: