使用maven構建工程,所以pom.xml添加以下依賴:java
<dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-table_2.11</artifactId> <version>1.7.2</version> </dependency> <!-- https://mvnrepository.com/artifact/org.apache.flink/flink-json --> <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-json</artifactId> <version>1.7.2</version> </dependency> <dependency> <groupId>com.fasterxml.jackson.core</groupId> <artifactId>jackson-databind</artifactId> <version>2.9.8</version> </dependency> <dependency> <groupId>joda-time</groupId> <artifactId>joda-time</artifactId> <version>2.10.1</version> </dependency>
response.proto文件node
syntax = "proto3"; package com.google.protos; //搜索響應 message SearchResponse { uint64 search_time = 1; uint32 code = 2; Result results = 3; } //搜索結果 message Result { string id = 1; repeated Item items = 2; } //搜索結果項 message Item{ string id = 1; string name = 2; string title = 3; string url = 4; uint64 publish_time = 5; float score = 6; //推薦或者類似加權分值 }
消息示例,包含嵌套對象results以及數組對象items:sql
{ "search_time":1553650604, "code":200, "results":{ "id":"449", "items":[ { "id":"47", "name":"name47", "title":"標題47", "url":"https://www.google.com.hk/item-47", "publish_time":1552884870, "score":96.03 }, { "id":"2", "name":"name2", "title":"標題2", "url":"https://www.google.com.hk/item-2", "publish_time":1552978902, "score":16.06 }, { "id":"60", "name":"name60", "title":"標題60", "url":"https://www.google.com.hk/item-60", "publish_time":1553444982, "score":62.58 }, { "id":"67", "name":"name67", "title":"標題67", "url":"https://www.google.com.hk/item-67", "publish_time":1553522957, "score":12.17 }, { "id":"15", "name":"name15", "title":"標題15", "url":"https://www.google.com.hk/item-15", "publish_time":1553525421, "score":32.36 }, { "id":"53", "name":"name53", "title":"標題53", "url":"https://www.google.com.hk/item-53", "publish_time":1553109227, "score":52.13 }, { "id":"70", "name":"name70", "title":"標題70", "url":"https://www.google.com.hk/item-70", "publish_time":1552781921, "score":1.72 }, { "id":"53", "name":"name53", "title":"標題53", "url":"https://www.google.com.hk/item-53", "publish_time":1553229003, "score":5.31 }, { "id":"30", "name":"name30", "title":"標題30", "url":"https://www.google.com.hk/item-30", "publish_time":1553282629, "score":26.51 }, { "id":"36", "name":"name36", "title":"標題36", "url":"https://www.google.com.hk/item-36", "publish_time":1552665833, "score":48.76 } ] } }
import com.google.protos.GoogleProtobuf.*; import com.googlecode.protobuf.format.JsonFormat; import org.apache.kafka.clients.producer.KafkaProducer; import org.apache.kafka.clients.producer.Producer; import org.apache.kafka.clients.producer.ProducerRecord; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import java.text.DecimalFormat; import java.time.Instant; import java.util.Properties; import java.util.Random; import java.util.concurrent.TimeUnit; /** * @author lynn * @ClassName com.lynn.kafka.SearchResponsePublisher * @Description TODO * @Date 19-3-26 上午8:17 * @Version 1.0 **/ public class SearchResponsePublisher { private static final Logger LOG = LoggerFactory.getLogger(SearchResponsePublisher.class); public String randomMessage(int results){ Random random = new Random(); DecimalFormat fmt = new DecimalFormat("##0.00"); SearchResponse.Builder response = SearchResponse.newBuilder(); response.setSearchTime(Instant.now().getEpochSecond()) .setCode(random.nextBoolean()?200:404); Result.Builder result = Result.newBuilder() .setId(""+random.nextInt(1000)); for (int i = 0; i < results; i++) { int number = random.nextInt(100); Item.Builder builder = Item.newBuilder() .setId(number+"") .setName("name"+number) .setTitle("標題"+number) .setUrl("https://www.google.com.hk/item-"+number) .setPublishTime(Instant.now().getEpochSecond() - random.nextInt(1000000)) .setScore(Float.parseFloat(fmt.format(random.nextInt(99) + random.nextFloat()))); result.addItems(builder.build()); } response.setResults(result.build()); return new JsonFormat().printToString(response.build()); } /** * * @param args */ public static void main(String[] args) throws InterruptedException{ if(args.length < 3){ System.err.println("Please input broker.servers and topic and records number!"); System.exit(-1); } String brokers = args[0]; String topic = args[1]; int recordsNumber = Integer.parseInt(args[2]); LOG.info("I will publish {} records...", recordsNumber); SearchResponsePublisher publisher = new SearchResponsePublisher(); // System.out.println(publisher.randomMessage(10)); // if(recordsNumber == 1000) return; Properties props = new Properties(); props.put("bootstrap.servers", brokers); //all:-1 props.put("acks", "all"); props.put("retries", 0); props.put("batch.size", 16384); props.put("linger.ms", 1); props.put("buffer.memory", 33554432); props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer"); props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer"); Producer<String, String> producer = new KafkaProducer<>(props); int count = 0; while (count++ < recordsNumber){ producer.send(new ProducerRecord<String, String>(topic, String.valueOf(Instant.now().toEpochMilli()), publisher.randomMessage(10))); TimeUnit.MILLISECONDS.sleep(100); } // producer.flush(); producer.close(); } }
import org.apache.flink.api.common.typeinfo.TypeInformation; import org.apache.flink.api.java.typeutils.ObjectArrayTypeInfo; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.flink.table.api.Table; import org.apache.flink.table.api.TableEnvironment; import org.apache.flink.table.api.Types; import org.apache.flink.table.api.java.StreamTableEnvironment; import org.apache.flink.table.descriptors.Json; import org.apache.flink.table.descriptors.Kafka; import org.apache.flink.table.descriptors.Schema; import org.apache.flink.table.sinks.PrintTableSink; import org.apache.flink.types.Row; import org.slf4j.Logger; import org.slf4j.LoggerFactory;
// set up the streaming execution environment final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); // env.setParallelism(1); StreamTableEnvironment tableEnv = TableEnvironment.getTableEnvironment(env); Kafka kafka = new Kafka().version("0.11") .topic(sourceTopic) .startFromEarliest() // .startFromLatest() .property("bootstrap.servers", brokers) .property("group.id", "res") .property("session.timeout.ms", "30000") .sinkPartitionerFixed(); tableEnv.connect(kafka) .withFormat(new Json() .failOnMissingField(false) .deriveSchema()) .withSchema(new Schema() .field("search_time", Types.LONG()) .field("code", Types.INT()) .field("results", Types.ROW( new String[]{"id", "items"}, new TypeInformation[]{ Types.STRING(), ObjectArrayTypeInfo.getInfoFor(Row[].class, //Array.newInstance(Row.class, 10).getClass(), Types.ROW( new String[]{"id", "name", "title", "url", "publish_time", "score"}, new TypeInformation[]{Types.STRING(),Types.STRING(),Types.STRING(),Types.STRING(),Types.LONG(),Types.FLOAT()} ))}) )).inAppendMode().registerTableSource("tb_json"); //item[1] item[10] 數組下標從1開始 String sql4 = "select search_time, code, results.id as result_id, items[1].name as item_1_name, items[2].id as item_2_id\n" + "from tb_json"; Table table4 = tableEnv.sqlQuery(sql4); tableEnv.registerTable("tb_item_2", table4); LOG.info("------------------print {} schema------------------", "tb_item_2"); table4.printSchema(); tableEnv.registerTableSink("console4", new String[]{"f0", "f1", "f2", "f3", "f4"}, new TypeInformation[]{ Types.LONG(),Types.INT(), Types.STRING(), Types.STRING(), Types.STRING() }, new PrintTableSink()); table4.insertInto("console4"); // execute program env.execute("Flink Table Json Engine");
select search_time, code, results.id as result_id, //嵌套json子字段 items[1].name as item_1_name, //數組對象子字段,數組下標從1開始 items[2].id as item_2_id from tb_json
嵌套字段能夠經過.鏈接符直接獲取,而數組元素能夠經過[下標]獲取,下標從1開始,與Java中數組下標從0開始不一樣.express
按照Json對象的嵌套以及數組格式進行定義,即無需將每一個字段展平進行定義,將嵌套字段定義爲Row類型,數組類型定義爲ObjectArrayTypeInfo或BasicArrayTypeInfo, ObjectArrayTypeInfo的第一個參數爲數組類型,如示例中Row[].class 或Array.newInstance(Row.class, 10).getClass()方式獲取class.apache
convert方法中的類型判斷使用==,可能時因爲flink版本的緣由引發的==運算符沒有重載.所以將此運算符替換爲.equals()方法.
JsonRowDeserializationSchema.javajson
private Object convert(JsonNode node, TypeInformation<?> info) { if (Types.VOID.equals(info) || node.isNull()) { return null; } else if (Types.BOOLEAN.equals(info)) { return node.asBoolean(); } else if (Types.STRING.equals(info)) { return node.asText(); } else if (Types.BIG_DEC.equals(info)) { return node.decimalValue(); } else if (Types.BIG_INT.equals(info)) { return node.bigIntegerValue(); } else if(Types.LONG.equals(info)){ return node.longValue(); } else if(Types.INT.equals(info)){ return node.intValue(); } else if(Types.FLOAT.equals(info)){ return node.floatValue(); } else if(Types.DOUBLE.equals(info)){ return node.doubleValue(); } else if (Types.SQL_DATE.equals(info)) { return Date.valueOf(node.asText()); } else if (Types.SQL_TIME.equals(info)) { // according to RFC 3339 every full-time must have a timezone; // until we have full timezone support, we only support UTC; // users can parse their time as string as a workaround final String time = node.asText(); if (time.indexOf('Z') < 0 || time.indexOf('.') >= 0) { throw new IllegalStateException( "Invalid time format. Only a time in UTC timezone without milliseconds is supported yet. " + "Format: HH:mm:ss'Z'"); } return Time.valueOf(time.substring(0, time.length() - 1)); } else if (Types.SQL_TIMESTAMP.equals(info)) { // according to RFC 3339 every date-time must have a timezone; // until we have full timezone support, we only support UTC; // users can parse their time as string as a workaround final String timestamp = node.asText(); if (timestamp.indexOf('Z') < 0) { throw new IllegalStateException( "Invalid timestamp format. Only a timestamp in UTC timezone is supported yet. " + "Format: yyyy-MM-dd'T'HH:mm:ss.SSS'Z'"); } return Timestamp.valueOf(timestamp.substring(0, timestamp.length() - 1).replace('T', ' ')); } else if (info instanceof RowTypeInfo) { return convertRow(node, (RowTypeInfo) info); } else if (info instanceof ObjectArrayTypeInfo) { return convertObjectArray(node, ((ObjectArrayTypeInfo) info).getComponentInfo()); } else if (info instanceof BasicArrayTypeInfo) { return convertObjectArray(node, ((BasicArrayTypeInfo) info).getComponentInfo()); } else if (info instanceof PrimitiveArrayTypeInfo && ((PrimitiveArrayTypeInfo) info).getComponentType() == Types.BYTE) { return convertByteArray(node); } else { // for types that were specified without JSON schema // e.g. POJOs try { return objectMapper.treeToValue(node, info.getTypeClass()); } catch (JsonProcessingException e) { throw new IllegalStateException("Unsupported type information '" + info + "' for node: " + node); } } }
JsonRowSerializationSchema.javabootstrap
private JsonNode convert(ContainerNode<?> container, JsonNode reuse, TypeInformation<?> info, Object object) { if (Types.VOID.equals(info) || object == null) { return container.nullNode(); } else if (Types.BOOLEAN.equals(info)) { return container.booleanNode((Boolean) object); } else if (Types.STRING.equals(info)) { return container.textNode((String) object); } else if (Types.BIG_DEC.equals(info)) { // convert decimal if necessary if (object instanceof BigDecimal) { return container.numberNode((BigDecimal) object); } return container.numberNode(BigDecimal.valueOf(((Number) object).doubleValue())); } else if (Types.BIG_INT.equals(info)) { // convert integer if necessary if (object instanceof BigInteger) { return container.numberNode((BigInteger) object); } return container.numberNode(BigInteger.valueOf(((Number) object).longValue())); } else if(Types.LONG.equals(info)){ if(object instanceof Long){ return container.numberNode((Long) object); } return container.numberNode(Long.valueOf(((Number) object).longValue())); } else if(Types.INT.equals(info)){ if(object instanceof Integer){ return container.numberNode((Integer) object); } return container.numberNode(Integer.valueOf(((Number) object).intValue())); } else if(Types.FLOAT.equals(info)){ if(object instanceof Float){ return container.numberNode((Float) object); } return container.numberNode(Float.valueOf(((Number) object).floatValue())); } else if(Types.DOUBLE.equals(info)){ if(object instanceof Double){ return container.numberNode((Double) object); } return container.numberNode(Double.valueOf(((Number) object).doubleValue())); } else if (Types.SQL_DATE.equals(info)) { return container.textNode(object.toString()); } else if (Types.SQL_TIME.equals(info)) { final Time time = (Time) object; // strip milliseconds if possible if (time.getTime() % 1000 > 0) { return container.textNode(timeFormatWithMillis.format(time)); } return container.textNode(timeFormat.format(time)); } else if (Types.SQL_TIMESTAMP.equals(info)) { return container.textNode(timestampFormat.format((Timestamp) object)); } else if (info instanceof RowTypeInfo) { if (reuse != null && reuse instanceof ObjectNode) { return convertRow((ObjectNode) reuse, (RowTypeInfo) info, (Row) object); } else { return convertRow(null, (RowTypeInfo) info, (Row) object); } } else if (info instanceof ObjectArrayTypeInfo) { if (reuse != null && reuse instanceof ArrayNode) { return convertObjectArray((ArrayNode) reuse, ((ObjectArrayTypeInfo) info).getComponentInfo(), (Object[]) object); } else { return convertObjectArray(null, ((ObjectArrayTypeInfo) info).getComponentInfo(), (Object[]) object); } } else if (info instanceof BasicArrayTypeInfo) { if (reuse != null && reuse instanceof ArrayNode) { return convertObjectArray((ArrayNode) reuse, ((BasicArrayTypeInfo) info).getComponentInfo(), (Object[]) object); } else { return convertObjectArray(null, ((BasicArrayTypeInfo) info).getComponentInfo(), (Object[]) object); } } else if (info instanceof PrimitiveArrayTypeInfo && ((PrimitiveArrayTypeInfo) info).getComponentType() == Types.BYTE) { return container.binaryNode((byte[]) object); } else { // for types that were specified without JSON schema // e.g. POJOs try { return mapper.valueToTree(object); } catch (IllegalArgumentException e) { throw new IllegalStateException("Unsupported type information '" + info + "' for object: " + object, e); } } }
添加文件:
resources/META-INF/services/org.apache.flink.table.factories.TableFactoryapi
org.apache.flink.formats.json.JsonRowFormatFactory org.apache.flink.streaming.connectors.kafka.Kafka011TableSourceSinkFactory
因爲打包後kafka-connector jar中與json jar中的同名文件會覆蓋,須要將兩個文件的內容保留.數組
參考阿里巴巴blink分支
scala:
BatchCompatibleStreamTableSink.scalasession
/* * Licensed to the Apache Software Foundation (ASF) under one * or more contributor license agreements. See the NOTICE file * distributed with this work for additional information * regarding copyright ownership. The ASF licenses this file * to you under the Apache License, Version 2.0 (the * "License"); you may not use this file except in compliance * with the License. You may obtain a copy of the License at * * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. * See the License for the specific language governing permissions and * limitations under the License. */ package org.apache.flink.table.sinks import org.apache.flink.table.api._ import org.apache.flink.streaming.api.datastream.{DataStream, DataStreamSink} /** Defines an external [[TableSink]] to emit a batch [[Table]] for * compatible with stream connect plugin. */ trait BatchCompatibleStreamTableSink[T] extends TableSink[T] { /** Emits the DataStream. */ def emitBoundedStream(boundedStream: DataStream[T]): DataStreamSink[_] }
PrintTableSink.scala
/* * Licensed to the Apache Software Foundation (ASF) under one * or more contributor license agreements. See the NOTICE file * distributed with this work for additional information * regarding copyright ownership. The ASF licenses this file * to you under the Apache License, Version 2.0 (the * "License"); you may not use this file except in compliance * with the License. You may obtain a copy of the License at * * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. * See the License for the specific language governing permissions and * limitations under the License. */ package org.apache.flink.table.sinks import java.lang.{Boolean => JBool} import java.util.TimeZone import java.util.{Date => JDate} import java.sql.Date import java.sql.Time import java.sql.Timestamp import org.apache.flink.api.common.typeinfo.TypeInformation import org.apache.flink.api.java.tuple.{Tuple2 => JTuple2} import org.apache.flink.api.java.typeutils.RowTypeInfo import org.apache.flink.streaming.api.datastream.{DataStream, DataStreamSink} import org.apache.flink.streaming.api.functions.sink.RichSinkFunction import org.apache.flink.streaming.api.operators.StreamingRuntimeContext import org.apache.flink.types.Row import org.apache.flink.configuration.Configuration import org.apache.flink.table.runtime.functions.DateTimeFunctions import org.apache.flink.util.StringUtils /** * A simple [[TableSink]] to output data to console. * */ class PrintTableSink() extends TableSinkBase[JTuple2[JBool, Row]] with BatchCompatibleStreamTableSink[JTuple2[JBool, Row]] with UpsertStreamTableSink[Row] { override def emitDataStream(dataStream: DataStream[JTuple2[JBool, Row]]) = { val sink: PrintSinkFunction = new PrintSinkFunction() dataStream.addSink(sink).name(sink.toString) } override protected def copy: TableSinkBase[JTuple2[JBool, Row]] = new PrintTableSink() override def setKeyFields(keys: Array[String]): Unit = {} override def setIsAppendOnly(isAppendOnly: JBool): Unit = {} // override def getRecordType: DataType = DataTypes.createRowType(getFieldTypes, getFieldNames) override def getRecordType: TypeInformation[Row] = { new RowTypeInfo(getFieldTypes, getFieldNames) } /** Emits the DataStream. */ override def emitBoundedStream(boundedStream: DataStream[JTuple2[JBool, Row]]) = { val sink: PrintSinkFunction = new PrintSinkFunction() boundedStream.addSink(sink).name(sink.toString) } } /** * Implementation of the SinkFunction writing every tuple to the standard output. * */ class PrintSinkFunction() extends RichSinkFunction[JTuple2[JBool, Row]] { private var prefix: String = _ override def open(parameters: Configuration): Unit = { super.open(parameters) val context = getRuntimeContext.asInstanceOf[StreamingRuntimeContext] prefix = "task-" + (context.getIndexOfThisSubtask + 1) + "> " } override def invoke(in: JTuple2[JBool, Row]): Unit = { val sb = new StringBuilder val row = in.f1 for (i <- 0 until row.getArity) { if (i > 0) sb.append(",") val f = row.getField(i) if (f.isInstanceOf[Date]) { sb.append(DateTimeFunctions.dateFormat(f.asInstanceOf[JDate].getTime, "yyyy-MM-dd")) } else if (f.isInstanceOf[Time]) { sb.append(DateTimeFunctions.dateFormat(f.asInstanceOf[JDate].getTime, "HH:mm:ss")) } else if (f.isInstanceOf[Timestamp]) { sb.append(DateTimeFunctions.dateFormat(f.asInstanceOf[JDate].getTime, "yyyy-MM-dd HH:mm:ss.SSS")) } else { sb.append(StringUtils.arrayAwareToString(f)) } } if (in.f0) { System.out.println(prefix + "(+)" + sb.toString()) } else { System.out.println(prefix + "(-)" + sb.toString()) } } override def close(): Unit = { this.prefix = "" } override def toString: String = "Print to System.out" }