此次須要作一個監控項目,全網日誌的指標計算,上線的話,計算量應該是百億/天java
單個source對應的sql以下sql
最原始的sql select pro,throwable,level,ip,`count`,id,`time`,firstl,lastl from ( select pro,throwable,level,ip, count(*) as `count`, lastStrInGroupSkipNull(CONCAT_WS('_',KAFKA_TOPIC,CAST(KAFKA_PARTITION AS VARCHAR),CAST(KAFKA_OFFSET as VARCHAR))) as id, firstLong(l) as firstl, lastLong(l) as lastl, TUMBLE_END(SPT, INTERVAL '3' SECOND) as `time` from input.`ymm-appmetric-dev-self1` where pro IS NOT NULL and throwable IS NOT NULL and level IS NOT NULL and level='ERROR' and ip IS NOT NULL group by pro,throwable,level,ip,TUMBLE(SPT,INTERVAL '3' SECOND) ) where 1=uniqueWithin100MS(pro,throwable,level,ip,`time`)
---先作技術論證,寫了下面一個sqlapache
select pro,throwable,level,ip,`count`,id,`time`,firstl,lastl from ( select pro,throwable,level,ip,count(*) as `count`, lastStrInGroupSkipNull(CONCAT_WS('_',KAFKA_TOPIC,CAST(KAFKA_PARTITION AS VARCHAR),CAST(KAFKA_OFFSET as VARCHAR))) as id, firstLong(l) as firstl, lastLong(l) as lastl, TUMBLE_END(SPT, INTERVAL '3' SECOND) as `time` from ( select pro,throwable,level,ip from input.`ymm-appmetric-dev-self1` where pro IS NOT NULL and throwable IS NOT NULL and level IS NOT NULL and level='ERROR' and ip IS NOT NULL union select pro,throwable,level,ip from input.`ymm-appmetric-dev-self2` where pro IS NOT NULL and throwable IS NOT NULL and level IS NOT NULL and level='ERROR' and ip IS NOT NULL ) group by pro,throwable,level,ip,TUMBLE(SPT,INTERVAL '3' SECOND) ) where 1=uniqueWithin100MS(pro,throwable,level,ip,`time`)
而後拉起flink任務,觀察是否可順利啓動---果真報錯了api
Caused by: org.apache.calcite.sql.validate.SqlValidatorException: Column 'SPT' not found in any table
定位一下,看看是什麼問題致使的,看了下以前寫的sql,猜想是由於UNION的時候,沒有在每一個表裏帶上SPT時間屬性字段以及其它字段,補上後sql以下app
select pro,throwable,level,ip,`count`,id,`time`,firstl,lastl from ( select pro,throwable,level,ip,count(*) as `count`, lastStrInGroupSkipNull(CONCAT_WS('_',KAFKA_TOPIC,CAST(KAFKA_PARTITION AS VARCHAR),CAST(KAFKA_OFFSET as VARCHAR))) as id, firstLong(l) as firstl, lastLong(l) as lastl, TUMBLE_END(SPT, INTERVAL '3' SECOND) as `time` from ( select pro,throwable,level,ip,l,KAFKA_TOPIC,KAFKA_PARTITION,KAFKA_OFFSET,SPT from input.`ymm-appmetric-dev-self1` where pro IS NOT NULL and throwable IS NOT NULL and level IS NOT NULL and level='ERROR' and ip IS NOT NULL union select pro,throwable,level,ip,l,KAFKA_TOPIC,KAFKA_PARTITION,KAFKA_OFFSET,SPT from input.`ymm-appmetric-dev-self2` where pro IS NOT NULL and throwable IS NOT NULL and level IS NOT NULL and level='ERROR' and ip IS NOT NULL ) group by pro,throwable,level,ip,TUMBLE(SPT,INTERVAL '3' SECOND) ) where 1=uniqueWithin100MS(pro,throwable,level,ip,`time`)
再重啓看看,此次應該差很少了吧---sql能夠順利編譯,可是仍是有錯oop
奇怪了,以前並無這樣的錯誤,贊,咱們來看看問題在哪!測試
咱們打開類的層次圖以下.net
借這個機會增強對這些類的理解!線程
---通過個人調試,發現問題出如今union上,不加這個Union,啥事沒有;加了就報錯,下面咱們再回到調用棧看看scala
一我的調試了一個下午,-_-||,最終發現知道修改一個地方就行
union -> union all
厲害了,給大佬低頭!
----好,既然解決了,咱們繼續來debug原理層!
測試了一下,發現多source跟單source相比,單source的watermark很好理解,可是多source就稍微複雜些,下面咱們來研究下原理!
首先,觀察一下現有的圖,以下所示:
下面再來研究一下線程,jstack一把
咱們來分析上面的線程,看看有沒有收穫!挑幾個重點線程講解
"VM Periodic Task Thread" os_prio=0 tid=0x00007f366825e800 nid=0x63d waiting on condition 百度能夠知道 該線程是JVM週期性任務調度的線程,它由WatcherThread建立,是一個單例對象。該線程在JVM內使用得比較頻繁,好比:按期的內存監控、JVM運行情況監控。
下面幾個是GC線程 "Gang worker#0 (Parallel GC Threads)" os_prio=0 tid=0x00007f3668031800 nid=0x626 runnable "Gang worker#1 (Parallel GC Threads)" os_prio=0 tid=0x00007f3668033800 nid=0x627 runnable "Gang worker#2 (Parallel GC Threads)" os_prio=0 tid=0x00007f3668035800 nid=0x628 runnable "Gang worker#3 (Parallel GC Threads)" os_prio=0 tid=0x00007f3668037800 nid=0x629 runnable "Gang worker#4 (Parallel GC Threads)" os_prio=0 tid=0x00007f3668039800 nid=0x62a runnable "Gang worker#5 (Parallel GC Threads)" os_prio=0 tid=0x00007f366803b000 nid=0x62b runnable "Gang worker#6 (Parallel GC Threads)" os_prio=0 tid=0x00007f366803d000 nid=0x62c runnable "Gang worker#7 (Parallel GC Threads)" os_prio=0 tid=0x00007f366803f000 nid=0x62d runnable "Concurrent Mark-Sweep GC Thread" os_prio=0 tid=0x00007f36680b7000 nid=0x630 runnable "Gang worker#0 (Parallel CMS Threads)" os_prio=0 tid=0x00007f36680b2800 nid=0x62e runnable "Gang worker#1 (Parallel CMS Threads)" os_prio=0 tid=0x00007f36680b4800 nid=0x62f runnable
---
"main" #1 prio=5 os_prio=0 tid=0x00007f3668019800 nid=0x625 waiting on condition [0x00007f3670010000] 主線程,在flink內部等待全部事情結束
"New I/O worker #1" #24 prio=5 os_prio=0 tid=0x00007f366995f000 nid=0x648 runnable [0x00007f3642cd1000] 內部netty線程
---
"Source: MyKafka010JsonTableSource -> from: (l, KAFKA_TOPIC, KAFKA_PARTITION, KAFKA_OFFSET, pro, throwable, level, ip, SPT) -> Timestamps/Watermarks -> where: (AND(=(level, _UTF-16LE'ERROR'), IS NOT NULL(pro), IS NOT NULL(throwable), IS NOT NULL(ip))), select: (pro, throwable, CAST(_UTF-16LE'ERROR') AS level, ip, SPT, CONCAT_WS(_UTF-16LE'_', KAFKA_TOPIC, CAST(KAFKA_PARTITION), CAST(KAFKA_OFFSET)) AS $f5, l) (1/1)" #51 prio=5 os_prio=0 tid=0x00007f363d11a800 nid=0x65e in Object.wait() [0x00007f3641ac3000] java.lang.Thread.State: WAITING (on object monitor) at java.lang.Object.wait(Native Method) at java.lang.Object.wait(Object.java:502) at org.apache.flink.streaming.connectors.kafka.internal.Handover.pollNext(Handover.java:74) - locked <0x00000000e6ee2df0> (a java.lang.Object) at org.apache.flink.streaming.connectors.kafka.internal.Kafka09Fetcher.runFetchLoop(Kafka09Fetcher.java:133) at org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumerBase.run(FlinkKafkaConsumerBase.java:721) at org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:87) at org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:56) at org.apache.flink.streaming.runtime.tasks.SourceStreamTask.run(SourceStreamTask.java:99) at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:306) at org.apache.flink.runtime.taskmanager.Task.run(Task.java:703) at java.lang.Thread.run(Thread.java:748) "Source: MyKafka010JsonTableSource -> from: (l, KAFKA_TOPIC, KAFKA_PARTITION, KAFKA_OFFSET, pro, throwable, level, ip, SPT) -> Timestamps/Watermarks -> where: (AND(=(level, _UTF-16LE'ERROR'), IS NOT NULL(pro), IS NOT NULL(throwable), IS NOT NULL(ip))), select: (pro, throwable, CAST(_UTF-16LE'ERROR') AS level, ip, SPT, CONCAT_WS(_UTF-16LE'_', KAFKA_TOPIC, CAST(KAFKA_PARTITION), CAST(KAFKA_OFFSET)) AS $f5, l) (1/1)" #50 prio=5 os_prio=0 tid=0x00007f363d120800 nid=0x65d in Object.wait() [0x00007f3641bc4000] java.lang.Thread.State: WAITING (on object monitor) at java.lang.Object.wait(Native Method) at java.lang.Object.wait(Object.java:502) at org.apache.flink.streaming.connectors.kafka.internal.Handover.pollNext(Handover.java:74) - locked <0x00000000e6ee2e98> (a java.lang.Object) at org.apache.flink.streaming.connectors.kafka.internal.Kafka09Fetcher.runFetchLoop(Kafka09Fetcher.java:133) at org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumerBase.run(FlinkKafkaConsumerBase.java:721) at org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:87) at org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:56) at org.apache.flink.streaming.runtime.tasks.SourceStreamTask.run(SourceStreamTask.java:99) at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:306) at org.apache.flink.runtime.taskmanager.Task.run(Task.java:703) at java.lang.Thread.run(Thread.java:748)
有2個線程是用來獲取消息,對於這2個線程來講,這2個消息不是直接讀取kafka,而是其它線程讀取kafka餵給這2個線程
---
"time attribute: (SPT) (1/1)" #53 prio=5 os_prio=0 tid=0x00007f363d8e4000 nid=0x662 in Object.wait() [0x00007f36418c1000] java.lang.Thread.State: WAITING (on object monitor) at java.lang.Object.wait(Native Method) at java.lang.Object.wait(Object.java:502) at org.apache.flink.runtime.io.network.partition.consumer.UnionInputGate.waitAndGetNextInputGate(UnionInputGate.java:205) - locked <0x00000000e6ee8210> (a java.util.ArrayDeque) at org.apache.flink.runtime.io.network.partition.consumer.UnionInputGate.getNextBufferOrEvent(UnionInputGate.java:163) at org.apache.flink.streaming.runtime.io.BarrierTracker.getNextNonBlocked(BarrierTracker.java:94) at org.apache.flink.streaming.runtime.io.StreamInputProcessor.processInput(StreamInputProcessor.java:209) at org.apache.flink.streaming.runtime.tasks.OneInputStreamTask.run(OneInputStreamTask.java:103) at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:306) at org.apache.flink.runtime.taskmanager.Task.run(Task.java:703) at java.lang.Thread.run(Thread.java:748) 這個線程對應了咱們sql裏的union算子
---
"groupBy: (pro, throwable, level, ip), window: (TumblingGroupWindow('w$, 'SPT, 3000.millis)), select: (pro, throwable, level, ip, COUNT(*) AS count, lastStrInGroupSkipNull($f5) AS id, firstLong(l) AS firstl, lastLong(l) AS lastl, start('w$) AS w$start, end('w$) AS w$end, rowtime('w$) AS w$rowtime, proctime('w$) AS w$proctime) -> where: (=(1, uniqueWithin100MS(pro, throwable, _UTF-16LE'ERROR', ip, w$end))), select: (pro, throwable, level, ip, count, id, w$end AS time, firstl, lastl) -> to: Row -> Sink: Kafka010JsonTableSink(pro, throwable, level, ip, count, id, time, firstl, lastl) (1/1)" #54 prio=5 os_prio=0 tid=0x00007f363fde3800 nid=0x664 in Object.wait() [0x00007f3641127000] java.lang.Thread.State: WAITING (on object monitor) at java.lang.Object.wait(Native Method) at java.lang.Object.wait(Object.java:502) at org.apache.flink.runtime.io.network.partition.consumer.SingleInputGate.getNextBufferOrEvent(SingleInputGate.java:533) - locked <0x00000000e6ee2d48> (a java.util.ArrayDeque) at org.apache.flink.runtime.io.network.partition.consumer.SingleInputGate.getNextBufferOrEvent(SingleInputGate.java:502) at org.apache.flink.streaming.runtime.io.BarrierTracker.getNextNonBlocked(BarrierTracker.java:94) at org.apache.flink.streaming.runtime.io.StreamInputProcessor.processInput(StreamInputProcessor.java:209) at org.apache.flink.streaming.runtime.tasks.OneInputStreamTask.run(OneInputStreamTask.java:103) at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:306) at org.apache.flink.runtime.taskmanager.Task.run(Task.java:703) at java.lang.Thread.run(Thread.java:748) 這個對應了group by算子
---生產者
"kafka-producer-network-thread | producer-1" #55 daemon prio=5 os_prio=0 tid=0x00007f364d0f0800 nid=0x667 runnable [0x00007f3640a26000] java.lang.Thread.State: RUNNABLE at sun.nio.ch.EPollArrayWrapper.epollWait(Native Method) at sun.nio.ch.EPollArrayWrapper.poll(EPollArrayWrapper.java:269) at sun.nio.ch.EPollSelectorImpl.doSelect(EPollSelectorImpl.java:93) at sun.nio.ch.SelectorImpl.lockAndDoSelect(SelectorImpl.java:86) - locked <0x00000000e6ef3358> (a sun.nio.ch.Util$3) - locked <0x00000000e6ef3340> (a java.util.Collections$UnmodifiableSet) - locked <0x00000000e6eedbd8> (a sun.nio.ch.EPollSelectorImpl) at sun.nio.ch.SelectorImpl.select(SelectorImpl.java:97) at org.apache.kafka.common.network.Selector.select(Selector.java:489) at org.apache.kafka.common.network.Selector.poll(Selector.java:298) at org.apache.kafka.clients.NetworkClient.poll(NetworkClient.java:349) at org.apache.kafka.clients.producer.internals.Sender.run(Sender.java:225) at org.apache.kafka.clients.producer.internals.Sender.run(Sender.java:126) at java.lang.Thread.run(Thread.java:748) 對應着生產者,直連kafka
---
"Time Trigger for Source: MyKafka010JsonTableSource -> from: (l, KAFKA_TOPIC, KAFKA_PARTITION, KAFKA_OFFSET, pro, throwable, level, ip, SPT) -> Timestamps/Watermarks -> where: (AND(=(level, _UTF-16LE'ERROR'), IS NOT NULL(pro), IS NOT NULL(throwable), IS NOT NULL(ip))), select: (pro, throwable, CAST(_UTF-16LE'ERROR') AS level, ip, SPT, CONCAT_WS(_UTF-16LE'_', KAFKA_TOPIC, CAST(KAFKA_PARTITION), CAST(KAFKA_OFFSET)) AS $f5, l) (1/1)" #57 daemon prio=5 os_prio=0 tid=0x00007f364d264800 nid=0x669 waiting on condition [0x00007f3640624000] java.lang.Thread.State: TIMED_WAITING (parking) at sun.misc.Unsafe.park(Native Method) - parking to wait for <0x00000000e6ef84c0> (a java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject) at java.util.concurrent.locks.LockSupport.parkNanos(LockSupport.java:215) at java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject.awaitNanos(AbstractQueuedSynchronizer.java:2078) at java.util.concurrent.ScheduledThreadPoolExecutor$DelayedWorkQueue.take(ScheduledThreadPoolExecutor.java:1093) at java.util.concurrent.ScheduledThreadPoolExecutor$DelayedWorkQueue.take(ScheduledThreadPoolExecutor.java:809) at java.util.concurrent.ThreadPoolExecutor.getTask(ThreadPoolExecutor.java:1067) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1127) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) at java.lang.Thread.run(Thread.java:748) "Time Trigger for Source: MyKafka010JsonTableSource -> from: (l, KAFKA_TOPIC, KAFKA_PARTITION, KAFKA_OFFSET, pro, throwable, level, ip, SPT) -> Timestamps/Watermarks -> where: (AND(=(level, _UTF-16LE'ERROR'), IS NOT NULL(pro), IS NOT NULL(throwable), IS NOT NULL(ip))), select: (pro, throwable, CAST(_UTF-16LE'ERROR') AS level, ip, SPT, CONCAT_WS(_UTF-16LE'_', KAFKA_TOPIC, CAST(KAFKA_PARTITION), CAST(KAFKA_OFFSET)) AS $f5, l) (1/1)" #56 daemon prio=5 os_prio=0 tid=0x00007f363e937800 nid=0x668 waiting on condition [0x00007f3640725000] java.lang.Thread.State: TIMED_WAITING (parking) at sun.misc.Unsafe.park(Native Method) - parking to wait for <0x00000000e6ee2bc8> (a java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject) at java.util.concurrent.locks.LockSupport.parkNanos(LockSupport.java:215) at java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject.awaitNanos(AbstractQueuedSynchronizer.java:2078) at java.util.concurrent.ScheduledThreadPoolExecutor$DelayedWorkQueue.take(ScheduledThreadPoolExecutor.java:1093) at java.util.concurrent.ScheduledThreadPoolExecutor$DelayedWorkQueue.take(ScheduledThreadPoolExecutor.java:809) at java.util.concurrent.ThreadPoolExecutor.getTask(ThreadPoolExecutor.java:1067) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1127) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) at java.lang.Thread.run(Thread.java:748) 每一個流對應着一個水印定時發送線程,由於我這邊的輸入是2個流 因此有2個水印發送線程
---
"Kafka Partition Discovery for Source: MyKafka010JsonTableSource -> from: (l, KAFKA_TOPIC, KAFKA_PARTITION, KAFKA_OFFSET, pro, throwable, level, ip, SPT) -> Timestamps/Watermarks -> where: (AND(=(level, _UTF-16LE'ERROR'), IS NOT NULL(pro), IS NOT NULL(throwable), IS NOT NULL(ip))), select: (pro, throwable, CAST(_UTF-16LE'ERROR') AS level, ip, SPT, CONCAT_WS(_UTF-16LE'_', KAFKA_TOPIC, CAST(KAFKA_PARTITION), CAST(KAFKA_OFFSET)) AS $f5, l) (1/1)" #61 prio=5 os_prio=0 tid=0x00007f364d25f000 nid=0x66c waiting on condition [0x00007f3640121000] java.lang.Thread.State: TIMED_WAITING (sleeping) at java.lang.Thread.sleep(Native Method) at org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumerBase$2.run(FlinkKafkaConsumerBase.java:701) at java.lang.Thread.run(Thread.java:748) "Kafka Partition Discovery for Source: MyKafka010JsonTableSource -> from: (l, KAFKA_TOPIC, KAFKA_PARTITION, KAFKA_OFFSET, pro, throwable, level, ip, SPT) -> Timestamps/Watermarks -> where: (AND(=(level, _UTF-16LE'ERROR'), IS NOT NULL(pro), IS NOT NULL(throwable), IS NOT NULL(ip))), select: (pro, throwable, CAST(_UTF-16LE'ERROR') AS level, ip, SPT, CONCAT_WS(_UTF-16LE'_', KAFKA_TOPIC, CAST(KAFKA_PARTITION), CAST(KAFKA_OFFSET)) AS $f5, l) (1/1)" #59 prio=5 os_prio=0 tid=0x00007f363f4bc800 nid=0x66a waiting on condition [0x00007f3640323000] java.lang.Thread.State: TIMED_WAITING (sleeping) at java.lang.Thread.sleep(Native Method) at org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumerBase$2.run(FlinkKafkaConsumerBase.java:701) at java.lang.Thread.run(Thread.java:748) 2個自動分區發現線程
---
"Kafka 0.10 Fetcher for Source: MyKafka010JsonTableSource -> from: (l, KAFKA_TOPIC, KAFKA_PARTITION, KAFKA_OFFSET, pro, throwable, level, ip, SPT) -> Timestamps/Watermarks -> where: (AND(=(level, _UTF-16LE'ERROR'), IS NOT NULL(pro), IS NOT NULL(throwable), IS NOT NULL(ip))), select: (pro, throwable, CAST(_UTF-16LE'ERROR') AS level, ip, SPT, CONCAT_WS(_UTF-16LE'_', KAFKA_TOPIC, CAST(KAFKA_PARTITION), CAST(KAFKA_OFFSET)) AS $f5, l) (1/1)" #60 daemon prio=5 os_prio=0 tid=0x00007f364d269800 nid=0x66d runnable [0x00007f363bffe000] java.lang.Thread.State: RUNNABLE at sun.nio.ch.EPollArrayWrapper.epollWait(Native Method) at sun.nio.ch.EPollArrayWrapper.poll(EPollArrayWrapper.java:269) at sun.nio.ch.EPollSelectorImpl.doSelect(EPollSelectorImpl.java:93) at sun.nio.ch.SelectorImpl.lockAndDoSelect(SelectorImpl.java:86) - locked <0x00000000e73f0888> (a sun.nio.ch.Util$3) - locked <0x00000000e73f0870> (a java.util.Collections$UnmodifiableSet) - locked <0x00000000e7279b20> (a sun.nio.ch.EPollSelectorImpl) at sun.nio.ch.SelectorImpl.select(SelectorImpl.java:97) at org.apache.kafka.common.network.Selector.select(Selector.java:489) at org.apache.kafka.common.network.Selector.poll(Selector.java:298) at org.apache.kafka.clients.NetworkClient.poll(NetworkClient.java:349) at org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:226) - locked <0x00000000e7497ec0> (a org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient) at org.apache.kafka.clients.consumer.KafkaConsumer.pollOnce(KafkaConsumer.java:1047) at org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:995) at org.apache.flink.streaming.connectors.kafka.internal.KafkaConsumerThread.run(KafkaConsumerThread.java:257) "Kafka 0.10 Fetcher for Source: MyKafka010JsonTableSource -> from: (l, KAFKA_TOPIC, KAFKA_PARTITION, KAFKA_OFFSET, pro, throwable, level, ip, SPT) -> Timestamps/Watermarks -> where: (AND(=(level, _UTF-16LE'ERROR'), IS NOT NULL(pro), IS NOT NULL(throwable), IS NOT NULL(ip))), select: (pro, throwable, CAST(_UTF-16LE'ERROR') AS level, ip, SPT, CONCAT_WS(_UTF-16LE'_', KAFKA_TOPIC, CAST(KAFKA_PARTITION), CAST(KAFKA_OFFSET)) AS $f5, l) (1/1)" #58 daemon prio=5 os_prio=0 tid=0x00007f363f4be800 nid=0x66b runnable [0x00007f3640222000] java.lang.Thread.State: RUNNABLE at sun.nio.ch.EPollArrayWrapper.epollWait(Native Method) at sun.nio.ch.EPollArrayWrapper.poll(EPollArrayWrapper.java:269) at sun.nio.ch.EPollSelectorImpl.doSelect(EPollSelectorImpl.java:93) at sun.nio.ch.SelectorImpl.lockAndDoSelect(SelectorImpl.java:86) - locked <0x00000000e6ef0758> (a sun.nio.ch.Util$3) - locked <0x00000000e6ef0740> (a java.util.Collections$UnmodifiableSet) - locked <0x00000000e6ee0248> (a sun.nio.ch.EPollSelectorImpl) at sun.nio.ch.SelectorImpl.select(SelectorImpl.java:97) at org.apache.kafka.common.network.Selector.select(Selector.java:489) at org.apache.kafka.common.network.Selector.poll(Selector.java:298) at org.apache.kafka.clients.NetworkClient.poll(NetworkClient.java:349) at org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:226) - locked <0x00000000e6f03398> (a org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient) at org.apache.kafka.clients.consumer.KafkaConsumer.pollOnce(KafkaConsumer.java:1047) at org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:995) at org.apache.flink.streaming.connectors.kafka.internal.KafkaConsumerThread.run(KafkaConsumerThread.java:257) 對應着2個直連kafka的生產者線程
線程debug完了,下面咱們來看每一個線程作什麼事情!這裏先簡單交代一下消息記錄和watermark的背景
對於每一個流,有1個消費者線程來讀取kafka的消息 而後經過本地內存交換,餵給另一個線程,就是文中Handover字樣的線程,這個線程會把消息往下游發送,同時,有1個水印線程定時探測是否有更大時間戳出現,出現的話,把這個時間戳放在一個水印事件裏下廣播給下游.
---下面先來debug下Handover線程,看看是如何消息餵給unionInputGate線程的
斷點在
stop at org.apache.flink.streaming.connectors.kafka.internal.Kafka09Fetcher:154
跑起來!
而後,發送一條消息到kafka,斷點順利命中
接下來就是具體看消息的流轉過程!
消息處理過程當中,會記錄下當前事件的時間戳,位置在
做用是若是時間戳比當前值更大,則更新這個時間戳,後面會有水印線程定時讀取這個值決定是否須要發送水印信息
好,繼續觀察消息的流動,執行到了下面這個地方
[1] org.apache.flink.runtime.io.network.api.writer.RecordWriter.emit (RecordWriter.java:104) [2] org.apache.flink.streaming.runtime.io.StreamRecordWriter.emit (StreamRecordWriter.java:81) [3] org.apache.flink.streaming.runtime.io.RecordWriterOutput.pushToRecordWriter (RecordWriterOutput.java:107) [4] org.apache.flink.streaming.runtime.io.RecordWriterOutput.collect (RecordWriterOutput.java:89) [5] org.apache.flink.streaming.runtime.io.RecordWriterOutput.collect (RecordWriterOutput.java:45) [6] org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect (AbstractStreamOperator.java:679) [7] org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect (AbstractStreamOperator.java:657) [8] org.apache.flink.streaming.api.operators.TimestampedCollector.collect (TimestampedCollector.java:51) [9] org.apache.flink.table.runtime.CRowWrappingCollector.collect (CRowWrappingCollector.scala:37) [10] org.apache.flink.table.runtime.CRowWrappingCollector.collect (CRowWrappingCollector.scala:28) [11] DataStreamCalcRule$69.processElement (null) [12] org.apache.flink.table.runtime.CRowProcessRunner.processElement (CRowProcessRunner.scala:66) [13] org.apache.flink.table.runtime.CRowProcessRunner.processElement (CRowProcessRunner.scala:35) [14] org.apache.flink.streaming.api.operators.ProcessOperator.processElement (ProcessOperator.java:66) [15] org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.pushToOperator (OperatorChain.java:560) [16] org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect (OperatorChain.java:535) [17] org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect (OperatorChain.java:515) [18] org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect (AbstractStreamOperator.java:679) [19] org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect (AbstractStreamOperator.java:657) [20] org.apache.flink.streaming.runtime.operators.TimestampsAndPeriodicWatermarksOperator.processElement (TimestampsAndPeriodicWatermarksOperator.java:67) [21] org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.pushToOperator (OperatorChain.java:560) [22] org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect (OperatorChain.java:535) [23] org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect (OperatorChain.java:515) [24] org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect (AbstractStreamOperator.java:679) [25] org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect (AbstractStreamOperator.java:657) [26] org.apache.flink.streaming.api.operators.TimestampedCollector.collect (TimestampedCollector.java:51) [27] org.apache.flink.table.runtime.CRowWrappingCollector.collect (CRowWrappingCollector.scala:37) [28] org.apache.flink.table.runtime.CRowWrappingCollector.collect (CRowWrappingCollector.scala:28) [29] DataStreamSourceConversion$23.processElement (null) [30] org.apache.flink.table.runtime.CRowOutputProcessRunner.processElement (CRowOutputProcessRunner.scala:67) [31] org.apache.flink.streaming.api.operators.ProcessOperator.processElement (ProcessOperator.java:66) [32] org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.pushToOperator (OperatorChain.java:560) [33] org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect (OperatorChain.java:535) [34] org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect (OperatorChain.java:515) [35] org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect (AbstractStreamOperator.java:679) [36] org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect (AbstractStreamOperator.java:657) [37] org.apache.flink.streaming.api.operators.StreamSourceContexts$ManualWatermarkContext.processAndCollectWithTimestamp (StreamSourceContexts.java:310) [38] org.apache.flink.streaming.api.operators.StreamSourceContexts$WatermarkContext.collectWithTimestamp (StreamSourceContexts.java:409) [39] org.apache.flink.streaming.connectors.kafka.internals.AbstractFetcher.emitRecordWithTimestamp (AbstractFetcher.java:398) [40] org.apache.flink.streaming.connectors.kafka.internal.Kafka010Fetcher.emitRecord (Kafka010Fetcher.java:89) [41] org.apache.flink.streaming.connectors.kafka.internal.Kafka09Fetcher.runFetchLoop (Kafka09Fetcher.java:154) [42] org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumerBase.run (FlinkKafkaConsumerBase.java:721) [43] org.apache.flink.streaming.api.operators.StreamSource.run (StreamSource.java:87) [44] org.apache.flink.streaming.api.operators.StreamSource.run (StreamSource.java:56) [45] org.apache.flink.streaming.runtime.tasks.SourceStreamTask.run (SourceStreamTask.java:99) [46] org.apache.flink.streaming.runtime.tasks.StreamTask.invoke (StreamTask.java:306) [47] org.apache.flink.runtime.taskmanager.Task.run (Task.java:703) [48] java.lang.Thread.run (Thread.java:748)
看一下這裏的即將執行的代碼
public void emit(T record) throws IOException, InterruptedException { for (int targetChannel : channelSelector.selectChannels(record, numChannels)) { sendToTarget(record, targetChannel); } }
這裏的print numChannels
numChannels = 1 --->由於咱們有一個union操做,union天然是全部源歸一!這就對了!
---最後放入消息並提醒消費線程,完整的調用棧以下:
[1] org.apache.flink.runtime.io.network.partition.consumer.SingleInputGate.queueChannel (SingleInputGate.java:623) [2] org.apache.flink.runtime.io.network.partition.consumer.SingleInputGate.notifyChannelNonEmpty (SingleInputGate.java:612) [3] org.apache.flink.runtime.io.network.partition.consumer.InputChannel.notifyChannelNonEmpty (InputChannel.java:121) [4] org.apache.flink.runtime.io.network.partition.consumer.LocalInputChannel.notifyDataAvailable (LocalInputChannel.java:202) [5] org.apache.flink.runtime.io.network.partition.PipelinedSubpartitionView.notifyDataAvailable (PipelinedSubpartitionView.java:56) [6] org.apache.flink.runtime.io.network.partition.PipelinedSubpartition.notifyDataAvailable (PipelinedSubpartition.java:290) [7] org.apache.flink.runtime.io.network.partition.PipelinedSubpartition.flush (PipelinedSubpartition.java:76) [8] org.apache.flink.runtime.io.network.partition.ResultPartition.flush (ResultPartition.java:269) [9] org.apache.flink.runtime.io.network.api.writer.RecordWriter.sendToTarget (RecordWriter.java:149) [10] org.apache.flink.runtime.io.network.api.writer.RecordWriter.emit (RecordWriter.java:105) [11] org.apache.flink.streaming.runtime.io.StreamRecordWriter.emit (StreamRecordWriter.java:81) [12] org.apache.flink.streaming.runtime.io.RecordWriterOutput.pushToRecordWriter (RecordWriterOutput.java:107) [13] org.apache.flink.streaming.runtime.io.RecordWriterOutput.collect (RecordWriterOutput.java:89) [14] org.apache.flink.streaming.runtime.io.RecordWriterOutput.collect (RecordWriterOutput.java:45) [15] org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect (AbstractStreamOperator.java:679) [16] org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect (AbstractStreamOperator.java:657) [17] org.apache.flink.streaming.api.operators.TimestampedCollector.collect (TimestampedCollector.java:51) [18] org.apache.flink.table.runtime.CRowWrappingCollector.collect (CRowWrappingCollector.scala:37) [19] org.apache.flink.table.runtime.CRowWrappingCollector.collect (CRowWrappingCollector.scala:28) [20] DataStreamCalcRule$69.processElement (null) [21] org.apache.flink.table.runtime.CRowProcessRunner.processElement (CRowProcessRunner.scala:66) [22] org.apache.flink.table.runtime.CRowProcessRunner.processElement (CRowProcessRunner.scala:35) [23] org.apache.flink.streaming.api.operators.ProcessOperator.processElement (ProcessOperator.java:66) [24] org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.pushToOperator (OperatorChain.java:560) [25] org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect (OperatorChain.java:535) [26] org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect (OperatorChain.java:515) [27] org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect (AbstractStreamOperator.java:679) [28] org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect (AbstractStreamOperator.java:657) [29] org.apache.flink.streaming.runtime.operators.TimestampsAndPeriodicWatermarksOperator.processElement (TimestampsAndPeriodicWatermarksOperator.java:67) [30] org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.pushToOperator (OperatorChain.java:560) [31] org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect (OperatorChain.java:535) [32] org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect (OperatorChain.java:515) [33] org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect (AbstractStreamOperator.java:679) [34] org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect (AbstractStreamOperator.java:657) [35] org.apache.flink.streaming.api.operators.TimestampedCollector.collect (TimestampedCollector.java:51) [36] org.apache.flink.table.runtime.CRowWrappingCollector.collect (CRowWrappingCollector.scala:37) [37] org.apache.flink.table.runtime.CRowWrappingCollector.collect (CRowWrappingCollector.scala:28) [38] DataStreamSourceConversion$23.processElement (null) [39] org.apache.flink.table.runtime.CRowOutputProcessRunner.processElement (CRowOutputProcessRunner.scala:67) [40] org.apache.flink.streaming.api.operators.ProcessOperator.processElement (ProcessOperator.java:66) [41] org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.pushToOperator (OperatorChain.java:560) [42] org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect (OperatorChain.java:535) [43] org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect (OperatorChain.java:515) [44] org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect (AbstractStreamOperator.java:679) [45] org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect (AbstractStreamOperator.java:657) [46] org.apache.flink.streaming.api.operators.StreamSourceContexts$ManualWatermarkContext.processAndCollectWithTimestamp (StreamSourceContexts.java:310) [47] org.apache.flink.streaming.api.operators.StreamSourceContexts$WatermarkContext.collectWithTimestamp (StreamSourceContexts.java:409) [48] org.apache.flink.streaming.connectors.kafka.internals.AbstractFetcher.emitRecordWithTimestamp (AbstractFetcher.java:398) [49] org.apache.flink.streaming.connectors.kafka.internal.Kafka010Fetcher.emitRecord (Kafka010Fetcher.java:89) [50] org.apache.flink.streaming.connectors.kafka.internal.Kafka09Fetcher.runFetchLoop (Kafka09Fetcher.java:154) [51] org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumerBase.run (FlinkKafkaConsumerBase.java:721) [52] org.apache.flink.streaming.api.operators.StreamSource.run (StreamSource.java:87) [53] org.apache.flink.streaming.api.operators.StreamSource.run (StreamSource.java:56) [54] org.apache.flink.streaming.runtime.tasks.SourceStreamTask.run (SourceStreamTask.java:99) [55] org.apache.flink.streaming.runtime.tasks.StreamTask.invoke (StreamTask.java:306) [56] org.apache.flink.runtime.taskmanager.Task.run (Task.java:703) [57] java.lang.Thread.run (Thread.java:748)
---水印的處理應該也是相似的,因此接下來,咱們來看Union所在的線程
咱們再來複習下上面裏提到的這個線程的調用棧
"time attribute: (SPT) (1/1)" #53 prio=5 os_prio=0 tid=0x00007f363d8e4000 nid=0x662 in Object.wait() [0x00007f36418c1000] java.lang.Thread.State: WAITING (on object monitor) at java.lang.Object.wait(Native Method) at java.lang.Object.wait(Object.java:502) at org.apache.flink.runtime.io.network.partition.consumer.UnionInputGate.waitAndGetNextInputGate(UnionInputGate.java:205) - locked <0x00000000e6ee8210> (a java.util.ArrayDeque) at org.apache.flink.runtime.io.network.partition.consumer.UnionInputGate.getNextBufferOrEvent(UnionInputGate.java:163) at org.apache.flink.streaming.runtime.io.BarrierTracker.getNextNonBlocked(BarrierTracker.java:94) at org.apache.flink.streaming.runtime.io.StreamInputProcessor.processInput(StreamInputProcessor.java:209) at org.apache.flink.streaming.runtime.tasks.OneInputStreamTask.run(OneInputStreamTask.java:103) at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:306) at org.apache.flink.runtime.taskmanager.Task.run(Task.java:703) at java.lang.Thread.run(Thread.java:748) 這個線程對應了咱們sql裏的union算子
上面這個圖,是等待有消息過來就提取消息,任何一個源有消息都會觸發消息提取,不然wait
---注意:這裏的消息有4種類型,通常咱們只須要關注record+watermark便可
具體地點是:
---這裏講一下,關於LatencyMarker,默認2秒鐘發送一次,截圖以下
其它的不論是record仍是watermark都會往下發送!
下面咱們來在union裏同時針對record和watermark打斷點,猜一猜哪一個斷點先被觸發?
斷點位於【針對flink-1.5版本】
Breakpoints set: breakpoint org.apache.flink.streaming.runtime.io.StreamInputProcessor:184 breakpoint org.apache.flink.streaming.runtime.io.StreamInputProcessor:198
觸發的順序以下:
---跟想的是同樣的! 下面就去研究下groupby線程
"groupBy: (pro, throwable, level, ip), window: (TumblingGroupWindow('w$, 'SPT, 3000.millis)), select: (pro, throwable, level, ip, COUNT(*) AS count, lastStrInGroupSkipNull($f5) AS id, firstLong(l) AS firstl, lastLong(l) AS lastl, start('w$) AS w$start, end('w$) AS w$end, rowtime('w$) AS w$rowtime, proctime('w$) AS w$proctime) -> where: (=(1, uniqueWithin100MS(pro, throwable, _UTF-16LE'ERROR', ip, w$end))), select: (pro, throwable, level, ip, count, id, w$end AS time, firstl, lastl) -> to: Row -> Sink: Kafka010JsonTableSink(pro, throwable, level, ip, count, id, time, firstl, lastl) (1/1)" #54 prio=5 os_prio=0 tid=0x00007f363fde3800 nid=0x664 in Object.wait() [0x00007f3641127000] java.lang.Thread.State: WAITING (on object monitor) at java.lang.Object.wait(Native Method) at java.lang.Object.wait(Object.java:502) at org.apache.flink.runtime.io.network.partition.consumer.SingleInputGate.getNextBufferOrEvent(SingleInputGate.java:533) - locked <0x00000000e6ee2d48> (a java.util.ArrayDeque) at org.apache.flink.runtime.io.network.partition.consumer.SingleInputGate.getNextBufferOrEvent(SingleInputGate.java:502) at org.apache.flink.streaming.runtime.io.BarrierTracker.getNextNonBlocked(BarrierTracker.java:94) at org.apache.flink.streaming.runtime.io.StreamInputProcessor.processInput(StreamInputProcessor.java:209) at org.apache.flink.streaming.runtime.tasks.OneInputStreamTask.run(OneInputStreamTask.java:103) at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:306) at org.apache.flink.runtime.taskmanager.Task.run(Task.java:703) at java.lang.Thread.run(Thread.java:748) 這個對應了group by算子
針對group by來講,最重要的環節,這個其實跟union線程同樣的,也是在
org.apache.flink.streaming.runtime.io.StreamInputProcessor.processInput
這裏面來作事件的分發,因此斷點都是同樣的
---
這裏主要強調,在groupby處理watermark時的位置以下:【尤爲是針對多個source來講,很容易出問題】
這個時候,我意識到在groupby線程中來觀察watermark還早了點,由於在union線程中針對watermark的處理還有一些祕密
因此咱們回到union線程來挖這些祕密,把groupby線程用suspend命令掛起來,專門debug union線程便可!
---打個斷點【針對flink-1.5】
stop at org.apache.flink.streaming.runtime.io.StreamInputProcessor:184
研究了一把,大體明白原理了,這麼說吧,線程模型以下
流1------- | | | | | |---------->union線程的watermark--------->groupby線程的watermark | | | | 流2-------
其中,流1和流2---每次都發送本身看到的最大時間戳發送個下游(看到小的就什麼都不作)
union這裏會動態更新流1和流2的各自所看到的最大時間戳,同時取Min(流1的最大時間戳,流2的最大時間戳),跟上一次的值比較
若是>上一次的Min值,則發送給group by.
---我以爲讀者看到這裏,確定已經懵逼了,我來解釋下思想
強調一下:消息在中間過程當中不攔截,直達最後的windowoperator那裏作windowLate判斷決定是否丟棄! =========================================================================================== 對於流1來講,它每次發送本身已知的最大時間戳給下游,就是說「你好,下游,對我來講小於這個時間戳的就算是延遲消息,你看着辦」 對於流2來講,它每次發送本身已知的最大時間戳給下游,就是說「你好,下游,對我來講小於這個時間戳的就算是延遲消息,你看着辦」 ---對於union來講,這裏複雜些 它取值min( 流1的max時間戳,流2的max時間戳)跟上一次的min( 流1的max時間戳,流2的max時間戳)比較, 若是發現遞增了,就把當前較大的這個min值發送給下游,說「你好,下游,全局來講,對我來講小於這個時間戳的就算是延遲消息,我只能幫到這裏了,已經盡力拖住時間戳了,你看着辦」 ---對於groupby來講,它收到時間戳,每次保留最大值,而後參考最大值來快速決定每一個消息是否是延遲消息(最大值-可容忍的延遲消息)。 因此,在多源狀況下,判斷全局一個消息是否是延遲消息,實際上由min( 流1的max時間戳,流2的max時間戳)這個值來參與決定 --- 咱們再跳出來想想這個事情,我估計讀者最懵逼的地方就是union爲啥取每一個流的最小值,而不是最大值 咱們就這麼理解吧,若是取最大值,那消費慢的流的數據大部分都成爲了late數據被丟棄,union就會被打 因此union爲了防止被打,它不想惹衆怒,就取了min(每一個流),這樣全部人都無話可說了 union旁白:我都取了大家每一個流的各自的時間戳最大值的全局最小值,還要我怎麼樣, 最慢的那個流也不會說啥了,由於取的就是它這個流上報的自身最大值。 上面都是從技術角度來闡述這個事情,那麼咱們再拔高一下,從更高的層次來看這個事情 其實就是讓更多的數據沒有成爲late數據,歸入正常運算範圍內,由min( 流1的max時間戳,流2的max時間戳)的遞增來推進全局windowoperator的計算輸出結果. 相應的,消費最慢的流會拖累最終業務數據的延遲生成.
---讀者能夠再細細琢磨裏面的門道,下面咱們來作邏輯測試!驗證咱們是否真正理解了這個遊戲規則!
背景:容忍延遲3000毫秒 下面每行的格式就是:流名稱 + 時間戳 ,每次只輸出1條 1)流1 + 1545703896000 2)流1 + 1545703896000 3)流2 + 1545703896000 4)流2 + 1545703898999 5)流2 + 1545703899000 6)流1 + 1545703899000 7)流1 + 1545703900000 8)流2 + 1545703902000-1 --->這個不會觸發windowOperator的輸出,由於流1的最小值還不夠 9)流1 + 1545703902000-1 --->這個纔會觸發windowOperator的輸出 正確輸出了,記住,必定要2個流 【齊頭並進,理實交融】
可是,其實,僅僅研究到這一步,並無徹底結束,欲知後事如何請聽下回分解 :)
原文連接: