最新要闻

广告

手机

iphone11大小尺寸是多少?苹果iPhone11和iPhone13的区别是什么?

iphone11大小尺寸是多少?苹果iPhone11和iPhone13的区别是什么?

警方通报辅警执法直播中被撞飞:犯罪嫌疑人已投案

警方通报辅警执法直播中被撞飞:犯罪嫌疑人已投案

家电

今日最新!(六)elasticsearch 源码之选主流程分析

来源:博客园

1.概述

es(elasticsearch,下同)的选举流程相对来说比较简单,使用的bully算法,简而言之,就是谁强谁就是老大,待会儿看下怎么判定谁更强。

2.选主流程

在启动篇中我们讲解了节点启动的一些逻辑

// org.elasticsearch.node.Node        // start after transport service so the local disco is known        discovery.start(); // start before cluster service so that it can set initial state on ClusterApplierService        clusterService.start();        assert clusterService.localNode().equals(localNodeFactory.getNode())            : "clusterService has a different local node than the factory provided";        transportService.acceptIncomingRequests(); // 接受请求        discovery.startInitialJoin(); // 开始选主流程

startInitialJoin中使用互斥锁确保同一时刻只有一个join线程


(相关资料图)

// org.elasticsearch.discovery.zen.ZenDiscovery    public void startInitialJoin() {        // start the join thread from a cluster state update. See {@link JoinThreadControl} for details.        synchronized (stateMutex) { // 互斥锁            // do the join on a different thread, the caller of this method waits for 30s anyhow till it is discovered            joinThreadControl.startNewThreadIfNotRunning();        }    }

开启线程进行join

// org.elasticsearch.discovery.zen.ZenDiscovery        public void startNewThreadIfNotRunning() {            assert Thread.holdsLock(stateMutex);            if (joinThreadActive()) { // 已经有了join线程                return;            }            threadPool.generic().execute(new Runnable() {                @Override                public void run() {                    Thread currentThread = Thread.currentThread();                    if (!currentJoinThread.compareAndSet(null, currentThread)) {                        return;                    }                    while (running.get() && joinThreadActive(currentThread)) { // 不断尝试join                        try {                            innerJoinCluster();                            return;                        } catch (Exception e) {                            logger.error("unexpected error while joining cluster, trying again", e);                            // Because we catch any exception here, we want to know in                            // tests if an uncaught exception got to this point and the test infra uncaught exception                            // leak detection can catch this. In practise no uncaught exception should leak                            assert ExceptionsHelper.reThrowIfNotNull(e);                        }                    }                    // cleaning the current thread from currentJoinThread is done by explicit calls.                }            });        }...    private void innerJoinCluster() {        DiscoveryNode masterNode = null;        final Thread currentThread = Thread.currentThread();        nodeJoinController.startElectionContext();        while (masterNode == null && joinThreadControl.joinThreadActive(currentThread)) {            masterNode = findMaster(); // 先寻找主节点        }        if (!joinThreadControl.joinThreadActive(currentThread)) {            logger.trace("thread is no longer in currentJoinThread. Stopping.");            return;        }        if (transportService.getLocalNode().equals(masterNode)) { // 当前节点是主节点            final int requiredJoins = Math.max(0, electMaster.minimumMasterNodes() - 1); // we count as one            logger.debug("elected as master, waiting for incoming joins ([{}] needed)", requiredJoins);            nodeJoinController.waitToBeElectedAsMaster(requiredJoins, masterElectionWaitForJoinsTimeout, // 等待被选举为master                    new NodeJoinController.ElectionCallback() {                        @Override                        public void onElectedAsMaster(ClusterState state) {                            synchronized (stateMutex) {                                joinThreadControl.markThreadAsDone(currentThread);                            }                        }                        @Override                        public void onFailure(Throwable t) {                            logger.trace("failed while waiting for nodes to join, rejoining", t);                            synchronized (stateMutex) {                                joinThreadControl.markThreadAsDoneAndStartNew(currentThread);                            }                        }                    }            );        } else { // 当前节点不是主节点            // process any incoming joins (they will fail because we are not the master)            nodeJoinController.stopElectionContext(masterNode + " elected");            // send join request            final boolean success = joinElectedMaster(masterNode); // join master节点            synchronized (stateMutex) {                if (success) {                    DiscoveryNode currentMasterNode = this.clusterState().getNodes().getMasterNode();                    if (currentMasterNode == null) {                        // Post 1.3.0, the master should publish a new cluster state before acking our join request. we now should have                        // a valid master.                        logger.debug("no master node is set, despite of join request completing. retrying pings.");                        joinThreadControl.markThreadAsDoneAndStartNew(currentThread);                    } else if (currentMasterNode.equals(masterNode) == false) {                        // update cluster state                        joinThreadControl.stopRunningThreadAndRejoin("master_switched_while_finalizing_join");                    }                    joinThreadControl.markThreadAsDone(currentThread);                } else {                    // failed to join. Try again...                    joinThreadControl.markThreadAsDoneAndStartNew(currentThread);                }            }        }    }

