Windbg調優Kafka.Client內存泄露

    歷來沒寫過Blog,想一想也是,工做十多年了,搞過N多的架構、技術,不與你們分享實在是惋惜了。另外,從傳統地ERP行業轉到互聯網,也遇到了很所史無前例的問題,原來知道有一些坑,可是不知道坑太多太深。藉着填坑的機會,把過程Log下來。緩存

    言歸正傳,先說說背景吧。Teld的業務平臺中存在大量的物聯網終端傳感數據和車輛運行數據,這些數據中蘊含着大量的財富。So,要存儲。Teld的充電終端仍是很NB的,如今已經有2W+,並且每隔30S上報一次數據,固然單條數據量不會很大。這纔是開始,按照國家規劃,到2020年,咱們要到百萬級別了。擦,說的太遠了!換算了一下,僅充電終端上報數據的TPS要求仍是挺高的。經過2個月的研究和技術選型,咱們選用Kafka做爲海量數據處理的應用中間件。架構

    好吧!選了Kafka,開始填坑吧。因爲咱們採用了.net技術路線,Kafka Client也必須是.net的。…(此處省略1萬字),Kafka環境順利調試成功,可是基於Kafka.Client編寫的Consumer程序卻出現嚴重的內存泄露。this

      image

    Consumer程序需長時間運行,上圖僅僅運行了2個小時後的內存就達到了570M。果斷抓Dump,Windbg分析。spa

    啓動Windbg,設置符號文件,加載Dump。.net

    執行下面命令:3d

        .loadby sos clr  (說明:程序是4.0的,2.0請問度娘)。調試

        !dumpheap –stat (說明:按照類型顯示堆中的對象數量和內存佔用大小)日誌

    執行結果:中間件

00007ff947e2f2e8  1215019     29160456 Kafka.Client.Common.NoOpTimer
00007ff947e2f1a8  1215019     29160456 Kafka.Client.Metrics.KafkaTimer
00007ff947e39600  1215018     38880576 Kafka.Client.Consumers.FetchRequestAndResponseMetrics
00007ff947e2df70  1215018     38880576 Kafka.Client.Common.ClientIdAndBroker
00007ff947e3a058  1215007     58320336 System.Collections.Concurrent.ConcurrentDictionary`2+Node[[Kafka.Client.Common.ClientIdAndBroker, Kafka.Client],[Kafka.Client.Consumers.FetchRequestAndResponseMetrics, Kafka.Client]]
00007ff9a5cc3d60  1267853     86313134 System.String對象

    經過執行結果能夠看到,NoOpTimer、KafkaTimer、TetchRequestAndResponseMetrics、ConcurrentDictionary對象每類都有120w+,佔用內存近200M。好吧,好像是這幾個傢伙的緣由,矛頭直指Kafka.Client。選取NoOpTimer,先看看gcroot狀況吧,繼續!

    執行命令:(對象太多了,命令運行一會,break吧。)

!dumpheap -mt 00007ff947e2f2e8  

    執行結果:

 000021ae62af490 00007ff947e2f2e8 24 

0000021ae62af5a8 00007ff947e2f2e8 24
0000021ae62af6c0 00007ff947e2f2e8 24
0000021ae62af7c0 00007ff947e2f2e8 24
0000021ae62af890 00007ff947e2f2e8 24
0000021ae62af960 00007ff947e2f2e8 24
0000021ae62afa30 00007ff947e2f2e8 24
0000021ae62afb00 00007ff947e2f2e8 24
0000021ae62afc18 00007ff947e2f2e8 24
0000021ae62afd18 00007ff947e2f2e8 24

    執行結果的第一列爲NoOpTimer對象的地址。查看gcroot狀況。

    執行命令:

!gcroot 000021ae62af490 

    執行結果:

0000021ae58965a8 Teld.Core.Log.Processor.ProcessService
-> 0000021ae58966a8 System.Collections.Generic.List`1[[Teld.Core.Log.Processor.LogListener, Teld.Core.Log.Processor]]
-> 0000021ae5898068 Teld.Core.Log.Processor.LogListener[]
-> 0000021ae5897b38 Teld.Core.Log.Processor.LogListener
-> 0000021ae5897b78 Teld.Core.Log.Processor.KafkaConsumer
-> 0000021a8ac0de20 Kafka.Client.Consumers.ZookeeperConsumerConnector
-> 0000021a90839800 Kafka.Client.Consumers.ConsumerFetcherManager
-> 0000021a90839908 System.Collections.Generic.Dictionary`2[[Kafka.Client.Server.BrokerAndFetcherId, Kafka.Client],[Kafka.Client.Server.AbstractFetcherThread, Kafka.Client]]
-> 0000021a92dcd208 System.Collections.Generic.Dictionary`2+Entry[[Kafka.Client.Server.BrokerAndFetcherId, Kafka.Client],[Kafka.Client.Server.AbstractFetcherThread, Kafka.Client]][]
-> 0000021a962e2710 Kafka.Client.Consumers.ConsumerFetcherThread
-> 0000021a962e2a70 Kafka.Client.Consumers.SimpleConsumer
-> 0000021ae58fcca8 Kafka.Client.Consumers.FetchRequestAndResponseStats
-> 0000021ae58fccd8 Kafka.Client.Utils.Pool`2[[Kafka.Client.Common.ClientIdAndBroker, Kafka.Client],[Kafka.Client.Consumers.FetchRequestAndResponseMetrics, Kafka.Client]]
-> 0000021a91cb17f8 System.Collections.Concurrent.ConcurrentDictionary`2+Tables[[Kafka.Client.Common.ClientIdAndBroker, Kafka.Client],[Kafka.Client.Consumers.FetchRequestAndResponseMetrics, Kafka.Client]]
-> 0000021af64f1728 System.Collections.Concurrent.ConcurrentDictionary`2+Node[[Kafka.Client.Common.ClientIdAndBroker, Kafka.Client],[Kafka.Client.Consumers.FetchRequestAndResponseMetrics, Kafka.Client]][]
-> 0000021a91c82b18 System.Collections.Concurrent.ConcurrentDictionary`2+Node[[Kafka.Client.Common.ClientIdAndBroker, Kafka.Client],[Kafka.Client.Consumers.FetchRequestAndResponseMetrics, Kafka.Client]]
-> 0000021ae62af470 Kafka.Client.Consumers.FetchRequestAndResponseMetrics
-> 0000021ae62af4a8 Kafka.Client.Metrics.KafkaTimer
-> 0000021ae62af490 Kafka.Client.Common.NoOpTimer

    經過執行結果能夠看到,NoOpTimer對象被FetchRequestAndResponseMetric所持有,而FetchRequestAndResponseMetric好像被緩存到ConcurrentDictionary中了。ConcurrentDictionary這一坨看着這麼熟悉呢,fuck!剛纔!dumpheap –stat的結果裏面有它!那就再分析ConCurrentDictionary類型看看吧。繼續!

    執行命令:(00007ff947e3a058 是第一次!dumpheap –stat 執行結果中的ConcurrentDictionary類型第一列的值(MT)。)

