環境 flink1.7.2python
增長flink1.7.2 的lib 中的jar, 不然會報類找不到sql
avro-1.8.2.jar flink-connector-kafka-0.10_2.12-1.7.2.jar flink-connector-kafka-base_2.12-1.7.2.jar flink-json-1.7.2.jar kafka-clients-0.11.0.0.jar flink-avro-1.7.2.jar flink-connector-kafka-0.11_2.12-1.7.2.jar flink-core-1.7.2.jar flink-python_2.12-1.7.2.jar log4j-1.2.17.jar flink-cep_2.12-1.7.2.jar flink-connector-kafka-0.9_2.12-1.7.2.jar flink-dist_2.12-1.7.2.jar flink-table_2.12-1.7.2.jar slf4j-log4j12-1.7.15.jar
tables: - name: myTable type: source update-mode: append connector: property-version: 1 type: kafka version: 0.11 topic: im-message-topic2 startup-mode: earliest-offset properties: - key: bootstrap.servers value: kafkaip:9092 - key: group.id value: testGroup format: property-version: 1 type: json schema: "ROW(sessionId STRING, fromUid STRING, toUid STRING, chatType STRING, type STRING,msgId STRING, msg STRING, timestampSend STRING)" schema: - name: sessionId type: STRING - name: fromUid type: STRING - name: toUid type: STRING - name: chatType type: STRING - name: type type: STRING - name: msgId type: STRING - name: msg type: STRING - name: rowTime type: TIMESTAMP rowtime: timestamps: type: "from-field" from: "timestampSend" watermarks: type: "periodic-bounded" delay: "60" - name: procTime type: TIMESTAMP proctime: true
./bin/sql-client.sh embedded select * from myTable;
而後使用 MATCH_RECOGNIZE 的sqljson
SELECT * FROM myTable MATCH_RECOGNIZE ( PARTITION BY sessionId ORDER BY rowTime MEASURES e2.procTime as answerTime, LAST(e1.procTime) as customer_event_time, e2.fromUid as empUid, e1.procTime as askTime, 1 as total_talk ONE ROW PER MATCH AFTER MATCH SKIP TO LAST e2 PATTERN (e1 e2) DEFINE e1 as e1.type = 'yonghu', e2 as e2.type = 'guanjia' );
上面是使用sql-client 不用謝代碼,固然也能夠寫代碼,下面是對應的程序bootstrap
public static void main(String[] arg) throws Exception { StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); StreamTableEnvironment tableEnv = TableEnvironment.getTableEnvironment(env); env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime); tableEnv.connect(new Kafka() .version("0.11") .topic("im-message-topic3") //.property("zookeeper.connect","") .property("bootstrap.servers","kafkaip:9092") .startFromEarliest() .sinkPartitionerRoundRobin()//Flink分區隨機映射到kafka分區 ).withFormat(new Json() .failOnMissingField(false) .deriveSchema() ).withSchema(new Schema() .field("sessionId", Types.STRING).from("sessionId") .field("fromUid", Types.STRING).from("fromUid") .field("toUid", Types.STRING).from("toUid") .field("chatType", Types.STRING).from("chatType") .field("type", Types.STRING).from("type") .field("msgId", Types.STRING).from("msgId") .field("msg", Types.STRING).from("msg") // .field("timestampSend", Types.SQL_TIMESTAMP) .field("rowtime", Types.SQL_TIMESTAMP) .rowtime(new Rowtime() .timestampsFromField("timestampSend") .watermarksPeriodicBounded(1000) ) .field("proctime", Types.SQL_TIMESTAMP).proctime() ).inAppendMode().registerTableSource("myTable"); Table tb2 = tableEnv.sqlQuery( "SELECT " + "answerTime, customer_event_time, empUid, noreply_counts, total_talk " + "FROM myTable" + " " + "MATCH_RECOGNIZE ( " + "PARTITION BY sessionId " + "ORDER BY rowtime " + "MEASURES " + "e2.rowtime as answerTime, "+ "LAST(e1.rowtime) as customer_event_time, " + "e2.fromUid as empUid, " + "1 as noreply_counts, " + "e1.rowtime as askTime," + "1 as total_talk " + "ONE ROW PER MATCH " + "AFTER MATCH SKIP TO LAST e2 " + "PATTERN (e1 e2) " + "DEFINE " + "e1 as e1.type = 'yonghu', " + "e2 as e2.type = 'guanjia' " + ")"+ "" ); DataStream<Row> appendStream =tableEnv.toAppendStream(tb2, Row.class); System.out.println("schema is:"); tb2.printSchema(); appendStream.writeAsText("/usr/local/whk", WriteMode.OVERWRITE); logger.info("stream end"); Table tb3 = tableEnv.sqlQuery("select sessionId, type from myTable"); DataStream<Row> temp =tableEnv.toAppendStream(tb3, Row.class); tb3.printSchema(); temp.writeAsText("/usr/local/whk2", WriteMode.OVERWRITE); env.execute("msg test"); }
大功告成,其實裏面坑不少。session
注意:若是使用了 TimeCharacteristic.EventTime, 請不用再使用procTime。app