最新要闻

广告

手机

iphone11大小尺寸是多少?苹果iPhone11和iPhone13的区别是什么?

iphone11大小尺寸是多少?苹果iPhone11和iPhone13的区别是什么?

警方通报辅警执法直播中被撞飞:犯罪嫌疑人已投案

警方通报辅警执法直播中被撞飞:犯罪嫌疑人已投案

家电

(三)elasticsearch 源码之启动流程分析

来源:博客园

1.前面我们在《(一)elasticsearch 编译和启动》和 《(二)elasticsearch 源码目录 》简单了解下es(elasticsearch,下同),现在我们来看下启动代码


(相关资料图)

下面是启动流程图,我们按照流程图的顺序依次描述

2.启动流程

org.elasticsearch.bootstrap.Elasticsearchpublic static void main(final String[] args) throws Exception {        overrideDnsCachePolicyProperties();        /*         * We want the JVM to think there is a security manager installed so that if internal policy decisions that would be based on the         * presence of a security manager or lack thereof act as if there is a security manager present (e.g., DNS cache policy). This         * forces such policies to take effect immediately.         */        System.setSecurityManager(new SecurityManager() {            @Override            public void checkPermission(Permission perm) {                // grant all permissions so that we can later set the security manager to the one that we want            }        });        LogConfigurator.registerErrorListener();        final Elasticsearch elasticsearch = new Elasticsearch();        int status = main(args, elasticsearch, Terminal.DEFAULT);        if (status != ExitCodes.OK) {            exit(status);        }    }

后续执行 Elasticsearch.execute -> Elasticsearch.init -> Bootstrap.init

org.elasticsearch.bootstrap.Bootstrapstatic void init(            final boolean foreground,            final Path pidFile,            final boolean quiet,            final Environment initialEnv) throws BootstrapException, NodeValidationException, UserException {        // force the class initializer for BootstrapInfo to run before        // the security manager is installed        BootstrapInfo.init();        INSTANCE = new Bootstrap();        // 安全配置文件        final SecureSettings keystore = loadSecureSettings(initialEnv);        final Environment environment = createEnvironment(pidFile, keystore, initialEnv.settings(), initialEnv.configFile());        LogConfigurator.setNodeName(Node.NODE_NAME_SETTING.get(environment.settings()));        try {            LogConfigurator.configure(environment);        } catch (IOException e) {            throw new BootstrapException(e);        }        if (JavaVersion.current().compareTo(JavaVersion.parse("11")) < 0) {            final String message = String.format(                            Locale.ROOT,                            "future versions of Elasticsearch will require Java 11; " +                                    "your Java version from [%s] does not meet this requirement",                            System.getProperty("java.home"));            new DeprecationLogger(LogManager.getLogger(Bootstrap.class)).deprecated(message);        }        // 处理pidFile        if (environment.pidFile() != null) {            try {                PidFile.create(environment.pidFile(), true);            } catch (IOException e) {                throw new BootstrapException(e);            }        }        // 如果是后台启动,则不打印日志        final boolean closeStandardStreams = (foreground == false) || quiet;        try {            if (closeStandardStreams) {                final Logger rootLogger = LogManager.getRootLogger();                final Appender maybeConsoleAppender = Loggers.findAppender(rootLogger, ConsoleAppender.class);                if (maybeConsoleAppender != null) {                    Loggers.removeAppender(rootLogger, maybeConsoleAppender);                }                closeSystOut();            }            // fail if somebody replaced the lucene jars            checkLucene();            // 通用异常捕获            // install the default uncaught exception handler; must be done before security is            // initialized as we do not want to grant the runtime permission            // setDefaultUncaughtExceptionHandler            Thread.setDefaultUncaughtExceptionHandler(new ElasticsearchUncaughtExceptionHandler());            INSTANCE.setup(true, environment);            try {                // any secure settings must be read during node construction                IOUtils.close(keystore);            } catch (IOException e) {                throw new BootstrapException(e);            }            INSTANCE.start();            if (closeStandardStreams) {                closeSysError();            }        }

这里我们可以关注下 INSTANCE.setup(true, environment);

org.elasticsearch.bootstrap.Bootstrapprivate void setup(boolean addShutdownHook, Environment environment) throws BootstrapException {        Settings settings = environment.settings();        try {            spawner.spawnNativeControllers(environment);        } catch (IOException e) {            throw new BootstrapException(e);        }        // 检查一些mlock设定        initializeNatives(                environment.tmpFile(),                BootstrapSettings.MEMORY_LOCK_SETTING.get(settings),                BootstrapSettings.SYSTEM_CALL_FILTER_SETTING.get(settings),                BootstrapSettings.CTRLHANDLER_SETTING.get(settings));        // 探针        // initialize probes before the security manager is installed        initializeProbes();        if (addShutdownHook) {            Runtime.getRuntime().addShutdownHook(new Thread() {                @Override                public void run() {                    try {                        IOUtils.close(node, spawner);                        LoggerContext context = (LoggerContext) LogManager.getContext(false);                        Configurator.shutdown(context);                        if (node != null && node.awaitClose(10, TimeUnit.SECONDS) == false) {                            throw new IllegalStateException("Node didn"t stop within 10 seconds. " +                                    "Any outstanding requests or tasks might get killed.");                        }                    } catch (IOException ex) {                        throw new ElasticsearchException("failed to stop node", ex);                    } catch (InterruptedException e) {                        LogManager.getLogger(Bootstrap.class).warn("Thread got interrupted while waiting for the node to shutdown.");                        Thread.currentThread().interrupt();                    }                }            });        }        try {            // 检查类加载的一些问题            // look for jar hell            final Logger logger = LogManager.getLogger(JarHell.class);            JarHell.checkJarHell(logger::debug);        } catch (IOException | URISyntaxException e) {            throw new BootstrapException(e);        }        // Log ifconfig output before SecurityManager is installed        IfConfig.logIfNecessary();        // 安全处理        // install SM after natives, shutdown hooks, etc.        try {            Security.configure(environment, BootstrapSettings.SECURITY_FILTER_BAD_DEFAULTS_SETTING.get(settings));        } catch (IOException | NoSuchAlgorithmException e) {            throw new BootstrapException(e);        }        node = new Node(environment) {            @Override            protected void validateNodeBeforeAcceptingRequests(                final BootstrapContext context,                final BoundTransportAddress boundTransportAddress, List checks) throws NodeValidationException {                BootstrapChecks.check(context, boundTransportAddress, checks);            }        };    }

最后一句 node = new Node(environment) 初始化了节点,里面做了许多工作

org.elasticsearch.node.Nodeprotected Node(            final Environment environment, Collection> classpathPlugins, boolean forbidPrivateIndexSettings) {    ...            // 打印jvm信息            final JvmInfo jvmInfo = JvmInfo.jvmInfo();            logger.info(                "version[{}], pid[{}], build[{}/{}/{}/{}], OS[{}/{}/{}], JVM[{}/{}/{}/{}]",                Build.CURRENT.getQualifiedVersion(),                jvmInfo.pid(),                Build.CURRENT.flavor().displayName(),                Build.CURRENT.type().displayName(),                Build.CURRENT.hash(),                Build.CURRENT.date(),                Constants.OS_NAME,                Constants.OS_VERSION,                Constants.OS_ARCH,                Constants.JVM_VENDOR,                Constants.JVM_NAME,                Constants.JAVA_VERSION,                Constants.JVM_VERSION);...            // 初始化各类服务,以及他们相关的依赖            this.pluginsService = new PluginsService(tmpSettings, environment.configFile(), environment.modulesFile(),                environment.pluginsFile(), classpathPlugins);            final Settings settings = pluginsService.updatedSettings();            final Set possibleRoles = Stream.concat(                    DiscoveryNodeRole.BUILT_IN_ROLES.stream(),                    pluginsService.filterPlugins(Plugin.class)                            .stream()                            .map(Plugin::getRoles)                            .flatMap(Set::stream))                    .collect(Collectors.toSet());            DiscoveryNode.setPossibleRoles(possibleRoles);            localNodeFactory = new LocalNodeFactory(settings, nodeEnvironment.nodeId());...            // guice注入            modules.add(b -> {                    b.bind(Node.class).toInstance(this);                    b.bind(NodeService.class).toInstance(nodeService);                    b.bind(NamedXContentRegistry.class).toInstance(xContentRegistry);                    b.bind(PluginsService.class).toInstance(pluginsService);                    b.bind(Client.class).toInstance(client);                    b.bind(NodeClient.class).toInstance(client);                    b.bind(Environment.class).toInstance(this.environment);                    b.bind(ThreadPool.class).toInstance(threadPool);

es 使用 guice注入框架,guice是个非常轻量级的依赖注入框架,既然各个组件都已经注入好了,我们现在可以启动了。

INSTANCE.start -> Bootstrap.start

org.elasticsearch.bootstrap.Bootstrapprivate void start() throws NodeValidationException {        node.start();        keepAliveThread.start();    }

node.start中启动各个组件。es中的各个组件继承了 AbstractLifecycleComponent。start方法会调用组件的doStart方法。

org.elasticsearch.node.Nodepublic Node start() throws NodeValidationException {        if (!lifecycle.moveToStarted()) {            return this;        }        logger.info("starting ...");        pluginLifecycleComponents.forEach(LifecycleComponent::start);        injector.getInstance(MappingUpdatedAction.class).setClient(client);        injector.getInstance(IndicesService.class).start();        injector.getInstance(IndicesClusterStateService.class).start();        injector.getInstance(SnapshotsService.class).start();        injector.getInstance(SnapshotShardsService.class).start();        injector.getInstance(SearchService.class).start();        nodeService.getMonitorService().start();        final ClusterService clusterService = injector.getInstance(ClusterService.class);        final NodeConnectionsService nodeConnectionsService = injector.getInstance(NodeConnectionsService.class);        nodeConnectionsService.start();        clusterService.setNodeConnectionsService(nodeConnectionsService);    ...

具体的我们看两个比较重要的服务 transportService.start();

org.elasticsearch.transport.TransportService@Override    protected void doStart() {        transport.setMessageListener(this);        connectionManager.addListener(this);        // 建立网络连接        transport.start();        if (transport.boundAddress() != null && logger.isInfoEnabled()) {            logger.info("{}", transport.boundAddress());            for (Map.Entry entry : transport.profileBoundAddresses().entrySet()) {                logger.info("profile [{}]: {}", entry.getKey(), entry.getValue());            }        }        localNode = localNodeFactory.apply(transport.boundAddress());        if (connectToRemoteCluster) {            // here we start to connect to the remote clusters            remoteClusterService.initializeRemoteClusters();        }    }

启动transport的实现类是 SecurityNetty4HttpServerTransport

另一个比较重要的服务,discovery.start(),具体实现类是 Coordinator

org.elasticsearch.cluster.coordination.Coordinator@Override    protected void doStart() {        synchronized (mutex) {            CoordinationState.PersistedState persistedState = persistedStateSupplier.get();            coordinationState.set(new CoordinationState(getLocalNode(), persistedState, electionStrategy));            peerFinder.setCurrentTerm(getCurrentTerm());            configuredHostsResolver.start();            final ClusterState lastAcceptedState = coordinationState.get().getLastAcceptedState();            if (lastAcceptedState.metaData().clusterUUIDCommitted()) {                logger.info("cluster UUID [{}]", lastAcceptedState.metaData().clusterUUID());            }            final VotingConfiguration votingConfiguration = lastAcceptedState.getLastCommittedConfiguration();            if (singleNodeDiscovery &&                votingConfiguration.isEmpty() == false &&                votingConfiguration.hasQuorum(Collections.singleton(getLocalNode().getId())) == false) {                throw new IllegalStateException("cannot start with [" + DiscoveryModule.DISCOVERY_TYPE_SETTING.getKey() + "] set to [" +                    DiscoveryModule.SINGLE_NODE_DISCOVERY_TYPE + "] when local node " + getLocalNode() +                    " does not have quorum in voting configuration " + votingConfiguration);            }            ...

关键词: 网络连接 我们来看 配置文件