elasticsearch index 之 create index(-)

從本篇開始,就進入了Index的核心代碼部分。這裏首先分析一下索引的建立過程。elasticsearch中的索引是多個分片的集合,它只是邏輯上的索引,並不具有實際的索引功能,全部對數據的操做最終仍是由每一個分片完成。建立索引的過程,從elasticsearch集羣上來講就是寫入索引元數據的過程,這一操做只能在master節點上完成。這是一個阻塞式動做,在加上分配在集羣上均衡的過程也很是耗時,所以在一次建立大量索引的過程master節點會出現單點性能瓶頸,可以看到響應過程很慢。html

在開始具體源碼分析以前,首先回顧一下Action部分的內容(參考index action分析),elasticsearch的每個功能都對應兩個Action,*action和Transport*action。*action中定義了每一個功能對應的路徑,同時Action的instance綁定對應的Transport*Action。全部功能請求都須要在集羣上轉發,這大概也是每一個功能都有Transport*Action的緣由吧。對於create固然也不例外,它的開始點也是TransportCreateAction。另外,在action support分析中分析過,不一樣的action須要通過和須要操做的節點也不一樣。create index只能由master節點進行,並且也只在master節點上進行,保證集羣數據的一致性。所以TransportCreateAction繼承了TransportMasterNodeOperationAction,並實現了materOperation方法。它的方法以下所示:api

   protected void masterOperation(final CreateIndexRequest request, final ClusterState state, final ActionListener<CreateIndexResponse> listener) throws ElasticsearchException {
        String cause = request.cause();
        if (cause.length() == 0) {
            cause = "api";
        }

        final CreateIndexClusterStateUpdateRequest updateRequest = new CreateIndexClusterStateUpdateRequest(request, cause, request.index())
                .ackTimeout(request.timeout()).masterNodeTimeout(request.masterNodeTimeout())
                .settings(request.settings()).mappings(request.mappings())
                .aliases(request.aliases()).customs(request.customs());

        createIndexService.createIndex(updateRequest, new ActionListener<ClusterStateUpdateResponse>() {

            @Override
            public void onResponse(ClusterStateUpdateResponse response) {
                listener.onResponse(new CreateIndexResponse(response.isAcknowledged()));
            }

            @Override
            public void onFailure(Throwable t) {
                if (t instanceof IndexAlreadyExistsException) {
                    logger.trace("[{}] failed to create", t, request.index());
                } else {
                    logger.debug("[{}] failed to create", t, request.index());
                }
                listener.onFailure(t);
            }
        });
    }

這裏看上很簡單,只是調用了createIndexService(它實際上是MetaDataCreateIndexService)的方法,就是修改集羣matedata過程。修改前首先獲取到index名稱對應的lock,這樣保證操做數據一致性,而後生成updatetask,交給clusterservice處理。代碼以下所示:app

public void createIndex(final CreateIndexClusterStateUpdateRequest request, final ActionListener<ClusterStateUpdateResponse> listener) {

        // 獲取鎖,只對該索引的操做加鎖,而不是整個cluster
        final Semaphore mdLock = metaDataService.indexMetaDataLock(request.index());

        // 若是可以獲取鎖離開建立索引,不然在下面啓動新的線程進行
        if (mdLock.tryAcquire()) {
            createIndex(request, listener, mdLock);
            return;
        }
        threadPool.executor(ThreadPool.Names.MANAGEMENT).execute(new ActionRunnable(listener) {
            @Override
            public void doRun() throws InterruptedException {
                if (!mdLock.tryAcquire(request.masterNodeTimeout().nanos(), TimeUnit.NANOSECONDS)) {
                    listener.onFailure(new ProcessClusterEventTimeoutException(request.masterNodeTimeout(), "acquire index lock"));
                    return;
                }
                createIndex(request, listener, mdLock);
            }
        });
    }

createIndex方法,會封裝create請求,而後向cluster發送一個updatetask。代碼以下所示:elasticsearch

 private void createIndex(final CreateIndexClusterStateUpdateRequest request, final ActionListener<ClusterStateUpdateResponse> listener, final Semaphore mdLock) {

        ImmutableSettings.Builder updatedSettingsBuilder = ImmutableSettings.settingsBuilder();
        updatedSettingsBuilder.put(request.settings()).normalizePrefix(IndexMetaData.INDEX_SETTING_PREFIX);
        request.settings(updatedSettingsBuilder.build());

        clusterService.submitStateUpdateTask("create-index [" + request.index() + "], cause [" + request.cause() + "]", Priority.URGENT, new AckedClusterStateUpdateTask<ClusterStateUpdateResponse>(request, listener)

創建索引,修改配置,增長或者修改mapping都是對集羣狀態修改,它們的過程都很類似,都是經過clusterService提交一個更新操做,同時附帶有優先級。clusterservice會根據優先級和更新狀態task的類型來進行對應的操做。以下所示:ide

 public void submitStateUpdateTask(final String source, Priority priority, final ClusterStateUpdateTask updateTask) {
        if (!lifecycle.started()) {
            return;
        }
        try {
            final UpdateTask task = new UpdateTask(source, priority, updateTask);//根據優先級新建不一樣的task
            if (updateTask instanceof TimeoutClusterStateUpdateTask) {//超時任務,這類任務須要即時返回,所以馬上執行。
                final TimeoutClusterStateUpdateTask timeoutUpdateTask = (TimeoutClusterStateUpdateTask) updateTask;
                updateTasksExecutor.execute(task, threadPool.scheduler(), timeoutUpdateTask.timeout(), new Runnable() {
                    @Override
                    public void run() {
                        threadPool.generic().execute(new Runnable() {
                            @Override
                            public void run() {
                                timeoutUpdateTask.onFailure(task.source(), new ProcessClusterEventTimeoutException(timeoutUpdateTask.timeout(), task.source()));
                            }
                        });
                    }
                });
            } else {//其它類型,能夠延遲執行,則交給線程池來執行。
                updateTasksExecutor.execute(task);
            }
        } catch (EsRejectedExecutionException e) {
            // ignore cases where we are shutting down..., there is really nothing interesting
            // to be done here...
            if (!lifecycle.stoppedOrClosed()) {
                throw e;
            }
        }
    }

說完它們的執行過程,再來看一下create index的具體邏輯。這個邏輯在matedataservice所提交的AckedClusterStateUpdateTask中的execute方法中。整體來講,這一過程就是將request中關於索引的配置mapping等取出來加入到當前的clustermatedata中,構造一個新的matedata的過程。這一過程仍是比較複雜,限於篇幅將在下次中進行分析。源碼分析

總結:建立索引的過程就是master節點更新集羣matedata的過程,爲了保證數據一致性,須要獲取鎖。所以存在單點瓶頸。對於外部調用來講,跟其它功能同樣,外部接口調用CreateIndexAction的相關方法,而後經過TransPortCreateIndexAction講請求發送到集羣上,進行索引建立。post

相關文章
相關標籤/搜索