最新要闻
- 环球头条:【手慢无】速来占便宜!三星128GB存储卡仅需49.8元
- 全球热头条丨汽车雷达在无人陵园内显示全是人影:吓坏车主
- 985硕士男子失业半年 应聘道士35岁已超龄!做实习生都被拒:现送外卖
- 没污染!国内首款氢内燃机飞机成功首飞:中国自研
- 环球新资讯:内地特供的网游性价比神U!酷睿i7-13790F评测:游戏性能强于锐龙9 7950X
- 环球播报:海豚模拟器登陆Steam
- 全球实时:半场战报:踢疯了!梅西第38分钟戴帽 阿根廷5-0暂领先库拉索
- 环球热推荐:Epic独占结束!中国功夫游戏《师父》登陆Steam:139元起
- 全固态电池空间电荷层微观机理揭示
- 环球要闻:核心、显存砍得没法看!RTX 4060 Ti/4060要5月上市:3999元起超值?
- 天天播报:大作《生化危机4重制版》发售 采用Denuvo正版加密 黑客发声:准备出手破解
- 天天快看点丨任天堂《塞尔达传说:王国之泪》限定版Switch发布!金黄手柄太酷炫
- 短讯!《最后生还者》PC版多半差评
- 快讯:成年子女不能与父母在酒店住同一标间引热议 太没道理?酒店回应
- 环球观察:4个多月第一次!Intel Arc Pro专业显卡终于有了新驱动
- 当前最新:小米“退钱”了:27万小米空气净化器初代用户 每人899元
手机
iphone11大小尺寸是多少?苹果iPhone11和iPhone13的区别是什么?
警方通报辅警执法直播中被撞飞:犯罪嫌疑人已投案
- iphone11大小尺寸是多少?苹果iPhone11和iPhone13的区别是什么?
- 警方通报辅警执法直播中被撞飞:犯罪嫌疑人已投案
- 男子被关545天申国赔:获赔18万多 驳回精神抚慰金
- 3天内26名本土感染者,辽宁确诊人数已超安徽
- 广西柳州一男子因纠纷杀害三人后自首
- 洱海坠机4名机组人员被批准为烈士 数千干部群众悼念
家电
zookeeper的Leader选举源码解析
作者:京东物流梁吉超
zookeeper是一个分布式服务框架,主要解决分布式应用中常见的多种数据问题,例如集群管理,状态同步等。为解决这些问题zookeeper需要Leader选举进行保障数据的强一致性机制和稳定性。本文通过集群的配置,对leader选举源进行解析,让读者们了解如何利用BIO通信机制,多线程多层队列实现高性能架构。
【资料图】
01Leader选举机制
Leader选举机制采用半数选举算法。
每一个zookeeper服务端称之为一个节点,每个节点都有投票权,把其选票投向每一个有选举权的节点,当其中一个节点选举出票数过半,这个节点就会成为Leader,其它节点成为Follower。
02Leader选举集群配置
重命名zoo_sample.cfg文件为zoo1.cfg ,zoo2.cfg,zoo3.cfg,zoo4.cfg
修改zoo.cfg文件,修改值如下:
【plain】zoo1.cfg文件内容:dataDir=/export/data/zookeeper-1clientPort=2181server.1=127.0.0.1:2001:3001server.2=127.0.0.1:2002:3002:participantserver.3=127.0.0.1:2003:3003:participantserver.4=127.0.0.1:2004:3004:observerzoo2.cfg文件内容:dataDir=/export/data/zookeeper-2clientPort=2182server.1=127.0.0.1:2001:3001server.2=127.0.0.1:2002:3002:participantserver.3=127.0.0.1:2003:3003:participantserver.4=127.0.0.1:2004:3004:observerzoo3.cfg文件内容:dataDir=/export/data/zookeeper-3clientPort=2183server.1=127.0.0.1:2001:3001server.2=127.0.0.1:2002:3002:participantserver.3=127.0.0.1:2003:3003:participantserver.4=127.0.0.1:2004:3004:observerzoo4.cfg文件内容:dataDir=/export/data/zookeeper-4clientPort=2184server.1=127.0.0.1:2001:3001server.2=127.0.0.1:2002:3002:participantserver.3=127.0.0.1:2003:3003:participantserver.4=127.0.0.1:2004:3004:observer
- server.第几号服务器(对应myid文件内容)=ip:数据同步端口:选举端口:选举标识
- participant默认参与选举标识,可不写. observer不参与选举
4.在/export/data/zookeeper-1,/export/data/zookeeper-2,/export/data/zookeeper-3,/export/data/zookeeper-4目录下创建myid文件,文件内容分别写1 ,2,3,4,用于标识sid(全称:Server ID)赋值。
- 启动三个zookeeper实例:
- bin/zkServer.sh start conf/zoo1.cfg
- bin/zkServer.sh start conf/zoo2.cfg
- bin/zkServer.sh start conf/zoo3.cfg
- 每启动一个实例,都会读取启动参数配置zoo.cfg文件,这样实例就可以知道其作为服务端身份信息sid以及集群中有多少个实例参与选举。
03Leader选举流程
图1 第一轮到第二轮投票流程
前提:
设定票据数据格式vote(sid,zxid,epoch)
- sid是Server ID每台服务的唯一标识,是myid文件内容;
- zxid是数据事务id号;
- epoch为选举周期,为方便理解下面讲解内容暂定为1初次选举,不写入下面内容里。
按照顺序启动sid=1,sid=2节点
第一轮投票:
sid=1节点:初始选票为自己,将选票vote(1,0)发送给sid=2节点;
sid=2节点:初始选票为自己,将选票vote(2,0)发送给sid=1节点;
sid=1节点:收到sid=2节点选票vote(2,0)和当前自己的选票vote(1,0),首先比对zxid值,zxid越大代表数据最新,优先选择zxid最大的选票,如果zxid相同,选举最大sid。当前投票选举结果为vote(2,0),sid=1节点的选票变为vote(2,0);
sid=2节点:收到sid=1节点选票vote(1,0)和当前自己的选票vote(2,0),参照上述选举方式,选举结果为vote(2,0),sid=2节点的选票不变;
第一轮投票选举结束。
第二轮投票:
sid=1节点:当前自己的选票为vote(2,0),将选票vote(2,0)发送给sid=2节点;
sid=2节点:当前自己的选票为vote(2,0),将选票vote(2,0)发送给sid=1节点;
sid=1节点:收到sid=2节点选票vote(2,0)和自己的选票vote(2,0), 按照半数选举算法,总共3个节点参与选举,已有2个节点选举出相同选票,推举sid=2节点为Leader,自己角色变为Follower;
sid=2节点:收到sid=1节点选票vote(2,0)和自己的选票vote(2,0),按照半数选举算法推举sid=2节点为Leader,自己角色变为Leader。
这时启动sid=3节点后,集群里已经选举出leader,sid=1和sid=2节点会将自己的leader选票发回给sid=3节点,通过半数选举结果还是sid=2节点为leader。
3.1 Leader选举采用多层队列架构
zookeeper选举底层主要分为选举应用层和消息传输队列层,第一层应用层队列统一接收和发送选票,而第二层传输层队列,是按照服务端sid分成了多个队列,是为了避免给每台服务端发送消息互相影响。比如对某台机器发送不成功不会影响正常服务端的发送。
图2 多层队列上下关系交互流程图
04解析代码入口类
通过查看zkServer.sh文件内容找到服务启动类:
org.apache.zookeeper.server.quorum.QuorumPeerMain
05选举流程代码解析
图3 选举代码实现流程图
- 加载配置文件QuorumPeerConfig.parse(path);
针对 Leader选举关键配置信息如下:
- 读取dataDir目录找到myid文件内容,设置当前应用sid标识,做为投票人身份信息。下面遇到myid变量为当前节点自己sid标识。
- 设置peerType当前应用是否参与选举
- new QuorumMaj()解析server.前缀加载集群成员信息,加载allMembers所有成员,votingMembers参与选举成员,observingMembers观察者成员,设置half值votingMembers.size()/2.
【Java】public QuorumMaj(Properties props) throws ConfigException { for (Entry
QuorumPeerMain.runFromConfig(config) 启动服务;
QuorumPeer.startLeaderElection() 开启选举服务;
- 设置当前选票new Vote(sid,zxid,epoch)
【plain】synchronized public void startLeaderElection(){try { if (getPeerState() == ServerState.LOOKING) { //首轮:当前节点默认投票对象为自己 currentVote = new Vote(myid, getLastLoggedZxid(), getCurrentEpoch()); } } catch(IOException e) { RuntimeException re = new RuntimeException(e.getMessage()); re.setStackTrace(e.getStackTrace()); throw re; }//........}
- 创建选举管理类:QuorumCnxnManager;
- 初始化recvQueue
接收投票队列(第二层传输队列); - 初始化queueSendMap
按sid发送投票队列(第二层传输队列); - 初始化senderWorkerMap
发送投票工作线程容器,表示着与sid投票节点已连接; - 初始化选举监听线程类QuorumCnxnManager.Listener。
【Java】//QuorumPeer.createCnxnManager()public QuorumCnxManager(QuorumPeer self, final long mySid, Map view, QuorumAuthServer authServer, QuorumAuthLearner authLearner, int socketTimeout, boolean listenOnAllIPs, int quorumCnxnThreadsSize, boolean quorumSaslAuthEnabled) { //接收投票队列(第二层传输队列) this.recvQueue = new ArrayBlockingQueue(RECV_CAPACITY); //按sid发送投票队列(第二层传输队列) this.queueSendMap = new ConcurrentHashMap>(); //发送投票工作线程容器,表示着与sid投票节点已连接 this.senderWorkerMap = new ConcurrentHashMap(); this.lastMessageSent = new ConcurrentHashMap(); String cnxToValue = System.getProperty("zookeeper.cnxTimeout"); if(cnxToValue != null){ this.cnxTO = Integer.parseInt(cnxToValue); } this.self = self; this.mySid = mySid; this.socketTimeout = socketTimeout; this.view = view; this.listenOnAllIPs = listenOnAllIPs; initializeAuth(mySid, authServer, authLearner, quorumCnxnThreadsSize, quorumSaslAuthEnabled); // Starts listener thread that waits for connection requests //创建选举监听线程 接收选举投票请求 listener = new Listener(); listener.setName("QuorumPeerListener");}//QuorumPeer.createElectionAlgorithmprotected Election createElectionAlgorithm(int electionAlgorithm){ Election le=null; //TODO: use a factory rather than a switch switch (electionAlgorithm) { case 0: le = new LeaderElection(this); break; case 1: le = new AuthFastLeaderElection(this); break; case 2: le = new AuthFastLeaderElection(this, true); break; case 3: qcm = createCnxnManager();// new QuorumCnxManager(... new Listener()) QuorumCnxManager.Listener listener = qcm.listener; if(listener != null){ listener.start();//启动选举监听线程 FastLeaderElection fle = new FastLeaderElection(this, qcm); fle.start(); le = fle; } else { LOG.error("Null listener when initializing cnx manager"); } break; default: assert false; }return le;}
- 开启选举监听线程QuorumCnxnManager.Listener;
- 创建ServerSockket等待大于自己sid节点连接,连接信息存储到senderWorkerMap
; - sid>self.sid才可以连接过来。
【Java】//上面的listener.start()执行后,选择此方法public void run() { int numRetries = 0; InetSocketAddress addr; Socket client = null; while((!shutdown) && (numRetries < 3)){ try { ss = new ServerSocket(); ss.setReuseAddress(true); if (self.getQuorumListenOnAllIPs()) { int port = self.getElectionAddress().getPort(); addr = new InetSocketAddress(port); } else { // Resolve hostname for this server in case the // underlying ip address has changed. self.recreateSocketAddresses(self.getId()); addr = self.getElectionAddress(); } LOG.info("My election bind port: " + addr.toString()); setName(addr.toString()); ss.bind(addr); while (!shutdown) { client = ss.accept(); setSockOpts(client); LOG.info("Received connection request " + client.getRemoteSocketAddress()); // Receive and handle the connection request // asynchronously if the quorum sasl authentication is // enabled. This is required because sasl server // authentication process may take few seconds to finish, // this may delay next peer connection requests. if (quorumSaslAuthEnabled) { receiveConnectionAsync(client); } else {//接收连接信息 receiveConnection(client); } numRetries = 0; } } catch (IOException e) { if (shutdown) { break; } LOG.error("Exception while listening", e); numRetries++; try { ss.close(); Thread.sleep(1000); } catch (IOException ie) { LOG.error("Error closing server socket", ie); } catch (InterruptedException ie) { LOG.error("Interrupted while sleeping. " + "Ignoring exception", ie); } closeSocket(client); } } LOG.info("Leaving listener"); if (!shutdown) { LOG.error("As I"m leaving the listener thread, " + "I won"t be able to participate in leader " + "election any longer: " + self.getElectionAddress()); } else if (ss != null) { // Clean up for shutdown. try { ss.close(); } catch (IOException ie) { // Don"t log an error for shutdown. LOG.debug("Error closing server socket", ie); } }}//代码执行路径:receiveConnection()->handleConnection(...)private void handleConnection(Socket sock, DataInputStream din) throws IOException {//...省略 if (sid < self.getId()) { /* * This replica might still believe that the connection to sid is * up, so we have to shut down the workers before trying to open a * new connection. */ SendWorker sw = senderWorkerMap.get(sid); if (sw != null) { sw.finish(); } /* * Now we start a new connection */ LOG.debug("Create new connection to server: {}", sid); closeSocket(sock); if (electionAddr != null) { connectOne(sid, electionAddr); } else { connectOne(sid); } } else { // Otherwise start worker threads to receive data. SendWorker sw = new SendWorker(sock, sid); RecvWorker rw = new RecvWorker(sock, din, sid, sw); sw.setRecv(rw); SendWorker vsw = senderWorkerMap.get(sid); if (vsw != null) { vsw.finish(); } //存储连接信息 senderWorkerMap.put(sid, sw); queueSendMap.putIfAbsent(sid, new ArrayBlockingQueue(SEND_CAPACITY)); sw.start(); rw.start(); }}
- 创建FastLeaderElection快速选举服务;
- 初始选票发送队列sendqueue(第一层队列)
- 初始选票接收队列recvqueue(第一层队列)
- 创建线程WorkerSender
- 创建线程WorkerReceiver
【Java】//FastLeaderElection.starterprivate void starter(QuorumPeer self, QuorumCnxManager manager) { this.self = self; proposedLeader = -1; proposedZxid = -1; //发送队列sendqueue(第一层队列) sendqueue = new LinkedBlockingQueue(); //接收队列recvqueue(第一层队列) recvqueue = new LinkedBlockingQueue(); this.messenger = new Messenger(manager);}//new Messenger(manager)Messenger(QuorumCnxManager manager) { //创建线程WorkerSender this.ws = new WorkerSender(manager); this.wsThread = new Thread(this.ws, "WorkerSender[myid=" + self.getId() + "]"); this.wsThread.setDaemon(true); //创建线程WorkerReceiver this.wr = new WorkerReceiver(manager); this.wrThread = new Thread(this.wr, "WorkerReceiver[myid=" + self.getId() + "]"); this.wrThread.setDaemon(true);}
- 开启WorkerSender和WorkerReceiver线程。
WorkerSender线程自旋获取sendqueue第一层队列元素
- sendqueue队列元素内容为相关选票信息详见ToSend类;
- 首先判断选票sid是否和自己sid值相同,相等直接放入到recvQueue队列中;
- 不相同将sendqueue队列元素转储到queueSendMap
第二层传输队列中。
【Java】//FastLeaderElection.Messenger.WorkerSenderclass WorkerSender extends ZooKeeperThread{//... public void run() { while (!stop) { try { ToSend m = sendqueue.poll(3000, TimeUnit.MILLISECONDS); if(m == null) continue; //将投票信息发送出去 process(m); } catch (InterruptedException e) { break; } } LOG.info("WorkerSender is down"); }}//QuorumCnxManager#toSendpublic void toSend(Long sid, ByteBuffer b) { /* * If sending message to myself, then simply enqueue it (loopback). */ if (this.mySid == sid) { b.position(0); addToRecvQueue(new Message(b.duplicate(), sid)); /* * Otherwise send to the corresponding thread to send. */ } else { /* * Start a new connection if doesn"t have one already. */ ArrayBlockingQueue bq = new ArrayBlockingQueue( SEND_CAPACITY); ArrayBlockingQueue oldq = queueSendMap.putIfAbsent(sid, bq); //转储到queueSendMap第二层传输队列中 if (oldq != null) { addToSendQueue(oldq, b); } else { addToSendQueue(bq, b); } connectOne(sid); }}
WorkerReceiver线程自旋获取recvQueue第二层传输队列元素转存到recvqueue第一层队列中。
【Java】//WorkerReceiverpublic void run() { Message response; while (!stop) { // Sleeps on receive try { //自旋获取recvQueue第二层传输队列元素 response = manager.pollRecvQueue(3000, TimeUnit.MILLISECONDS); if(response == null) continue; // The current protocol and two previous generations all send at least 28 bytes if (response.buffer.capacity() < 28) { LOG.error("Got a short response: " + response.buffer.capacity()); continue; } //... if(self.getPeerState() == QuorumPeer.ServerState.LOOKING){ //第二层传输队列元素转存到recvqueue第一层队列中 recvqueue.offer(n); //... } }//...}
06选举核心逻辑
- 启动线程QuorumPeer
开始Leader选举投票makeLEStrategy().lookForLeader();
sendNotifications()向其它节点发送选票信息,选票信息存储到sendqueue队列中。sendqueue队列由WorkerSender线程处理。
【plain】//QuorunPeer.run//...try { reconfigFlagClear(); if (shuttingDownLE) { shuttingDownLE = false; startLeaderElection(); } //makeLEStrategy().lookForLeader() 发送投票 setCurrentVote(makeLEStrategy().lookForLeader());} catch (Exception e) { LOG.warn("Unexpected exception", e); setPeerState(ServerState.LOOKING);} //...//FastLeaderElection.lookLeaderpublic Vote lookForLeader() throws InterruptedException {//... //向其他应用发送投票sendNotifications();//...}private void sendNotifications() { //获取应用节点 for (long sid : self.getCurrentAndNextConfigVoters()) { QuorumVerifier qv = self.getQuorumVerifier(); ToSend notmsg = new ToSend(ToSend.mType.notification, proposedLeader, proposedZxid, logicalclock.get(), QuorumPeer.ServerState.LOOKING, sid, proposedEpoch, qv.toString().getBytes()); if(LOG.isDebugEnabled()){ LOG.debug("Sending Notification: " + proposedLeader + " (n.leader), 0x" + Long.toHexString(proposedZxid) + " (n.zxid), 0x" + Long.toHexString(logicalclock.get()) + " (n.round), " + sid + " (recipient), " + self.getId() + " (myid), 0x" + Long.toHexString(proposedEpoch) + " (n.peerEpoch)"); } //储存投票信息 sendqueue.offer(notmsg); }}class WorkerSender extends ZooKeeperThread { //... public void run() { while (!stop) { try {//提取已储存的投票信息 ToSend m = sendqueue.poll(3000, TimeUnit.MILLISECONDS); if(m == null) continue; process(m); } catch (InterruptedException e) { break; } } LOG.info("WorkerSender is down"); }//...}
自旋recvqueue队列元素获取投票过来的选票信息:
【Java】public Vote lookForLeader() throws InterruptedException {//.../* * Loop in which we exchange notifications until we find a leader */while ((self.getPeerState() == ServerState.LOOKING) && (!stop)){ /* * Remove next notification from queue, times out after 2 times * the termination time */ //提取投递过来的选票信息 Notification n = recvqueue.poll(notTimeout, TimeUnit.MILLISECONDS);/* * Sends more notifications if haven"t received enough. * Otherwise processes new notification. */if(n == null){ if(manager.haveDelivered()){ //已全部连接成功,并且前一轮投票都完成,需要再次发起投票 sendNotifications(); } else { //如果未收到选票信息,manager.contentAll()自动连接其它socket节点 manager.connectAll(); } /* * Exponential backoff */ int tmpTimeOut = notTimeout*2; notTimeout = (tmpTimeOut < maxNotificationInterval? tmpTimeOut : maxNotificationInterval); LOG.info("Notification time out: " + notTimeout); } //.... } //...}
【Java】//manager.connectAll()->connectOne(sid)->initiateConnection(...)->startConnection(...)private boolean startConnection(Socket sock, Long sid) throws IOException { DataOutputStream dout = null; DataInputStream din = null; try { // Use BufferedOutputStream to reduce the number of IP packets. This is // important for x-DC scenarios. BufferedOutputStream buf = new BufferedOutputStream(sock.getOutputStream()); dout = new DataOutputStream(buf); // Sending id and challenge // represents protocol version (in other words - message type) dout.writeLong(PROTOCOL_VERSION); dout.writeLong(self.getId()); String addr = self.getElectionAddress().getHostString() + ":" + self.getElectionAddress().getPort(); byte[] addr_bytes = addr.getBytes(); dout.writeInt(addr_bytes.length); dout.write(addr_bytes); dout.flush(); din = new DataInputStream( new BufferedInputStream(sock.getInputStream())); } catch (IOException e) { LOG.warn("Ignoring exception reading or writing challenge: ", e); closeSocket(sock); return false; } // authenticate learner QuorumPeer.QuorumServer qps = self.getVotingView().get(sid); if (qps != null) { // TODO - investigate why reconfig makes qps null. authLearner.authenticate(sock, qps.hostname); } // If lost the challenge, then drop the new connection //保证集群中所有节点之间只有一个通道连接 if (sid > self.getId()) { LOG.info("Have smaller server identifier, so dropping the " + "connection: (" + sid + ", " + self.getId() + ")"); closeSocket(sock); // Otherwise proceed with the connection } else { SendWorker sw = new SendWorker(sock, sid); RecvWorker rw = new RecvWorker(sock, din, sid, sw); sw.setRecv(rw); SendWorker vsw = senderWorkerMap.get(sid); if(vsw != null) vsw.finish(); senderWorkerMap.put(sid, sw); queueSendMap.putIfAbsent(sid, new ArrayBlockingQueue( SEND_CAPACITY)); sw.start(); rw.start(); return true; } return false;}
如上述代码中所示,sid>self.sid才可以创建连接Socket和SendWorker,RecvWorker线程,存储到senderWorkerMap 图4 节点之间连接方式 在上代码中,自旋从recvqueue队列中获取到选票信息。开始进行选举: 在上述代码中的totalOrderPredicate方法逻辑如下: 在上述代码中:recvset是存储每个sid推举的选票信息。 第一轮 sid1:vote(1,0,1) ,sid2:vote(2,0,1); 第二轮 sid1:vote(2,0,1) ,sid2:vote(2,0,1)。 最终经过选举信息vote(2,0,1)为推荐leader,并用推荐leader在recvset选票池里比对持相同票数量为2个。因为总共有3个节点参与选举,sid1和sid2都选举sid2为leader,满足票数过半要求,故确认sid2为leader。 通过对Leader选举源码的解析,可以了解到: 多个应用节点之间网络通信采用BIO方式进行相互投票,同时保证每个节点之间只使用一个通道,减少网络资源的消耗,足以见得在BIO分布式中间件开发中的技术重要性。 基于BIO的基础上,灵活运用多线程和内存消息队列完好实现多层队列架构,每层队列由不同的线程分工协作,提高快速选举性能目的。 为BIO在多线程技术上的实践带来了宝贵的经验。
关键词:
SAMSUNG三星MB-MC DMicro-SD128GB存储卡支持U3、calss10速度等级,读速最高可达130MB s,支持4K视频和...【Java】public Vote lookForLeader() throws InterruptedException {//... if (n.electionEpoch > logicalclock.get()) { //当前选举周期小于选票周期,重置recvset选票池 //大于当前周期更新当前选票信息,再次发送投票 logicalclock.set(n.electionEpoch); recvset.clear(); if(totalOrderPredicate(n.leader, n.zxid, n.peerEpoch, getInitId(), getInitLastLoggedZxid(), getPeerEpoch())) { updateProposal(n.leader, n.zxid, n.peerEpoch); } else { updateProposal(getInitId(), getInitLastLoggedZxid(), getPeerEpoch()); } sendNotifications(); } else if (n.electionEpoch < logicalclock.get()) { if(LOG.isDebugEnabled()){ LOG.debug("Notification election epoch is smaller than logicalclock. n.electionEpoch = 0x" + Long.toHexString(n.electionEpoch) + ", logicalclock=0x" + Long.toHexString(logicalclock.get())); } break; } else if (totalOrderPredicate(n.leader, n.zxid, n.peerEpoch, proposedLeader, proposedZxid, proposedEpoch)) {//相同选举周期 //接收的选票与当前选票PK成功后,替换当前选票 updateProposal(n.leader, n.zxid, n.peerEpoch); sendNotifications(); }//...}
【Java】//接收的选票与当前选票PKprotected boolean totalOrderPredicate(long newId, long newZxid, long newEpoch, long curId, long curZxid, long curEpoch) { LOG.debug("id: " + newId + ", proposed id: " + curId + ", zxid: 0x" + Long.toHexString(newZxid) + ", proposed zxid: 0x" + Long.toHexString(curZxid)); if(self.getQuorumVerifier().getWeight(newId) == 0){ return false; } /* * We return true if one of the following three cases hold: * 1- New epoch is higher * 2- New epoch is the same as current epoch, but new zxid is higher * 3- New epoch is the same as current epoch, new zxid is the same * as current zxid, but server id is higher. */ return ((newEpoch > curEpoch) || ((newEpoch == curEpoch) && ((newZxid > curZxid) || ((newZxid == curZxid) && (newId > curId)))));wId > curId))))); }
【Java】public Vote lookForLeader() throws InterruptedException {//... //存储节点对应的选票信息 // key:选票来源sid value:选票推举的Leader sid recvset.put(n.sid, new Vote(n.leader, n.zxid, n.electionEpoch, n.peerEpoch)); //半数选举开始 if (termPredicate(recvset, new Vote(proposedLeader, proposedZxid, logicalclock.get(), proposedEpoch))) { // Verify if there is any change in the proposed leader while((n = recvqueue.poll(finalizeWait, TimeUnit.MILLISECONDS)) != null){ if(totalOrderPredicate(n.leader, n.zxid, n.peerEpoch, proposedLeader, proposedZxid, proposedEpoch)){ recvqueue.put(n); break; } } /*WorkerSender * This predicate is true once we don"t read any new * relevant message from the reception queue */ if (n == null) { //已选举出leader 更新当前节点是否为leader self.setPeerState((proposedLeader == self.getId()) ? ServerState.LEADING: learningState()); Vote endVote = new Vote(proposedLeader, proposedZxid, proposedEpoch); leaveInstance(endVote); return endVote; } }//...}/** * Termination predicate. Given a set of votes, determines if have * sufficient to declare the end of the election round. * * @param votes * Set of votes * @param vote * Identifier of the vote received last PK后的选票 */private boolean termPredicate(HashMap
07总结
环球头条:【手慢无】速来占便宜!三星128GB存储卡仅需49.8元
zookeeper的Leader选举源码解析
焦点要闻:前端设计模式——享元模式
每日观察!使用NSIS打包超大型软件的几个注意事项
环球头条:【手慢无】速来占便宜!三星128GB存储卡仅需49.8元
全球热头条丨汽车雷达在无人陵园内显示全是人影:吓坏车主
985硕士男子失业半年 应聘道士35岁已超龄!做实习生都被拒:现送外卖
没污染!国内首款氢内燃机飞机成功首飞:中国自研
环球新资讯:内地特供的网游性价比神U!酷睿i7-13790F评测:游戏性能强于锐龙9 7950X
环球播报:海豚模拟器登陆Steam
全球实时:半场战报:踢疯了!梅西第38分钟戴帽 阿根廷5-0暂领先库拉索
环球热推荐:Epic独占结束!中国功夫游戏《师父》登陆Steam:139元起
【天天时快讯】Native开发过程中容易忽略的注意事项
全球快播:【国际大宗商品早报】美豆反弹收涨近2% 伊拉克库尔德地区原油出口停顿支撑油价续涨
全固态电池空间电荷层微观机理揭示
环球要闻:核心、显存砍得没法看!RTX 4060 Ti/4060要5月上市:3999元起超值?
天天播报:大作《生化危机4重制版》发售 采用Denuvo正版加密 黑客发声:准备出手破解
天天快看点丨任天堂《塞尔达传说:王国之泪》限定版Switch发布!金黄手柄太酷炫
短讯!《最后生还者》PC版多半差评
每日消息!读SQL进阶教程笔记01_CASE表达式
快讯:成年子女不能与父母在酒店住同一标间引热议 太没道理?酒店回应
Spring整合Mybatis遇到的问题(一)
有关Mongodb 在windows上安装的问题
今头条!JNI知识点总结
天天日报丨【Visual Leak Detector】配置项 ReportTo
【Visual Leak Detector】配置项 SelfTest
环球观察:4个多月第一次!Intel Arc Pro专业显卡终于有了新驱动
当前最新:小米“退钱”了:27万小米空气净化器初代用户 每人899元
全球快报:小金刚手机杀到1999元 卢伟冰喊话友商:欢迎光明磊落竞争
全球播报:人体工学椅从没想过:自己真正的对手会是汽车
天天微资讯!日本海滩惊现大量乌贼尸体 绵延200米
通讯!青石板路图片(青石板规格)
微头条丨喊老公过时了!赵丽颖唐嫣孙俪都这样称呼另一半,网友:甜炸了
项目中如何对XSS统一处理
面向对象设计原则
instanceof的使用
GPT-4救了我狗的命
【全球快播报】你敢坐吗?滴滴能打到自动驾驶汽车了
环球看点!三爱健康集团(01889)发盈喜 预计2022年度股东应占溢利同比增加约223.6%至2922万元
世界头条:用gpt4训练一个简易真人代理
全球热头条丨【Visual Leak Detector】配置项 ReportFile
环球通讯!融创百亿美元境外债务重组成功在即 持债金额超30%的债权人小组已签订重组支持协议
环球新消息丨OpenAI创始人:AI可能毁灭人类 必须开发新技术来解决
焦点速递!美亚柏科:公司将对各类 AIGC 内容的检测、AI 生成文本的检测技术及产品进行布局
聚焦:免费Midjourney AI绘画Prompt提示词平台合集
NCNN 模型推理详解及实战
基于中断的字符串动态显示
全球热推荐:[NOI1999] 生日蛋糕
绿牌将会取消?网友:走好不送
一图看懂Note 12 Turbo:性价比进行到底 16+1TB仅售2599元
一加李杰:用户不会被蒙蔽 将旗舰体验普及到底
视讯!哈迷必备!Redmi Buds 4哈利波特版图赏
首发第二代骁龙7+!Redmi Note 12 Turbo图赏
设备树的概念(四):平台设备驱动和设备树
天天最资讯丨Apache iotdb-web-workbench 认证绕过漏洞(CVE-2023-24829)
Halcon学习教程(一) 之提取十字线中心 图像分割
甲流吃退烧药不退烧怎么办_吃了一粒退烧药多久可以喂奶
环球今日报丨卢伟冰:Note系列全球销量破3.2亿 进入全球单品十强
头条:马斯克为何没做出ChatGPT?揭秘OpenAI创始人的权力斗争
日本推出佛祖版ChatGPT:已经为20多万人解决烦恼
观天下!合资车还咋玩!奇瑞艾瑞泽5 GT上市:起售价仅7.99万
每日快报!Redmi Note 12 Turbo搭载超细四窄边直屏:边框窄至1.42mm!
热消息:一篇文章带你了解面积图
京沪杭等地近期明确将有序放开设摊、允许外摆
全球资讯:德创环保:宁波甬德拟以1.61亿元收购飞乐环保100%股权
每日讯息!12306回应免费坐高铁:积分存在有效期
环球即时看!甄子丹主演!《疾速追杀4》豆瓣8.2分:纯粹动作爽片 超越前作
男子求职竟被要求在杭州本地买房 网友:不是招员工是招客户
环球微动态丨试过网易的新AI后 我发现它很笨 但又很聪明
广汽传祺GS4追尾后定损巡航坏了?保险公司不想理赔
记录--vue刷新当前页面
全球关注:借助 mperf 进行矩阵乘法极致优化
MySQL学习笔记-存储引擎
当前看点!低代码起势,开发者可以早日脱离996了?
今日热议:南钢股份:钢材销量同比下降6.28%,2022年年归母净利润同比下滑48.59%,拟10派2.5元
快讯:《生化危机4:重制版》D加密惹争议 又导致性能问题了
全球观热点:女子地铁照被AI一键脱衣传播 网友:无下限的开发
每日焦点!网友准备冲Redmi Note 12 Turbo哈利·波特版:预算3900元
头条:男子乘错出租车 起步1秒被强制收费6元:走了不到10米
【世界新视野】内存掉入无底洞:没有最便宜 只有更便宜
环球观天下!32、K8S-配置管理之Configmap
世界看点:Python中21道个程序小练习
css设置超过固定长度以省略号显示
全球看点:乒乓球技巧训练这就是高手的水准_乒乓球技巧
全球简讯:瑞银:看好亚股增长潜力 偏好亚洲投资级债券
Twitter将上线离谱新规:非会员甚至无法参与投票
热头条丨九识自动驾驶物流车被曝闯红灯 网友:该怎么记分处罚
速读:“网红”威震天在北京环球影城摔了一跤 客服回应:后续演出正常
【环球新要闻】一个时代结束了!Wii U和3DS在线商店已正式关闭
实时:透明外壳设计!Nothing Ear(2)无线耳机上市 999元
热资讯![HTML]表单标签(form表单域、input输入表单、label标签、select下拉表单、textarea文本域)
C#List的3种排序方法
环球观焦点:韩国有意解禁福岛水产品?在野党呼吁韩总统表态
全球新资讯:收评:两市震荡走弱沪指跌0.19% 人工智能概念冲高回落
全球热头条丨想玩必须升级!Steam将于2024年终止对Win7/8/8.1系统支持
中国科学家发现月球水库 估计蓄水量多达2700亿吨
父母做生意给儿子取名为顾客 当事人:家里有个“顾客”能带来更多顾客
世界最高安全标准 我国自研核电华龙一号西部首堆全面建成
【时快讯】249元 TP-LINK推出新款玄鸟AX3000路由器:3000Mbps满血Wi-Fi 6
观热点:Rust 备忘清单_开发速查表分享
即时焦点:郑州大学河南先进技术研究院2023年硕士研究生拟调剂信息公告已公布