Elasticsearch 通訊模塊的分析從宏觀上介紹了ES Transport模塊整體功能,因而就很好奇ElasticSearch是怎麼把服務啓動起來,以接收Client發送過來的Index索引操做、GET獲取文檔操做 等一系列操做的呢?本文分析:ElasticSearch6.3.2 Netty Http Server 服務的啓動過程。ES節點啓動,就是啓動各個服務,初始化各個服務代碼實現 在 org.elasticsearch.node.Node的構造方法中,從建立 org.elasticsearch.common.network.NetworkModule 對象開始,NetworkModule 就是ES中全部關於網絡通訊相關的功能的建立與註冊吧。html
final NetworkModule networkModule = new NetworkModule(settings, false, pluginsService.filterPlugins(NetworkPlugin.class), threadPool, bigArrays, pageCacheRecycler, circuitBreakerService, namedWriteableRegistry, xContentRegistry, networkService, restController);
在建立NetworkModule對象時,主要是建立2個用於通訊的Serverjava
A client can either be retrieved from a org.elasticsearch.node.Node started, or connected remotely to one or more nodes using org.elasticsearch.client.transport.TransportClient. Every node in the cluster can handle HTTP and Transport traffic by default. The transport layer is used exclusively for communication between nodes and the Java TransportClient; the HTTP layer is used only by external REST clients.node
Netty4HttpServerTransport 對象建立以下,Netty4TcpTransport 也是相似的邏輯。
org.elasticsearch.common.network.NetworkModule#NetworkModuleapi
Map<String, Supplier<HttpServerTransport>> httpTransportFactory = plugin.getHttpTransports(settings,threadPool,bigArrays,circuitBreakerService,namedWriteableRegistry, xContentRegistry, networkService, dispatcher); for (Map.Entry<String, Supplier<HttpServerTransport>> entry : httpTransportFactory.entrySet()) { registerHttpTransport(entry.getKey(), entry.getValue()); }
Netty4Plugin#getHttpTransports 建立 Netty Http Server:Netty4HttpServerTransport網絡
@Override public Map<String, Supplier<HttpServerTransport>> getHttpTransports(Settings settings, ThreadPool threadPool, BigArrays bigArrays,CircuitBreakerService,circuitBreakerService,NamedWriteableRegistry namedWriteableRegistry,NamedXContentRegistry xContentRegistry,NetworkService networkService,HttpServerTransport.Dispatcher dispatcher) { return Collections.singletonMap(NETTY_HTTP_TRANSPORT_NAME, () -> new Netty4HttpServerTransport(settings, networkService, bigArrays, threadPool, xContentRegistry, dispatcher)); }
將構造好的 Transport 對象封裝到 TransportServiceapp
//獲取構造好的 Netty4Transport final Transport transport = networkModule.getTransportSupplier().get(); //將 Netty4Transport 封裝到 TransportService final TransportService transportService = newTransportService(settings, transport, threadPool, networkModule.getTransportInterceptor(), localNodeFactory, settingsModule.getClusterSettings(), taskHeaders);
而後其餘須要使用通訊功能的模塊,只須要封裝 TransportService 對象便可。好比執行用戶SEARCH操做的搜索模塊 TransportSearchAction,它有一個實例屬性SearchTransportService,而SearchTransportService就封裝了 TransportService,這樣TransportSearchAction就能使用TcpTransport進行通訊了。以下代碼所示:
Node.java 構造方法:框架
//構造SearchTransportService對象時f須要TransportService,TransportService對象 是一個"公共鏈接對象",許多服務都會用到它 final SearchTransportService searchTransportService = new SearchTransportService(settings,transportService,SearchExecutionStatsCollector.makeWrapper(responseCollectorService));
這裏額外提一句:各類Action對象所依賴的Service,應該都是在Node.java的構造方法裏面建立的:好比TransportSearchAction依賴的SearchTransportService、ClusterService等都是在節點啓動時建立的。elasticsearch
當Netty4HttpServerTransport建立完畢後,就須要綁定端口,啓動服務。在org.elasticsearch.node.Node.start方法是ES節點中全部服務的啓動入口(固然也包括Netty Http Server了)
org.elasticsearch.node.Node#start方法tcp
if (NetworkModule.HTTP_ENABLED.get(settings)) { injector.getInstance(HttpServerTransport.class).start(); }
由於Netty4HttpServerTransport繼承了AbstractLifecycleComponent,所以它的啓動邏輯在org.elasticsearch.common.component.AbstractLifecycleComponent.start中實現,執行doStart()啓動Netty Http Server,並綁定端口到9200
Netty4HttpServerTransport#doStart()ide
protected void doStart() { boolean success = false; try { this.serverOpenChannels = new Netty4OpenChannelsHandler(logger);//---> es for test serverBootstrap = new ServerBootstrap();//workerCount=8, elasticsearch[debug_node][http_server_worker] //channel一旦分配給EventLoopGroup裏面的某個EventLoop線程後,該channel上的全部的事件都將由這個EventLoop線程處理 serverBootstrap.group(new NioEventLoopGroup(workerCount, daemonThreadFactory(settings, HTTP_SERVER_WORKER_THREAD_NAME_PREFIX))); serverBootstrap.channel(NioServerSocketChannel.class);//處理鏈接請求,每一個鏈接創建後建立一個'child channel'處理該鏈接的全部IO事件 //爲child channel 綁定一個handler, 即用該handler處理該 channel 上的io event serverBootstrap.childHandler(configureServerChannelHandler());//--->Netty4HttpRequestHandler //指定 child channel 一些配置參數 (父channel是處理鏈接請求的channel, child channel是已創建的鏈接的事件處理通道) serverBootstrap.childOption(ChannelOption.TCP_NODELAY, SETTING_HTTP_TCP_NO_DELAY.get(settings)); serverBootstrap.childOption(ChannelOption.SO_KEEPALIVE, SETTING_HTTP_TCP_KEEP_ALIVE.get(settings)); //---> TCP 發送緩衝區大小 final ByteSizeValue tcpSendBufferSize = SETTING_HTTP_TCP_SEND_BUFFER_SIZE.get(settings); if (tcpSendBufferSize.getBytes() > 0) { serverBootstrap.childOption(ChannelOption.SO_SNDBUF, Math.toIntExact(tcpSendBufferSize.getBytes())); } //---> TCP 接收緩衝區大小 final ByteSizeValue tcpReceiveBufferSize = SETTING_HTTP_TCP_RECEIVE_BUFFER_SIZE.get(settings); if (tcpReceiveBufferSize.getBytes() > 0) { serverBootstrap.childOption(ChannelOption.SO_RCVBUF, Math.toIntExact(tcpReceiveBufferSize.getBytes())); } serverBootstrap.option(ChannelOption.RCVBUF_ALLOCATOR, recvByteBufAllocator); serverBootstrap.childOption(ChannelOption.RCVBUF_ALLOCATOR, recvByteBufAllocator); final boolean reuseAddress = SETTING_HTTP_TCP_REUSE_ADDRESS.get(settings); serverBootstrap.option(ChannelOption.SO_REUSEADDR, reuseAddress); serverBootstrap.childOption(ChannelOption.SO_REUSEADDR, reuseAddress); this.boundAddress = createBoundHttpAddress();//--->ServerBootStrap綁定端口 if (logger.isInfoEnabled()) { logger.info("{}", boundAddress); } success = true; } finally { if (success == false) { doStop(); // otherwise we leak threads since we never moved to started } } }
Netty Http Server的worker線程數量是:節點所在的機器上的可用CPU核數:(Runtime.getRuntime().availableProcessors()
*2)
其餘的一些默認配置以下:
TCP_NODELAY=true, SO_KEEPALIVE=true
ServerBootstrap(ServerBootstrapConfig(group: NioEventLoopGroup, channelFactory: NioServerSocketChannel.class, options: {RCVBUF_ALLOCATOR=io.netty.channel.FixedRecvByteBufAllocator@72ce8a9b, SO_REUSEADDR=true}, childGroup: NioEventLoopGroup, childOptions: {TCP_NODELAY=true, SO_KEEPALIVE=true, RCVBUF_ALLOCATOR=io.netty.channel.FixedRecvByteBufAllocator@72ce8a9b, SO_REUSEADDR=true}, childHandler: org.elasticsearch.http.netty4.Netty4HttpServerTransport$HttpChannelHandler@56ec6ac0))
因爲ES Server(實在找不到其餘更好的名字來描述了...)是基於 Netty的,那確定有個ChannelHandler負責處理髮生在SocketChannel上的事件。而這個ChannelHandler就是:org.elasticsearch.http.netty4.Netty4HttpRequestHandler
org.elasticsearch.http.netty4.Netty4HttpServerTransport.HttpChannelHandler#initChannel 方法中註冊了Netty4HttpRequestHandler,所以用戶請求就交給Netty4HttpRequestHandler來處理了。
ch.pipeline().addLast("handler", requestHandler);//Netty4HttpRequestHandler 業務邏輯處理
那根據Netty框架,毫無疑問 接收用戶請求的起始處理點在 org.elasticsearch.http.netty4.Netty4HttpRequestHandler#channelRead0 方法裏面了。
所以,若是想debug一下INDEX操做、GET操做、DELETE操做的入口,在入口點: org.elasticsearch.http.netty4.Netty4HttpRequestHandler#channelRead0 打上debug斷點,在返回處:org.elasticsearch.http.netty4.Netty4HttpChannel#sendResponse 打上debug斷點,根據IDEA的 dubuger frames 棧追蹤 查看各個操做的執行路徑。
既然全部的用戶操做都是統一的入口,那麼又是如何解析這些操做,並最終傳遞給合適的 TransportXXXAction 來處理的呢?其大概步驟以下:
action.accept(channel)
語句觸發執行。return transportAction(action).execute(request, listener)
this.action.doExecute(task, request, listener);
調用每一個實現類TransportXXXAction#doExecute()執行對應的操做!
shardBulkAction.execute(bulkShardRequest, new ActionListener<BulkShardResponse>(){...});
將刪除操做提交給"分片處理Action"---TransportShardBulkAction執行。this.action.doExecute(task, request, listener);
,這時就是調用:TransportShardBulkAction的doExecute方法了。而TransportShardBulkAction的doExecute()方法是繼承自TransportReplicationAction,能夠看到在這裏面執行的是ReroutePhase任務,這也很好理解,由於刪除一篇文檔,須要知道這篇文檔在哪一個分片上,須要把刪除請求發送到這個分片上去,這也是爲何須要ReroutePhase的緣由吧:protected void doExecute(Task task, Request request, ActionListener<Response> listener) { new ReroutePhase((ReplicationTask) task, request, listener).run(); }
6,跟蹤到ReroutePhase的doRun()方法裏面看:刪除操做在本機節點上執行performLocalAction,刪除操做在其餘遠程節點上執行:performRemoteAction。這裏,又經過TransportService#sendRequest 方法把請求發送出去了。。。煩,那我就繼續跟蹤,看看你翻跟斗到哪裏去了……
if (primary.currentNodeId().equals(state.nodes().getLocalNodeId())) { performLocalAction(state, primary, node, indexMetaData); } else { performRemoteAction(state, primary, node); }
8,AsyncPrimaryAction#doRun成功獲取到鎖(PrimaryShardReference)後,回調:AsyncPrimaryAction#onResponse,在createReplicatedOperation(...).execute()
觸發底層Lucene刪除邏輯。
刪除的時候,有相應的刪除策略,具體實如今:org.elasticsearch.index.engine.InternalEngine#planDeletionAsPrimary
if (versionValue == null) { currentVersion = Versions.NOT_FOUND; currentlyDeleted = true; } else { currentVersion = versionValue.version; currentlyDeleted = versionValue.isDelete(); } final DeletionStrategy plan; if (delete.versionType().isVersionConflictForWrites(currentVersion, delete.version(), currentlyDeleted)) { final VersionConflictEngineException e = new VersionConflictEngineException(shardId, delete, currentVersion, currentlyDeleted); plan = DeletionStrategy.skipDueToVersionConflict(e, currentVersion, currentlyDeleted); } else { plan = DeletionStrategy.processNormally( currentlyDeleted, generateSeqNoForOperation(delete), delete.versionType().updateVersion(currentVersion, delete.version())); } return plan;
刪除doc的時候,還要判斷docid在不在,具體實如今:org.elasticsearch.index.engine.InternalEngine#loadCurrentVersionFromIndex
private long loadCurrentVersionFromIndex(Term uid) throws IOException { assert incrementIndexVersionLookup(); try (Searcher searcher = acquireSearcher("load_version", SearcherScope.INTERNAL)) { return VersionsAndSeqNoResolver.loadVersion(searcher.reader(), uid); } }
另外在看源碼的時候發現,** delete-by-doc-id 是不會觸發 段合併的 **。因此,delete by id 這種方式的刪除是很快的且對集羣負載影響很小:
// NOTE: we don't throttle this when merges fall behind because delete-by-id does not create new segments:
最終在:org.elasticsearch.index.engine.InternalEngine#delete 方法裏面進行Lucene層面上的文檔刪除:
if (delete.origin() == Operation.Origin.PRIMARY) { plan = planDeletionAsPrimary(delete); } else { plan = planDeletionAsNonPrimary(delete); } if (plan.earlyResultOnPreflightError.isPresent()) { deleteResult = plan.earlyResultOnPreflightError.get(); } else if (plan.deleteFromLucene) { deleteResult = deleteInLucene(delete, plan); } else { deleteResult = new DeleteResult( plan.versionOfDeletion, plan.seqNoOfDeletion, plan.currentlyDeleted == false); }
具體實如今:org.elasticsearch.index.engine.InternalEngine#deleteInLucene裏面,代碼就不貼了。以上,就是一個完整的 ES delete by doc id 的執行流程。感興趣的能夠再細究。
這篇文章最後,詳細介紹了DELET API的執行路徑,其餘操做也是相似的,按這個分析便可。 原文:https://www.cnblogs.com/hapjin/p/11018479.html