目前咱們日誌收集組件使用的是filebeat6.6.1,在某業務上線之後,發生了日誌收集延遲的問題,最差的狀況,延遲兩天以上。嚴重影響了下游數據分析項目。node
分析該業務日誌以後,發現該業務日誌量大,可是單日誌filed很是少。git
以前咱們在壓測的時候,已經設置了output批量發送。再加上觀察kafka集羣的性能監控,基本上能夠排查是下游集羣的影響。github
針對該問題,今天的主角該出場了--pprof。golang
PProf 工具是自帶的咱們檢測 Golang 開發應用性能的利器。Golang 提供的兩個官方包 runtime/pprof, net/http/pprof 能方便的採集程序運行的堆棧、goroutine、內存分配和佔用、io 等信息的 .prof 文件,而後可使用 go tool pprof 分析 .prof 文件。兩個包的做用是同樣的,只是使用方式的差別。web
net/http/pprof 其實就是對runtime/pprof的封裝,用於webserver。今天咱們主要使用runtime/pprof。apache
1 開啓filebeat pprof
默認filebeat 的pprof 是關閉的。開啓的方法以下:json
./filebeat --c /etc/filebeat.yml --path.data /usr/share/filebeat/data --path.logs /var/log/filebeat --httpprof 0.0.0.0:6060
2 查看30scpu信息app
go tool pprof http://0.0.0.0:6060/debug/pprof/profile
30s後,咱們輸入top10命令,有以下打印信息:工具
Showing top 10 nodes out of 197 flat flat% sum% cum cum% 21.45s 13.42% 13.42% 70.09s 43.85% runtime.gcDrain 15.49s 9.69% 23.11% 39.83s 24.92% runtime.scanobject 11.38s 7.12% 30.23% 11.38s 7.12% runtime.futex 7.86s 4.92% 35.15% 16.30s 10.20% runtime.greyobject 7.82s 4.89% 40.04% 7.82s 4.89% runtime.markBits.isMarked (inline) 5.59s 3.50% 43.53% 5.59s 3.50% runtime.(*lfstack).pop 5.51s 3.45% 46.98% 6.05s 3.78% runtime.heapBitsForObject 5.26s 3.29% 50.27% 13.92s 8.71% runtime.sweepone 4.04s 2.53% 52.80% 4.04s 2.53% runtime.memclrNoHeapPointers 3.37s 2.11% 54.91% 4.40s 2.75% runtime.runqgrab
發現太多的cpu時間浪費在GC上,基本上能夠確定filebeat在小日誌場景下,建立了大量的對象。此時你們應該都想到了sync.pool。性能
咱們須要更詳細的信息,須要查看具體的調用關係,發現那裏在大量的建立對象。
輸入 web命令,將會看到以下的圖,以圖形化的方式展現了GC的佔用:
經過調用關係找到了newobject大量調用:
接着找到了根源:
能夠看出根源在sarama 庫,filebeat 經過sarama 來將message 寫到kafka中。主要是encode方法(flate NewWriter)。咱們都知道該方法是用來壓縮的,咱們的filebeat 默認是採用了gzip壓縮。
因此接下來咱們須要經過代碼驗證一下猜測了。下面經過heap圖側面證實以前的猜測。
3 舊代碼
func (m *Message) encode(pe packetEncoder) error { pe.push(newCRC32Field(crcIEEE)) pe.putInt8(m.Version) attributes := int8(m.Codec) & compressionCodecMask pe.putInt8(attributes) if m.Version >= 1 { if err := (Timestamp{&m.Timestamp}).encode(pe); err != nil { return err } } err := pe.putBytes(m.Key) if err != nil { return err } var payload []byte if m.compressedCache != nil { payload = m.compressedCache m.compressedCache = nil } else if m.Value != nil { switch m.Codec { case CompressionNone: payload = m.Value case CompressionGZIP: var buf bytes.Buffer var writer *gzip.Writer if m.CompressionLevel != CompressionLevelDefault { writer, err = gzip.NewWriterLevel(&buf, m.CompressionLevel) if err != nil { return err } } else { writer = gzip.NewWriter(&buf) } if _, err = writer.Write(m.Value); err != nil { return err } if err = writer.Close(); err != nil { return err } m.compressedCache = buf.Bytes() payload = m.compressedCache case CompressionSnappy: tmp := snappy.Encode(m.Value) m.compressedCache = tmp payload = m.compressedCache case CompressionLZ4: var buf bytes.Buffer writer := lz4.NewWriter(&buf) if _, err = writer.Write(m.Value); err != nil { return err } if err = writer.Close(); err != nil { return err } m.compressedCache = buf.Bytes() payload = m.compressedCache default: return PacketEncodingError{fmt.Sprintf("unsupported compression codec (%d)", m.Codec)} } // Keep in mind the compressed payload size for metric gathering m.compressedSize = len(payload) } if err = pe.putBytes(payload); err != nil { return err } return pe.pop() }
經過代碼能夠看出,gzip壓縮的時候,使用了gzip.NewWriter
方法。此時已經很明顯了。
因爲大量的小日誌,在寫到kafka以前,都在大量的gzip壓縮,形成了大量的CPU時間浪費在了GC上。
4: 如何解決?
此時對go熟悉的人都會想起使用sync.pool 複用對象,避免頻繁GC。
sarama官方最新的代碼:
import ( "bytes" "compress/gzip" "fmt" "sync" "github.com/eapache/go-xerial-snappy" "github.com/pierrec/lz4" ) var ( lz4WriterPool = sync.Pool{ New: func() interface{} { return lz4.NewWriter(nil) }, } gzipWriterPool = sync.Pool{ New: func() interface{} { return gzip.NewWriter(nil) }, } ) func compress(cc CompressionCodec, level int, data []byte) ([]byte, error) { switch cc { case CompressionNone: return data, nil case CompressionGZIP: var ( err error buf bytes.Buffer writer *gzip.Writer ) if level != CompressionLevelDefault { writer, err = gzip.NewWriterLevel(&buf, level) if err != nil { return nil, err } } else { writer = gzipWriterPool.Get().(*gzip.Writer) defer gzipWriterPool.Put(writer) writer.Reset(&buf) } if _, err := writer.Write(data); err != nil { return nil, err } if err := writer.Close(); err != nil { return nil, err } return buf.Bytes(), nil case CompressionSnappy: return snappy.Encode(data), nil case CompressionLZ4: writer := lz4WriterPool.Get().(*lz4.Writer) defer lz4WriterPool.Put(writer) var buf bytes.Buffer writer.Reset(&buf) if _, err := writer.Write(data); err != nil { return nil, err } if err := writer.Close(); err != nil { return nil, err } return buf.Bytes(), nil case CompressionZSTD: return zstdCompress(nil, data) default: return nil, PacketEncodingError{fmt.Sprintf("unsupported compression codec (%d)", cc)} } }
經過最新的代碼能夠看出,官方只是在不啓用gzip壓縮的時候(compressionlevel=-1000),會複用對象池。
這並不能知足咱們的需求。
因此更改之後的代碼以下:
package sarama import ( "bytes" "compress/gzip" "fmt" "sync" snappy "github.com/eapache/go-xerial-snappy" "github.com/pierrec/lz4" ) var ( lz4WriterPool = sync.Pool{ New: func() interface{} { return lz4.NewWriter(nil) }, } gzipWriterPool = sync.Pool{ New: func() interface{} { return gzip.NewWriter(nil) }, } gzipWriterPoolForCompressionLevel1 = sync.Pool{ New: func() interface{} { gz, err := gzip.NewWriterLevel(nil, 1) if err != nil { panic(err) } return gz }, } gzipWriterPoolForCompressionLevel2 = sync.Pool{ New: func() interface{} { gz, err := gzip.NewWriterLevel(nil, 2) if err != nil { panic(err) } return gz }, } gzipWriterPoolForCompressionLevel3 = sync.Pool{ New: func() interface{} { gz, err := gzip.NewWriterLevel(nil, 3) if err != nil { panic(err) } return gz }, } gzipWriterPoolForCompressionLevel4 = sync.Pool{ New: func() interface{} { gz, err := gzip.NewWriterLevel(nil, 4) if err != nil { panic(err) } return gz }, } gzipWriterPoolForCompressionLevel5 = sync.Pool{ New: func() interface{} { gz, err := gzip.NewWriterLevel(nil, 5) if err != nil { panic(err) } return gz }, } gzipWriterPoolForCompressionLevel6 = sync.Pool{ New: func() interface{} { gz, err := gzip.NewWriterLevel(nil, 6) if err != nil { panic(err) } return gz }, } gzipWriterPoolForCompressionLevel7 = sync.Pool{ New: func() interface{} { gz, err := gzip.NewWriterLevel(nil, 7) if err != nil { panic(err) } return gz }, } gzipWriterPoolForCompressionLevel8 = sync.Pool{ New: func() interface{} { gz, err := gzip.NewWriterLevel(nil, 8) if err != nil { panic(err) } return gz }, } gzipWriterPoolForCompressionLevel9 = sync.Pool{ New: func() interface{} { gz, err := gzip.NewWriterLevel(nil, 9) if err != nil { panic(err) } return gz }, } ) func compress(cc CompressionCodec, level int, data \[\]byte) (\[\]byte, error) { switch cc { case CompressionNone: return data, nil case CompressionGZIP: var ( err error buf bytes.Buffer writer \*gzip.Writer ) switch level { case CompressionLevelDefault: writer = gzipWriterPool.Get().(\*gzip.Writer) defer gzipWriterPool.Put(writer) writer.Reset(&buf) case 1: writer = gzipWriterPoolForCompressionLevel1.Get().(\*gzip.Writer) defer gzipWriterPoolForCompressionLevel1.Put(writer) writer.Reset(&buf) case 2: writer = gzipWriterPoolForCompressionLevel2.Get().(\*gzip.Writer) defer gzipWriterPoolForCompressionLevel2.Put(writer) writer.Reset(&buf) case 3: writer = gzipWriterPoolForCompressionLevel3.Get().(\*gzip.Writer) defer gzipWriterPoolForCompressionLevel3.Put(writer) writer.Reset(&buf) case 4: writer = gzipWriterPoolForCompressionLevel4.Get().(\*gzip.Writer) defer gzipWriterPoolForCompressionLevel4.Put(writer) writer.Reset(&buf) case 5: writer = gzipWriterPoolForCompressionLevel5.Get().(\*gzip.Writer) defer gzipWriterPoolForCompressionLevel5.Put(writer) writer.Reset(&buf) case 6: writer = gzipWriterPoolForCompressionLevel6.Get().(\*gzip.Writer) defer gzipWriterPoolForCompressionLevel6.Put(writer) writer.Reset(&buf) case 7: writer = gzipWriterPoolForCompressionLevel7.Get().(\*gzip.Writer) defer gzipWriterPoolForCompressionLevel7.Put(writer) writer.Reset(&buf) case 8: writer = gzipWriterPoolForCompressionLevel8.Get().(\*gzip.Writer) defer gzipWriterPoolForCompressionLevel8.Put(writer) writer.Reset(&buf) case 9: writer = gzipWriterPoolForCompressionLevel9.Get().(\*gzip.Writer) defer gzipWriterPoolForCompressionLevel9.Put(writer) writer.Reset(&buf) default: writer, err = gzip.NewWriterLevel(&buf, level) if err != nil { return nil, err } } if \_, err := writer.Write(data); err != nil { return nil, err } if err := writer.Close(); err != nil { return nil, err } return buf.Bytes(), nil case CompressionSnappy: return snappy.Encode(data), nil case CompressionLZ4: writer := lz4WriterPool.Get().(\*lz4.Writer) defer lz4WriterPool.Put(writer) var buf bytes.Buffer writer.Reset(&buf) if \_, err := writer.Write(data); err != nil { return nil, err } if err := writer.Close(); err != nil { return nil, err } return buf.Bytes(), nil case CompressionZSTD: return zstdCompress(nil, data) default: return nil, PacketEncodingError{fmt.Sprintf("unsupported compression codec (%d)", cc)} } }
直接上圖:
升級先後cpu利用率對比。
1: PProf 是個性能調優的大殺器。
2: 其實filebeat 還有更多的優化點。好比json 序列化。
3:實際結果cpu使用下降了一半,採集速度卻提升了20%。