在作支付訂單寬表的場景,須要關聯的表比較多並且支付有可能要延遲好久,這種狀況下不太適合使用Flink的表Join,想到的另一種解決方案是消費多個Topic的數據,再根據訂單號進行keyBy,再在邏輯中根據不一樣Topic處理,因此在接收到的消息中最好可以有topic字段,JSONKeyValueDeserializationSchema就完美的解決了這個問題。html
def getKafkaConsumer(kafkaAddr: String, topicNames: util.ArrayList[String], groupId: String): FlinkKafkaConsumer[ObjectNode] = { val properties = getKafkaProperties(groupId, kafkaAddr) val consumer = new FlinkKafkaConsumer[ObjectNode](topicNames, new JSONKeyValueDeserializationSchema(true), properties) consumer.setStartFromGroupOffsets() // the default behaviour consumer }
在這裏new JSONKeyValueDeserializationSchema(true)是須要帶上元數據信息,false則不帶上,源碼以下java
public class JSONKeyValueDeserializationSchema implements KafkaDeserializationSchema<ObjectNode> { private static final long serialVersionUID = 1509391548173891955L; private final static Logger log = LoggerFactory.getLogger(JSONKeyValueDeserializationSchema.class); private final boolean includeMetadata; private ObjectMapper mapper; public JSONKeyValueDeserializationSchema(boolean includeMetadata) { this.includeMetadata = includeMetadata; } public ObjectNode deserialize(ConsumerRecord<byte[], byte[]> record) { if (this.mapper == null) { this.mapper = new ObjectMapper(); } ObjectNode node = this.mapper.createObjectNode(); if (record.key() != null) { node.set("key", this.mapper.readValue(record.key(), JsonNode.class)); } if (record.value() != null) { node.set("value", this.mapper.readValue(record.value(), JsonNode.class)); } if (this.includeMetadata) { node.putObject("metadata").put("offset", record.offset()).put("topic", record.topic()).put("partition", record.partition()); }return node; } public boolean isEndOfStream(ObjectNode nextElement) { return false; } public TypeInformation<ObjectNode> getProducedType() { return TypeExtractor.getForClass(ObjectNode.class); } }
原本覺得到這裏就大功告成了,誰不想竟然報錯了。。每條消息反序列化的都報錯。node
2019-11-29 19:55:15.401 flink [Source: kafkasource (1/1)] ERROR c.y.b.D.JSONKeyValueDeserializationSchema - Unrecognized token 'xxxxx': was expecting ('true', 'false' or 'null') at [Source: [B@2e119f0e; line: 1, column: 45]org.apache.flink.shaded.jackson2.com.fasterxml.jackson.core.JsonParseException: Unrecognized token 'xxxxxxx': was expecting ('true', 'false' or 'null') at [Source: [B@2e119f0e; line: 1, column: 45] at org.apache.flink.shaded.jackson2.com.fasterxml.jackson.core.JsonParser._constructError(JsonParser.java:1586) at org.apache.flink.shaded.jackson2.com.fasterxml.jackson.core.base.ParserMinimalBase._reportError(ParserMinimalBase.java:521) at org.apache.flink.shaded.jackson2.com.fasterxml.jackson.core.json.UTF8StreamJsonParser._reportInvalidToken(UTF8StreamJsonParser.java:3464) at org.apache.flink.shaded.jackson2.com.fasterxml.jackson.core.json.UTF8StreamJsonParser._handleUnexpectedValue(UTF8StreamJsonParser.java:2628) at org.apache.flink.shaded.jackson2.com.fasterxml.jackson.core.json.UTF8StreamJsonParser._nextTokenNotInObject(UTF8StreamJsonParser.java:854) at org.apache.flink.shaded.jackson2.com.fasterxml.jackson.core.json.UTF8StreamJsonParser.nextToken(UTF8StreamJsonParser.java:748) at org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.ObjectMapper._initForReading(ObjectMapper.java:3847) at org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.ObjectMapper._readMapAndClose(ObjectMapper.java:3792) at org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.ObjectMapper.readValue(ObjectMapper.java:2890) at com.xx.xx.DeserializationSchema.JSONKeyValueDeserializationSchema.deserialize(JSONKeyValueDeserializationSchema.java:33) at com.xx.xx.DeserializationSchema.JSONKeyValueDeserializationSchema.deserialize(JSONKeyValueDeserializationSchema.java:15) at org.apache.flink.streaming.connectors.kafka.internal.KafkaFetcher.runFetchLoop(KafkaFetcher.java:140) at org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumerBase.run(FlinkKafkaConsumerBase.java:711) at org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:93) at org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:57) at org.apache.flink.streaming.runtime.tasks.SourceStreamTask.run(SourceStreamTask.java:97) at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:300) at org.apache.flink.runtime.taskmanager.Task.run(Task.java:711) at java.lang.Thread.run(Thread.java:745)
由於源碼是沒有try catch的,沒法獲取到報錯的具體數據,只能直接重寫這個方法了web
新建一個DeserializationSchema包,再建立JSONKeyValueDeserializationSchema類,而後在getKafkaConsumer從新引用咱們本身的JSONKeyValueDeserializationSchema類,再在日誌中咱們就能夠知道是哪些數據沒法反序列化apache
@PublicEvolving public class JSONKeyValueDeserializationSchema implements KafkaDeserializationSchema<ObjectNode> { private static final long serialVersionUID = 1509391548173891955L; private final static Logger log = LoggerFactory.getLogger(JSONKeyValueDeserializationSchema.class); private final boolean includeMetadata; private ObjectMapper mapper; public JSONKeyValueDeserializationSchema(boolean includeMetadata) { this.includeMetadata = includeMetadata; } public ObjectNode deserialize(ConsumerRecord<byte[], byte[]> record) { if (this.mapper == null) { this.mapper = new ObjectMapper(); } ObjectNode node = this.mapper.createObjectNode(); try { if (record.key() != null) { node.set("key", this.mapper.readValue(record.key(), JsonNode.class)); } if (record.value() != null) { node.set("value", this.mapper.readValue(record.value(), JsonNode.class)); } if (this.includeMetadata) { node.putObject("metadata").put("offset", record.offset()).put("topic", record.topic()).put("partition", record.partition()); } } catch (Exception e) { log.error(e.getMessage(), e); log.error("JSONKeyValueDeserializationSchema 出錯:" + record.toString() + "=====key爲" + new String(record.key()) + "=====數據爲" + new String(record.value())); } return node; } public boolean isEndOfStream(ObjectNode nextElement) { return false; } public TypeInformation<ObjectNode> getProducedType() { return TypeExtractor.getForClass(ObjectNode.class); } }
發現key爲一串訂單號,由於topic數據不是原生canal json數據,是被加工過的,那應該是上游生產的時候指定的keyjson
那繼續修改咱們的JSONKeyValueDeserializationSchema代碼,由於key用不到,因此直接註釋掉,固然也能夠將class指定爲Stringapi
if (record.key() != null) { node.set("key", this.mapper.readValue(record.key(), JsonNode.class)); }
try catch在這裏咱們仍是保留並將出錯的數據打到日誌,修改後的代碼以下app
@PublicEvolving public class JSONKeyValueDeserializationSchema implements KafkaDeserializationSchema<ObjectNode> { private static final long serialVersionUID = 1509391548173891955L; private final static Logger log = LoggerFactory.getLogger(JSONKeyValueDeserializationSchema.class); private final boolean includeMetadata; private ObjectMapper mapper; public JSONKeyValueDeserializationSchema(boolean includeMetadata) { this.includeMetadata = includeMetadata; } public ObjectNode deserialize(ConsumerRecord<byte[], byte[]> record) { if (this.mapper == null) { this.mapper = new ObjectMapper(); } ObjectNode node = this.mapper.createObjectNode(); try { // if (record.key() != null) { // node.set("key", this.mapper.readValue(record.key(), JsonNode.class)); // } if (record.value() != null) { node.set("value", this.mapper.readValue(record.value(), JsonNode.class)); } if (this.includeMetadata) { node.putObject("metadata").put("offset", record.offset()).put("topic", record.topic()).put("partition", record.partition()); } } catch (Exception e) { log.error(e.getMessage(), e); log.error("JSONKeyValueDeserializationSchema 出錯:" + record.toString() + "=====key爲" + new String(record.key()) + "=====數據爲" + new String(record.value())); } return node; } public boolean isEndOfStream(ObjectNode nextElement) { return false; } public TypeInformation<ObjectNode> getProducedType() { return TypeExtractor.getForClass(ObjectNode.class); } }
至此,問題解決。oop
原文出處:https://www.cnblogs.com/createweb/p/11972125.htmlthis