經過pprof優化filebeat性能

目前咱們日誌收集組件使用的是filebeat6.6.1,在某業務上線之後,發生了日誌收集延遲的問題,最差的狀況,延遲兩天以上。嚴重影響了下游數據分析項目。node

分析該業務日誌以後,發現該業務日誌量大,可是單日誌filed很是少。git

以前咱們在壓測的時候,已經設置了output批量發送。再加上觀察kafka集羣的性能監控,基本上能夠排查是下游集羣的影響。github

針對該問題,今天的主角該出場了--pprof。golang

pprof

PProf 工具是自帶的咱們檢測 Golang 開發應用性能的利器。Golang 提供的兩個官方包 runtime/pprof, net/http/pprof 能方便的採集程序運行的堆棧、goroutine、內存分配和佔用、io 等信息的 .prof 文件,而後可使用 go tool pprof 分析 .prof 文件。兩個包的做用是同樣的,只是使用方式的差別。web

net/http/pprof 其實就是對runtime/pprof的封裝,用於webserver。今天咱們主要使用runtime/pprof。apache

debug過程

1 開啓filebeat pprof
默認filebeat 的pprof 是關閉的。開啓的方法以下:json

./filebeat --c /etc/filebeat.yml --path.data /usr/share/filebeat/data --path.logs /var/log/filebeat --httpprof 0.0.0.0:6060

2 查看30scpu信息app

go tool pprof http://0.0.0.0:6060/debug/pprof/profile

30s後,咱們輸入top10命令,有以下打印信息:工具

Showing top 10 nodes out of 197
      flat  flat%   sum%        cum   cum%
    21.45s 13.42% 13.42%     70.09s 43.85%  runtime.gcDrain
    15.49s  9.69% 23.11%     39.83s 24.92%  runtime.scanobject
    11.38s  7.12% 30.23%     11.38s  7.12%  runtime.futex
     7.86s  4.92% 35.15%     16.30s 10.20%  runtime.greyobject
     7.82s  4.89% 40.04%      7.82s  4.89%  runtime.markBits.isMarked (inline)
     5.59s  3.50% 43.53%      5.59s  3.50%  runtime.(*lfstack).pop
     5.51s  3.45% 46.98%      6.05s  3.78%  runtime.heapBitsForObject
     5.26s  3.29% 50.27%     13.92s  8.71%  runtime.sweepone
     4.04s  2.53% 52.80%      4.04s  2.53%  runtime.memclrNoHeapPointers
     3.37s  2.11% 54.91%      4.40s  2.75%  runtime.runqgrab

發現太多的cpu時間浪費在GC上,基本上能夠確定filebeat在小日誌場景下,建立了大量的對象。此時你們應該都想到了sync.pool。性能

咱們須要更詳細的信息,須要查看具體的調用關係,發現那裏在大量的建立對象。

輸入 web命令,將會看到以下的圖,以圖形化的方式展現了GC的佔用:

filebeat-gc.jpg

經過調用關係找到了newobject大量調用:

filebeat-new.jpg

接着找到了根源:

filebeat-sarama.jpg

能夠看出根源在sarama 庫,filebeat 經過sarama 來將message 寫到kafka中。主要是encode方法(flate NewWriter)。咱們都知道該方法是用來壓縮的,咱們的filebeat 默認是採用了gzip壓縮。

因此接下來咱們須要經過代碼驗證一下猜測了。下面經過heap圖側面證實以前的猜測。

filebeat-heap.jpg

3 舊代碼

func (m *Message) encode(pe packetEncoder) error {
    pe.push(newCRC32Field(crcIEEE))

    pe.putInt8(m.Version)

    attributes := int8(m.Codec) & compressionCodecMask
    pe.putInt8(attributes)

    if m.Version >= 1 {
        if err := (Timestamp{&m.Timestamp}).encode(pe); err != nil {
            return err
        }
    }

    err := pe.putBytes(m.Key)
    if err != nil {
        return err
    }

    var payload []byte

    if m.compressedCache != nil {
        payload = m.compressedCache
        m.compressedCache = nil
    } else if m.Value != nil {
        switch m.Codec {
        case CompressionNone:
            payload = m.Value
        case CompressionGZIP:
            var buf bytes.Buffer
            var writer *gzip.Writer
            if m.CompressionLevel != CompressionLevelDefault {
                writer, err = gzip.NewWriterLevel(&buf, m.CompressionLevel)
                if err != nil {
                    return err
                }
            } else {
                writer = gzip.NewWriter(&buf)
            }
            if _, err = writer.Write(m.Value); err != nil {
                return err
            }
            if err = writer.Close(); err != nil {
                return err
            }
            m.compressedCache = buf.Bytes()
            payload = m.compressedCache
        case CompressionSnappy:
            tmp := snappy.Encode(m.Value)
            m.compressedCache = tmp
            payload = m.compressedCache
        case CompressionLZ4:
            var buf bytes.Buffer
            writer := lz4.NewWriter(&buf)
            if _, err = writer.Write(m.Value); err != nil {
                return err
            }
            if err = writer.Close(); err != nil {
                return err
            }
            m.compressedCache = buf.Bytes()
            payload = m.compressedCache

        default:
            return PacketEncodingError{fmt.Sprintf("unsupported compression codec (%d)", m.Codec)}
        }
        // Keep in mind the compressed payload size for metric gathering
        m.compressedSize = len(payload)
    }

    if err = pe.putBytes(payload); err != nil {
        return err
    }

    return pe.pop()
}

