本文主要研究一下flink的Queryable Statehtml
@Test public void testValueStateForQuery() throws Exception { StreamExecutionEnvironment env = StreamExecutionEnvironment .createRemoteEnvironment("192.168.99.100", 8081, SubmitTest.JAR_FILE); env.addSource(new RandomTuple2Source()) .keyBy(0) //key by first value of tuple .flatMap(new CountWindowAverage()) .print(); JobExecutionResult result = env.execute("testQueryableState"); LOGGER.info("submit job result:{}",result); }
public class CountWindowAverage extends RichFlatMapFunction<Tuple2<Long, Long>, Tuple2<Long, Long>> { private transient ValueState<Tuple2<Long, Long>> sum; // a tuple containing the count and the sum @Override public void flatMap(Tuple2<Long, Long> input, Collector<Tuple2<Long, Long>> out) throws Exception { Tuple2<Long, Long> currentSum = sum.value(); if(currentSum == null){ currentSum = Tuple2.of(1L,input.f1); }else{ currentSum.f0 += 1; currentSum.f1 += input.f1; } sum.update(currentSum); if (currentSum.f0 >= 2) { out.collect(new Tuple2<>(input.f0, currentSum.f1 / currentSum.f0)); sum.clear(); } } @Override public void open(Configuration config) { ValueStateDescriptor<Tuple2<Long, Long>> descriptor = new ValueStateDescriptor<>( "average", // the state name TypeInformation.of(new TypeHint<Tuple2<Long, Long>>() {})); // type information descriptor.setQueryable("query-name"); sum = getRuntimeContext().getState(descriptor); } }
@Test public void testQueryStateByJobId() throws InterruptedException, IOException { //get jobId from flink ui running job page JobID jobId = JobID.fromHexString("793edfa93f354aa0274f759cb13ce79e"); long key = 1L; //flink-core-1.7.0-sources.jar!/org/apache/flink/configuration/QueryableStateOptions.java QueryableStateClient client = new QueryableStateClient("192.168.99.100", 9069); // the state descriptor of the state to be fetched. ValueStateDescriptor<Tuple2<Long, Long>> descriptor = new ValueStateDescriptor<>( "average", TypeInformation.of(new TypeHint<Tuple2<Long, Long>>() {})); CompletableFuture<ValueState<Tuple2<Long, Long>>> resultFuture = client.getKvState(jobId, "query-name", key, BasicTypeInfo.LONG_TYPE_INFO, descriptor); LOGGER.info("get kv state return future, waiting......"); // org.apache.flink.queryablestate.exceptions.UnknownKeyOrNamespaceException: Queryable State Server : No state for the specified key/namespace. ValueState<Tuple2<Long, Long>> res = resultFuture.join(); LOGGER.info("query result:{}",res.value()); client.shutdownAndWait(); }
Started Queryable State Proxy Server @ /172.20.0.3:9069
的日誌,這樣子就能夠確認是啓用了該功能