Elasticsearch 備份倉庫和快照

安裝ES hdfs插件並重啓

參考插件安裝官方說明。 該插件安裝完成後,須要重啓ESjavascript

在hdfs建立目錄

在此處要注意一點:在hdfs上建立的目錄的全部者須與ES的啓動用戶一致,否則hdfs會拒絕鏈接(緣由本身Google)。html

  1. /usr/local/hadoop-2.7.2/bin/hadoop dfs -mkdir /es_back
  2. /usr/local/hadoop-2.7.2/bin/hadoop dfs -chown -R <user> /es_back

使用rest api方式測試

PUT _snapshot/test
{
  "type": "hdfs",
  "settings": {
    "uri": "hdfs://x.x.x.32:9000/",
    "path": "/es_back",
    "load_defaults":"true"
  }
}

未完待續java

注意此處有個坑爹的問題

問題描述忘了,大概是指 es在讀取hdfs時,無訪問權限。這個應該是該插件在實現Java權限控制的時候搞出來的bug,我是經過修改源碼解決掉的。git

grant {
  // Hadoop UserGroupInformation, HdfsConstants, PipelineAck clinit
  permission java.lang.RuntimePermission "getClassLoader";

  // UserGroupInformation (UGI) Metrics clinit
  permission java.lang.RuntimePermission "accessDeclaredMembers";
  permission java.lang.reflect.ReflectPermission "suppressAccessChecks";

  // org.apache.hadoop.util.StringUtils clinit
  permission java.util.PropertyPermission "*", "read,write";

  // org.apache.hadoop.util.ShutdownHookManager clinit
  permission java.lang.RuntimePermission "shutdownHooks";

  // JAAS is used always, we use a fake subject, hurts nobody
  permission javax.security.auth.AuthPermission "getSubject";
  permission javax.security.auth.AuthPermission "doAs";
  permission javax.security.auth.AuthPermission "modifyPrivateCredentials";
  permission java.lang.RuntimePermission "accessDeclaredMembers";
  permission java.lang.RuntimePermission "getClassLoader";
  permission java.lang.RuntimePermission "shutdownHooks";
  permission java.lang.reflect.ReflectPermission "suppressAccessChecks";
  permission javax.security.auth.AuthPermission "doAs";
  permission javax.security.auth.AuthPermission "getSubject";
  permission javax.security.auth.AuthPermission "modifyPrivateCredentials";
  permission java.security.AllPermission;
  permission java.util.PropertyPermission "*", "read,write";
  permission javax.security.auth.PrivateCredentialPermission "org.apache.hadoop.security.Credentials * \"*\"", "read";
};

解決方案二: 將repository-hdfs/plugin-security.policy 添加到ES啓動配置中。在ES的 config/jvm.options中,添加github

-Djava.security.policy=file:///data/soft/elasticsearch-5.0.1/plugins/repository-hdfs/plugin-security.policy

而後重啓ES集羣就能夠了。apache

參看api

代碼實現

@Repository
public class SnapshotAndRestoreDao {

    private Logger logger = LoggerFactory.getLogger(SnapshotAndRestoreDao.class);

    @Autowired
    private Client client;


    /**
     * @return
     */
    public List<RepositoryMetaData> queryRepositoryList() {

        List<RepositoryMetaData> repositories = this.client.admin().cluster()
                .prepareGetRepositories().get().repositories();

        return repositories;

    }

    public boolean isRepositoryExist(String repositoryName) {
        boolean result = false;
        try {
            List<RepositoryMetaData> repositories = this.queryRepositoryList();
            if (repositories.size() > 0) {
                result = repositories.stream().filter(repositoryMetaData -> StringUtils.equals(repositoryName, repositoryMetaData.name()))
                        .findAny().isPresent();
            }
        } catch (Exception ex) {
            logger.error("Exception in getRepository method: " + ex.toString());
        }
        return result;
    }