这里分为三步

step1, 我们先看如何确定当前节点需要join哪个master节点,es会先ping其他节点获取他们的状态信息

// org.elasticsearch.discovery.zen.ZenDiscovery       private DiscoveryNode findMaster() {        logger.trace("starting to ping");        List fullPingResponses = pingAndWait(pingTimeout).toList();        if (fullPingResponses == null) {            logger.trace("No full ping responses");            return null;        }        if (logger.isTraceEnabled()) {            StringBuilder sb = new StringBuilder();            if (fullPingResponses.size() == 0) {                sb.append(" {none}");            } else {                for (ZenPing.PingResponse pingResponse : fullPingResponses) {                    sb.append("\n\t--> ").append(pingResponse);                }            }            logger.trace("full ping responses:{}", sb);        }        final DiscoveryNode localNode = transportService.getLocalNode();        // add our selves        assert fullPingResponses.stream().map(ZenPing.PingResponse::node)            .filter(n -> n.equals(localNode)).findAny().isPresent() == false;        fullPingResponses.add(new ZenPing.PingResponse(localNode, null, this.clusterState()));        // filter responses        final List pingResponses = filterPingResponses(fullPingResponses, masterElectionIgnoreNonMasters, logger);        List activeMasters = new ArrayList<>(); // 活跃主节点        for (ZenPing.PingResponse pingResponse : pingResponses) {            // We can"t include the local node in pingMasters list, otherwise we may up electing ourselves without            // any check / verifications from other nodes in ZenDiscover#innerJoinCluster()            if (pingResponse.master() != null && !localNode.equals(pingResponse.master())) {                activeMasters.add(pingResponse.master());            }        }        // nodes discovered during pinging        List masterCandidates = new ArrayList<>(); // 候选主节点        for (ZenPing.PingResponse pingResponse : pingResponses) {            if (pingResponse.node().isMasterNode()) {                masterCandidates.add(new ElectMasterService.MasterCandidate(pingResponse.node(), pingResponse.getClusterStateVersion()));            }        }        if (activeMasters.isEmpty()) {            if (electMaster.hasEnoughCandidates(masterCandidates)) {                final ElectMasterService.MasterCandidate winner = electMaster.electMaster(masterCandidates); // 没有活跃的master,从所有master节点中选取                logger.trace("candidate {} won election", winner);                return winner.getNode();            } else {                // if we don"t have enough master nodes, we bail, because there are not enough master to elect from                logger.warn("not enough master nodes discovered during pinging (found [{}], but needed [{}]), pinging again",                            masterCandidates, electMaster.minimumMasterNodes());                return null;            }        } else {            assert !activeMasters.contains(localNode) :                "local node should never be elected as master when other nodes indicate an active master";            // lets tie break between discovered nodes            return electMaster.tieBreakActiveMasters(activeMasters); // 决胜!!!        }    }