!dumpheap -mt 00007ff947e3a058

    執行結果:(隨機截取一段)

0000021aefcd5a90 00007ff947e3a058       48    
0000021aefcd5c20 00007ff947e3a058       48    
0000021aefcd5d60 00007ff947e3a058       48    
0000021aefcd5ef0 00007ff947e3a058        48    
0000021aefcd6030 00007ff947e3a058       48    
0000021aefcd65e8 00007ff947e3a058       48    
0000021aefcd6790 00007ff947e3a058       48    
0000021aefcd68d8 00007ff947e3a058       48    
0000021aefcd6a68 00007ff947e3a058       48  

    隨機選取一個,繼續查看gcroot狀況。

    執行命令:

!gcroot 0000021aefcd6a68

    執行結果:

0000021ae58965a8 Teld.Core.Log.Processor.ProcessService
            ->  0000021ae58966a8 System.Collections.Generic.List`1[[Teld.Core.Log.Processor.LogListener, Teld.Core.Log.Processor]]
            ->  0000021ae5898068 Teld.Core.Log.Processor.LogListener[]
            ->  0000021ae58970a8 Teld.Core.Log.Processor.LogListener
            ->  0000021ae58970e8 Teld.Core.Log.Processor.KafkaConsumer
            ->  0000021a8cedba08 Kafka.Client.Consumers.ZookeeperConsumerConnector
            ->  0000021a94f56710 Kafka.Client.Consumers.ConsumerFetcherManager
            ->  0000021a94f56818 System.Collections.Generic.Dictionary`2[[Kafka.Client.Server.BrokerAndFetcherId, Kafka.Client],[Kafka.Client.Server.AbstractFetcherThread, Kafka.Client]]
            ->  0000021a94f5bd20 System.Collections.Generic.Dictionary`2+Entry[[Kafka.Client.Server.BrokerAndFetcherId, Kafka.Client],[Kafka.Client.Server.AbstractFetcherThread, Kafka.Client]][]
            ->  0000021a962e5e80 Kafka.Client.Consumers.ConsumerFetcherThread
            ->  0000021a962e61e0 Kafka.Client.Consumers.SimpleConsumer
            ->  0000021ae58f60e8 Kafka.Client.Consumers.FetchRequestAndResponseStats
            ->  0000021ae58f6118 Kafka.Client.Utils.Pool`2[[Kafka.Client.Common.ClientIdAndBroker, Kafka.Client],[Kafka.Client.Consumers.FetchRequestAndResponseMetrics, Kafka.Client]]
            ->  0000021a89deda70 System.Collections.Concurrent.ConcurrentDictionary`2+Tables[[Kafka.Client.Common.ClientIdAndBroker, Kafka.Client],[Kafka.Client.Consumers.FetchRequestAndResponseMetrics, Kafka.Client]]
            ->  0000021af5a43128 System.Collections.Concurrent.ConcurrentDictionary`2+Node[[Kafka.Client.Common.ClientIdAndBroker, Kafka.Client],[Kafka.Client.Consumers.FetchRequestAndResponseMetrics, Kafka.Client]][]
            ->  0000021aefcd6a68 System.Collections.Concurrent.ConcurrentDictionary`2+Node[[Kafka.Client.Common.ClientIdAndBroker, Kafka.Client],[Kafka.Client.Consumers.FetchRequestAndResponseMetrics, Kafka.Client]]

    經過結果能夠看到,ConcurrentDictionary被Pool引用,而Pool又被FetchRequestAndResponseStats引用。這與NoOpTimer類型的引用狀況很類似啊!

    搜一下第一次!dumpheap –stat 的結果,發現FetchRequestAndResponseStats和Pool類型的對象數量只有11個。

00007ff947e387f8       11          528 Kafka.Client.Consumers.FetchRequestAndResponseStats

7ff947e397d8       11          792 Kafka.Client.Utils.Pool`2[[Kafka.Client.Common.ClientIdAndBroker, Kafka.Client],[Kafka.Client.Consumers.FetchRequestAndResponseMetrics, Kafka.Client]]

    看來,100多萬個對象都是從Pool上來的。果斷翻開kafka.Client的源代碼。

