1. 自定義Sink寫入hbase?java
使用的是原生的hbase客戶端,能夠本身控制每多少條記錄刷新一次。遇到了幾個坑致使數據寫不到hbase裏邊去:node
- 集羣hbase版本和客戶端版本不一致(版本1和版本2相互之間會有衝突)
- Jar包衝突
例如protobuf-java版本衝突,常見的是兩個關鍵錯誤,java.io.IOException: java.lang.reflect.InvocationTargetException 和 Caused by: java.lang.NoClassDefFoundError: Could not initialize class org.apache.hadoop.hbase.protobuf.ProtobufUtil。apache
2. Flink 消費Kafka偏移量
Flink讀寫Kafka,若是使用Consumer08的話,偏移量會提交Zk,下邊這個配置能夠寫在Conf文件中,提交偏移量的Zk能夠直接指定。Consumer09之後版本就不向Zk提交了,Kafka本身會單獨搞一個Topic存儲消費狀態。bootstrap
1 xxxx08 { 2 bootstrap.servers = "ip:9092" 3 zookeeper.connect = "ip1:2181,ip2/vio" 4 group.id = "group1" 5 auto.commit.enable = true 6 auto.commit.interval.ms = 30000 7 zookeeper.session.timeout.ms = 60000 8 zookeeper.connection.timeout.ms = 30000 9 }
1 final Properties consumerProps = ConfigUtil 2 .getProperties(config, 「xxxx08");// 使用本身編寫的Util函數讀取配置 3 4 final FlinkKafkaConsumer08<String> source = 5 new FlinkKafkaConsumer08<String>(topic, new SimpleStringSchema(), consumerProps);
3. Flink 的日誌打印
Flink打印日誌的時候,日誌打印到哪,日誌文件是否是切塊,並非在工程resource下配置文件裏指定的!!!而是在flink/conf中指定的,好比我安裝的Flink On Yarn模式,只須要在安裝的機器上flink/conf文件夾下修改對應的配置文件便可,以下:session
具體能夠參考:Flink日誌配置ide
4. Flink 的akka時間超時
這個問題比較常見,遇見過兩次,總結下:函數
首先是集羣機器負載比較高,有的機器負載百分之幾萬都有,在這時候taskmanager、jobmanager就會報akka超時的異常,能夠適當增大akka超時時間扛過這段時間;oop
而後最多見的是程序裏調用外部接口,延遲較高,有的是5秒甚至10秒,這種時候akka就會超時測試
5. Flink 的讀HDFS寫Kafka
flink讀hdfs的時候用了DataSet,本身在中間map裏邊已經寫到kafka裏邊了,因此不想要sink,但flink要求必須有sink,因此只能加個.output(new DiscardingOutputFormat<>()),這樣對程序不會形成影響。 fetch
6. 本地測試Flink
本地測試Flink偶爾會報錯,記錄下:
(1)本地Apache flink集羣沒有運行,會報下面鏈接被拒絕的錯誤,你只須要啓動它:./bin/start-cluster.sh
org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannel $ AnnotatedConnectException:拒絕鏈接:localhost / 127.0.0.1:8081
7. Flink on YARN
8. Flink並行度設置


java.io.IOException: Insufficient number of network buffers: required 4, but only 0 available. The total number of network buffers is currently set to 6472 of 32768 bytes each. You can increase this number by setting the configuration keys 'taskmanager.network.memory.fraction', 'taskmanager.network.memory.min', and 'taskmanager.network.memory.max'. at org.apache.flink.runtime.io.network.buffer.NetworkBufferPool.createBufferPool(NetworkBufferPool.java:272) at org.apache.flink.runtime.io.network.buffer.NetworkBufferPool.createBufferPool(NetworkBufferPool.java:257) at org.apache.flink.runtime.io.network.NetworkEnvironment.setupInputGate(NetworkEnvironment.java:278) at org.apache.flink.runtime.io.network.NetworkEnvironment.registerTask(NetworkEnvironment.java:224) at org.apache.flink.runtime.taskmanager.Task.run(Task.java:608) at java.lang.Thread.run(Thread.java:748)
並行度設置不合理,按報的錯設置便可:
source.parallelism = 40 map.parallelism = 40 sink.parallelism = 40
9. Flink常見報錯
- java.lang.Exception: Container released on a lost node
異常緣由是 Container 運行所在節點在 YARN 集羣中被標記爲 LOST,該節點上的全部 Container 都將被 YARN RM 主動釋放並通知 AM,JobManager 收到此異常後會 Failover 自行恢復(從新申請資源並啓動新的 TaskManager),遺留的 TaskManager 進程可在超時後自行退出
- Could not build the program from JAR file.
這個問題的迷惑性較大,不少時候並不是指定運行的 JAR 文件問題,而是提交過程當中發生了異常,須要根據日誌信息進一步排查。
10. Flink消費Kafka單條數據過大起Lag
能夠在kafka consumer中設置下列參數:
pro.put("fetch.message.max.bytes", "8388608"); pro.put("max.partition.fetch.bytes", "8388608");