可以看到es根据集群状态版本号,和节点id大小(小的优先)来判定哪个节点能成为master

// org.elasticsearch.discovery.zen.ElectMasterService        public static int compare(MasterCandidate c1, MasterCandidate c2) {            // we explicitly swap c1 and c2 here. the code expects "better" is lower in a sorted            // list, so if c2 has a higher cluster state version, it needs to come first.            int ret = Long.compare(c2.clusterStateVersion, c1.clusterStateVersion); // 集群状态版本号高的优先            if (ret == 0) {                ret = compareNodes(c1.getNode(), c2.getNode());            }            return ret;        }...    private static int compareNodes(DiscoveryNode o1, DiscoveryNode o2) {        if (o1.isMasterNode() && !o2.isMasterNode()) { // 主节点优先            return -1;        }        if (!o1.isMasterNode() && o2.isMasterNode()) {            return 1;        }        return o1.getId().compareTo(o2.getId()); // 然后比较节点id    }

step 2.如果当前节点是master节点,那么等待其他节点join,关注 waitToBeElectedAsMaster 中的逻辑,检查是否有足够多的join请求

// org.elasticsearch.discovery.zen.NodeJoinController        public void waitToBeElectedAsMaster(int requiredMasterJoins, TimeValue timeValue, final ElectionCallback callback) {        final CountDownLatch done = new CountDownLatch(1);        final ElectionCallback wrapperCallback = new ElectionCallback() {            @Override            public void onElectedAsMaster(ClusterState state) {                done.countDown();                callback.onElectedAsMaster(state);            }            @Override            public void onFailure(Throwable t) {                done.countDown();                callback.onFailure(t);            }        };        ElectionContext myElectionContext = null;        try {            // check what we have so far..            // capture the context we add the callback to make sure we fail our own            synchronized (this) {                assert electionContext != null : "waitToBeElectedAsMaster is called we are not accumulating joins";                myElectionContext = electionContext;                electionContext.onAttemptToBeElected(requiredMasterJoins, wrapperCallback); // 做一些检查工作                checkPendingJoinsAndElectIfNeeded(); // 看下是否有足够的join请求            }            try {                if (done.await(timeValue.millis(), TimeUnit.MILLISECONDS)) { // 超时判断                    // callback handles everything                    return;                }            } catch (InterruptedException e) {            }            if (logger.isTraceEnabled()) {                final int pendingNodes = myElectionContext.getPendingMasterJoinsCount();                logger.trace("timed out waiting to be elected. waited [{}]. pending master node joins [{}]", timeValue, pendingNodes);            }            failContextIfNeeded(myElectionContext, "timed out waiting to be elected");        } catch (Exception e) {            logger.error("unexpected failure while waiting for incoming joins", e);            if (myElectionContext != null) {                failContextIfNeeded(myElectionContext, "unexpected failure while waiting for pending joins [" + e.getMessage() + "]");            }        }    }    private synchronized void checkPendingJoinsAndElectIfNeeded() {        assert electionContext != null : "election check requested but no active context";        final int pendingMasterJoins = electionContext.getPendingMasterJoinsCount();        if (electionContext.isEnoughPendingJoins(pendingMasterJoins) == false) { // 没有足够的join请求            if (logger.isTraceEnabled()) {                logger.trace("not enough joins for election. Got [{}], required [{}]", pendingMasterJoins,                    electionContext.requiredMasterJoins);            }        } else {            if (logger.isTraceEnabled()) {                logger.trace("have enough joins for election. Got [{}], required [{}]", pendingMasterJoins,                    electionContext.requiredMasterJoins);            }            electionContext.closeAndBecomeMaster(); // 选主成功了            electionContext = null; // clear this out so future joins won"t be accumulated        }    }

step 3.如果当前节点不是master节点,那么join此master节点,发送请求即可