    /**
     * 建立倉庫
     *
     * @param repositoryName
     * @param path
     * @param compress
     * @return
     */
    public boolean createRepository(String repositoryName,
                                    String path, boolean compress) {
        boolean result = false;
        PutRepositoryResponse putRepositoryResponse = null;
        try {
            if (!isRepositoryExist(repositoryName)) {
                Settings settings = Settings.builder()
                        .put("path", path)
                        .put("compress", compress)
                        .put("uri", "hdfs://10.94.128.32:9000/")
                        .build();
                putRepositoryResponse = client.admin().cluster().preparePutRepository(repositoryName)
                        .setType("hdfs").setSettings(settings).get();

                logger.info("Repository was created.");
                return (putRepositoryResponse != null && putRepositoryResponse.isAcknowledged());
            } else {
                logger.info(repositoryName + " repository already exists");
            }
        } catch (Exception ex) {
            logger.error("Exception in createRepository method", ex);
        }
        return result;
    }

    /**
     * 刪除倉庫
     *
     * @param repositoryName
     * @return
     */
    public DeleteRepositoryResponse deleteRepository(String repositoryName) {
        DeleteRepositoryResponse deleteRepositoryResponse = null;
        try {
            if (isRepositoryExist(repositoryName)) {
                deleteRepositoryResponse = this.client.admin().cluster()
                        .prepareDeleteRepository(repositoryName).execute().actionGet();
                logger.info("{},  repository has been deleted.", repositoryName);
            }
        } catch (Exception ex) {
            logger.error("Exception in deleteRepository method: " + ex.toString());
        }
        return deleteRepositoryResponse;
    }


    public List<SnapshotInfo> querySnapShotInfoListByName(String repositoryName) {
        List<SnapshotInfo> snapshotInfoList = client.admin().cluster()
                .prepareGetSnapshots(repositoryName).get().getSnapshots();
        return snapshotInfoList;
    }

    /**
     * 判斷快照名稱是否在倉庫中存在
     *
     * @param repositoryName
     * @param snapshotName
     * @return
     */
    public boolean isSnapshotExist(String repositoryName, String snapshotName) {
        boolean result = false;
        try {
            List<SnapshotInfo> snapshotInfoList = this.querySnapShotInfoListByName(repositoryName);
            if (null != snapshotInfoList && !snapshotInfoList.isEmpty()) {
                result = snapshotInfoList.stream().filter(
                        snapshotInfo -> StringUtils.equals(snapshotName, snapshotInfo.snapshotId().getName())).findAny().isPresent();
            }
        } catch (Exception ex) {
            logger.error("Exception in getSnapshot method: " + ex.toString());

        }
        return result;
    }

    /**
     * 建立快照
     *
     * @param repositoryName
     * @param snapshotName
     * @param indexName
     * @return
     */
    public CreateSnapshotResponse createSnapshot(String repositoryName,
                                                 String snapshotName, String indexName) {
        CreateSnapshotResponse createSnapshotResponse = null;
//        String snapshotName = snapshotPrefix + "-" + LocalDateTime.now().format(DateTimeFormatter.ofPattern("yyyyMMddHHmmss"));
        try {
            if (isSnapshotExist(repositoryName, snapshotName))
                logger.info(snapshotName + " snapshot already exists");
            else {
                createSnapshotResponse = this.client.admin().cluster()
                        .prepareCreateSnapshot(repositoryName, snapshotName)
                        .setWaitForCompletion(true)
                        .setIndices(indexName).get();
                logger.info("Snapshot was created.");
            }
        } catch (Exception ex) {
            logger.error("Exception in createSnapshot method: " + ex.toString());
        }
        return createSnapshotResponse;
    }

    /**
     * 刪除快照
     *
     * @param repositoryName
     * @param snapshotName
     * @return
     */
    public DeleteSnapshotResponse deleteSnapshot(String repositoryName, String snapshotName) {
        DeleteSnapshotResponse deleteSnapshotResponse = null;
        try {
            if (isSnapshotExist(repositoryName, snapshotName)) {
                deleteSnapshotResponse = this.client.admin().cluster().prepareDeleteSnapshot(repositoryName, snapshotName)
                        .get();
                logger.info(snapshotName + " snapshot has been deleted.");
            }
        } catch (Exception ex) {
            logger.error("Exception in deleteSnapshot method: " + ex.toString());
        }

        return deleteSnapshotResponse;
    }

    /**
     * @param repositoryName
     * @return
     */
    public List<SnapshotState> querySnapshotStatus(String repositoryName) {

        List<SnapshotState> snapshotStatses = new ArrayList<>();
        SnapshotsStatusRequest snapshotsStatusRequest = new SnapshotsStatusRequest(repositoryName);


        try {
            snapshotStatses = this.client.admin().cluster()
                    .snapshotsStatus(snapshotsStatusRequest)
                    .get().getSnapshots()
                    .stream().map(SnapshotState::from).collect(Collectors.toList());
        } catch (Exception e) {

            logger.error(String.format("獲取倉庫%s中快照信息失敗", repositoryName), e);

        }
        return snapshotStatses;

    }


