參考插件安裝官方說明。 該插件安裝完成後,須要重啓ESjavascript
在此處要注意一點:在hdfs上建立的目錄的全部者須與ES的啓動用戶一致,否則hdfs會拒絕鏈接(緣由本身Google)。html
PUT _snapshot/test { "type": "hdfs", "settings": { "uri": "hdfs://x.x.x.32:9000/", "path": "/es_back", "load_defaults":"true" } }
未完待續java
問題描述忘了,大概是指 es在讀取hdfs時,無訪問權限。這個應該是該插件在實現Java權限控制的時候搞出來的bug,我是經過修改源碼解決掉的。git
grant { // Hadoop UserGroupInformation, HdfsConstants, PipelineAck clinit permission java.lang.RuntimePermission "getClassLoader"; // UserGroupInformation (UGI) Metrics clinit permission java.lang.RuntimePermission "accessDeclaredMembers"; permission java.lang.reflect.ReflectPermission "suppressAccessChecks"; // org.apache.hadoop.util.StringUtils clinit permission java.util.PropertyPermission "*", "read,write"; // org.apache.hadoop.util.ShutdownHookManager clinit permission java.lang.RuntimePermission "shutdownHooks"; // JAAS is used always, we use a fake subject, hurts nobody permission javax.security.auth.AuthPermission "getSubject"; permission javax.security.auth.AuthPermission "doAs"; permission javax.security.auth.AuthPermission "modifyPrivateCredentials"; permission java.lang.RuntimePermission "accessDeclaredMembers"; permission java.lang.RuntimePermission "getClassLoader"; permission java.lang.RuntimePermission "shutdownHooks"; permission java.lang.reflect.ReflectPermission "suppressAccessChecks"; permission javax.security.auth.AuthPermission "doAs"; permission javax.security.auth.AuthPermission "getSubject"; permission javax.security.auth.AuthPermission "modifyPrivateCredentials"; permission java.security.AllPermission; permission java.util.PropertyPermission "*", "read,write"; permission javax.security.auth.PrivateCredentialPermission "org.apache.hadoop.security.Credentials * \"*\"", "read"; };
解決方案二: 將repository-hdfs/plugin-security.policy 添加到ES啓動配置中。在ES的 config/jvm.options中,添加github
-Djava.security.policy=file:///data/soft/elasticsearch-5.0.1/plugins/repository-hdfs/plugin-security.policy
而後重啓ES集羣就能夠了。apache
參看api
@Repository public class SnapshotAndRestoreDao { private Logger logger = LoggerFactory.getLogger(SnapshotAndRestoreDao.class); @Autowired private Client client; /** * @return */ public List<RepositoryMetaData> queryRepositoryList() { List<RepositoryMetaData> repositories = this.client.admin().cluster() .prepareGetRepositories().get().repositories(); return repositories; } public boolean isRepositoryExist(String repositoryName) { boolean result = false; try { List<RepositoryMetaData> repositories = this.queryRepositoryList(); if (repositories.size() > 0) { result = repositories.stream().filter(repositoryMetaData -> StringUtils.equals(repositoryName, repositoryMetaData.name())) .findAny().isPresent(); } } catch (Exception ex) { logger.error("Exception in getRepository method: " + ex.toString()); } return result; } /** * 建立倉庫 * * @param repositoryName * @param path * @param compress * @return */ public boolean createRepository(String repositoryName, String path, boolean compress) { boolean result = false; PutRepositoryResponse putRepositoryResponse = null; try { if (!isRepositoryExist(repositoryName)) { Settings settings = Settings.builder() .put("path", path) .put("compress", compress) .put("uri", "hdfs://10.94.128.32:9000/") .build(); putRepositoryResponse = client.admin().cluster().preparePutRepository(repositoryName) .setType("hdfs").setSettings(settings).get(); logger.info("Repository was created."); return (putRepositoryResponse != null && putRepositoryResponse.isAcknowledged()); } else { logger.info(repositoryName + " repository already exists"); } } catch (Exception ex) { logger.error("Exception in createRepository method", ex); } return result; } /** * 刪除倉庫 * * @param repositoryName * @return */ public DeleteRepositoryResponse deleteRepository(String repositoryName) { DeleteRepositoryResponse deleteRepositoryResponse = null; try { if (isRepositoryExist(repositoryName)) { deleteRepositoryResponse = this.client.admin().cluster() .prepareDeleteRepository(repositoryName).execute().actionGet(); logger.info("{}, repository has been deleted.", repositoryName); } } catch (Exception ex) { logger.error("Exception in deleteRepository method: " + ex.toString()); } return deleteRepositoryResponse; } public List<SnapshotInfo> querySnapShotInfoListByName(String repositoryName) { List<SnapshotInfo> snapshotInfoList = client.admin().cluster() .prepareGetSnapshots(repositoryName).get().getSnapshots(); return snapshotInfoList; } /** * 判斷快照名稱是否在倉庫中存在 * * @param repositoryName * @param snapshotName * @return */ public boolean isSnapshotExist(String repositoryName, String snapshotName) { boolean result = false; try { List<SnapshotInfo> snapshotInfoList = this.querySnapShotInfoListByName(repositoryName); if (null != snapshotInfoList && !snapshotInfoList.isEmpty()) { result = snapshotInfoList.stream().filter( snapshotInfo -> StringUtils.equals(snapshotName, snapshotInfo.snapshotId().getName())).findAny().isPresent(); } } catch (Exception ex) { logger.error("Exception in getSnapshot method: " + ex.toString()); } return result; } /** * 建立快照 * * @param repositoryName * @param snapshotName * @param indexName * @return */ public CreateSnapshotResponse createSnapshot(String repositoryName, String snapshotName, String indexName) { CreateSnapshotResponse createSnapshotResponse = null; // String snapshotName = snapshotPrefix + "-" + LocalDateTime.now().format(DateTimeFormatter.ofPattern("yyyyMMddHHmmss")); try { if (isSnapshotExist(repositoryName, snapshotName)) logger.info(snapshotName + " snapshot already exists"); else { createSnapshotResponse = this.client.admin().cluster() .prepareCreateSnapshot(repositoryName, snapshotName) .setWaitForCompletion(true) .setIndices(indexName).get(); logger.info("Snapshot was created."); } } catch (Exception ex) { logger.error("Exception in createSnapshot method: " + ex.toString()); } return createSnapshotResponse; } /** * 刪除快照 * * @param repositoryName * @param snapshotName * @return */ public DeleteSnapshotResponse deleteSnapshot(String repositoryName, String snapshotName) { DeleteSnapshotResponse deleteSnapshotResponse = null; try { if (isSnapshotExist(repositoryName, snapshotName)) { deleteSnapshotResponse = this.client.admin().cluster().prepareDeleteSnapshot(repositoryName, snapshotName) .get(); logger.info(snapshotName + " snapshot has been deleted."); } } catch (Exception ex) { logger.error("Exception in deleteSnapshot method: " + ex.toString()); } return deleteSnapshotResponse; } /** * @param repositoryName * @return */ public List<SnapshotState> querySnapshotStatus(String repositoryName) { List<SnapshotState> snapshotStatses = new ArrayList<>(); SnapshotsStatusRequest snapshotsStatusRequest = new SnapshotsStatusRequest(repositoryName); try { snapshotStatses = this.client.admin().cluster() .snapshotsStatus(snapshotsStatusRequest) .get().getSnapshots() .stream().map(SnapshotState::from).collect(Collectors.toList()); } catch (Exception e) { logger.error(String.format("獲取倉庫%s中快照信息失敗", repositoryName), e); } return snapshotStatses; } /** * @param repositoryName * @param snapshotName * @return */ public Optional<SnapshotState> querySnapshotStatus(String repositoryName, String snapshotName) { List<SnapshotState> snapshotStates = this.querySnapshotStatus(repositoryName); return snapshotStates.stream().filter( snapshotState -> { return StringUtils.equals(snapshotName, snapshotState.getSnapShotName()); } ).findFirst(); } /** * 備份恢復 * 注意,備份恢復時,待備份的index不能打開 * * @param repositoryName * @param snapshotName * @return */ public RestoreSnapshotResponse restoreSnapshot(String repositoryName, String snapshotName) { RestoreSnapshotResponse restoreSnapshotResponse = null; try { if (isRepositoryExist(repositoryName) && isSnapshotExist(repositoryName, snapshotName)) { //查詢該快照下全部的index信息 List<SnapshotInfo> snapshotInfoList = this.queryIndicsByRepoAndSnapshot(repositoryName, snapshotName); if (!snapshotInfoList.isEmpty()) { List<String> indicesList = snapshotInfoList.get(0).indices(); indicesList = indicesList.stream().filter( indice -> this.isIndexExists(indice) ).collect(Collectors.toList()); if (!indicesList.isEmpty()) { CloseIndexRequestBuilder closeIndexRequestBuilder = new CloseIndexRequestBuilder(client.admin().indices(), CloseIndexAction.INSTANCE); closeIndexRequestBuilder.setIndices(indicesList.toArray(new String[indicesList.size()])); CloseIndexResponse closeIndexResponse = closeIndexRequestBuilder.get(); if (null != closeIndexResponse && closeIndexResponse.isAcknowledged()) { return this.restore(repositoryName, snapshotName); } } else { return this.restore(repositoryName, snapshotName); } } } } catch (Exception e) { logger.error("Exception in restoreSnapshot method", e); } return null; } /** * @param repositoryName * @param snapshotName * @return * @throws ExecutionException * @throws InterruptedException */ private RestoreSnapshotResponse restore(String repositoryName, String snapshotName) throws ExecutionException, InterruptedException { RestoreSnapshotRequest restoreSnapshotRequest = new RestoreSnapshotRequest(repositoryName, snapshotName); RestoreSnapshotResponse restoreSnapshotResponse = this.client.admin().cluster().restoreSnapshot(restoreSnapshotRequest).get(); logger.info("Snapshot was restored."); return restoreSnapshotResponse; } /** * @param repositoryName * @param snapshotName * @return */ public List<SnapshotInfo> queryIndicsByRepoAndSnapshot(String repositoryName, String snapshotName) { GetSnapshotsAction getSnapshotsAction = GetSnapshotsAction.INSTANCE; GetSnapshotsRequestBuilder builder = new GetSnapshotsRequestBuilder(this.client.admin().cluster(), getSnapshotsAction, repositoryName); builder.setSnapshots(snapshotName); GetSnapshotsResponse getSnapshotsResponse = builder.get(); if (getSnapshotsAction != null) { return getSnapshotsResponse.getSnapshots(); } return Lists.newArrayList(); } /** * 查詢索引是否存在 * * @param indexname * @return */ public boolean isIndexExists(String indexname) { IndicesExistsRequestBuilder existsRequestBuilder = new IndicesExistsRequestBuilder(this.client.admin().indices() , IndicesExistsAction.INSTANCE, indexname); IndicesExistsResponse indicesExistsResponse = existsRequestBuilder.get(); return indicesExistsResponse != null && indicesExistsResponse.isExists(); } }