最新要闻
- 珠江的源头在哪里?珠江的长度是多少千米?
- 当前热文:被称作“电费刺客” 商家:踢脚线取暖器耗电量可达空调3倍
- 蜀国的皇帝有哪些?蜀国的皇帝列表排名
- 当前视点!明晚8点开播!央视网络春晚第二波阵容官宣:王心凌、撒贝宁等加盟
- 魔兽国服关闭当天 老外喜迎新版本升级 网友:暴雪杀人诛心
- 豆瓣9.2分神作!《新·福音战士剧场版:终》终于官宣引进
- 天天时讯:2023年电脑城奸商依然猖狂:3千元笔记本卖5千 出库不能退
- 为黛西小姐开车故事背景是什么?为黛西小姐开车故事梗概是什么?
- 打电动是什么意思?打电动是什么游戏?
- 特百惠是哪国的牌子?特百惠卖什么产品?
- 快资讯:FAA飞航系统已有30年历史 老迈程度堪比N64
- 环球百事通!90后女孩神还原蔡明春晚40年造型火了 本尊回应5个字
- 焦点观察:果粉愿望要实现!iPhone 16 Pro直接256GB存储起步
- 环球快消息!12月轿车销量排名出炉:传统“豪强”反攻、比亚迪也挡不住?
- 世界观点:超大范围降雪来袭:全国多地上百条高速局部路段公路封闭
- 滚动:我国让科幻片成了现实!全球首艘智能型无人系统科考母船交付使用
广告
手机
iphone11大小尺寸是多少?苹果iPhone11和iPhone13的区别是什么?
警方通报辅警执法直播中被撞飞:犯罪嫌疑人已投案
- iphone11大小尺寸是多少?苹果iPhone11和iPhone13的区别是什么?
- 警方通报辅警执法直播中被撞飞:犯罪嫌疑人已投案
- 男子被关545天申国赔:获赔18万多 驳回精神抚慰金
- 3天内26名本土感染者,辽宁确诊人数已超安徽
- 广西柳州一男子因纠纷杀害三人后自首
- 洱海坠机4名机组人员被批准为烈士 数千干部群众悼念
家电
今日最新!(六)elasticsearch 源码之选主流程分析
1.概述
es(elasticsearch,下同)的选举流程相对来说比较简单,使用的bully算法,简而言之,就是谁强谁就是老大,待会儿看下怎么判定谁更强。
2.选主流程
在启动篇中我们讲解了节点启动的一些逻辑
// org.elasticsearch.node.Node // start after transport service so the local disco is known discovery.start(); // start before cluster service so that it can set initial state on ClusterApplierService clusterService.start(); assert clusterService.localNode().equals(localNodeFactory.getNode()) : "clusterService has a different local node than the factory provided"; transportService.acceptIncomingRequests(); // 接受请求 discovery.startInitialJoin(); // 开始选主流程
startInitialJoin中使用互斥锁确保同一时刻只有一个join线程
(相关资料图)
// org.elasticsearch.discovery.zen.ZenDiscovery public void startInitialJoin() { // start the join thread from a cluster state update. See {@link JoinThreadControl} for details. synchronized (stateMutex) { // 互斥锁 // do the join on a different thread, the caller of this method waits for 30s anyhow till it is discovered joinThreadControl.startNewThreadIfNotRunning(); } }
开启线程进行join
// org.elasticsearch.discovery.zen.ZenDiscovery public void startNewThreadIfNotRunning() { assert Thread.holdsLock(stateMutex); if (joinThreadActive()) { // 已经有了join线程 return; } threadPool.generic().execute(new Runnable() { @Override public void run() { Thread currentThread = Thread.currentThread(); if (!currentJoinThread.compareAndSet(null, currentThread)) { return; } while (running.get() && joinThreadActive(currentThread)) { // 不断尝试join try { innerJoinCluster(); return; } catch (Exception e) { logger.error("unexpected error while joining cluster, trying again", e); // Because we catch any exception here, we want to know in // tests if an uncaught exception got to this point and the test infra uncaught exception // leak detection can catch this. In practise no uncaught exception should leak assert ExceptionsHelper.reThrowIfNotNull(e); } } // cleaning the current thread from currentJoinThread is done by explicit calls. } }); }... private void innerJoinCluster() { DiscoveryNode masterNode = null; final Thread currentThread = Thread.currentThread(); nodeJoinController.startElectionContext(); while (masterNode == null && joinThreadControl.joinThreadActive(currentThread)) { masterNode = findMaster(); // 先寻找主节点 } if (!joinThreadControl.joinThreadActive(currentThread)) { logger.trace("thread is no longer in currentJoinThread. Stopping."); return; } if (transportService.getLocalNode().equals(masterNode)) { // 当前节点是主节点 final int requiredJoins = Math.max(0, electMaster.minimumMasterNodes() - 1); // we count as one logger.debug("elected as master, waiting for incoming joins ([{}] needed)", requiredJoins); nodeJoinController.waitToBeElectedAsMaster(requiredJoins, masterElectionWaitForJoinsTimeout, // 等待被选举为master new NodeJoinController.ElectionCallback() { @Override public void onElectedAsMaster(ClusterState state) { synchronized (stateMutex) { joinThreadControl.markThreadAsDone(currentThread); } } @Override public void onFailure(Throwable t) { logger.trace("failed while waiting for nodes to join, rejoining", t); synchronized (stateMutex) { joinThreadControl.markThreadAsDoneAndStartNew(currentThread); } } } ); } else { // 当前节点不是主节点 // process any incoming joins (they will fail because we are not the master) nodeJoinController.stopElectionContext(masterNode + " elected"); // send join request final boolean success = joinElectedMaster(masterNode); // join master节点 synchronized (stateMutex) { if (success) { DiscoveryNode currentMasterNode = this.clusterState().getNodes().getMasterNode(); if (currentMasterNode == null) { // Post 1.3.0, the master should publish a new cluster state before acking our join request. we now should have // a valid master. logger.debug("no master node is set, despite of join request completing. retrying pings."); joinThreadControl.markThreadAsDoneAndStartNew(currentThread); } else if (currentMasterNode.equals(masterNode) == false) { // update cluster state joinThreadControl.stopRunningThreadAndRejoin("master_switched_while_finalizing_join"); } joinThreadControl.markThreadAsDone(currentThread); } else { // failed to join. Try again... joinThreadControl.markThreadAsDoneAndStartNew(currentThread); } } } }
这里分为三步
step1, 我们先看如何确定当前节点需要join哪个master节点,es会先ping其他节点获取他们的状态信息
// org.elasticsearch.discovery.zen.ZenDiscovery private DiscoveryNode findMaster() { logger.trace("starting to ping"); List fullPingResponses = pingAndWait(pingTimeout).toList(); if (fullPingResponses == null) { logger.trace("No full ping responses"); return null; } if (logger.isTraceEnabled()) { StringBuilder sb = new StringBuilder(); if (fullPingResponses.size() == 0) { sb.append(" {none}"); } else { for (ZenPing.PingResponse pingResponse : fullPingResponses) { sb.append("\n\t--> ").append(pingResponse); } } logger.trace("full ping responses:{}", sb); } final DiscoveryNode localNode = transportService.getLocalNode(); // add our selves assert fullPingResponses.stream().map(ZenPing.PingResponse::node) .filter(n -> n.equals(localNode)).findAny().isPresent() == false; fullPingResponses.add(new ZenPing.PingResponse(localNode, null, this.clusterState())); // filter responses final List pingResponses = filterPingResponses(fullPingResponses, masterElectionIgnoreNonMasters, logger); List activeMasters = new ArrayList<>(); // 活跃主节点 for (ZenPing.PingResponse pingResponse : pingResponses) { // We can"t include the local node in pingMasters list, otherwise we may up electing ourselves without // any check / verifications from other nodes in ZenDiscover#innerJoinCluster() if (pingResponse.master() != null && !localNode.equals(pingResponse.master())) { activeMasters.add(pingResponse.master()); } } // nodes discovered during pinging List masterCandidates = new ArrayList<>(); // 候选主节点 for (ZenPing.PingResponse pingResponse : pingResponses) { if (pingResponse.node().isMasterNode()) { masterCandidates.add(new ElectMasterService.MasterCandidate(pingResponse.node(), pingResponse.getClusterStateVersion())); } } if (activeMasters.isEmpty()) { if (electMaster.hasEnoughCandidates(masterCandidates)) { final ElectMasterService.MasterCandidate winner = electMaster.electMaster(masterCandidates); // 没有活跃的master,从所有master节点中选取 logger.trace("candidate {} won election", winner); return winner.getNode(); } else { // if we don"t have enough master nodes, we bail, because there are not enough master to elect from logger.warn("not enough master nodes discovered during pinging (found [{}], but needed [{}]), pinging again", masterCandidates, electMaster.minimumMasterNodes()); return null; } } else { assert !activeMasters.contains(localNode) : "local node should never be elected as master when other nodes indicate an active master"; // lets tie break between discovered nodes return electMaster.tieBreakActiveMasters(activeMasters); // 决胜!!! } }
可以看到es根据集群状态版本号,和节点id大小(小的优先)来判定哪个节点能成为master
// org.elasticsearch.discovery.zen.ElectMasterService public static int compare(MasterCandidate c1, MasterCandidate c2) { // we explicitly swap c1 and c2 here. the code expects "better" is lower in a sorted // list, so if c2 has a higher cluster state version, it needs to come first. int ret = Long.compare(c2.clusterStateVersion, c1.clusterStateVersion); // 集群状态版本号高的优先 if (ret == 0) { ret = compareNodes(c1.getNode(), c2.getNode()); } return ret; }... private static int compareNodes(DiscoveryNode o1, DiscoveryNode o2) { if (o1.isMasterNode() && !o2.isMasterNode()) { // 主节点优先 return -1; } if (!o1.isMasterNode() && o2.isMasterNode()) { return 1; } return o1.getId().compareTo(o2.getId()); // 然后比较节点id }
step 2.如果当前节点是master节点,那么等待其他节点join,关注 waitToBeElectedAsMaster 中的逻辑,检查是否有足够多的join请求
// org.elasticsearch.discovery.zen.NodeJoinController public void waitToBeElectedAsMaster(int requiredMasterJoins, TimeValue timeValue, final ElectionCallback callback) { final CountDownLatch done = new CountDownLatch(1); final ElectionCallback wrapperCallback = new ElectionCallback() { @Override public void onElectedAsMaster(ClusterState state) { done.countDown(); callback.onElectedAsMaster(state); } @Override public void onFailure(Throwable t) { done.countDown(); callback.onFailure(t); } }; ElectionContext myElectionContext = null; try { // check what we have so far.. // capture the context we add the callback to make sure we fail our own synchronized (this) { assert electionContext != null : "waitToBeElectedAsMaster is called we are not accumulating joins"; myElectionContext = electionContext; electionContext.onAttemptToBeElected(requiredMasterJoins, wrapperCallback); // 做一些检查工作 checkPendingJoinsAndElectIfNeeded(); // 看下是否有足够的join请求 } try { if (done.await(timeValue.millis(), TimeUnit.MILLISECONDS)) { // 超时判断 // callback handles everything return; } } catch (InterruptedException e) { } if (logger.isTraceEnabled()) { final int pendingNodes = myElectionContext.getPendingMasterJoinsCount(); logger.trace("timed out waiting to be elected. waited [{}]. pending master node joins [{}]", timeValue, pendingNodes); } failContextIfNeeded(myElectionContext, "timed out waiting to be elected"); } catch (Exception e) { logger.error("unexpected failure while waiting for incoming joins", e); if (myElectionContext != null) { failContextIfNeeded(myElectionContext, "unexpected failure while waiting for pending joins [" + e.getMessage() + "]"); } } } private synchronized void checkPendingJoinsAndElectIfNeeded() { assert electionContext != null : "election check requested but no active context"; final int pendingMasterJoins = electionContext.getPendingMasterJoinsCount(); if (electionContext.isEnoughPendingJoins(pendingMasterJoins) == false) { // 没有足够的join请求 if (logger.isTraceEnabled()) { logger.trace("not enough joins for election. Got [{}], required [{}]", pendingMasterJoins, electionContext.requiredMasterJoins); } } else { if (logger.isTraceEnabled()) { logger.trace("have enough joins for election. Got [{}], required [{}]", pendingMasterJoins, electionContext.requiredMasterJoins); } electionContext.closeAndBecomeMaster(); // 选主成功了 electionContext = null; // clear this out so future joins won"t be accumulated } }
step 3.如果当前节点不是master节点,那么join此master节点,发送请求即可
// org.elasticsearch.discovery.zen.ZenDiscovery private boolean joinElectedMaster(DiscoveryNode masterNode) { try { // first, make sure we can connect to the master transportService.connectToNode(masterNode); } catch (Exception e) { logger.warn(() -> new ParameterizedMessage("failed to connect to master [{}], retrying...", masterNode), e); return false; } int joinAttempt = 0; // we retry on illegal state if the master is not yet ready while (true) { try { logger.trace("joining master {}", masterNode); membership.sendJoinRequestBlocking(masterNode, transportService.getLocalNode(), joinTimeout); // 发送join请求,以阻塞的方式 return true; }...
最后我们再来看看es是如何处理join 请求的
// org.elasticsearch.discovery.zen.MembershipAction private class JoinRequestRequestHandler implements TransportRequestHandler { @Override public void messageReceived(final JoinRequest request, final TransportChannel channel, Task task) throws Exception { listener.onJoin(request.getNode(), new JoinCallback() { @Override public void onSuccess() { try { channel.sendResponse(TransportResponse.Empty.INSTANCE); } catch (Exception e) { onFailure(e); } } @Override public void onFailure(Exception e) { try { channel.sendResponse(e); } catch (Exception inner) { inner.addSuppressed(e); logger.warn("failed to send back failure on join request", inner); } } }); } }
// org.elasticsearch.discovery.zen.ZenDiscovery private class MembershipListener implements MembershipAction.MembershipListener { @Override public void onJoin(DiscoveryNode node, MembershipAction.JoinCallback callback) { handleJoinRequest(node, ZenDiscovery.this.clusterState(), callback); } @Override public void onLeave(DiscoveryNode node) { handleLeaveRequest(node); } } void handleJoinRequest(final DiscoveryNode node, final ClusterState state, final MembershipAction.JoinCallback callback) { if (nodeJoinController == null) { throw new IllegalStateException("discovery module is not yet started"); } else { // we do this in a couple of places including the cluster update thread. This one here is really just best effort // to ensure we fail as fast as possible. onJoinValidators.stream().forEach(a -> a.accept(node, state)); if (state.getBlocks().hasGlobalBlock(STATE_NOT_RECOVERED_BLOCK) == false) { JoinTaskExecutor.ensureMajorVersionBarrier(node.getVersion(), state.getNodes().getMinNodeVersion()); } // try and connect to the node, if it fails, we can raise an exception back to the client... transportService.connectToNode(node); // 看下发送请求的节点是否可以连通 // validate the join request, will throw a failure if it fails, which will get back to the // node calling the join request try { membership.sendValidateJoinRequestBlocking(node, state, joinTimeout); // 和发送请求的节点验证下请求的合法性,有点儿像tcp } catch (Exception e) { logger.warn(() -> new ParameterizedMessage("failed to validate incoming join request from node [{}]", node), e); callback.onFailure(new IllegalStateException("failure when sending a validation request to node", e)); return; } nodeJoinController.handleJoinRequest(node, callback); } } public synchronized void handleJoinRequest(final DiscoveryNode node, final MembershipAction.JoinCallback callback) { if (electionContext != null) { electionContext.addIncomingJoin(node, callback);// 将node节点加入到map中 checkPendingJoinsAndElectIfNeeded(); } else { masterService.submitStateUpdateTask("zen-disco-node-join", new JoinTaskExecutor.Task(node, "no election context"), ClusterStateTaskConfig.build(Priority.URGENT), joinTaskExecutor, new JoinTaskListener(callback, logger)); } }
3.总结
es的选举算法流程比较简单,流程性的代码比较多。bully算法相对其他选举算法,比如raft,zab,来说也比较简单。
-
世界新资讯:火山引擎 DataTester:一次 A/B 测试,帮助产品分享率提升超 20%
更多技术交流、求职机会,欢迎关注字节跳动数据平台微信公众号,并进入官方交流群对C端产品而言,增长的...
来源: 今日最新!(六)elasticsearch 源码之选主流程分析
环球今日讯!java中关于继承,多态及方法调用的底层细节
如何构建基于 DDD 领域驱动的微服务?
世界新资讯:火山引擎 DataTester:一次 A/B 测试,帮助产品分享率提升超 20%
珠江的源头在哪里?珠江的长度是多少千米?
当前热文:被称作“电费刺客” 商家:踢脚线取暖器耗电量可达空调3倍
蜀国的皇帝有哪些?蜀国的皇帝列表排名
当前视点!明晚8点开播!央视网络春晚第二波阵容官宣:王心凌、撒贝宁等加盟
魔兽国服关闭当天 老外喜迎新版本升级 网友:暴雪杀人诛心
豆瓣9.2分神作!《新·福音战士剧场版:终》终于官宣引进
天天时讯:2023年电脑城奸商依然猖狂:3千元笔记本卖5千 出库不能退
为黛西小姐开车故事背景是什么?为黛西小姐开车故事梗概是什么?
打电动是什么意思?打电动是什么游戏?
特百惠是哪国的牌子?特百惠卖什么产品?
电视机顶盒怎么连接电视机?电视机顶盒怎么破解?
怎么给冰箱加氟?冰箱加氟一般需要多少钱?
excel怎么转化为在线表格?excel怎么转化为PDF?
lol怎么亮徽章?lol徽章有什么用?
斗鱼鱼丸多少钱一个?斗鱼鱼丸怎么兑换人民币?
用SGDK开发世嘉MD游戏:入门篇
快资讯:FAA飞航系统已有30年历史 老迈程度堪比N64
环球百事通!90后女孩神还原蔡明春晚40年造型火了 本尊回应5个字
焦点观察:果粉愿望要实现!iPhone 16 Pro直接256GB存储起步
环球快消息!12月轿车销量排名出炉:传统“豪强”反攻、比亚迪也挡不住?
世界观点:超大范围降雪来袭:全国多地上百条高速局部路段公路封闭
最新:误将磁盘格式化的应急响应
头条:【Python爬虫项目实战】Python爬虫豆瓣Top250电影短评数据保存本地
2023最新nacos的windows 10安装(保姆级)
滚动:我国让科幻片成了现实!全球首艘智能型无人系统科考母船交付使用
世界热点评!6英寸墨水屏带来全新听书体验!讯飞有声书评测:内置26种朗读主播 方言英语都能读
全球观速讯丨微软经典Media Player获新生:新版本面向全部Win10用户推出
焦点快看:读编程与类型系统笔记06_函数类型的高级应用
全球热消息:NVIDIA发布GeForce 527.37驱动 4倍性能提升的DLSS3游戏再加一
焦点播报:苹果A处理器不玩性能!iPhone 16曝光:屏幕更完美、2TB售价欲超2万
环球信息:美国家庭平均月薪出炉引热议:超出想象!就这还靠信用卡续命
你贡献过几部iPhone?全球最强打工人:苹果库克年薪近1亿美元 自愿降薪40%
每日速递:监管新规下车险保费最高可降23%?业内人士:有些还会变贵
天天即时看!打破多项纪录!我国汽车产销总量连续14年全球第一:新能源暴涨翻倍
《王者荣耀》兔年春节福利一览:武则天神器传说皮肤来了
我国第一型“金牌火箭” 长二丙火箭成功发射亚太6E卫星
全球动态:[概率论与数理统计]笔记:3.5 大数定律与中心极限定理
环球滚动:Spring Cloud Alibaba 2022.0.0.0 版本发布啦!
国产高端手机份额第一!卢伟冰:小米13系列好评99% 自然销量高
【天天快播报】史上第一颗6GHz CPU!i9-13900KS发布:性能涨3% 价格涨20%
理想汽车CEO曾试图接触威马沈晖?本人回应:纯属放屁!
世界新资讯:韩国第一个月球探测器发回第一张照片:地月黑白合影
每日资讯:【深度学习】常用PyTorch CUDA版本whl下载及在线安装命令
世界今日讯!「闲话随笔」势能分析法
全球短讯!MQ——如何选择消息队列
微头条丨女子乘火车遇麻将专列生意火爆 还能K歌引围观:网友直呼想买站票
全球观焦点:三级应急响应启动!寒潮预警升级 降温图又变紫了:局地降超20度
天天观天下!盖茨自曝他的主力机是三星Galaxy Z Fold4:李在镕送的
全球热文:《卧龙:苍天陨落》中配预告 声优大佬云集、虎牢关战吕布超燃
学习笔记——Mybatis中缓存机制
播报:区块链特辑——solidity语言基础(五)
热文:Python实现希尔排序、快速排序、归并排序
苹果兔年新春大片《过五关》发布!iPhone 14 Pro拍摄国粹京剧
天天看点:Matplotlib学习笔记1 - 上手制作一些图表吧!
焦点消息!最远能跑1200km 长安智电iDD正式发布:一公里不到2毛钱
世界动态:3月发售 《生化危机4:重制版》中文配音首曝:艾达王登场
世界百事通!能怎样?爱奇艺回应没限制投屏只限制清晰度被喷无耻:律师称确没违法
要闻:uwsgi 快速入门
焦点速递!学习笔记——Mybatis动态SQL
《原神》开发商米哈游上热搜:间接持股企业超百家
一度因黑客攻击被迫停服:狼人杀游戏《鹅鸭杀》服务器重新上线
天天视点!魅族新专利:可根据坐姿判断是否疲劳驾驶
全球焦点!奔驰L3级自动驾驶获批:司机可看书、喝咖啡 事故奔驰负责
PS“迷你版”Paint.net推5.0正式更新:Win7/8.1系统被放弃
央视网络春晚官宣:王冰冰、董宇辉加盟
天天快资讯:N95口罩价格大跳水 单价跌破1块钱!专家:是否戴N95口罩看场合
来自后端的愤怒!
【全球新要闻】广东东莞的刮痧板在欧美爆火:有公司年出口量可达700万片
限制投屏清晰度后:爱奇艺TV版新功能允许远程为父母付款
世界热资讯!性能领先24核i9多达24% AMD锐龙7000 3D缓存版闹乌龙:情人节没戏
全球快资讯丨为什么手机的内存都到了16GB 电脑才普及8GB呢?
秦始皇兵马俑考古重大新发现!6000多件宝物、一号坑甬道被人挖开过
腾讯E证通实现人脸核验(.Net6)
天天短讯!排查+解决 Java 内存泄漏,最基本的方法,你必须学会!
环球热推荐:看这篇就够了丨基于Calcite框架的SQL语法扩展探索
今日讯!都想来分一杯羹,低代码到底服务谁?
04-Sed操作参数
看点:799元 ROG龙鳞ACE Aimlab鼠标上市:仅重54g
2030年前我国将送人上月球 新一代载人火箭三级发动机点火成功
天天快讯:《王者荣耀》兔年限定皮肤英雄公布:马克、瑶妹喜提新装
【报资讯】1300多种首次公开!中国空间站居然种了这么多“菜”
中国第一商贩:“傻子瓜子”创始人年广九逝世 享年84岁
全球速递!【Python爬虫实战项目】Python爬虫批量下载去哪儿网站旅游数据保存本地
世界热门:辛苦加班了一年,为何年终奖比同级同事少?
天天视点!Nginx 高级篇
天天热推荐:python中argparse模块的用法
【天天快播报】使用python自动发送图片给微信好友
焦点日报:美国上万架航班停飞!初步调查:因一个数据库文件损坏
环球观察:小米13 Pro用上安卓最高端屏幕 雷军:价格最贵
4000吨巨型“充电宝”广东下水:每天可发2.4万度电
每日信息:高德地图上线一键报平安功能 路程家人一看便知
天天最资讯丨2022年新生儿爆款名字出炉上热搜:男/女孩这些名字土爆了?
环球热文:【验证码逆向专栏】某验二代滑块验证码逆向分析
开源动物行为分析实验箱(斯金纳箱)特色解析及试用条款
环球信息:爱奇艺APP投屏加钱 上海消保委点名批评:不合理、不厚道
特斯拉降价效果显著!友商会不会跟进降价?