數據採集系統的優化實戰

1 概述

在歷時2個月的不斷優化過程當中,將數據採集系統的處理能力(kafka一個topic)從2.5萬提高到了10萬,基本符合對下一次峯值的要求了。前端

全部日誌中,其中廣告日誌和做品日誌量是最大的,因此本次的優化也是針對這兩塊進行優化。nginx

廣告日誌接口TPS從以前的不到1k/s,提高到如今的2.1w/s,提高了20倍。 spring

做品日誌接口TPS從以前的不到1k/s,提高到如今的1.4w/s,提高了13倍。apache

在數據採集的優化過程當中,設計到不少地方,包含代碼的優化,框架的優化,服務的優化,如今將對吞吐率的提升有明顯左右的優化點記錄下來。後端

2 面向對象

技術負責人,後端服務開發工程師服務器

3 撰寫時間

2020年04月03日app

4 技術框架圖

arti1.png

5 後端日誌ETL程序LogServer的優化

廣告日誌接口TPS從以前的不到1k/s,提高到如今的2.1w/s,提高了將近20倍。
做品日誌接口TPS從以前的不到1k/s,提高到如今的1.4w/s,提高了13倍。 框架

1.廣告日誌接口的壓測結果部分截圖maven

arti2.png

2.做品日誌接口的壓測結果部分截圖tcp

arti3.png

如下TPS的提高是一個大概的值。

5.1 刪除代碼中沒必要要的打印日誌

例如

System.out.println
    System.out.println
    logger.info

TPS 1k -> 3k

5.2 將logback.xml文件中的打印日誌關閉

例如

<appender-ref ref="action"/>
    <appender-ref ref="console"/>

TPS 3k -> 5k

5.3 對獲取kafka相關的logger的代碼優化

例如
以前的代碼

