最新要闻
- 全球今日报丨DJI这三个字母 是怎么占领你的背包的
- 环球热头条丨告别毛巾“一条恒久远”!金号纯棉抑菌毛巾大促:一条5块钱
- 今日快讯:哪吒汽车联手宁德时代共研“滑板底盘”:电池、底盘合体
- 世界信息:苹果加大降低中国工厂依赖程度:都要搬走?印度成香饽饽 出口激增
- 独占4K AMR 120帧高规格!《流浪地球2》发布CINITY海报
- 唐门鸟翔碧空在哪里学?唐门鸟翔碧空可以放什么技能?
- 雷龙鱼水温多少合适?雷龙吃什么饲料?
- 最新消息:三星Galaxy S23系列定档:2月2日登场 首发新版骁龙8 Gen2
- 《满江红》公布秦桧版预告:饰演者雷佳音狠辣狡诈
- 【全球新要闻】特斯拉大降价 其它车企跟不跟?乘联会秘书长发声
- 焦点速读:万物有灵 被收养流浪狗跳车拦住怀孕主人 下一秒山路塌方
- 每日聚焦:1208元!中国探月航天推出限量火箭碎片:运送嫦娥四号的长三乙
- 加减乘除是谁发明的?加减乘除混合运算100道
- 米亲韩语是什么意思?韩语shake it是什么意思?
- 全高清和超高清有什么区别?全高清和超高清4K哪个更护眼?
- 异丙醇的作用与用途有哪些?异丙醇和酒精的区别是什么?
手机
iphone11大小尺寸是多少?苹果iPhone11和iPhone13的区别是什么?
警方通报辅警执法直播中被撞飞:犯罪嫌疑人已投案
- iphone11大小尺寸是多少?苹果iPhone11和iPhone13的区别是什么?
- 警方通报辅警执法直播中被撞飞:犯罪嫌疑人已投案
- 男子被关545天申国赔:获赔18万多 驳回精神抚慰金
- 3天内26名本土感染者,辽宁确诊人数已超安徽
- 广西柳州一男子因纠纷杀害三人后自首
- 洱海坠机4名机组人员被批准为烈士 数千干部群众悼念
家电
简讯:(五)elasticsearch 源码之查询流程分析
1.概述
上文我们讨论了es(elasticsearch,下同)索引流程,本文讨论es查询流程,以下是基本流程图
2.查询流程
为了方便调试代码,笔者在电脑上启动了了两个节点,创建了一个索引如下,该索引有两个分片,没有复制分片
(资料图片仅供参考)
使用postman发送如下请求:
接下来,我们看代码(本系列文章源代码版本为7.4.0),search查询也是rest请求
// org.elasticsearch.action.support.TransportAction public void proceed(Task task, String actionName, Request request, ActionListener listener) { int i = index.getAndIncrement(); try { if (i < this.action.filters.length) { this.action.filters[i].apply(task, actionName, request, listener, this); // 先处理过滤器 } else if (i == this.action.filters.length) { this.action.doExecute(task, request, listener); // 执行action操作 } else { listener.onFailure(new IllegalStateException("proceed was called too many times")); } } catch(Exception e) { logger.trace("Error during transport action execution.", e); listener.onFailure(e); } }
具体执行操作的是 TransportSearchAction,TransportSearchAction 对查询索引的顺序做了一些优化,我们这里用的是 QUERY_THEN_FETCH
// org.elasticsearch.action.search.TransportSearchAction protected void doExecute(Task task, SearchRequest searchRequest, ActionListener listener) { final long relativeStartNanos = System.nanoTime(); final SearchTimeProvider timeProvider = new SearchTimeProvider(searchRequest.getOrCreateAbsoluteStartMillis(), relativeStartNanos, System::nanoTime); ActionListener rewriteListener = ActionListener.wrap(source -> { if (source != searchRequest.source()) { // only set it if it changed - we don"t allow null values to be set but it might be already null. this way we catch // situations when source is rewritten to null due to a bug searchRequest.source(source); } final ClusterState clusterState = clusterService.state(); final Map remoteClusterIndices = remoteClusterService.groupIndices(searchRequest.indicesOptions(), searchRequest.indices(), idx -> indexNameExpressionResolver.hasIndexOrAlias(idx, clusterState)); OriginalIndices localIndices = remoteClusterIndices.remove(RemoteClusterAware.LOCAL_CLUSTER_GROUP_KEY); if (remoteClusterIndices.isEmpty()) { executeLocalSearch(task, timeProvider, searchRequest, localIndices, clusterState, listener); // 查询当前节点 } else { if (shouldMinimizeRoundtrips(searchRequest)) { // 使用了折叠 ccsRemoteReduce(searchRequest, localIndices, remoteClusterIndices, timeProvider, searchService::createReduceContext, remoteClusterService, threadPool, listener, (r, l) -> executeLocalSearch(task, timeProvider, r, localIndices, clusterState, l)); } else { AtomicInteger skippedClusters = new AtomicInteger(0); collectSearchShards(searchRequest.indicesOptions(), searchRequest.preference(), searchRequest.routing(), skippedClusters, remoteClusterIndices, remoteClusterService, threadPool, ActionListener.wrap( searchShardsResponses -> { List remoteShardIterators = new ArrayList<>(); Map remoteAliasFilters = new HashMap<>(); BiFunction clusterNodeLookup = processRemoteShards( searchShardsResponses, remoteClusterIndices, remoteShardIterators, remoteAliasFilters); int localClusters = localIndices == null ? 0 : 1; int totalClusters = remoteClusterIndices.size() + localClusters; int successfulClusters = searchShardsResponses.size() + localClusters; executeSearch((SearchTask) task, timeProvider, searchRequest, localIndices, remoteShardIterators, clusterNodeLookup, clusterState, remoteAliasFilters, listener, new SearchResponse.Clusters(totalClusters, successfulClusters, skippedClusters.get())); }, listener::onFailure)); } } }, listener::onFailure); if (searchRequest.source() == null) { rewriteListener.onResponse(searchRequest.source()); } else { Rewriteable.rewriteAndFetch(searchRequest.source(), searchService.getRewriteContext(timeProvider::getAbsoluteStartMillis), rewriteListener); // 重写后回调 } }... private void executeSearch(SearchTask task, SearchTimeProvider timeProvider, SearchRequest searchRequest, OriginalIndices localIndices, List remoteShardIterators, BiFunction remoteConnections, ClusterState clusterState, Map remoteAliasMap, ActionListener listener, SearchResponse.Clusters clusters) { clusterState.blocks().globalBlockedRaiseException(ClusterBlockLevel.READ); // 读锁 // TODO: I think startTime() should become part of ActionRequest and that should be used both for index name // date math expressions and $now in scripts. This way all apis will deal with now in the same way instead // of just for the _search api final Index[] indices = resolveLocalIndices(localIndices, searchRequest.indicesOptions(), clusterState, timeProvider); Map aliasFilter = buildPerIndexAliasFilter(searchRequest, clusterState, indices, remoteAliasMap); Map> routingMap = indexNameExpressionResolver.resolveSearchRouting(clusterState, searchRequest.routing(), searchRequest.indices()); routingMap = routingMap == null ? Collections.emptyMap() : Collections.unmodifiableMap(routingMap); Map concreteIndexBoosts = resolveIndexBoosts(searchRequest, clusterState); if (shouldSplitIndices(searchRequest)) { // 分开查询只读索引和在写索引,并且优先查在写索引 //Execute two separate searches when we can, so that indices that are being written to are searched as quickly as possible. //Otherwise their search context would need to stay open for too long between the query and the fetch phase, due to other //indices (possibly slower) being searched at the same time. List writeIndicesList = new ArrayList<>(); List readOnlyIndicesList = new ArrayList<>(); splitIndices(indices, clusterState, writeIndicesList, readOnlyIndicesList); String[] writeIndices = writeIndicesList.toArray(new String[0]); String[] readOnlyIndices = readOnlyIndicesList.toArray(new String[0]); if (readOnlyIndices.length == 0) { executeSearch(task, timeProvider, searchRequest, localIndices, writeIndices, routingMap, aliasFilter, concreteIndexBoosts, remoteShardIterators, remoteConnections, clusterState, listener, clusters); } else if (writeIndices.length == 0 && remoteShardIterators.isEmpty()) { executeSearch(task, timeProvider, searchRequest, localIndices, readOnlyIndices, routingMap, aliasFilter, concreteIndexBoosts, remoteShardIterators, remoteConnections, clusterState, listener, clusters); } else { //Split the search in two whenever throttled indices are searched together with ordinary indices (local or remote), so //that we don"t keep the search context open for too long between query and fetch for ordinary indices due to slow indices. CountDown countDown = new CountDown(2); AtomicReference exceptions = new AtomicReference<>(); SearchResponseMerger searchResponseMerger = createSearchResponseMerger(searchRequest.source(), timeProvider, searchService::createReduceContext); CountDownActionListener countDownActionListener = new CountDownActionListener(countDown, exceptions, listener) { @Override void innerOnResponse(SearchResponse searchResponse) { searchResponseMerger.add(searchResponse); } @Override SearchResponse createFinalResponse() { return searchResponseMerger.getMergedResponse(clusters); } }; //Note that the indices set to the new SearchRequest won"t be retrieved from it, as they have been already resolved and //will be provided separately to executeSearch. SearchRequest writeIndicesRequest = SearchRequest.subSearchRequest(searchRequest, writeIndices, RemoteClusterService.LOCAL_CLUSTER_GROUP_KEY, timeProvider.getAbsoluteStartMillis(), false); executeSearch(task, timeProvider, writeIndicesRequest, localIndices, writeIndices, routingMap, aliasFilter, concreteIndexBoosts, remoteShardIterators, remoteConnections, clusterState, countDownActionListener, SearchResponse.Clusters.EMPTY); //Note that the indices set to the new SearchRequest won"t be retrieved from it, as they have been already resolved and //will be provided separately to executeSearch. SearchRequest readOnlyIndicesRequest = SearchRequest.subSearchRequest(searchRequest, readOnlyIndices, RemoteClusterService.LOCAL_CLUSTER_GROUP_KEY, timeProvider.getAbsoluteStartMillis(), false); executeSearch(task, timeProvider, readOnlyIndicesRequest, localIndices, readOnlyIndices, routingMap, aliasFilter, concreteIndexBoosts, Collections.emptyList(), (alias, id) -> null, clusterState, countDownActionListener, SearchResponse.Clusters.EMPTY); } } else { String[] concreteIndices = Arrays.stream(indices).map(Index::getName).toArray(String[]::new); executeSearch(task, timeProvider, searchRequest, localIndices, concreteIndices, routingMap, aliasFilter, concreteIndexBoosts, remoteShardIterators, remoteConnections, clusterState, listener, clusters); } }... private void executeSearch(SearchTask task, SearchTimeProvider timeProvider, SearchRequest searchRequest, OriginalIndices localIndices, String[] concreteIndices, Map> routingMap, Map aliasFilter, Map concreteIndexBoosts, List remoteShardIterators, BiFunction remoteConnections, ClusterState clusterState, ActionListener listener, SearchResponse.Clusters clusters) { Map nodeSearchCounts = searchTransportService.getPendingSearchRequests(); GroupShardsIterator localShardsIterator = clusterService.operationRouting().searchShards(clusterState, concreteIndices, routingMap, searchRequest.preference(), searchService.getResponseCollectorService(), nodeSearchCounts); GroupShardsIterator shardIterators = mergeShardsIterators(localShardsIterator, localIndices, searchRequest.getLocalClusterAlias(), remoteShardIterators); failIfOverShardCountLimit(clusterService, shardIterators.size()); // optimize search type for cases where there is only one shard group to search on if (shardIterators.size() == 1) { // if we only have one group, then we always want Q_T_F, no need for DFS, and no need to do THEN since we hit one shard searchRequest.searchType(QUERY_THEN_FETCH); // 单个分片,不需要dfs了 } if (searchRequest.allowPartialSearchResults() == null) { // No user preference defined in search request - apply cluster service default searchRequest.allowPartialSearchResults(searchService.defaultAllowPartialSearchResults()); } if (searchRequest.isSuggestOnly()) { // disable request cache if we have only suggest searchRequest.requestCache(false); if (searchRequest.searchType() == DFS_QUERY_THEN_FETCH) { // convert to Q_T_F if we have only suggest searchRequest.searchType(QUERY_THEN_FETCH); } } final DiscoveryNodes nodes = clusterState.nodes(); BiFunction connectionLookup = buildConnectionLookup(searchRequest.getLocalClusterAlias(), nodes::get, remoteConnections, searchTransportService::getConnection); boolean preFilterSearchShards = shouldPreFilterSearchShards(searchRequest, shardIterators); searchAsyncAction(task, searchRequest, shardIterators, timeProvider, connectionLookup, clusterState.version(), Collections.unmodifiableMap(aliasFilter), concreteIndexBoosts, routingMap, listener, preFilterSearchShards, clusters).start(); // 执行 SearchQueryThenFetchAsyncAction,异步处理 }
接下来执行 QUERY_THEN_FETCH的逻辑,从上面的时序图中我们看到 QUERY_THEN_FETCH主要分为四个阶段(phase),init, query, fetch, send response
// org.elasticsearch.action.search.AbstractSearchAsyncAction private void executePhase(SearchPhase phase) { try { phase.run(); // 执行阶段 } catch (Exception e) { if (logger.isDebugEnabled()) { logger.debug(new ParameterizedMessage("Failed to execute [{}] while moving to [{}] phase", request, phase.getName()), e); } onPhaseFailure(phase, "", e); } }
首先是init阶段
// org.elasticsearch.action.search.AbstractSearchAsyncAction public final void run() { for (final SearchShardIterator iterator : toSkipShardsIts) { assert iterator.skip(); skipShard(iterator); } if (shardsIts.size() > 0) { assert request.allowPartialSearchResults() != null : "SearchRequest missing setting for allowPartialSearchResults"; if (request.allowPartialSearchResults() == false) { final StringBuilder missingShards = new StringBuilder(); // Fail-fast verification of all shards being available for (int index = 0; index < shardsIts.size(); index++) { final SearchShardIterator shardRoutings = shardsIts.get(index); if (shardRoutings.size() == 0) { if(missingShards.length() > 0){ missingShards.append(", "); } missingShards.append(shardRoutings.shardId()); } } if (missingShards.length() > 0) { //Status red - shard is missing all copies and would produce partial results for an index search final String msg = "Search rejected due to missing shards ["+ missingShards + "]. Consider using `allow_partial_search_results` setting to bypass this error."; throw new SearchPhaseExecutionException(getName(), msg, null, ShardSearchFailure.EMPTY_ARRAY); } } for (int index = 0; index < shardsIts.size(); index++) { // 轮询分片搜索 final SearchShardIterator shardRoutings = shardsIts.get(index); assert shardRoutings.skip() == false; performPhaseOnShard(index, shardRoutings, shardRoutings.nextOrNull()); } } }
然后是query阶段,query阶段调用transportService去查当前节点,或者其他节点查询符合条件的文档
// org.elasticsearch.action.search.SearchQueryThenFetchAsyncAction protected void executePhaseOnShard(final SearchShardIterator shardIt, final ShardRouting shard, final SearchActionListener listener) { getSearchTransport().sendExecuteQuery(getConnection(shardIt.getClusterAlias(), shard.currentNodeId()), buildShardSearchRequest(shardIt), getTask(), listener); }
节点收到请求后找到对应的处理器处理
// org.elasticsearch.action.search.SearchTransportService transportService.registerRequestHandler(QUERY_ACTION_NAME, ThreadPool.Names.SAME, ShardSearchTransportRequest::new, // 注册query的请求处理器 (request, channel, task) -> { searchService.executeQueryPhase(request, (SearchTask) task, new ChannelActionListener<>( channel, QUERY_ACTION_NAME, request)); });
构建queryContext进行查询
// org.elasticsearch.search.SearchService private SearchPhaseResult executeQueryPhase(ShardSearchRequest request, SearchTask task) throws Exception { final SearchContext context = createAndPutContext(request); context.incRef(); try { context.setTask(task); final long afterQueryTime; try (SearchOperationListenerExecutor executor = new SearchOperationListenerExecutor(context)) { contextProcessing(context); loadOrExecuteQueryPhase(request, context); // query 主逻辑 if (context.queryResult().hasSearchContext() == false && context.scrollContext() == null) { freeContext(context.id()); } else { contextProcessedSuccessfully(context); } afterQueryTime = executor.success(); } if (request.numberOfShards() == 1) { return executeFetchPhase(context, afterQueryTime); // fetch 逻辑 } return context.queryResult(); } catch (Exception e) { // execution exception can happen while loading the cache, strip it if (e instanceof ExecutionException) { e = (e.getCause() == null || e.getCause() instanceof Exception) ? (Exception) e.getCause() : new ElasticsearchException(e.getCause()); } logger.trace("Query phase failed", e); processFailure(context, e); throw e; } finally { cleanContext(context); } }... private void loadOrExecuteQueryPhase(final ShardSearchRequest request, final SearchContext context) throws Exception { final boolean canCache = indicesService.canCache(request, context); context.getQueryShardContext().freezeContext(); if (canCache) { // 看下是否有缓存 indicesService.loadIntoContext(request, context, queryPhase); } else { queryPhase.execute(context); } }... public void execute(SearchContext searchContext) throws QueryPhaseExecutionException { if (searchContext.hasOnlySuggest()) { suggestPhase.execute(searchContext); searchContext.queryResult().topDocs(new TopDocsAndMaxScore( new TopDocs(new TotalHits(0, TotalHits.Relation.EQUAL_TO), Lucene.EMPTY_SCORE_DOCS), Float.NaN), new DocValueFormat[0]); return; } if (LOGGER.isTraceEnabled()) { LOGGER.trace("{}", new SearchContextSourcePrinter(searchContext)); } // Pre-process aggregations as late as possible. In the case of a DFS_Q_T_F // request, preProcess is called on the DFS phase phase, this is why we pre-process them // here to make sure it happens during the QUERY phase aggregationPhase.preProcess(searchContext); final ContextIndexSearcher searcher = searchContext.searcher(); boolean rescore = execute(searchContext, searchContext.searcher(), searcher::setCheckCancelled); // 查询主逻辑 if (rescore) { // only if we do a regular search rescorePhase.execute(searchContext); // 重新打分 } suggestPhase.execute(searchContext); // 处理建议,聚合 aggregationPhase.execute(searchContext); if (searchContext.getProfilers() != null) { ProfileShardResult shardResults = SearchProfileShardResults .buildShardResults(searchContext.getProfilers()); searchContext.queryResult().profileResults(shardResults); } }... static boolean execute(SearchContext searchContext, final IndexSearcher searcher, Consumer checkCancellationSetter) throws QueryPhaseExecutionException { final IndexReader reader = searcher.getIndexReader(); QuerySearchResult queryResult = searchContext.queryResult(); queryResult.searchTimedOut(false); try { queryResult.from(searchContext.from()); queryResult.size(searchContext.size()); Query query = searchContext.query(); assert query == searcher.rewrite(query); // already rewritten final ScrollContext scrollContext = searchContext.scrollContext(); if (scrollContext != null) { if (scrollContext.totalHits == null) { // first round assert scrollContext.lastEmittedDoc == null; // there is not much that we can optimize here since we want to collect all // documents in order to get the total number of hits } else { final ScoreDoc after = scrollContext.lastEmittedDoc; if (returnsDocsInOrder(query, searchContext.sort())) { // now this gets interesting: since we sort in index-order, we can directly // skip to the desired doc if (after != null) { query = new BooleanQuery.Builder() .add(query, BooleanClause.Occur.MUST) .add(new MinDocQuery(after.doc + 1), BooleanClause.Occur.FILTER) .build(); } // ... and stop collecting after ${size} matches searchContext.terminateAfter(searchContext.size()); } else if (canEarlyTerminate(reader, searchContext.sort())) { // now this gets interesting: since the search sort is a prefix of the index sort, we can directly // skip to the desired doc if (after != null) { query = new BooleanQuery.Builder() .add(query, BooleanClause.Occur.MUST) .add(new SearchAfterSortedDocQuery(searchContext.sort().sort, (FieldDoc) after), BooleanClause.Occur.FILTER) .build(); } } } } final LinkedList collectors = new LinkedList<>(); // whether the chain contains a collector that filters documents boolean hasFilterCollector = false; if (searchContext.terminateAfter() != SearchContext.DEFAULT_TERMINATE_AFTER) { // add terminate_after before the filter collectors // it will only be applied on documents accepted by these filter collectors collectors.add(createEarlyTerminationCollectorContext(searchContext.terminateAfter())); // this collector can filter documents during the collection hasFilterCollector = true; } if (searchContext.parsedPostFilter() != null) { // add post filters before aggregations // it will only be applied to top hits collectors.add(createFilteredCollectorContext(searcher, searchContext.parsedPostFilter().query())); // this collector can filter documents during the collection hasFilterCollector = true; } if (searchContext.queryCollectors().isEmpty() == false) { // plug in additional collectors, like aggregations collectors.add(createMultiCollectorContext(searchContext.queryCollectors().values())); } if (searchContext.minimumScore() != null) { // apply the minimum score after multi collector so we filter aggs as well collectors.add(createMinScoreCollectorContext(searchContext.minimumScore())); // this collector can filter documents during the collection hasFilterCollector = true; } boolean timeoutSet = scrollContext == null && searchContext.timeout() != null && searchContext.timeout().equals(SearchService.NO_TIMEOUT) == false; final Runnable timeoutRunnable; if (timeoutSet) { final long startTime = searchContext.getRelativeTimeInMillis(); final long timeout = searchContext.timeout().millis(); final long maxTime = startTime + timeout; timeoutRunnable = () -> { final long time = searchContext.getRelativeTimeInMillis(); if (time > maxTime) { throw new TimeExceededException(); } }; } else { timeoutRunnable = null; } final Runnable cancellationRunnable; if (searchContext.lowLevelCancellation()) { SearchTask task = searchContext.getTask(); cancellationRunnable = () -> { if (task.isCancelled()) throw new TaskCancelledException("cancelled"); }; } else { cancellationRunnable = null; } final Runnable checkCancelled; if (timeoutRunnable != null && cancellationRunnable != null) { checkCancelled = () -> { timeoutRunnable.run(); cancellationRunnable.run(); }; } else if (timeoutRunnable != null) { checkCancelled = timeoutRunnable; } else if (cancellationRunnable != null) { checkCancelled = cancellationRunnable; } else { checkCancelled = null; } checkCancellationSetter.accept(checkCancelled); // add cancellable // this only performs segment-level cancellation, which is cheap and checked regardless of // searchContext.lowLevelCancellation() collectors.add(createCancellableCollectorContext(searchContext.getTask()::isCancelled)); final boolean doProfile = searchContext.getProfilers() != null; // create the top docs collector last when the other collectors are known final TopDocsCollectorContext topDocsFactory = createTopDocsCollectorContext(searchContext, reader, hasFilterCollector); // add the top docs collector, the first collector context in the chain collectors.addFirst(topDocsFactory); final Collector queryCollector; if (doProfile) { InternalProfileCollector profileCollector = QueryCollectorContext.createQueryCollectorWithProfiler(collectors); searchContext.getProfilers().getCurrentQueryProfiler().setCollector(profileCollector); queryCollector = profileCollector; } else { queryCollector = QueryCollectorContext.createQueryCollector(collectors); } try { searcher.search(query, queryCollector); // 调用lucene api } catch (EarlyTerminatingCollector.EarlyTerminationException e) { queryResult.terminatedEarly(true); } catch (TimeExceededException e) { assert timeoutSet : "TimeExceededException thrown even though timeout wasn"t set"; if (searchContext.request().allowPartialSearchResults() == false) { // Can"t rethrow TimeExceededException because not serializable throw new QueryPhaseExecutionException(searchContext, "Time exceeded"); } queryResult.searchTimedOut(true); } finally { searchContext.clearReleasables(SearchContext.Lifetime.COLLECTION); } if (searchContext.terminateAfter() != SearchContext.DEFAULT_TERMINATE_AFTER && queryResult.terminatedEarly() == null) { queryResult.terminatedEarly(false); } final QuerySearchResult result = searchContext.queryResult(); for (QueryCollectorContext ctx : collectors) { ctx.postProcess(result); } ExecutorService executor = searchContext.indexShard().getThreadPool().executor(ThreadPool.Names.SEARCH); if (executor instanceof QueueResizingEsThreadPoolExecutor) { QueueResizingEsThreadPoolExecutor rExecutor = (QueueResizingEsThreadPoolExecutor) executor; queryResult.nodeQueueSize(rExecutor.getCurrentQueueSize()); queryResult.serviceTimeEWMA((long) rExecutor.getTaskExecutionEWMA()); } if (searchContext.getProfilers() != null) { ProfileShardResult shardResults = SearchProfileShardResults.buildShardResults(searchContext.getProfilers()); result.profileResults(shardResults); } return topDocsFactory.shouldRescore(); } catch (Exception e) { throw new QueryPhaseExecutionException(searchContext, "Failed to execute main query", e); } }
至此,节点查询逻辑完成。请求查询的节点对查询结果进行保存
// org.elasticsearch.action.search.AbstractSearchAsyncAction public final void onShardSuccess(Result result) { successfulOps.incrementAndGet(); results.consumeResult(result); // 处理查询结果 if (logger.isTraceEnabled()) { logger.trace("got first-phase result from {}", result != null ? result.getSearchShardTarget() : null); } // clean a previous error on this shard group (note, this code will be serialized on the same shardIndex value level // so its ok concurrency wise to miss potentially the shard failures being created because of another failure // in the #addShardFailure, because by definition, it will happen on *another* shardIndex AtomicArray shardFailures = this.shardFailures.get(); if (shardFailures != null) { shardFailures.set(result.getShardIndex(), null); } }
// org.elasticsearch.action.search.InitialSearchPhase void consumeResult(Result result) { assert results.get(result.getShardIndex()) == null : "shardIndex: " + result.getShardIndex() + " is already set"; results.set(result.getShardIndex(), result); // 处理查询结果 }
下一步是fetch阶段
// org.elasticsearch.action.search.FetchSearchPhase private void innerRun() throws IOException { final int numShards = context.getNumShards(); final boolean isScrollSearch = context.getRequest().scroll() != null; List phaseResults = queryResults.asList(); String scrollId = isScrollSearch ? TransportSearchHelper.buildScrollId(queryResults) : null; final SearchPhaseController.ReducedQueryPhase reducedQueryPhase = resultConsumer.reduce(); // 解析上一步的查询结果,主要是文档id final boolean queryAndFetchOptimization = queryResults.length() == 1; final Runnable finishPhase = () -> moveToNextPhase(searchPhaseController, scrollId, reducedQueryPhase, queryAndFetchOptimization ? queryResults : fetchResults); if (queryAndFetchOptimization) { assert phaseResults.isEmpty() || phaseResults.get(0).fetchResult() != null : "phaseResults empty [" + phaseResults.isEmpty() + "], single result: " + phaseResults.get(0).fetchResult(); // query AND fetch optimization finishPhase.run(); } else { ScoreDoc[] scoreDocs = reducedQueryPhase.sortedTopDocs.scoreDocs; final IntArrayList[] docIdsToLoad = searchPhaseController.fillDocIdsToLoad(numShards, scoreDocs); // fetch哪些文档 if (scoreDocs.length == 0) { // no docs to fetch -- sidestep everything and return phaseResults.stream() .map(SearchPhaseResult::queryResult) .forEach(this::releaseIrrelevantSearchContext); // we have to release contexts here to free up resources finishPhase.run(); } else { final ScoreDoc[] lastEmittedDocPerShard = isScrollSearch ? searchPhaseController.getLastEmittedDocPerShard(reducedQueryPhase, numShards) : null; final CountedCollector counter = new CountedCollector<>(r -> fetchResults.set(r.getShardIndex(), r), docIdsToLoad.length, // we count down every shard in the result no matter if we got any results or not finishPhase, context); for (int i = 0; i < docIdsToLoad.length; i++) { IntArrayList entry = docIdsToLoad[i]; SearchPhaseResult queryResult = queryResults.get(i); if (entry == null) { // no results for this shard ID if (queryResult != null) { // if we got some hits from this shard we have to release the context there // we do this as we go since it will free up resources and passing on the request on the // transport layer is cheap. releaseIrrelevantSearchContext(queryResult.queryResult()); } // in any case we count down this result since we don"t talk to this shard anymore counter.countDown(); } else { SearchShardTarget searchShardTarget = queryResult.getSearchShardTarget(); Transport.Connection connection = context.getConnection(searchShardTarget.getClusterAlias(), searchShardTarget.getNodeId()); ShardFetchSearchRequest fetchSearchRequest = createFetchRequest(queryResult.queryResult().getRequestId(), i, entry, lastEmittedDocPerShard, searchShardTarget.getOriginalIndices()); executeFetch(i, searchShardTarget, counter, fetchSearchRequest, queryResult.queryResult(), connection); // 去fetch文档内容 } } } }
最后收集结果返回:
// org.elasticsearch.action.search.AbstractSearchAsyncAction protected final SearchResponse buildSearchResponse(InternalSearchResponse internalSearchResponse, String scrollId) { ShardSearchFailure[] failures = buildShardFailures(); Boolean allowPartialResults = request.allowPartialSearchResults(); assert allowPartialResults != null : "SearchRequest missing setting for allowPartialSearchResults"; if (allowPartialResults == false && failures.length > 0){ raisePhaseFailure(new SearchPhaseExecutionException("", "Shard failures", null, failures)); } return new SearchResponse(internalSearchResponse, scrollId, getNumShards(), successfulOps.get(), skippedOps.get(), buildTookInMillis(), failures, clusters); }
3.elasticsearch中的回调
es中大量使用listener回调,对于习惯了顺序逻辑的同学可能会不太适应,这里举例说明
可以看到doExecute方法定义了一个很长的rewriteListener,然后在Rewriteable中进行回调。
注意到doExecute 方法参数里面也有一个listener,调用 executeLocalSearch 后也会进行回调。有些回调可能有多层,需要层层往上递归。
// org.elasticsearch.action.search.TransportSearchAction protected void doExecute(Task task, SearchRequest searchRequest, ActionListener listener) { final long relativeStartNanos = System.nanoTime(); final SearchTimeProvider timeProvider = new SearchTimeProvider(searchRequest.getOrCreateAbsoluteStartMillis(), relativeStartNanos, System::nanoTime); ActionListener rewriteListener = ActionListener.wrap(source -> { // 1.先定义listener if (source != searchRequest.source()) { // only set it if it changed - we don"t allow null values to be set but it might be already null. this way we catch // situations when source is rewritten to null due to a bug searchRequest.source(source); } final ClusterState clusterState = clusterService.state(); final Map remoteClusterIndices = remoteClusterService.groupIndices(searchRequest.indicesOptions(), searchRequest.indices(), idx -> indexNameExpressionResolver.hasIndexOrAlias(idx, clusterState)); OriginalIndices localIndices = remoteClusterIndices.remove(RemoteClusterAware.LOCAL_CLUSTER_GROUP_KEY); if (remoteClusterIndices.isEmpty()) { executeLocalSearch(task, timeProvider, searchRequest, localIndices, clusterState, listener); // 查询当前节点 } else { if (shouldMinimizeRoundtrips(searchRequest)) { // 使用了折叠 ccsRemoteReduce(searchRequest, localIndices, remoteClusterIndices, timeProvider, searchService::createReduceContext, remoteClusterService, threadPool, listener, (r, l) -> executeLocalSearch(task, timeProvider, r, localIndices, clusterState, l)); } else { AtomicInteger skippedClusters = new AtomicInteger(0); collectSearchShards(searchRequest.indicesOptions(), searchRequest.preference(), searchRequest.routing(), skippedClusters, remoteClusterIndices, remoteClusterService, threadPool, ActionListener.wrap( searchShardsResponses -> { List remoteShardIterators = new ArrayList<>(); Map remoteAliasFilters = new HashMap<>(); BiFunction clusterNodeLookup = processRemoteShards( searchShardsResponses, remoteClusterIndices, remoteShardIterators, remoteAliasFilters); int localClusters = localIndices == null ? 0 : 1; int totalClusters = remoteClusterIndices.size() + localClusters; int successfulClusters = searchShardsResponses.size() + localClusters; executeSearch((SearchTask) task, timeProvider, searchRequest, localIndices, remoteShardIterators, clusterNodeLookup, clusterState, remoteAliasFilters, listener, new SearchResponse.Clusters(totalClusters, successfulClusters, skippedClusters.get())); }, listener::onFailure)); } } }, listener::onFailure); if (searchRequest.source() == null) { rewriteListener.onResponse(searchRequest.source()); } else { Rewriteable.rewriteAndFetch(searchRequest.source(), searchService.getRewriteContext(timeProvider::getAbsoluteStartMillis), rewriteListener); // 2. rewriteAndFetch } }
// org.elasticsearch.index.query.Rewriteable static > void rewriteAndFetch(T original, QueryRewriteContext context, ActionListener rewriteResponse, int iteration) { T builder = original; try { for (T rewrittenBuilder = builder.rewrite(context); rewrittenBuilder != builder; rewrittenBuilder = builder.rewrite(context)) { builder = rewrittenBuilder; if (iteration++ >= MAX_REWRITE_ROUNDS) { // this is some protection against user provided queries if they don"t obey the contract of rewrite we allow 16 rounds // and then we fail to prevent infinite loops throw new IllegalStateException("too many rewrite rounds, rewriteable might return new objects even if they are not " + "rewritten"); } if (context.hasAsyncActions()) { T finalBuilder = builder; final int currentIterationNumber = iteration; context.executeAsyncActions(ActionListener.wrap(n -> rewriteAndFetch(finalBuilder, context, rewriteResponse, currentIterationNumber), rewriteResponse::onFailure)); return; } } rewriteResponse.onResponse(builder); // 3. 回调 rewriteListener } catch (IOException|IllegalArgumentException|ParsingException ex) { rewriteResponse.onFailure(ex); } }
4.总结
本文简单描述了es是如何进行文档查询的,es会先去各个分片上获取符合查询条件的文档id等信息,然后再fetch文档内容。本文没有涉及dfs,后面博客会继续探讨这些课题。
-
环球热文:消息服务 + Serverless 函数计算如何助力企业降本提效?
作者|柳下背景介绍消息队列服务(下文均以MessageService命名)作为云计算PaaS领域的基础设施之一,其高...
来源: 简讯:(五)elasticsearch 源码之查询流程分析
环球热文:消息服务 + Serverless 函数计算如何助力企业降本提效?
世界滚动:el-table更新数据页面闪烁问题
全球今日报丨DJI这三个字母 是怎么占领你的背包的
环球热头条丨告别毛巾“一条恒久远”!金号纯棉抑菌毛巾大促:一条5块钱
今日快讯:哪吒汽车联手宁德时代共研“滑板底盘”:电池、底盘合体
世界信息:苹果加大降低中国工厂依赖程度:都要搬走?印度成香饽饽 出口激增
独占4K AMR 120帧高规格!《流浪地球2》发布CINITY海报
世界热点!指针知识点总结
每日热讯!TiDB 底层存储结构 LSM 树原理介绍
linux基础:2、前期必备知识、系统运行命令、快捷方式命令、目录结构相关命令、文件与文件夹相关命令、目录结构
环球今日报丨C# 循环给多个连续编号的控件赋值
网上银行怎么转账?网上银行转账限额是多少?
诺基亚5800xm当年多少钱?诺基亚5800XM手机参数
投影仪吊架怎么安装?吊式投影仪安装方法
华为gt2怎么设置相册表盘?华为gt2有血氧功能吗?
唐门鸟翔碧空在哪里学?唐门鸟翔碧空可以放什么技能?
雷龙鱼水温多少合适?雷龙吃什么饲料?
最新消息:三星Galaxy S23系列定档:2月2日登场 首发新版骁龙8 Gen2
《满江红》公布秦桧版预告:饰演者雷佳音狠辣狡诈
【全球新要闻】特斯拉大降价 其它车企跟不跟?乘联会秘书长发声
焦点速读:万物有灵 被收养流浪狗跳车拦住怀孕主人 下一秒山路塌方
每日聚焦:1208元!中国探月航天推出限量火箭碎片:运送嫦娥四号的长三乙
加减乘除是谁发明的?加减乘除混合运算100道
米亲韩语是什么意思?韩语shake it是什么意思?
全高清和超高清有什么区别?全高清和超高清4K哪个更护眼?
异丙醇的作用与用途有哪些?异丙醇和酒精的区别是什么?
Serverless 奇点已来,下一个十年将驶向何方?
每日热点:没电、没网也能支付 数字人民币全新功能上线:安卓先行
环球微资讯!用上比亚迪发动机 斯威大虎ED-i增程版亮相:油耗低至2.06升
关注:公司就给员工加薪50元致歉 已尽力对不起大家引热议:为何不知足?
Win7彻底停服 国产OS统信站出来了:打印机、软件轻松迁移
读编程与类型系统笔记04_类型安全
微动态丨【QtJson】用Qt自带的QJson,直接一步到位封装和解析一个类的实例对象!
还买iPhone 14/15?新iPhone准备中:苹果弃灵动岛 更完美
人民日报评电视收费乱象:从用户身上“薅羊毛” 广告还见缝插针
微头条丨今年首场寒潮来袭 最强雨雪下在哪?这些地区将迎暴雨、暴雪
天天滚动:2022年 我把比亚迪DM-i插混当纯电动车 开了1万公里
《王者荣耀》星会员正式上线:全新充值体系 打破贵10等级限制
当前速讯:神舟新款游戏本上架:13代i5+满血RTX 3050 4999元
最强高端!小米13 Ultra有望下月发:终于告别USB 3.0
动态:家长炮轰《中国奇谭》烂 吓哭孩子引热议:专家回应 给成年人看
环球观焦点:贾跃亭又摊上事了:法拉第未来接到纳斯达克摘牌警告
全球报道:JavaScript 扁平与树形数组数据的转换
热点在线丨层叠样式表(CSS)1
当前最新:ceph-3
世界速读:区块链特辑——solidity语言基础(四)
热文:华硕全球首秀四频段Wi-Fi 7路由器:峰值下载2.5万兆
滚动:惠普新款EliteBook 1040笔记本发布:13代酷睿、2K 120Hz屏
天天视点!为什么一个病毒株传着传着就没了?
Redmi K40S 12+256G顶配版不到1900元:骁龙870和OIS都有
各大新能源车企年度KPI出炉 特斯拉未达标 比亚迪称王
焦点速读:区块链特辑——solidity语言基础(二)
每日时讯!酷安最火骁龙8系手机诞生!一加11酷安热度第一
时隔两年 国美真快乐App重新更名国美
快看点丨大前端html学习06-宽高自适应
天天观察:SpringDataJPA 程序未配置乐观锁的情况下,报了乐观锁异常
Redis的客户端
每日时讯!关于19c RU补丁报错问题的分析处理
环球简讯:记录使用adb连接rn项目进行开发
焦点要闻:加油被惊喜到!一加11不杀后台:前一天打开的APP第二天还在
最新快讯!Python中高阶函数与装饰器教程
【新要闻】Python中的异常处理总结
flutter 效果实现 —— 全面屏效果
Mysql中的锁:表、MDL、意向锁、行锁
温子仁恐怖片新作《梅根》 拯救了北美院线一月票房
天天热点评!被假货逼疯的劳力士:终于坐不住了
全球快资讯:要的就是销量!特斯拉在新加坡优惠近7万
全球新资讯:豆瓣9.5高分动画!有家长炮轰《中国奇谭》画风吓哭孩子 网友不乐意了
全球今日报丨【首页】热销爆品开发修改商品值
加了国六B汽油 排气管喷水?网友犀利吐槽:我加了拉肚子
热讯:峰米S5 Rolling投影仪发布:360度可旋转支架 还能当音箱用
世界热推荐:米哈游创始人之一参与打造:国产独立游戏《微光之镜》今日发售
即时看!奥迪RS e-tron GT很好 但它仍是大众体系里最拧巴的产品
当前讯息:菜鸟将投2亿:补贴快递员爬楼送货上门
天天快看:iOS 16卡壳了
世界热推荐:OLED+彩色水墨翻转双屏!联想公布ThinkBook Plus Twist笔记本
天天微速讯:雷军爆料了!新机皇小米13 Ultra或将于MWC发布:影像堆料突破天际
焦点速看:因易增加儿童哮喘几率:美国或将禁止使用燃气灶
【环球播资讯】曾狠坑过乐视!中电熊猫被拉横幅维权:“还我血汗钱”
快消息!事件处理_2事件修饰符
flutter 基础 —— 事件监听
环球热门:JavaScript 将base64 转换为File
当前信息:129元 荣耀智能体脂秤3开售:Wi-Fi、蓝牙双连接
买699元手机送99元耳机!Redmi 12C价格跟米粉交个朋友
今晚20点抢京东大额红包 天猫年货节红包最后一天
每日动态!没了量子力学 你连手机都玩不了
焦点热讯:9.78万起 飞度堂弟新款东风本田来福酱上市:动力被砍、油耗不变
热门看点:Bonitasoft认证绕过和RCE漏洞分析及复现(CVE-2022-25237)
【世界快播报】操作系统 — 精髓与设计原理(第二章 操作系统概述)
速看:2023年手机还能怎样进化?三点方向
全球热议:干不过BBA!广汽讴歌退出中国市场:车型少 价格高
俄罗斯影院播放盗版《阿凡达2》:还是合法的!西方干瞪眼
【环球新视野】巴黎圣母院确认2024年重开!《刺客信条》花费2年还原
今日关注:接盘国服暴雪玩家!网易《无尽战区》明天开服 配置要求公布
每日热门:部分聚类算法简介及优缺点分析
时讯:记录--JS-SDK页面打开提示realAuthUrl错误
最新:5. 使用互斥量保护共享数据
环球新消息丨【Python爬虫实战项目】Python爬虫批量下载相亲网站数据并保存本地(附源码)
天天快看:什么是堆叠面积图?