最新要闻
- 全球今亮点!野性觉醒!动物园办虎兔交接仪式:兔子差点成“盘中餐”
- 【环球新视野】GXG男士休闲长款羊毛大衣:到手199元
- 天天热头条丨行业唯一!雅迪电动车官宣登上2023年央视春晚
- 世界今亮点!寒气逼人!“裁员潮”席卷美科技巨头:谷歌母公司将裁1.2万人
- 王晶武侠电影 《天龙八部之乔峰传》今日上线“爱优腾”三大平台
- 世界第一高楼迪拜塔点亮中国红:春晚吉祥物“兔圆圆”亮相
- 最低气温跌破-50摄氏度 漠河游客:刚出门就想钻回炕上
- 【速看料】用上自研3.0T发动机 坦克700实车曝光:水泥灰质感十足
- 天天微速讯:做清洁的噩梦!潮汕女子晒大扫除:一家人出动擦镂空家具
- 当前快讯:春节假期7天免通行费!交通部:目前全国高速公路通行畅通
- 大批AMD RX 6000挂掉的原因找到了!挖矿、湿度影响
- 专家:春节期间最好每餐七分饱
- 环球热文:比苹果良心!微信支付春节福利来了:App Store充值9折
- 环球最新:毫米波无线投屏带来全新体验 红魔电竞显示器上手
- 环球今日报丨戴尔发布新款32寸6K显示器:剑指苹果Pro Display XDR
- 焦点精选!徐志胜首登春晚:担心在春晚嘴瓢每天练绕口令
手机
iphone11大小尺寸是多少?苹果iPhone11和iPhone13的区别是什么?
警方通报辅警执法直播中被撞飞:犯罪嫌疑人已投案
- iphone11大小尺寸是多少?苹果iPhone11和iPhone13的区别是什么?
- 警方通报辅警执法直播中被撞飞:犯罪嫌疑人已投案
- 男子被关545天申国赔:获赔18万多 驳回精神抚慰金
- 3天内26名本土感染者,辽宁确诊人数已超安徽
- 广西柳州一男子因纠纷杀害三人后自首
- 洱海坠机4名机组人员被批准为烈士 数千干部群众悼念
家电
世界热门:SOFAJRaft模块启动过程
本篇文章旨在分析SOFAJRaft中jraft-example模块的启动过程,由于SOFAJRaft在持续开源的过程中,所以无法保证示例代码永远是最新的,要是有较大的变动或者纰漏、错误的地方,欢迎大家留言讨论。@Author:Akai-yuan更新时间:2023年1月20日
SOFAJRaft-Start Our Journey
1. 开门见山:main方法概览
public static void main(final String[] args) throws IOException { if (args.length != 4) { System.out .println("Usage : java com.alipay.sofa.jraft.example.counter.CounterServer {dataPath} {groupId} {serverId} {initConf}"); System.out .println("Example: java com.alipay.sofa.jraft.example.counter.CounterServer /tmp/server1 counter 127.0.0.1:8081 127.0.0.1:8081,127.0.0.1:8082,127.0.0.1:8083"); System.exit(1); } //日志存储路径 final String dataPath = args[0]; //SOFAJRaft集群的名字 final String groupId = args[1]; //当前节点的ip和端口 final String serverIdStr = args[2]; //集群节点的ip和端口 final String initConfStr = args[3]; final NodeOptions nodeOptions = new NodeOptions(); // for test, modify some params // 设置选举超时时间为 1 秒 nodeOptions.setElectionTimeoutMs(1000); // 关闭 CLI 服务 nodeOptions.setDisableCli(false); // 每隔30秒做一次 snapshot nodeOptions.setSnapshotIntervalSecs(30); // 解析参数 final PeerId serverId = new PeerId(); if (!serverId.parse(serverIdStr)) { throw new IllegalArgumentException("Fail to parse serverId:" + serverIdStr); } final Configuration initConf = new Configuration(); //将raft分组加入到Configuration的peers数组中 if (!initConf.parse(initConfStr)) { throw new IllegalArgumentException("Fail to parse initConf:" + initConfStr); } // 设置初始集群配置 nodeOptions.setInitialConf(initConf); // 启动raft server final CounterServer counterServer = new CounterServer(dataPath, groupId, serverId, nodeOptions); System.out.println("Started counter server at port:" + counterServer.getNode().getNodeId().getPeerId().getPort()); // GrpcServer need block to prevent process exit CounterGrpcHelper.blockUntilShutdown();}
我们在启动CounterServer的main方法的时候,会将传入的String[]类型参数args分别转化为日志存储的路径、SOFAJRaft集群的名字、当前节点的ip和端口、集群节点的ip和端口,并设值到NodeOptions中,作为当前节点启动的参数。
(资料图)
2. 对象转换:创建PeerId
引子:在main方法中,我们可以看到,程序将String类型参数转换成了PeerId对象,那么接下来我们需要探究转换的具体过程。
在转换当前节点并初始化为一个PeerId对象的过程中,调用了PeerId中的parse方法:
public boolean parse(final String s) { if (StringUtils.isEmpty(s)) { return false; } final String[] tmps = Utils.parsePeerId(s); if (tmps.length < 2 || tmps.length > 4) { return false; } try { final int port = Integer.parseInt(tmps[1]); this.endpoint = new Endpoint(tmps[0], port); switch (tmps.length) { case 3: this.idx = Integer.parseInt(tmps[2]); break; case 4: if (tmps[2].equals("")) { this.idx = 0; } else { this.idx = Integer.parseInt(tmps[2]); } this.priority = Integer.parseInt(tmps[3]); break; default: break; } this.str = null; return true; } catch (final Exception e) { LOG.error("Parse peer from string failed: {}.", s, e); return false; }}
该方法内部又调用了工具类Utils.parsePeerId,最终达到的效果如下:其中,a、b分别对应IP和Port端口号,组成了PeerId的EndPoint属性;c指代idx【同一地址中的索引,默认值为0】;d指代priority优先级【节点的本地优先级值,如果节点不支持优先级选择,则该值为-1】。
PeerId.parse("a:b") = new PeerId("a", "b", 0 , -1)PeerId.parse("a:b:c") = new PeerId("a", "b", "c", -1)PeerId.parse("a:b::d") = new PeerId("a", "b", 0, "d")PeerId.parse("a:b:c:d") = new PeerId("a", "b", "c", "d")
3. 渐入佳境:构造CountServer
引子:在main方法中,我们可以看到,进行初步的参数解析后,调用了CountServer的构造器,要说这个构造器,第一次看里面的步骤确实会感觉挺复杂的,接下来我们一起分析一下源码。
CountServer构造器的源码如下:
public CounterServer(final String dataPath, final String groupId, final PeerId serverId, final NodeOptions nodeOptions) throws IOException { // 初始化raft data path, 它包含日志、元数据、快照 FileUtils.forceMkdir(new File(dataPath)); // 这里让 raft RPC 和业务 RPC 使用同一个 RPC server, 通常也可以分开 final RpcServer rpcServer = RaftRpcServerFactory.createRaftRpcServer(serverId.getEndpoint()); // GrpcServer need init marshaller CounterGrpcHelper.initGRpc(); CounterGrpcHelper.setRpcServer(rpcServer); // 注册业务处理器 CounterService counterService = new CounterServiceImpl(this); rpcServer.registerProcessor(new GetValueRequestProcessor(counterService)); rpcServer.registerProcessor(new IncrementAndGetRequestProcessor(counterService)); // 初始化状态机 this.fsm = new CounterStateMachine(); // 设置状态机到启动参数 nodeOptions.setFsm(this.fsm); // 设置存储路径 (包含日志、元数据、快照) // 日志(必须) nodeOptions.setLogUri(dataPath + File.separator + "log"); // 元数据(必须) nodeOptions.setRaftMetaUri(dataPath + File.separator + "raft_meta"); // 快照(可选, 一般都推荐) nodeOptions.setSnapshotUri(dataPath + File.separator + "snapshot"); // 初始化 raft group 服务框架 this.raftGroupService = new RaftGroupService(groupId, serverId, nodeOptions, rpcServer); // 启动 this.node = this.raftGroupService.start(); }
接下来仔细说说CountServer的构造器里面具体做了什么。
4. 追根溯源:RpcServer
引子:CountServer构造器中调用的RaftRpcServerFactory.createRaftRpcServer()方法,底层到底是如何构造出一个RpcServer的呢,接下来会和大家讨论createRaftRpcServer()方法的具体实现
首先请看RaftRpcServerFactory.createRaftRpcServer(serverId.getEndpoint())方法:createRaftRpcServer方法目前有createRaftRpcServer(final Endpoint endpoint)和createRaftRpcServer(final Endpoint endpoint, final Executor raftExecutor,final Executor cliExecutor)两个重载方法,其实不管哪个方法,本质上实现过程都有如下两个步骤:(1)首先调用了GrpcRaftRpcFactory的createRpcServer方法,这里涉及gRpc构建server的底层知识,有时间会再写一篇文章探究一下gRpc,这里可以简单理解为构建了一个rpc服务端。该方法实现如下:
public RpcServer createRpcServer(final Endpoint endpoint, final ConfigHelper helper) { final int port = Requires.requireNonNull(endpoint, "endpoint").getPort(); Requires.requireTrue(port > 0 && port < 0xFFFF, "port out of range:" + port); final MutableHandlerRegistry handlerRegistry = new MutableHandlerRegistry(); final Server server = ServerBuilder.forPort(port) // .fallbackHandlerRegistry(handlerRegistry) // .directExecutor() // .maxInboundMessageSize(RPC_MAX_INBOUND_MESSAGE_SIZE) // .build(); final RpcServer rpcServer = new GrpcServer(server, handlerRegistry, this.parserClasses, getMarshallerRegistry()); if (helper != null) { helper.config(rpcServer); } return rpcServer;}
(2)紧接着调用addRaftRequestProcessors,这个方法为RpcServer添加RAFT和CLI服务核心请求处理器,关于RpcProcessor这个实体类,会在后面的文章中具体分析,这里可以先"不求甚解"。
//添加RAFT和CLI服务请求处理器public static void addRaftRequestProcessors(final RpcServer rpcServer, final Executor raftExecutor, final Executor cliExecutor) { // 添加raft核心处理器 final AppendEntriesRequestProcessor appendEntriesRequestProcessor = new AppendEntriesRequestProcessor( raftExecutor); rpcServer.registerConnectionClosedEventListener(appendEntriesRequestProcessor); rpcServer.registerProcessor(appendEntriesRequestProcessor); rpcServer.registerProcessor(new GetFileRequestProcessor(raftExecutor)); rpcServer.registerProcessor(new InstallSnapshotRequestProcessor(raftExecutor)); rpcServer.registerProcessor(new RequestVoteRequestProcessor(raftExecutor)); rpcServer.registerProcessor(new PingRequestProcessor()); rpcServer.registerProcessor(new TimeoutNowRequestProcessor(raftExecutor)); rpcServer.registerProcessor(new ReadIndexRequestProcessor(raftExecutor)); // 添加raft cli服务处理器 rpcServer.registerProcessor(new AddPeerRequestProcessor(cliExecutor)); rpcServer.registerProcessor(new RemovePeerRequestProcessor(cliExecutor)); rpcServer.registerProcessor(new ResetPeerRequestProcessor(cliExecutor)); rpcServer.registerProcessor(new ChangePeersRequestProcessor(cliExecutor)); rpcServer.registerProcessor(new GetLeaderRequestProcessor(cliExecutor)); rpcServer.registerProcessor(new SnapshotRequestProcessor(cliExecutor)); rpcServer.registerProcessor(new TransferLeaderRequestProcessor(cliExecutor)); rpcServer.registerProcessor(new GetPeersRequestProcessor(cliExecutor)); rpcServer.registerProcessor(new AddLearnersRequestProcessor(cliExecutor)); rpcServer.registerProcessor(new RemoveLearnersRequestProcessor(cliExecutor)); rpcServer.registerProcessor(new ResetLearnersRequestProcessor(cliExecutor)); }
5. 一探究竟:CounterGrpcHelper做了什么
CountServer构造器在初步创建RpcServer后,调用了CounterGrpcHelper.initGRpc()和CounterGrpcHelper.setRpcServer(rpcServer)两个方法,接下来和大家分析这两个方法的实现过程
首先请看initGRpc方法:RpcFactoryHelper.rpcFactory()实际是调用了GrpcRaftRpcFactory(因为GrpcRaftRpcFactory实现了RaftRpcFactory接口),GrpcRaftRpcFactory中维护了一个ConcurrentHashMap
public static void initGRpc() { if ("com.alipay.sofa.jraft.rpc.impl.GrpcRaftRpcFactory".equals(RpcFactoryHelper.rpcFactory().getClass() .getName())) { RpcFactoryHelper.rpcFactory().registerProtobufSerializer(CounterOutter.GetValueRequest.class.getName(), CounterOutter.GetValueRequest.getDefaultInstance()); RpcFactoryHelper.rpcFactory().registerProtobufSerializer( CounterOutter.IncrementAndGetRequest.class.getName(), CounterOutter.IncrementAndGetRequest.getDefaultInstance()); RpcFactoryHelper.rpcFactory().registerProtobufSerializer(CounterOutter.ValueResponse.class.getName(), CounterOutter.ValueResponse.getDefaultInstance()); try { Class> clazz = Class.forName("com.alipay.sofa.jraft.rpc.impl.MarshallerHelper"); Method registerRespInstance = clazz.getMethod("registerRespInstance", String.class, Message.class); registerRespInstance.invoke(null, CounterOutter.GetValueRequest.class.getName(), CounterOutter.ValueResponse.getDefaultInstance()); registerRespInstance.invoke(null, CounterOutter.IncrementAndGetRequest.class.getName(), CounterOutter.ValueResponse.getDefaultInstance()); } catch (Exception e) { LOG.error("Failed to init grpc server", e); } } }
接着我们再看setRpcServer方法:CounterGrpcHelper里面还维护了一个RpcServer实例,CounterGrpcHelper.setRpcServer(rpcServer)实际上会将构造的RpcServer装配到CounterGrpcHelper里面。
public static void setRpcServer(RpcServer rpcServer) { CounterGrpcHelper.rpcServer = rpcServer;}
6.乘胜追击:RaftGroupService
在CountServer构造器中,经过上述一系列操作步骤,走到了RaftGroupService构造器中,在构造RaftGroupService实体后,调用了它的start方法,这一步在于初始化 raft group 服务框架
public synchronized Node start(final boolean startRpcServer) { //如果已经启动了,那么就返回 if (this.started) { return this.node; } //校验serverId和groupId if (this.serverId == null || this.serverId.getEndpoint() == null || this.serverId.getEndpoint().equals(new Endpoint(Utils.IP_ANY, 0))) { throw new IllegalArgumentException("Blank serverId:" + this.serverId); } if (StringUtils.isBlank(this.groupId)) { throw new IllegalArgumentException("Blank group id:" + this.groupId); } //设置当前node的ip和端口 NodeManager.getInstance().addAddress(this.serverId.getEndpoint()); //创建node this.node = RaftServiceFactory.createAndInitRaftNode(this.groupId, this.serverId, this.nodeOptions); if (startRpcServer) { //启动远程服务 this.rpcServer.init(null); } else { LOG.warn("RPC server is not started in RaftGroupService."); } this.started = true; LOG.info("Start the RaftGroupService successfully."); return this.node; }
这个方法会在一开始的时候对RaftGroupService在构造器实例化的参数进行校验,然后把当前节点的Endpoint添加到NodeManager的addrSet变量中,接着调用RaftServiceFactory#createAndInitRaftNode实例化Node节点。每个节点都会启动一个rpc的服务,因为每个节点既可以被选举也可以投票给其他节点,节点之间需要互相通信,所以需要启动一个rpc服务。
7.刨根问底:Node节点的创建
以下就是Node节点的一系列创建过程,由于嵌套的层数比较多,所以就全部列举出来了,整个过程简而言之就是,createAndInitRaftNode方法首先调用createRaftNode实例化一个Node的实例NodeImpl,然后调用其init方法进行初始化,主要的配置都是在init方法中完成的。代码如下:
this.node = RaftServiceFactory.createAndInitRaftNode(this.groupId, this.serverId, this.nodeOptions);
public static Node createAndInitRaftNode(final String groupId, final PeerId serverId, final NodeOptions opts) { final Node ret = createRaftNode(groupId, serverId); if (!ret.init(opts)) { throw new IllegalStateException("Fail to init node, please see the logs to find the reason."); } return ret; }
public static Node createRaftNode(final String groupId, final PeerId serverId) { return new NodeImpl(groupId, serverId); }
public NodeImpl(final String groupId, final PeerId serverId) { super(); if (groupId != null) { Utils.verifyGroupId(groupId); } this.groupId = groupId; this.serverId = serverId != null ? serverId.copy() : null; this.state = State.STATE_UNINITIALIZED; this.currTerm = 0; updateLastLeaderTimestamp(Utils.monotonicMs()); this.confCtx = new ConfigurationCtx(this); this.wakingCandidate = null; final int num = GLOBAL_NUM_NODES.incrementAndGet(); LOG.info("The number of active nodes increment to {}.", num); }
8.一窥到底:Node节点的初始化
老实说,NodeImpl#init方法确实挺长的,所以我打算分成几个部分来展示,方便分析
(1)《参数的赋值与校验》
这段代码主要是给各个变量赋值,然后进行校验判断一下serverId不能为0.0.0.0,当前的Endpoint必须要在NodeManager里面设置过等等(NodeManager的设置是在RaftGroupService的start方法里)。然后会初始化一个全局的的定时调度管理器TimerManager:
//一系列判空操作Requires.requireNonNull(opts, "Null node options"); Requires.requireNonNull(opts.getRaftOptions(), "Null raft options"); Requires.requireNonNull(opts.getServiceFactory(), "Null jraft service factory"); //JRaftServiceFactory目前有3个实现类 // 1.BDBLogStorageJRaftServiceFactory // 2.DefaultJRaftServiceFactory // 3.HybridLogJRaftServiceFactorythis.serviceFactory = opts.getServiceFactory(); this.options = opts; this.raftOptions = opts.getRaftOptions(); //基于 Metrics 类库的性能指标统计,具有丰富的性能统计指标,默认为false,不开启度量工具 this.metrics = new NodeMetrics(opts.isEnableMetrics()); this.serverId.setPriority(opts.getElectionPriority()); this.electionTimeoutCounter = 0; //Utils.IP_ANY = "0.0.0.0" if (this.serverId.getIp().equals(Utils.IP_ANY)) { LOG.error("Node can"t started from IP_ANY."); return false; } if (!NodeManager.getInstance().serverExists(this.serverId.getEndpoint())) { LOG.error("No RPC server attached to, did you forget to call addService?"); return false; } if (this.options.getAppendEntriesExecutors() == null) { this.options.setAppendEntriesExecutors(Utils.getDefaultAppendEntriesExecutor()); } //定时任务管理器//此处TIMER_FACTORY获取到的是DefaultRaftTimerFactory //this.options.isSharedTimerPool()默认为false//this.options.getTimerPoolSize()取值为Utils.cpus() * 3 > 20 ? 20 : Utils.cpus() * 3 this.timerManager = TIMER_FACTORY.getRaftScheduler(this.options.isSharedTimerPool(), this.options.getTimerPoolSize(), "JRaft-Node-ScheduleThreadPool");
此处浅析一下__TimerManager:初始化一个线程池,根据传入的参数this.options.getTimerPoolSize()==Utils.cpus() * 3 > 20 ? 20 : Utils.cpus() * 3可以分析得知如果当前的服务器的cpu线程数_3 大于20 ,那么这个线程池的coreSize就是20,否则就是cpu线程数的_3倍。
public TimerManager(int workerNum, String name) { this.executor = ThreadPoolUtil.newScheduledBuilder() // .poolName(name) // .coreThreads(workerNum) // .enableMetric(true) // .threadFactory(new NamedThreadFactory(name, true)) // .build(); }
(2)《计时器的初始化》
由于这些计时器的实现比较繁杂,所以具体功能等到后面对应章节再一并梳理。
- voteTimer是用来控制选举的,如果选举超时,当前的节点又是候选者角色,那么就会发起选举。
- electionTimer是预投票计时器。候选者在发起投票之前,先发起预投票,如果没有得到半数以上节点的反馈,则候选者就会识趣的放弃参选。
- stepDownTimer定时检查是否需要重新选举leader。当前的leader可能出现它的Follower可能并没有整个集群的1/2却还没有下台的情况,那么这个时候会定期的检查看leader的Follower是否有那么多,没有那么多的话会强制让leader下台。
- snapshotTimer快照计时器。这个计时器会每隔1小时触发一次生成一个快照。
这些计时器有一个共同的特点就是会根据不同的计时器返回一个在一定范围内随机的时间。返回一个随机的时间可以防止多个节点在同一时间内同时发起投票选举从而降低选举失败的概率。
//设置投票计时器 final String suffix = getNodeId().toString(); String name = "JRaft-VoteTimer-" + suffix; this.voteTimer = new RepeatedTimer(name, this.options.getElectionTimeoutMs(), TIMER_FACTORY.getVoteTimer( this.options.isSharedVoteTimer(), name)) { //处理投票超时 @Override protected void onTrigger() { handleVoteTimeout(); } //在一定范围内返回一个随机的时间戳 @Override protected int adjustTimeout(final int timeoutMs) { return randomTimeout(timeoutMs); } };//设置预投票计时器//当leader在规定的一段时间内没有与 Follower 舰船进行通信时,// Follower 就可以认为leader已经不能正常担任旗舰的职责,则 Follower 可以去尝试接替leader的角色。// 这段通信超时被称为 Election Timeout//候选者在发起投票之前,先发起预投票 name = "JRaft-ElectionTimer-" + suffix; this.electionTimer = new RepeatedTimer(name, this.options.getElectionTimeoutMs(), TIMER_FACTORY.getElectionTimer(this.options.isSharedElectionTimer(), name)) { @Override protected void onTrigger() { handleElectionTimeout(); } //在一定范围内返回一个随机的时间戳 //为了避免同时发起选举而导致失败 @Override protected int adjustTimeout(final int timeoutMs) { return randomTimeout(timeoutMs); } }; //leader下台的计时器//定时检查是否需要重新选举leader name = "JRaft-StepDownTimer-" + suffix; this.stepDownTimer = new RepeatedTimer(name, this.options.getElectionTimeoutMs() >> 1, TIMER_FACTORY.getStepDownTimer(this.options.isSharedStepDownTimer(), name)) { @Override protected void onTrigger() { handleStepDownTimeout(); } };//快照计时器 name = "JRaft-SnapshotTimer-" + suffix; this.snapshotTimer = new RepeatedTimer(name, this.options.getSnapshotIntervalSecs() * 1000, TIMER_FACTORY.getSnapshotTimer(this.options.isSharedSnapshotTimer(), name)) { private volatile boolean firstSchedule = true; @Override protected void onTrigger() { handleSnapshotTimeout(); } @Override protected int adjustTimeout(final int timeoutMs) { if (!this.firstSchedule) { return timeoutMs; } // Randomize the first snapshot trigger timeout this.firstSchedule = false; if (timeoutMs > 0) { int half = timeoutMs / 2; return half + ThreadLocalRandom.current().nextInt(half); } else { return timeoutMs; } } };
(3)《消费队列Disruptor》
关于Disruptor的内容,后面有时间会写一篇相关的文章进行分享
这里初始化了一个Disruptor作为消费队列,然后校验了metrics是否开启,默认是不开启的
this.configManager = new ConfigurationManager();//初始化一个disruptor,采用多生产者模式 this.applyDisruptor = DisruptorBuilder.newInstance() // //设置disruptor大小,默认16384 .setRingBufferSize(this.raftOptions.getDisruptorBufferSize()) // .setEventFactory(new LogEntryAndClosureFactory()) // .setThreadFactory(new NamedThreadFactory("JRaft-NodeImpl-Disruptor-", true)) // .setProducerType(ProducerType.MULTI) // .setWaitStrategy(new BlockingWaitStrategy()) // .build();//设置事件处理器 this.applyDisruptor.handleEventsWith(new LogEntryAndClosureHandler());//设置异常处理器 this.applyDisruptor.setDefaultExceptionHandler(new LogExceptionHandler
(4)《功能初始化》
对快照、日志、元数据等功能进行初始化
//fsmCaller封装对业务 StateMachine 的状态转换的调用以及日志的写入等this.fsmCaller = new FSMCallerImpl(); //初始化日志存储功能 if (!initLogStorage()) { LOG.error("Node {} initLogStorage failed.", getNodeId()); return false; }//初始化元数据存储功能 if (!initMetaStorage()) { LOG.error("Node {} initMetaStorage failed.", getNodeId()); return false; } //对FSMCaller初始化 if (!initFSMCaller(new LogId(0, 0))) { LOG.error("Node {} initFSMCaller failed.", getNodeId()); return false; } //实例化投票箱 this.ballotBox = new BallotBox(); final BallotBoxOptions ballotBoxOpts = new BallotBoxOptions(); ballotBoxOpts.setWaiter(this.fsmCaller); ballotBoxOpts.setClosureQueue(this.closureQueue); //初始化ballotBox的属性 if (!this.ballotBox.init(ballotBoxOpts)) { LOG.error("Node {} init ballotBox failed.", getNodeId()); return false; } //初始化快照存储功能 if (!initSnapshotStorage()) { LOG.error("Node {} initSnapshotStorage failed.", getNodeId()); return false; } //校验日志文件索引的一致性 final Status st = this.logManager.checkConsistency(); if (!st.isOk()) { LOG.error("Node {} is initialized with inconsistent log, status={}.", getNodeId(), st); return false; } //配置管理raft group中的信息 this.conf = new ConfigurationEntry(); this.conf.setId(new LogId()); // if have log using conf in log, else using conf in options if (this.logManager.getLastLogIndex() > 0) { checkAndSetConfiguration(false); } else { this.conf.setConf(this.options.getInitialConf()); // initially set to max(priority of all nodes) this.targetPriority = getMaxPriorityOfNodes(this.conf.getConf().getPeers()); } if (!this.conf.isEmpty()) { Requires.requireTrue(this.conf.isValid(), "Invalid conf: %s", this.conf); } else { LOG.info("Init node {} with empty conf.", this.serverId); }
初始化replicatorGroup、rpcService以及readOnlyService:
// TODO RPC service and ReplicatorGroup is in cycle dependent, refactor it this.replicatorGroup = new ReplicatorGroupImpl(); //收其他节点或者客户端发过来的请求,转交给对应服务处理 this.rpcService = new DefaultRaftClientService(this.replicatorGroup, this.options.getAppendEntriesExecutors()); final ReplicatorGroupOptions rgOpts = new ReplicatorGroupOptions(); rgOpts.setHeartbeatTimeoutMs(heartbeatTimeout(this.options.getElectionTimeoutMs())); rgOpts.setElectionTimeoutMs(this.options.getElectionTimeoutMs()); rgOpts.setLogManager(this.logManager); rgOpts.setBallotBox(this.ballotBox); rgOpts.setNode(this); rgOpts.setRaftRpcClientService(this.rpcService); rgOpts.setSnapshotStorage(this.snapshotExecutor != null ? this.snapshotExecutor.getSnapshotStorage() : null); rgOpts.setRaftOptions(this.raftOptions); rgOpts.setTimerManager(this.timerManager); // Adds metric registry to RPC service. this.options.setMetricRegistry(this.metrics.getMetricRegistry()); //初始化rpc服务 if (!this.rpcService.init(this.options)) { LOG.error("Fail to init rpc service."); return false; } this.replicatorGroup.init(new NodeId(this.groupId, this.serverId), rgOpts); this.readOnlyService = new ReadOnlyServiceImpl(); final ReadOnlyServiceOptions rosOpts = new ReadOnlyServiceOptions(); rosOpts.setFsmCaller(this.fsmCaller); rosOpts.setNode(this); rosOpts.setRaftOptions(this.raftOptions); //只读服务初始化 if (!this.readOnlyService.init(rosOpts)) { LOG.error("Fail to init readOnlyService."); return false; }
(5)《逻辑变动》
这段代码里会将当前的状态设置为Follower,然后启动快照定时器定时生成快照。如果当前的集群不是单节点集群需要做一下stepDown,表示新生成的Node节点需要重新进行选举。最下面有一个if分支,如果当前的jraft集群里只有一个节点,那么个节点必定是leader直接进行选举就好了,所以会直接调用electSelf进行选举。
// 将当前的状态设置为Follower this.state = State.STATE_FOLLOWER; if (LOG.isInfoEnabled()) { LOG.info("Node {} init, term={}, lastLogId={}, conf={}, oldConf={}.", getNodeId(), this.currTerm, this.logManager.getLastLogId(false), this.conf.getConf(), this.conf.getOldConf()); } //如果快照执行器不为空,并且生成快照的时间间隔大于0,那么就定时生成快照 if (this.snapshotExecutor != null && this.options.getSnapshotIntervalSecs() > 0) { LOG.debug("Node {} start snapshot timer, term={}.", getNodeId(), this.currTerm); this.snapshotTimer.start(); } //新启动的node需要重新选举 if (!this.conf.isEmpty()) { stepDown(this.currTerm, false, new Status()); } if (!NodeManager.getInstance().add(this)) { LOG.error("NodeManager add {} failed.", getNodeId()); return false; } // Now the raft node is started , have to acquire the writeLock to avoid race // conditions this.writeLock.lock(); //这个分支表示当前的jraft集群里只有一个节点,那么个节点必定是leader直接进行选举就好了 if (this.conf.isStable() && this.conf.getConf().size() == 1 && this.conf.getConf().contains(this.serverId)) { // The group contains only this server which must be the LEADER, trigger // the timer immediately. electSelf(); } else { this.writeLock.unlock(); } return true;
9.写在最后
SOFAJRaft 是一个基于 RAFT 一致性算法的生产级高性能 Java 实现。第一次阅读这种复杂的开源代码,老实说确实非常吃力,但其实步步深入,反复推敲,逐渐会从恐惧陌生甚至抵触,转变为惊喜与赞叹。你会慢慢痴迷于里面很多优雅且优秀的实现。在这里,感谢SOFAJRaft的每一位代码贡献者。源码的阅读过程中,的的确确学到了很多东西。我也会继续学习下去,希望能够巩固、深入我对RAFT一致性算法的理解与体悟。
世界热门:SOFAJRaft模块启动过程
全球今亮点!野性觉醒!动物园办虎兔交接仪式:兔子差点成“盘中餐”
【环球新视野】GXG男士休闲长款羊毛大衣:到手199元
天天热头条丨行业唯一!雅迪电动车官宣登上2023年央视春晚
世界今亮点!寒气逼人!“裁员潮”席卷美科技巨头:谷歌母公司将裁1.2万人
王晶武侠电影 《天龙八部之乔峰传》今日上线“爱优腾”三大平台
世界第一高楼迪拜塔点亮中国红:春晚吉祥物“兔圆圆”亮相
最低气温跌破-50摄氏度 漠河游客:刚出门就想钻回炕上
世界今热点:[数据结构] 队列 (C语言)
Wallpaper Engine壁纸提取工具,pkg文件提取静态图片,pkg文件转jpg,pkg文件转png,pkg文件转图像,pkg文件提取图像
【速看料】用上自研3.0T发动机 坦克700实车曝光:水泥灰质感十足
天天微速讯:做清洁的噩梦!潮汕女子晒大扫除:一家人出动擦镂空家具
天天简讯:LeetCode.面试题02.05-链表求和-题解分析
当前快讯:春节假期7天免通行费!交通部:目前全国高速公路通行畅通
大批AMD RX 6000挂掉的原因找到了!挖矿、湿度影响
专家:春节期间最好每餐七分饱
环球热文:比苹果良心!微信支付春节福利来了:App Store充值9折
环球最新:毫米波无线投屏带来全新体验 红魔电竞显示器上手
C 忽远忽近的距离【2023牛客寒假算法基础集训营3 】
环球今日报丨戴尔发布新款32寸6K显示器:剑指苹果Pro Display XDR
焦点精选!徐志胜首登春晚:担心在春晚嘴瓢每天练绕口令
当前快看:土耳其上空出现碟形透镜云:恍若UFO、相当震撼
年终奖入手iPhone 14:听我的别贴膜 没啥用
一口一只小苍蝇:不恶心吗?
3小时挪动300米堵哭了!去海南自驾请注意:需预约购票
世界关注:读函数式编程思维笔记02_转变思维
【世界新视野】《塞尔达传说:王国之泪》即将发售 网友抱怨任天堂还没开始宣发
当前观点:广东游客专程来感受-50℃的漠河 有车企趁极寒测试车辆性能
每日消息!张朝阳用物理知识解构《流浪地球2》:吴京听惊讶 竖拇指点赞
【世界报资讯】首发4899元!机械革命推出新款蛟龙5游戏本:锐龙7 7735H+RTX 3050
热点聚焦:10余款皮肤齐上架!玩家深夜把《王者荣耀》充值系统挤崩溃了
学习笔记——SpringMVC消息转换器概述;使用消息转换器处理请求报文;使用消息转换器处理响应报文;使用消息转换器处理Json格式数据
今日热讯:D 宿命之间的对决【2023牛客寒假算法基础集训营3】
用AutoHotkey解决B站客户端缓存视频m4s合并成mp4提示解析失败
天天热门:刚提特斯拉Model Y电池遇故障 五天坏两次!车主“愤怒”要求换新车
近十日全国平均气温为历史第三暖:大部分地区天气利于出行
世界讯息:HGAME2023_WP_WEEK2
双向链表
每日短讯:爱做美甲女生注意了!过年美甲小心致癌突变
实时焦点:2023央视春晚没有贾玲张小斐:此前连续5年登台
苹果着手开发iPhone 16 Pro!灵动岛要扩大
《流浪地球2》首波口碑好评如潮 吴京:希望中国电影以该片为底气
关注:Intel扼杀超频!13代酷睿再也不能“白嫖”了
世界球精选!iPhone 14 Pro系列供需已平衡:之前要等一个多月 一机难求
【环球时快讯】了解Spring
刷掌支付快到了:腾讯新刷掌设备专利发布
最新消息:身家千万老板被骗到缅甸做诈骗:3个月暴瘦40多斤
世界快消息!Lisp的求值规则
每日速讯:需求激增!国内客运航班量连续13天上涨
焦点热门:禁止中国玩家参加炉石赛事引众怒 用户抵制卸载无惧:暴雪态度没缓和
【世界速看料】全力推动首车落地!贾跃亭回应FF落地湖北黄冈:何时归国依旧成谜
环球即时:打工一年享受享受怎么了 HKC 27寸4K 144Hz显示器3449元
每日热讯!2023春节档预售票房破5亿:张艺谋电影《满江红》领跑
世界快消息!我的2022年终总结
全球热议:春节宠物寄养生意火爆!猫狗“标间”599元/天 “总统套房”5400元/月
广州明起向外卖骑手、快递员发“春节补贴”:每人每天150元
【焦点热闻】男子开“电动爹”回东北:200公里充次电 充了14次
【全球独家】微软XGP彻底赢麻了 索尼再次面临“断气”风险
国产最强皮卡!长城山海炮6x6版内饰谍照:相当豪华
世界观速讯丨您是否存在想在浏览器动态编译razor的组件的想法?
Obsidian 插件(一):DataView 的使用
天天短讯!单向循环链表
京东代言人!中国选手谷爱凌夺2023年首个冠军
马斯克没钱了?推特拍卖数百件办公资产:“小蓝鸟”雕像10万美元成交
今日关注:灵山“网红牛”不幸被车撞死:一片狼藉
天天快报!无短板办公利器!联想小新Air 14 Plus图赏
短讯!深圳一公司年会送员工保时捷跑车:还有宝马、特斯拉
全球聚焦:单向链表
全球今头条!商务部:每卖4辆新车就有1辆新能源
TGA年度最佳游戏 《双人成行》Steam新史低:73.26元
微资讯!八核酷睿+满血RTX 3060!机械革命极光Pro图赏
春节档电影票均价7年来首降:你会选择看哪部电影过年?
一加11独家适配!国产魂Like手游《帕斯卡契约》新增高品质震动
国人不识货?本田思域TYPE R因太畅销在日本暂停接单
全球速读:今年这回 我们总可以对春节档有点期待了吧
热消息:雷军回顾2022年:小米13得到大家认可最开心
育碧喜加一:《雷曼:疯狂兔子》免费领!兔年玩疯兔
世界热门:大摆锤高处停摆近10分钟 游乐场回应:两边游客体重相近所致
焦点快报!新年第一月iPhone已降价2次!苹果14首降超千元 国内用户喊真香抄底
每日视点!印度8岁女孩放弃4亿家产出家 看透世俗享乐:从没看过电视
世界热消息:学习笔记——springMVC中视图及视图解析器对象;视图控制器
当前快讯:撕破脸!暴雪禁止中国玩家参加炉石赛事引众怒 国内玩家地址卸载所有游戏
获得版号后首测!米哈游《崩坏:星穹铁道》三测定档1月24日
环球观速讯丨英首相车内讲话未系安全带遭批 IIHS曾做测试:后排不系结果可怕!
今日要闻!《三体》电视剧更新8集 豆瓣评分增至8.1 网友担忧:原创角色正让抠书式还原走形
荣耀MagicBook V 14可升级MagicOS 7.0:新增键鼠共享功能
数据结构:树状数组 学习笔记
环球观热点:Downie V4.6.4 for Mac 视频下载工具
进海南遭世纪大堵车 3小时挪300米?当地回应:前段时间轮渡停航
每日快讯!奥迪胜诉!蔚来在德国被禁止使用“ES6”、“ES8”等名称
世界快消息!卖8万能成爆款?比亚迪海鸥内饰谍照曝光:看齐大哥海豚
当前观察:担心的事发生了 滑雪游客乘坐缆车时坠落!官方回应
环球新资讯:谁能阻挡!比亚迪双线再“扩军”:产能新增或超百万辆
本故事纯属虚构是什么意思?本故事纯属虚构的下一句是什么?
尺子的刻度是什么意思?尺子的刻度是从几开始的?
社会发展的根本动力是什么?社会发展的源泉是什么?
肌底是什么意思?肌底液是干什么用的?
交通问题有哪些?交通问题反馈打什么电话?
ceb文件怎么打开?ceb文件怎么转换成PDF?
水电煤气费一个月大约多少钱?微信怎么交水电煤气费?