public synchronized static Logger getLogger(String topic) {
    Logger logger = loggers.get(topic);
    try {
    if (logger == null) {
        logger = LoggerFactory.getLogger(topic);
        loggers.put(topic, logger);
    }
    return logger;
 }

優化後的代碼

public  static Logger getLogger(String topic) {
    if (logger == null) {
        synchronized(KafkaLoggerFactory.class){
            if(logger == null){
                logger = LoggerFactory.getLogger(topic);
                loggers.put(topic, logger);
            }
        }
    }
}

TPS 5k -> 9k

5.4 對流量廣告的邏輯進行簡化

以前的作法:
將廣告數據當成普通的日誌數據來處理,會經歷全部的日誌判斷邏輯,最後通過驗證,數據沒問題,纔會發送到kafka,整個邏輯鏈條比較長。

如今的作法:
先看代碼

ip: String ip = request.getIp();
    collection.put("ip", ip);
    // 國家、地區、城市: collection.putAll(Constant.getRegionInfo(ip));
    server_host: collection.put("srh", Constant.serverHost);
    server_time: collection.put("s_t", System.currentTimeMillis());

    if( "traffic_view".equals(collection.get("product")) ){
        parseAdRecord(collection);
        return Constant.RESPONSE_CODE_NORMAL;
    }

    ...

    public void parseAdRecord(Map<String, Object> collection){
        try {
            collection = Constant.clearAdCollection(collection);
            log2kafka(Constant.eventTopic, JSONObject.toJSONString(collection));
        } catch (Exception e) {
            e.printStackTrace();
        }
    }

從上邊的代碼能夠看出,將廣告的邏輯單獨處理進行處理,整個鏈路短了不少,大概總共分3步:
1 必須的公共字段處理
2 判斷是否廣告日誌
3 將廣告日誌發送到kafka

TPS 9k -> 1.2w

5.5 對廣告日誌中的字段進行精簡

將HDFS上廣告日誌總共的85個字段,如今精簡到45個左右。這一步雖然對LogServer的吞吐率沒有太多的提高。可是卻能夠將kafka的吞吐量提高將近一倍。

5.6 升級和簡化依賴
  1. 首先將非必要的maven依賴所有刪除掉了,使得依賴的數量從217個減小到了51個。
  2. 將maven依賴升級成比較新的版本。
  3. 有些依賴被刪掉了,相關的類也作了調整,好比StringUtils.isEmpty()已經從spring類
org.springframework.util.StringUtils

調整成了commons-lang3包內的org.apache.commons.lang3.StringUtils

<dependency>
            <groupId>org.apache.commons</groupId>
            <artifactId>commons-lang3</artifactId>
            <version>3.10</version>
        </dependency>

6 服務器硬件層面

從以前的4核8G服務器遷移到8核16G的服務器上。
而且對服務器內核參數作了如下優化:

net.core.somaxconn = 10240
net.core.netdev_max_backlog =262144
net.ipv4.tcp_keepalive_intvl = 5
net.ipv4.tcp_keepalive_probes = 3
net.ipv4.tcp_keepalive_time = 600
net.ipv4.tcp_tw_reuse = 1
net.ipv4.ip_local_port_range = 1024     60999
net.ipv4.tcp_fin_timeout = 30
net.ipv4.tcp_syn_retries = 1
net.ipv4.tcp_synack_retries = 1

1.2w -> 2w

7 對前端SDK的優化

通過對kafka的寫入壓測,當日志大小爲1024字節的時候的QPS是2048的接近2倍。

arti4.png

1 減小前端上報日誌字段數量,將暫時未用的的字段刪掉。前端sdk上報的日誌字段從71個刪減到48個字段,減小了32%的字段數量。
2 沒必要要的日誌再也不上報,主要是經過修改前端日誌上報的邏輯來實現。

8 對Nginx的優化:

對Nginx的優化主要有2個方面:

  1. 服務器層面的優化,如上述第5項
  2. Nginx自己的配置優化
  3. 增長了ip防刷的機制
8.1 Nginx一部分配置的優化。

worker_connections從20480上調到了102400,調大了5倍。調大以後nginx的吞吐量從2w/s,提高到3.5w/s,設置的時候最好結合業務和服務器的性能先作個壓測。

worker_processes 默認是1,官方建議和cpu的核數相同,或者直接設置成auto。有人建議設置成cpu核數的2倍,從個人測試狀況來看,不會有明顯的提升,也多是場景覆蓋的有限。

worker_cpu_affinity Nginx默認沒有開啓利用多核cpu,經過worker_cpu_affinity開啓nginx利用多核cpu,並將worker綁定到指定的線程上,以此來提升nginx的性能。

multi_accept 默認Nginx沒有開啓multi_accept。multi_accept可讓nginx worker進程儘量多地接受請求。它的做用是讓worker進程一次性地接受監聽隊列裏的全部請求,而後處理。若是multi_accept的值設爲off,那麼worker進程必須一個一個地接受監聽隊列裏的請求。

worker_processes  8;
worker_cpu_affinity 00000001 00000010 00000100 00001000 00010000 00100000 01000000 10000000;

worker_connections  102400;
multi_accept on;

優化完以後QPS從1萬左右上升到3.5萬。

8.2 ip防刷

在conf/module/定義了一個blacklist文件:

map $http_x_forwarded_for $ip_action{
    default            0;
    ~123\.123\.29      1;
}

在nginx.conf中增長ip過濾的配置:

location /log.gif {
        if ($ip_action) {
            return 403;
        }
        proxy_pass         http://big-da/log-server/push;
        proxy_set_header   Host             $host;
        proxy_set_header   X-Real-IP        $remote_addr;
        proxy_set_header   X-Forwarded-For  $proxy_add_x_forwarded_for;
        client_max_body_size       128k;
        client_body_buffer_size    32k;
        proxy_connect_timeout      5;
        proxy_send_timeout         5;
        proxy_read_timeout         5;
        proxy_http_version           1.1;
        proxy_set_header Connection "";
    }

若是是黑名單中的ip,則直接拒絕請求。

9 對kafka的優化

1.將全部重要topic的Replication從1改爲了2,這樣保證在kafka有1個節點故障的時候,topic也能正常工做。

arti5.png

2.爲每一個節點的kafka設置了獨用的SSD硬盤。
3.topic分區的分區數量根據業務需求設置,咱們設置了6個分區。
3.在producer端寫kafka的使用使用snappy壓縮格式
4.在producer端合理設置batch.size

batch.size用來控制producer在將消息發送到kafka以前須要攢夠多少本身的數據。默認是16kB,通過測試,在32kB的狀況下,吞吐量和壓測都在接受的接受的範圍內。

5.在producer端合理設置linger.ms

默認沒有設置,只要有數據就會當即發送。能夠將linger.ms設置爲100,在流量比較大的時候,能夠減小發送請求的數量,從而提升吞吐量。

6.升級版本,將kafka從0.10升到了2.2.1

做者:易企秀工程師 Peace -> 我的主頁

相關文章
相關標籤/搜索