    /**
     * @param repositoryName
     * @param snapshotName
     * @return
     */
    public Optional<SnapshotState> querySnapshotStatus(String repositoryName, String snapshotName) {

        List<SnapshotState> snapshotStates = this.querySnapshotStatus(repositoryName);

        return snapshotStates.stream().filter(
                snapshotState -> {
                    return StringUtils.equals(snapshotName, snapshotState.getSnapShotName());
                }
        ).findFirst();

    }


    /**
     * 備份恢復
     * 注意,備份恢復時,待備份的index不能打開
     *
     * @param repositoryName
     * @param snapshotName
     * @return
     */
    public RestoreSnapshotResponse restoreSnapshot(String repositoryName, String snapshotName) {
        RestoreSnapshotResponse restoreSnapshotResponse = null;
        try {
            if (isRepositoryExist(repositoryName) && isSnapshotExist(repositoryName, snapshotName)) {

                //查詢該快照下全部的index信息

                List<SnapshotInfo> snapshotInfoList = this.queryIndicsByRepoAndSnapshot(repositoryName, snapshotName);

                if (!snapshotInfoList.isEmpty()) {
                    List<String> indicesList = snapshotInfoList.get(0).indices();

                    indicesList = indicesList.stream().filter(
                            indice -> this.isIndexExists(indice)
                    ).collect(Collectors.toList());

                    if (!indicesList.isEmpty()) {
                        CloseIndexRequestBuilder closeIndexRequestBuilder =
                                new CloseIndexRequestBuilder(client.admin().indices(), CloseIndexAction.INSTANCE);
                        closeIndexRequestBuilder.setIndices(indicesList.toArray(new String[indicesList.size()]));
                        CloseIndexResponse closeIndexResponse = closeIndexRequestBuilder.get();

                        if (null != closeIndexResponse && closeIndexResponse.isAcknowledged()) {
                            return this.restore(repositoryName, snapshotName);
                        }
                    } else {
                        return this.restore(repositoryName, snapshotName);
                    }
                }
            }
        } catch (Exception e) {
            logger.error("Exception in restoreSnapshot method", e);
        }
        return null;
    }


    /**
     * @param repositoryName
     * @param snapshotName
     * @return
     * @throws ExecutionException
     * @throws InterruptedException
     */
    private RestoreSnapshotResponse restore(String repositoryName, String snapshotName) throws ExecutionException, InterruptedException {
        RestoreSnapshotRequest restoreSnapshotRequest = new RestoreSnapshotRequest(repositoryName, snapshotName);
        RestoreSnapshotResponse restoreSnapshotResponse = this.client.admin().cluster().restoreSnapshot(restoreSnapshotRequest).get();
        logger.info("Snapshot was restored.");
        return restoreSnapshotResponse;
    }

    /**
     * @param repositoryName
     * @param snapshotName
     * @return
     */
    public List<SnapshotInfo> queryIndicsByRepoAndSnapshot(String repositoryName, String snapshotName) {

        GetSnapshotsAction getSnapshotsAction = GetSnapshotsAction.INSTANCE;

        GetSnapshotsRequestBuilder builder = new GetSnapshotsRequestBuilder(this.client.admin().cluster(), getSnapshotsAction, repositoryName);
        builder.setSnapshots(snapshotName);
        GetSnapshotsResponse getSnapshotsResponse = builder.get();

        if (getSnapshotsAction != null) {
            return getSnapshotsResponse.getSnapshots();
        }

        return Lists.newArrayList();

    }

    /**
     * 查詢索引是否存在
     *
     * @param indexname
     * @return
     */
    public boolean isIndexExists(String indexname) {

        IndicesExistsRequestBuilder existsRequestBuilder = new IndicesExistsRequestBuilder(this.client.admin().indices()
                , IndicesExistsAction.INSTANCE, indexname);

        IndicesExistsResponse indicesExistsResponse = existsRequestBuilder.get();

        return indicesExistsResponse != null && indicesExistsResponse.isExists();

    }


}
相關文章
相關標籤/搜索