1 基本概念node
集羣: 一個集羣有一個或多個節點組織在一塊兒,並將數據組織在一塊兒,提供索引和搜索服務.
節點:一個節點是一個集羣中的服務器,提供存儲數據,提供搜索服務.
索引:文檔的邏輯的集合
分片:一個邏輯索引有若干分片,其中一個分片被設置爲主分片.分片爲索引的存儲位置. 會涉及到分佈式問題.
類型:文檔的類型
文檔:與lucene中的document相似.服務器
若是集羣狀態發生改變,發生改變狀態的節點,先通知其餘節點更新狀態,最後才更新本地節點
的狀態.所以在提交版本信息同步時,都會涉及到這個過程.app
2 提交請求初次建立索引curl
概述:elasticsearch
解析請求,根據請求建立index和mapping,並提交版本信息到集羣.而後根據配置建立0,1,2,3
的shard(分片)而後更新版本信息到集羣.最後建立shard(分片)4 而後更新版本信息到集羣.
最後根據請求更新已有的mapper信息,最後發佈到集羣中各個節點,而後更新版本信息.分佈式
curl -XPOST 'localhost:9200/twitter/tweet/1' -d '{"name":"parker","age":18}'
源碼閱讀:
HttpServer.internalDispatchRequest (HttpRequest,HttpChannel) ->
(RestController)restController.dispatchRequest(HttpRequest,HttpChannel) ->
RestController.executeHandler(HttpRequest,HttpChannel)->
獲取RestHandler->
handler.handleRequest(request, channel)->
BaseRestHandler.handleRequest(request, channel, new HeadersAndContextCopyClient(client, request, controller.relevantHeaders()))->
RestIndexAction.handleRequest(request, channel, new HeadersAndContextCopyClient(client, request, controller.relevantHeaders()))->
建立IndexRequest對象.
client.index(indexRequest,RestBuilderListener)->
AbstraceClient.index(final IndexRequest request, final ActionListener<IndexResponse> listener)->
execute(IndexAction.INSTANCE, request, listener)->
BaseRestHandler.execute(Action<Request, Response, RequestBuilder, Client> action, Request request, ActionListener<Response> listener)
處理前的準備工做copyHeaderSAndContext(request, restRequest, headers)->
<-copyHeaderSAndContext
FilterClient.execute(action, request, listener)->
(TransportAction)transportAction.execute(request, listener)->
TransportAction.doExecute(request, listener)->
TransportIndexAction.doExecute(final IndexRequest request, final ActionListener<IndexResponse> listener)->
判斷是否須要新建索引,若是須要則新建,不然直接創建索引.
若是須要建立索引,先構造建立索引的請求.而後建立索引
(TransportCreateIndexAction)createIndexAction.execute(createIndexRequest, new ActionListener<CreateIndexResponse>)->
TransportMasterNodeOperationAction.doExecute(final Request request, final ActionListener<Response> listener)->
TransportMasterNodeOperationAction.innerExecute(final Request request, final ActionListener<Response> listener, final ClusterStateObserver observer, final boolean retrying)->
先判斷是不是本地master節點的請求
若是經過驗證,將其交給線程池來處理.
TransportMasterNodeOperationAction.masterOperation(request, clusterService.state(), listener);->
TransportCreateIndexAction.masterOperation((final CreateIndexRequest request, final ClusterState state, final ActionListener<CreateIndexResponse> listener)->
建立請求
(CreateIndexClusterStateUpdateRequest)updateRequest
MetaDataCreateIndexService.createIndexService.createIndex(final CreateIndexClusterStateUpdateRequest request, final ActionListener<ClusterStateUpdateResponse> listener)->
若是能獲取鎖,直接建立.
不然放入線程池,交由後臺執行獲取鎖操做,何時獲取到何時建立.
MetaDataCreateIndexService.createIndex(request, listener, mdLock);->
建立並完善請求.
(InternalClusterService extends ClusterService)clusterService.submitStateUpdateTask(String, Priority.URGENT, new AckedClusterStateUpdateTask<ClusterStateUpdateResponse>(request, listener))->
首先生成UpdateTask,而後被提交的線程池中,而後被執行UpdateTask的run方法.
UpdateTask.run->
newClusterState = updateTask.execute(previousClusterState);->
MetaDataCreateIndexService.execute->
建立相關配置文件.
在indices中建立index 而且增長mapping
indicesService.createIndex(request.index(), actualIndexSettings, clusterService.localNode().id());->
建立相關索引,添加模塊.
<-indicesService.createIndex(request.index(), actualIndexSettings, clusterService.localNode().id());
初始化indexService
初始化mapperService
初始化索引解析服務indexQueryParserService
更新相關其餘相關信息
<-MetaDataCreateIndexService.execute
<-newClusterState = updateTask.execute(previousClusterState);ui
Discovery.AckListener ackListener = new NoOpAckListener();
若是是主節點,而後發佈狀態.
更新狀態.
for (ClusterStateListener listener : preAppliedListeners) {
try {
listener.clusterChanged(clusterChangedEvent);
} catch (Exception ex) {
logger.warn("failed to notify ClusterStateListener", ex);
}
}
RoutingService.clusterChanged(Event)->
<-RoutingService.clusterChanged(Event)
IndicesClusterStateService.clusterChanged(Event)->
執行一系列與event相關的操做.
cleanFailedShards(event);
applyDeletedIndices(event);
applyNewIndices(event);
applyMappings(event);
applyAliases(event);
applyNewOrUpdatedShards(event);
applyDeletedShards(event);
applyCleanedIndices(event);
applySettings(event);
<-IndicesClusterStateService.clusterChanged(Event)
NodeSettingsService.clusterChanged(Event)->
<-NodeSettingsService.clusterChanged(Event)
InternalClusterInfoService.clusterChanged(Event)->
<-InternalClusterInfoService.clusterChanged(Event)
RepositoriesService.clusterChanged(Event)->
<-RepositoriesService.clusterChanged(Event)
MetaDataUpdateSettingsService.clusterChanged(Event)->
<-MetaDataUpdateSettingsService.clusterChanged(Event)
RestoreService.clusterChanged(Event)->
<-RestoreService.clusterChanged(Event)
RiverRouter.clusterChanged(Event)->
<-RiverRouter.clusterChanged(Event)
org.elasticsearch.cluster.service.InternalClusterService$LocalNodeMasterListeners.clusterChanged(Event)->
<-org.elasticsearch.cluster.service.InternalClusterService$LocalNodeMasterListeners.clusterChanged(Event)
LocalGatewayAllocator.clusterChanged(Event)->
<-LocalGatewayAllocator.clusterChanged(Event)
IndexCache.clusterChanged(Event)->
<-IndexCache.clusterChanged(Event)
SnapshotsService.clusterChanged(Event)->
<-SnapshotsService.clusterChanged(Event)
IndicesStore.clusterChanged(Event)->
<-IndicesStore.clusterChanged(Event)
LocalGateway.clusterChanged(Event)->
<-LocalGateway.clusterChanged(Event)
GatewayService.clusterChanged(Event)->
<-GatewayService.clusterChanged(Event)
若是node有掉節點的狀況,卸下節點.
獲取發佈相關請求.
//manual ack only from the master at the end of the publish
<-UpdateTask.run
<-(InternalClusterService extends ClusterService)clusterService.submitStateUpdateTask
<-MetaDataCreateIndexService.createIndexService.createIndex
<-TransportCreateIndexAction.masterOperation
<-TransportMasterNodeOperationAction.masterOperation
<-TransportMasterNodeOperationAction.innerExecute
<-TransportMasterNodeOperationAction.doExecute
<-(TransportCreateIndexAction)createIndexAction.execute(createIndexRequest, new ActionListener<CreateIndexResponse>)
不然直接建立索引
TransportCreateIndexAction.innerExecute(request, listener);
<-TransportIndexAction.doExecute(final IndexRequest request, final ActionListener<IndexResponse> listener)
<-TransportAction.doExecute(request, listener);
<-(TransportAction)transportAction.execute
<-FilterClient.execute
<-BaseRequestHandler.execute
<-AbstraceClient.index
<-RestIndexAction.handleRequest
<-BaseRestHandler.handleRequest
<-RestHandler
<-RestController
<-HttpServerurl
後期會詳細研究這一過程spa