internal class FetchRequestAndResponseStats
    {
        private string clientId;

        private Func<ClientIdAndBroker, FetchRequestAndResponseMetrics> valueFactory;
        private Pool<ClientIdAndBroker, FetchRequestAndResponseMetrics> stats;

        private FetchRequestAndResponseMetrics allBrokerStats;

        public FetchRequestAndResponseStats(string clientId)
        {
            this.clientId = clientId;
            this.valueFactory = k => new FetchRequestAndResponseMetrics(k);
            this.stats = new Pool<ClientIdAndBroker, FetchRequestAndResponseMetrics>(this.valueFactory);
            this.allBrokerStats = new FetchRequestAndResponseMetrics(new ClientIdAndBroker(clientId, "AllBrokers"));
        }

        public FetchRequestAndResponseMetrics GetFetchRequestAndResponseAllBrokersStats()
        {
            return this.allBrokerStats;
        }

        public FetchRequestAndResponseMetrics GetFetchRequestAndResponseStats(string brokerInfo)
        {
            return this.stats.GetAndMaybePut(new ClientIdAndBroker(this.clientId, brokerInfo + "-"));
        }
    }

    Pool類型的對象是FetchRequestAndResponseStats的一個屬性,而且Pool是繼承自ConcurrentDictionary,Key的類型爲ClientIdAndBroker。Pool的定義以下:

public class Pool<TKey, TValue> : ConcurrentDictionary<TKey, TValue>
    {
        public Func<TKey, TValue> ValueFactory { get; set; }

        public Pool(Func<TKey, TValue> valueFactory = null)
        {
            this.ValueFactory = valueFactory;
        }

        public TValue GetAndMaybePut(TKey key)
        {
            if (this.ValueFactory == null)
            {
                throw new KafkaException("Empty value factory in pool");
            }
            return this.GetOrAdd(key, this.ValueFactory);
        }

    }

    問題來了,FetchRequestAndResponseStats.GetFetchRequestAndResponseStats方法,每次New ClientIdAndBroker 對象後,調用Pool.GetAndMaybePut方法。擦!!!每次訪問都是新對象,這個對象是要做爲ConcurrentDictionary的Key存入的。而且存入方法調用的是ConcurrentDictionary.GetOrAdd()。新建的對象只能從ConcurrentDictionary中Add,沒有任何Get到的可能性啊。Kafka.Client中居然會出現這麼低級的問題,瞬間對開源的組件有了新的認識:開源組件的坑太深了,不填不知道啊。

    抓緊把開源組件的代碼改一下吧。把Pool的key類型從ClientIdAndBroker改成string。調試運行,下面是Run了2天的Consumer程序的內存佔用狀況,期間Consumer已經處理了60萬日誌。

    image

    問題終於完美解決了!最後,國際慣例,感謝JuQiang老師指導。在互聯網領域,我是個新手,Blog中不免存在一些不客觀,不成熟的看法,還請多多包涵!

相關文章
相關標籤/搜索