經過代碼能夠看出,gzip壓縮的時候,使用了gzip.NewWriter方法。此時已經很明顯了。

因爲大量的小日誌,在寫到kafka以前,都在大量的gzip壓縮,形成了大量的CPU時間浪費在了GC上。

4: 如何解決?

此時對go熟悉的人都會想起使用sync.pool 複用對象,避免頻繁GC。

sarama官方最新的代碼:

import (
    "bytes"
    "compress/gzip"
    "fmt"
    "sync"

    "github.com/eapache/go-xerial-snappy"
    "github.com/pierrec/lz4"
)

var (
    lz4WriterPool = sync.Pool{
        New: func() interface{} {
            return lz4.NewWriter(nil)
        },
    }

    gzipWriterPool = sync.Pool{
        New: func() interface{} {
            return gzip.NewWriter(nil)
        },
    }
)

func compress(cc CompressionCodec, level int, data []byte) ([]byte, error) {
    switch cc {
    case CompressionNone:
        return data, nil
    case CompressionGZIP:
        var (
            err    error
            buf    bytes.Buffer
            writer *gzip.Writer
        )
        if level != CompressionLevelDefault {
            writer, err = gzip.NewWriterLevel(&buf, level)
            if err != nil {
                return nil, err
            }
        } else {
            writer = gzipWriterPool.Get().(*gzip.Writer)
            defer gzipWriterPool.Put(writer)
            writer.Reset(&buf)
        }
        if _, err := writer.Write(data); err != nil {
            return nil, err
        }
        if err := writer.Close(); err != nil {
            return nil, err
        }
        return buf.Bytes(), nil
    case CompressionSnappy:
        return snappy.Encode(data), nil
    case CompressionLZ4:
        writer := lz4WriterPool.Get().(*lz4.Writer)
        defer lz4WriterPool.Put(writer)

        var buf bytes.Buffer
        writer.Reset(&buf)

        if _, err := writer.Write(data); err != nil {
            return nil, err
        }
        if err := writer.Close(); err != nil {
            return nil, err
        }
        return buf.Bytes(), nil
    case CompressionZSTD:
        return zstdCompress(nil, data)
    default:
        return nil, PacketEncodingError{fmt.Sprintf("unsupported compression codec (%d)", cc)}
    }
}

經過最新的代碼能夠看出,官方只是在不啓用gzip壓縮的時候(compressionlevel=-1000),會複用對象池。

這並不能知足咱們的需求。

因此更改之後的代碼以下:

package sarama

import (

"bytes"

"compress/gzip"

"fmt"

"sync"

snappy "github.com/eapache/go-xerial-snappy"

"github.com/pierrec/lz4"

)

