寬帶優化之Flume Avro在個推的實踐

引言
帶寬不夠用,靠這個方法我讓數據壓縮率達到了80%以上apache

如何在有限的資源下解決性能瓶頸問題是運維永恆的痛點。這期文章,Mr.Tech 邀請了在性能優化方面有着豐富經驗的個推高級運維工程師白子畫,爲你們分享寬帶優化之Flume Avro在個推的實踐。性能優化

在異地日誌數據互傳的場景下,咱們從傳輸數據着手,藉助Avro的特性使數據壓縮率達80%以上,解決了個推在實際生產過程當中遇到的帶寬不夠用的問題。本文咱們將向你們介紹Flume Avro在數據傳輸過程當中所承擔的不一樣角色,以及如何保證數據的完整性和傳輸的高效性,並分享在實際業務中取得的優化效果。服務器

背景
個推做爲專業的數據智能服務商,已經成功服務了數十萬APP,每日的消息下發量達百億級別,由此產生了海量日誌數據。爲了應對業務上的各類需求,咱們須要採集並集中化日誌進行計算,爲此個推選用了高可用的、高可靠的、分佈式的Flume系統以對海量日誌進行採集、聚合和傳輸。此外,個推也不斷對Flume進行迭代升級,以實現本身對日誌的特定需求。網絡

原有的異地機房日誌匯聚方式,整個流程相對來講比較簡單,A機房業務產生的日誌經過多種方式寫入該機房Kafka集羣,而後B機房的Flume經過網絡專線實時消費A機房Kafka的日誌數據後寫入本機房的Kafka集羣,全部機房的數據就是經過相同方式在B機房Kakfa集羣中集中化管理。如圖一所示:數據結構

圖一:原有異地日誌傳輸模式運維

可是隨着業務量的不斷增長,日誌數據在逐漸增多的過程當中對帶寬要求變高,帶寬的瓶頸問題日益凸顯。按照1G的專線帶寬成本2~3w/月來計算,一個異地機房一年僅專線帶寬擴容成本就高達30w以上。對此,如何找到一種成本更加低廉且符合當前業務預期的傳輸方案呢?Avro有快速壓縮的二進制數據形式,並能有效節約數據存儲空間和網絡傳輸帶寬,從而成爲優選方案。分佈式

優化思路
Avro簡介工具

Avro是一個數據序列化系統。它是Hadoop的一個子項目,也是Apache的一個獨立的項目,其主要特色以下:
● 豐富的數據結構;
● 可壓縮、快速的二進制數據類型;
● 可持久化存儲的文件類型;
● 遠程過程調用(RPC);
● 提供的機制使動態語言能夠方便地處理數據。
具體可參考官方網站:http://avro.apache.org/oop

Flume Avro方案性能

Flume的RPC Source是Avro Source,它被設計爲高擴展的RPC服務端,能從其餘Flume Agent 的Avro Sink或者Flume SDK客戶端,接收數據到Flume Agent中,具體流程如圖二所示:

圖二:Avro Source流程

針對該模式,咱們的日誌傳輸方案計劃變動爲A機房部署Avro Sink用以消費該機房Kafka集羣的日誌數據,壓縮後發送到B機房的Avro Source,而後解壓寫入B機房的Kafka集羣,具體的傳輸模式如圖三所示:

圖三:Flume Avro傳輸模式

可能存在的問題

咱們預估可能存在的問題主要有如下三點:
● 當專線故障的時候,數據是否能保證完整性;
● 該模式下CPU和內存等硬件的消耗評估;
● 傳輸性能問題。

驗證狀況
針對以上的幾個問題,咱們作了幾項對比實驗。
環境準備狀況說明:

  1. 兩臺服務器192.168.10.81和192.168.10.82,以及每臺服務器上對應一個Kakfa集羣,模擬A機房和B機房;
  2. 兩個Kafka集羣中對應topicA(源端)和topicB(目標端)。在topicA中寫入合計大小11G的日誌數據用來模擬原始端日誌數據。
  3. 192.168.10.82上部署一個Flume,模擬原有傳輸方式。
  4. 192.168.10.81服務器部署Avro Sink,192.168.10.82部署Avro Source,模擬Flume Avro傳輸模式。

原有Flume模式驗證(非Avro)

監控Kafka消費狀況:

81流量統計:

82流量統計:

消費所有消息耗時:20min
消費總日誌條數統計:129,748,260
總流量:13.5G

Avro模式驗證

配置說明:

Avro Sink配置:

kafkasink 是kafkatokafka的sinks的名字,可配多個,空格分開

kafkatokafka.sources = kafka_dmc_bullet
kafkatokafka.channels = channel_dmc_bullet
kafkatokafka.sinks = kafkasink_dmc_bullet
kafkatokafka.sources.kafka_dmc_bullet.type = org.apache.flume.source.kafka.KafkaSource
kafkatokafka.sources.kafka_dmc_bullet.channels = channel_dmc_bullet
kafkatokafka.sources.kafka_dmc_bullet.zookeeperConnect = 192.168.10.81:2181
kafkatokafka.sources.kafka_dmc_bullet.topic = topicA
kafkatokafka.sources.kafka_dmc_bullet.kafka.zookeeper.connection.timeout.ms = 150000
kafkatokafka.sources.kafka_dmc_bullet.kafka.consumer.timeout.ms = 10000
kafkatokafka.sources.kafka_dmc_bullet.kafka.group.id = flumeavro
kafkatokafka.sources.kafka_dmc_bullet.batchSize = 5000