// org.elasticsearch.discovery.zen.ZenDiscovery    private boolean joinElectedMaster(DiscoveryNode masterNode) {        try {            // first, make sure we can connect to the master            transportService.connectToNode(masterNode);        } catch (Exception e) {            logger.warn(() -> new ParameterizedMessage("failed to connect to master [{}], retrying...", masterNode), e);            return false;        }        int joinAttempt = 0; // we retry on illegal state if the master is not yet ready        while (true) {            try {                logger.trace("joining master {}", masterNode);                membership.sendJoinRequestBlocking(masterNode, transportService.getLocalNode(), joinTimeout); // 发送join请求,以阻塞的方式                return true;            }...

最后我们再来看看es是如何处理join 请求的

// org.elasticsearch.discovery.zen.MembershipAction    private class JoinRequestRequestHandler implements TransportRequestHandler {        @Override        public void messageReceived(final JoinRequest request, final TransportChannel channel, Task task) throws Exception {            listener.onJoin(request.getNode(), new JoinCallback() {                @Override                public void onSuccess() {                    try {                        channel.sendResponse(TransportResponse.Empty.INSTANCE);                    } catch (Exception e) {                        onFailure(e);                    }                }                @Override                public void onFailure(Exception e) {                    try {                        channel.sendResponse(e);                    } catch (Exception inner) {                        inner.addSuppressed(e);                        logger.warn("failed to send back failure on join request", inner);                    }                }            });        }    }
// org.elasticsearch.discovery.zen.ZenDiscovery    private class MembershipListener implements MembershipAction.MembershipListener {        @Override        public void onJoin(DiscoveryNode node, MembershipAction.JoinCallback callback) {            handleJoinRequest(node, ZenDiscovery.this.clusterState(), callback);        }        @Override        public void onLeave(DiscoveryNode node) {            handleLeaveRequest(node);        }    }    void handleJoinRequest(final DiscoveryNode node, final ClusterState state, final MembershipAction.JoinCallback callback) {        if (nodeJoinController == null) {            throw new IllegalStateException("discovery module is not yet started");        } else {            // we do this in a couple of places including the cluster update thread. This one here is really just best effort            // to ensure we fail as fast as possible.            onJoinValidators.stream().forEach(a -> a.accept(node, state));            if (state.getBlocks().hasGlobalBlock(STATE_NOT_RECOVERED_BLOCK) == false) {                JoinTaskExecutor.ensureMajorVersionBarrier(node.getVersion(), state.getNodes().getMinNodeVersion());            }            // try and connect to the node, if it fails, we can raise an exception back to the client...            transportService.connectToNode(node); // 看下发送请求的节点是否可以连通            // validate the join request, will throw a failure if it fails, which will get back to the            // node calling the join request            try {                membership.sendValidateJoinRequestBlocking(node, state, joinTimeout); // 和发送请求的节点验证下请求的合法性,有点儿像tcp            } catch (Exception e) {                logger.warn(() -> new ParameterizedMessage("failed to validate incoming join request from node [{}]", node),                    e);                callback.onFailure(new IllegalStateException("failure when sending a validation request to node", e));                return;            }            nodeJoinController.handleJoinRequest(node, callback);        }    }    public synchronized void handleJoinRequest(final DiscoveryNode node, final MembershipAction.JoinCallback callback) {        if (electionContext != null) {            electionContext.addIncomingJoin(node, callback);// 将node节点加入到map中            checkPendingJoinsAndElectIfNeeded();        } else {            masterService.submitStateUpdateTask("zen-disco-node-join",                new JoinTaskExecutor.Task(node, "no election context"), ClusterStateTaskConfig.build(Priority.URGENT),                joinTaskExecutor, new JoinTaskListener(callback, logger));        }    }

3.总结

es的选举算法流程比较简单,流程性的代码比较多。bully算法相对其他选举算法,比如raft,zab,来说也比较简单。

关键词: 发送请求 比较简单