本文主要研究一下flink的Execution Plan Visualizationhtml
@Test public void testExecutionPlan(){ final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); DataStream<Tuple2<String,Integer>> dataStream = env.fromElements(WORDS) .flatMap(new WordCountTest.Tokenizer()) .keyBy(0) .sum(1); dataStream.print(); System.out.println(env.getExecutionPlan()); }
{ "nodes": [ { "id": 1, "type": "Source: Collection Source", "pact": "Data Source", "contents": "Source: Collection Source", "parallelism": 1 }, { "id": 2, "type": "Flat Map", "pact": "Operator", "contents": "Flat Map", "parallelism": 4, "predecessors": [ { "id": 1, "ship_strategy": "REBALANCE", "side": "second" } ] }, { "id": 4, "type": "Keyed Aggregation", "pact": "Operator", "contents": "Keyed Aggregation", "parallelism": 4, "predecessors": [ { "id": 2, "ship_strategy": "HASH", "side": "second" } ] }, { "id": 5, "type": "Sink: Print to Std. Out", "pact": "Data Sink", "contents": "Sink: Print to Std. Out", "parallelism": 4, "predecessors": [ { "id": 4, "ship_strategy": "FORWARD", "side": "second" } ] } ] }
打開flink plan visualizer將上面的json,輸入到文本框,點擊Draw進行可視化以下:
java
flink-streaming-java_2.11-1.7.1-sources.jar!/org/apache/flink/streaming/api/environment/StreamExecutionEnvironment.javanode
@Public public abstract class StreamExecutionEnvironment { //...... /** * Creates the plan with which the system will execute the program, and * returns it as a String using a JSON representation of the execution data * flow graph. Note that this needs to be called, before the plan is * executed. * * @return The execution plan of the program, as a JSON String. */ public String getExecutionPlan() { return getStreamGraph().getStreamingPlanAsJSON(); } /** * Getter of the {@link org.apache.flink.streaming.api.graph.StreamGraph} of the streaming job. * * @return The streamgraph representing the transformations */ @Internal public StreamGraph getStreamGraph() { if (transformations.size() <= 0) { throw new IllegalStateException("No operators defined in streaming topology. Cannot execute."); } return StreamGraphGenerator.generate(this, transformations); } //...... }
flink-streaming-java_2.11-1.7.1-sources.jar!/org/apache/flink/streaming/api/graph/StreamGraph.javaapache
@Internal public class StreamGraph extends StreamingPlan { private static final Logger LOG = LoggerFactory.getLogger(StreamGraph.class); private String jobName = StreamExecutionEnvironment.DEFAULT_JOB_NAME; private final StreamExecutionEnvironment environment; private final ExecutionConfig executionConfig; private final CheckpointConfig checkpointConfig; private boolean chaining; private Map<Integer, StreamNode> streamNodes; private Set<Integer> sources; private Set<Integer> sinks; private Map<Integer, Tuple2<Integer, List<String>>> virtualSelectNodes; private Map<Integer, Tuple2<Integer, OutputTag>> virtualSideOutputNodes; private Map<Integer, Tuple2<Integer, StreamPartitioner<?>>> virtualPartitionNodes; protected Map<Integer, String> vertexIDtoBrokerID; protected Map<Integer, Long> vertexIDtoLoopTimeout; private StateBackend stateBackend; private Set<Tuple2<StreamNode, StreamNode>> iterationSourceSinkPairs; //...... public String getStreamingPlanAsJSON() { try { return new JSONGenerator(this).getJSON(); } catch (Exception e) { throw new RuntimeException("JSON plan creation failed", e); } } //...... }