source kafkasink_dmc_bullet的配置,可配置多個sink提升壓縮傳輸效率

kafkatokafka.sinks.kafkasink_dmc_bullet.type = org.apache.flume.sink.AvroSink
kafkatokafka.sinks.kafkasink_dmc_bullet.hostname = 192.168.10.82
kafkatokafka.sinks.kafkasink_dmc_bullet.port = 55555 //與source的rpc端口一一對應
kafkatokafka.sinks.kafkasink_dmc_bullet.compression-type = deflate //壓縮模式
kafkatokafka.sinks.kafkasink_dmc_bullet.compression-level = 6 //壓縮率1~9
kafkatokafka.sinks.kafkasink_dmc_bullet.channel = channel_dmc_bullet
kafkatokafka.sinks.kafkasink_dmc_bullet.channel = channel_dmc_bullet
kafkatokafka.sinks.kafkasink_dmc_bullet.requiredAcks = 1
kafkatokafka.sinks.kafkasink_dmc_bullet.batchSize = 5000

source kafkasink_dmc_bullet配的channel,只配一個

kafkatokafka.channels.channel_dmc_bullet.type = memory
kafkatokafka.channels.channel_dmc_bullet.capacity = 100000

kafkatokafka.channels.channel_dmc_bullet.byteCapacity = 10000

kafkatokafka.channels.channel_dmc_bullet.byteCapacityBufferPercentage = 10

kafkatokafka.channels.channel_dmc_bullet.transactionCapacity = 5000
kafkatokafka.channels.channel_dmc_bullet.keep-alive = 60

Avro Source配置:

kafkasink 是kafkatokafka的sinks的名字,可配多個,空格分開

kafkatokafka.sources = kafka_dmc_bullet
kafkatokafka.channels = channel_dmc_bullet
kafkatokafka.sinks = kafkasink_dmc_bullet
kafkatokafka.sources.kafka_dmc_bullet.type = avro
kafkatokafka.sources.kafka_dmc_bullet.channels = channel_dmc_bullet
kafkatokafka.sources.kafka_dmc_bullet.bind = 0.0.0.0
kafkatokafka.sources.kafka_dmc_bullet.port = 55555 //rpc端口綁定
kafkatokafka.sources.kafka_dmc_bullet.compression-type = deflate //壓縮模式
kafkatokafka.sources.kafka_dmc_bullet.batchSize = 100

source kafkasink_dmc_bullet的配置

kafkatokafka.sinks.kafkasink_dmc_bullet.type = org.apache.flume.sink.kafka.KafkaSink
kafkatokafka.sinks.kafkasink_dmc_bullet.kafka.partitioner.class = com.gexin.rp.base.kafka.SimplePartitioner
kafkatokafka.sinks.kafkasink_dmc_bullet.channel = channel_dmc_bullet
kafkatokafka.sinks.kafkasink_dmc_bullet.topic = topicB
kafkatokafka.sinks.kafkasink_dmc_bullet.brokerList = 192.168.10.82:9091,192.168.10.82:9092,192.168.10.82:9093
kafkatokafka.sinks.kafkasink_dmc_bullet.requiredAcks = 1
kafkatokafka.sinks.kafkasink_dmc_bullet.batchSize = 500
kafkatokafka.channels.channel_dmc_bullet.type = memory
kafkatokafka.channels.channel_dmc_bullet.capacity = 100000
kafkatokafka.channels.channel_dmc_bullet.transactionCapacity = 1000

監控Kafka消費狀況

81流量統計:

82流量統計:

消費所有消息耗時:26min
消費總日誌條數統計:129,748,260
總流量:1.69G

故障模擬

  1. 模擬專線故障,在A、B兩機房不通的狀況下,Avro Sink報錯以下:

  1. 監控Kafka消費狀況,發現消費者已中止消費:

  1. 故障處理恢復後繼續消費剩餘日誌,經統計,總日誌條數爲:129,747,255。

結論

  1. 當專線發生故障時,正在網絡傳輸中的通道外數據可能會有少部分丟失,其丟失緣由爲網絡緣由,與Avro模式無關;故障後中止消費的數據不會有任何的丟失問題,因爲網絡緣由丟失的數據須要評估其重要性以及是否須要補傳。
  2. 流量壓縮率達80%以上,同時咱們也測試了等級爲1~9的壓縮率,6跟9很是接近,CPU和內存的使用率與原有傳輸模式相差不大,帶寬的優化效果比較明顯。
  3. 傳輸性能因爲壓縮的緣由適當變弱,單Sink由原先20分鐘延長至26分鐘,可適當增長Sink的個數來提升傳輸速率。

生產環境實施結果

實施結果以下:

  1. 因爲還有其它業務的帶寬佔用,總帶寬使用率節省了50%以上,現階段高峯期帶寬速率不超過400Mbps;
  2. 每一個Sink傳輸速率的極限大概是3000條每秒,壓縮傳輸速率問題經過增長Sink的方式解決,但會適當增長CPU和內存的損耗。

全文總結
Flume做爲個推日誌傳輸的主要工具之一,Source的類型選擇尤其重要(如avro、thrif、exec、kafka和spooling directory等等)。不管選擇哪一種Source,都是爲了實現日誌數據的高效傳輸。本文經過Avro的方式,解決了帶寬資源瓶頸的問題。

將來,咱們但願與更多開發者一塊兒探索如何用更多的技術手段來節約控制成本,並知足更多的業務場景需求。

相關文章
相關標籤/搜索