var (

lz4WriterPool = sync.Pool{

New: func() interface{} {

return lz4.NewWriter(nil)

},

}

gzipWriterPool = sync.Pool{

New: func() interface{} {

return gzip.NewWriter(nil)

},

}

gzipWriterPoolForCompressionLevel1 = sync.Pool{

New: func() interface{} {

gz, err := gzip.NewWriterLevel(nil, 1)

if err != nil {

panic(err)

}

return gz

},

}

gzipWriterPoolForCompressionLevel2 = sync.Pool{

New: func() interface{} {

gz, err := gzip.NewWriterLevel(nil, 2)

if err != nil {

panic(err)

}

return gz

},

}

gzipWriterPoolForCompressionLevel3 = sync.Pool{

New: func() interface{} {

gz, err := gzip.NewWriterLevel(nil, 3)

if err != nil {

panic(err)

}

return gz

},

}

gzipWriterPoolForCompressionLevel4 = sync.Pool{

New: func() interface{} {

gz, err := gzip.NewWriterLevel(nil, 4)

if err != nil {

panic(err)

}

return gz

},

}

gzipWriterPoolForCompressionLevel5 = sync.Pool{

New: func() interface{} {

gz, err := gzip.NewWriterLevel(nil, 5)

if err != nil {

panic(err)

}

return gz

},

}

gzipWriterPoolForCompressionLevel6 = sync.Pool{

New: func() interface{} {

gz, err := gzip.NewWriterLevel(nil, 6)

if err != nil {

panic(err)

}

return gz

},

}

gzipWriterPoolForCompressionLevel7 = sync.Pool{

New: func() interface{} {

gz, err := gzip.NewWriterLevel(nil, 7)

if err != nil {

panic(err)

}

return gz

},

}

gzipWriterPoolForCompressionLevel8 = sync.Pool{

New: func() interface{} {

gz, err := gzip.NewWriterLevel(nil, 8)

if err != nil {

panic(err)

}

return gz

},

}

gzipWriterPoolForCompressionLevel9 = sync.Pool{

New: func() interface{} {

gz, err := gzip.NewWriterLevel(nil, 9)

if err != nil {

panic(err)

}

return gz

},

}

)

func compress(cc CompressionCodec, level int, data \[\]byte) (\[\]byte, error) {

switch cc {

case CompressionNone:

return data, nil

case CompressionGZIP:

var (

err error

buf bytes.Buffer

writer \*gzip.Writer

)

switch level {

case CompressionLevelDefault:

writer = gzipWriterPool.Get().(\*gzip.Writer)

defer gzipWriterPool.Put(writer)

writer.Reset(&buf)

case 1:

writer = gzipWriterPoolForCompressionLevel1.Get().(\*gzip.Writer)

defer gzipWriterPoolForCompressionLevel1.Put(writer)

writer.Reset(&buf)

case 2:

writer = gzipWriterPoolForCompressionLevel2.Get().(\*gzip.Writer)

defer gzipWriterPoolForCompressionLevel2.Put(writer)

writer.Reset(&buf)

case 3:

writer = gzipWriterPoolForCompressionLevel3.Get().(\*gzip.Writer)

defer gzipWriterPoolForCompressionLevel3.Put(writer)

writer.Reset(&buf)

case 4:

writer = gzipWriterPoolForCompressionLevel4.Get().(\*gzip.Writer)

defer gzipWriterPoolForCompressionLevel4.Put(writer)

writer.Reset(&buf)

case 5:

writer = gzipWriterPoolForCompressionLevel5.Get().(\*gzip.Writer)

defer gzipWriterPoolForCompressionLevel5.Put(writer)

writer.Reset(&buf)

case 6:

writer = gzipWriterPoolForCompressionLevel6.Get().(\*gzip.Writer)

defer gzipWriterPoolForCompressionLevel6.Put(writer)

writer.Reset(&buf)

case 7:

writer = gzipWriterPoolForCompressionLevel7.Get().(\*gzip.Writer)

defer gzipWriterPoolForCompressionLevel7.Put(writer)

writer.Reset(&buf)

case 8:

writer = gzipWriterPoolForCompressionLevel8.Get().(\*gzip.Writer)

defer gzipWriterPoolForCompressionLevel8.Put(writer)

writer.Reset(&buf)

case 9:

writer = gzipWriterPoolForCompressionLevel9.Get().(\*gzip.Writer)

defer gzipWriterPoolForCompressionLevel9.Put(writer)

writer.Reset(&buf)

default:

writer, err = gzip.NewWriterLevel(&buf, level)

if err != nil {

return nil, err

}

}

if \_, err := writer.Write(data); err != nil {

return nil, err

}

if err := writer.Close(); err != nil {

return nil, err

}

return buf.Bytes(), nil

case CompressionSnappy:

return snappy.Encode(data), nil

case CompressionLZ4:

writer := lz4WriterPool.Get().(\*lz4.Writer)

defer lz4WriterPool.Put(writer)

var buf bytes.Buffer

writer.Reset(&buf)

if \_, err := writer.Write(data); err != nil {

return nil, err

}

if err := writer.Close(); err != nil {

return nil, err

}

return buf.Bytes(), nil

case CompressionZSTD:

return zstdCompress(nil, data)

default:

return nil, PacketEncodingError{fmt.Sprintf("unsupported compression codec (%d)", cc)}

}

}

生產環境結果

直接上圖:

filebeat-net.png

升級先後cpu利用率對比。

總結

1: PProf 是個性能調優的大殺器。

2: 其實filebeat 還有更多的優化點。好比json 序列化。

3:實際結果cpu使用下降了一半,採集速度卻提升了20%。

相關文章
相關標籤/搜索