hive JDBC的鏈接方式,只能獲取到hive執行的最終結果。java
若是想要獲取執行過程當中的狀態,而且使用取消執行的功能。須要使用hiveServer2 thrift的方式。spring
執行狀態在TOperationState 類中,包括:
INITIALIZED_STATE(0),
RUNNING_STATE(1),
FINISHED_STATE(2),
CANCELED_STATE(3),
CLOSED_STATE(4),
ERROR_STATE(5),
UKNOWN_STATE(6),
PENDING_STATE(7);sql
public class QueryInstance { private static String host = "127.0.0.1"; private static int port = 10000; private static String username = "hive"; private static String passsword = "hive"; private static TTransport transport; private static TCLIService.Client client; private TOperationState tOperationState = null; private Map<String, Object> resultMap = new HashMap<String, Object>(); static { try { transport = QueryTool.getSocketInstance(host, port, username, passsword); client = new TCLIService.Client(new TBinaryProtocol(transport)); transport.open(); } catch (TTransportException e) { Log.info("hive collection error!"); } } public TOperationHandle submitQuery(String command) throws Exception { TOperationHandle tOperationHandle; TExecuteStatementResp resp = null; TSessionHandle sessHandle = QueryTool.openSession(client) .getSessionHandle(); TExecuteStatementReq execReq = new TExecuteStatementReq(sessHandle, command); // 異步運行 execReq.setRunAsync(true); // 執行sql resp = client.ExecuteStatement(execReq);// 執行語句 tOperationHandle = resp.getOperationHandle();// 獲取執行的handle if (tOperationHandle == null) { //語句執行異常時,會把異常信息放在resp.getStatus()中。 throw new Exception(resp.getStatus().getErrorMessage()); } return tOperationHandle; } public String getQueryLog(TOperationHandle tOperationHandle) throws Exception { String log = ""; return log; } public TOperationState getQueryHandleStatus( TOperationHandle tOperationHandle) throws Exception { if (tOperationHandle != null) { TGetOperationStatusReq statusReq = new TGetOperationStatusReq( tOperationHandle); TGetOperationStatusResp statusResp = client .GetOperationStatus(statusReq); tOperationState = statusResp.getOperationState(); } return tOperationState; } public List<String> getColumns(TOperationHandle tOperationHandle) throws Throwable { TGetResultSetMetadataResp metadataResp; TGetResultSetMetadataReq metadataReq; TTableSchema tableSchema; metadataReq = new TGetResultSetMetadataReq(tOperationHandle); metadataResp = client.GetResultSetMetadata(metadataReq); List<TColumnDesc> columnDescs; List<String> columns = null; tableSchema = metadataResp.getSchema(); if (tableSchema != null) { columnDescs = tableSchema.getColumns(); columns = new ArrayList<String>(); for (TColumnDesc tColumnDesc : columnDescs) { columns.add(tColumnDesc.getColumnName()); } } return columns; } /** * 獲取執行結果 select語句 */ public List<Object> getResults(TOperationHandle tOperationHandle) throws Throwable{ TFetchResultsReq fetchReq = new TFetchResultsReq(); fetchReq.setOperationHandle(tOperationHandle); fetchReq.setMaxRows(1000); TFetchResultsResp re=client.FetchResults(fetchReq); List<TColumn> list = re.getResults().getColumns(); List<Object> list_row = new ArrayList<Object>(); for(TColumn field:list){ if (field.isSetStringVal()) { list_row.add(field.getStringVal().getValues()); } else if (field.isSetDoubleVal()) { list_row.add(field.getDoubleVal().getValues()); } else if (field.isSetI16Val()) { list_row.add(field.getI16Val().getValues()); } else if (field.isSetI32Val()) { list_row.add(field.getI32Val().getValues()); } else if (field.isSetI64Val()) { list_row.add(field.getI64Val().getValues()); } else if (field.isSetBoolVal()) { list_row.add(field.getBoolVal().getValues()); } else if (field.isSetByteVal()) { list_row.add(field.getByteVal().getValues()); } } for(Object obj:list_row){ System.out.println(obj); } return list_row; } public void cancelQuery(TOperationHandle tOperationHandle) throws Throwable { if (tOperationState != TOperationState.FINISHED_STATE) { TCancelOperationReq cancelOperationReq = new TCancelOperationReq(); cancelOperationReq.setOperationHandle(tOperationHandle); client.CancelOperation(cancelOperationReq); } } }
public class QueryTool { public static TTransport getSocketInstance(String host,int port,String USER,String PASSWORD) throws TTransportException{ TTransport transport = HiveAuthFactory.getSocketTransport(host, port,99999); try { transport = PlainSaslHelper.getPlainTransport(USER, PASSWORD, transport); } catch (SaslException e) { e.printStackTrace(); } return transport; } public static TOpenSessionResp openSession(TCLIService.Client client) throws TException{ TOpenSessionReq openSessionReq = new TOpenSessionReq(); return client.OpenSession(openSessionReq); } }
public class Test { public static void main(String[] args) { try { QueryInstance base = new QueryInstance(); TOperationHandle handle = base .submitQuery("show partitions nubiabase.event_base"); base.getResults(handle); } catch (Throwable e) { e.printStackTrace(); } } }
pom文件apache
<dependencies> <dependency> <groupId>jdk.tools</groupId> <artifactId>jdk.tools</artifactId> <version>1.8</version> <scope>system</scope> <systemPath>${JAVA_HOME}/lib/tools.jar</systemPath> </dependency> <!-- lombok --> <dependency> <groupId>org.projectlombok</groupId> <artifactId>lombok</artifactId> <version>1.12.2</version> </dependency> <!-- guava --> <dependency> <groupId>com.google.guava</groupId> <artifactId>guava</artifactId> <version>18.0</version> </dependency> <dependency> <groupId>org.apache.poi</groupId> <artifactId>poi-excelant</artifactId> <version>3.10-FINAL</version> </dependency> <!-- spring data --> <dependency> <groupId>org.springframework.data</groupId> <artifactId>spring-data-hadoop</artifactId> <version>2.2.0.RELEASE</version> </dependency> <dependency> <groupId>org.apache.hive</groupId> <artifactId>hive-jdbc</artifactId> <version>1.1.1</version> </dependency> <dependency> <groupId>org.spark-project.hive</groupId> <artifactId>hive-jdbc</artifactId> <version>1.2.1.spark</version> </dependency> <dependency> <groupId>org.spark-project.hive</groupId> <artifactId>hive-exec</artifactId> <version>1.2.1.spark</version> </dependency> <dependency> <groupId>org.apache.thrift</groupId> <artifactId>libthrift</artifactId> <version>0.9.3</version> </dependency> <dependency> <groupId>com.google.code.gson</groupId> <artifactId>gson</artifactId> <version>2.8.0</version> </dependency> <dependency> <groupId>org.projectlombok</groupId> <artifactId>lombok</artifactId> <version>1.12.2</version> </dependency> <dependency> <groupId>log4j</groupId> <artifactId>log4j</artifactId> <version>1.2.17</version> </dependency> </dependencies>