kafka日誌同步至elasticsearch和kibana展現

原文: kafka日誌同步至elasticsearch和kibana展現

kafka日誌同步至elasticsearch和kibana展現

一 kafka consumer準備                       html

  前面的章節進行了分佈式job的自動計算的概念講解以及實踐。上次分佈式日誌說過日誌寫進kafka,是須要進行處理,以便合理的進行展現,分佈式日誌的量和咱們對日誌的重視程度,決定了咱們必需要有一個大數據檢索,和友好展現的需求。那麼天然就是elasticsearch和kibana,elasticsearch是能夠檢索TB級別數據的一個分佈式NOSQL數據庫,而kibana,不只僅能夠展現詳情,並且有針對不一樣展現需求的功能,而且定製了不少不少日誌格式的模板和採集數據的插件,這裏很少介紹了,我本身感受是比percona的pmm強大不少。git

  書歸正傳,咱們這一節是要作同步前的準備工做。第一,對kafka的consumer進行封裝。第二,讀取kafka數據是須要一個後臺程序去處理,可是不須要job,咱們上次作的框架是基於zookeeper的分佈式job,而kafka的分佈式是在服務器端的,固然將job分佈式設計方案用在輪詢或者阻塞方式的後臺程序,也是能夠的,可是此次就不講解了。下面咱們就將kafka分佈式的原理分析下,kafka的客戶端有一個組的概念,borker端有一個topic的概念,product在發送消息的時候,會有一個key值。由於kafka存數據就是以key-value的方式存儲數據的,因此broker就是用product傳遞過來的這個key進行運算,合理的將數據存儲到某個topic的某個分區。而consumer端訂閱topic,能夠訂閱多個topic,它的分派是這樣的,每個topic下的分區會有多個consuer,可是這些consumer必須屬於不一樣的組,而每個consumer能夠訂閱多個topic下的分區,可是不能重複。下面看圖吧,以咱們此次實際的日誌爲例,在kafka中mylog topic有5個分區。github

  那麼若是咱們有三個程序須要用這個mylog topic怎麼辦?並且咱們須要很快的處理完這個數據,因此有可能這三個程序每個程序都要兩臺服務器。想着都很頭大,對吧?固然若是有咱們前面講解的分佈式job也能夠處理,可是要把分佈式的功能遷移到這個後臺程序,避免不了又大動干戈,開發,調試,測試,修改bug,直到程序穩定,那又是一場苦功。可是在kafka這裏,不用擔憂,三個程序,好比訂單,庫存,顧客,咱們爲這三個程序的kafka客戶端對應的設置爲三個組,每個組中consumer數量只要不超過5個,假如訂單須要用到名爲mylog的topic的消息,只要訂單處理這個topic的實例數量,必須不能超過5個,固然能夠少於5個,也能夠等於0個。而同時一個consumer又能夠去訂閱多個topic,這也是kafka能夠媲美rabbit的重要的一個緣由,先天支持併發和擴展。咱們看圖:redis

 

若是一個組的consumer數量沒有topic的分區多,kafka會自動分派給這個組的consumer,若是某一個consumer失敗,kafka也會自動的將這個consumer的offset記錄而且分派給另一個consumer。數據庫

可是要注意一點,kafka的topic中的每一個分區是線性的,可是全部的分區看起來就不會是線性的,若是須要topic是線性的,就必須將分區設置爲1個。json

下面看看咱們封裝的kafka客戶端方法:bootstrap

複製代碼
using System;
using System.Collections.Generic;
using System.Threading.Tasks;
using Confluent.Kafka;
using Microsoft.Extensions.Options;

namespace  Walt.Framework.Service.Kafka
{
    public class KafkaService : IKafkaService
    {

        private KafkaOptions _kafkaOptions;
        private Producer _producer;
        private Consumer _consumer;

        public Action<Message> GetMessageDele{ get; set; }

        public Action<Error> ErrorDele{ get; set; }

        public Action<LogMessage> LogDele{ get; set; }

        public KafkaService(IOptionsMonitor<KafkaOptions>  kafkaOptions)
        {
            _kafkaOptions=kafkaOptions.CurrentValue; 
            kafkaOptions.OnChange((kafkaOpt,s)=>{
                _kafkaOptions=kafkaOpt; 
                    System.Diagnostics.Debug
                    .WriteLine(Newtonsoft.Json.JsonConvert.SerializeObject(kafkaOpt)+"---"+s);
            });
             _producer=new Producer(_kafkaOptions.Properties);

            _consumer=new Consumer(_kafkaOptions.Properties);
        }

        private byte[] ConvertToByte(string str)
        {
            return System.Text.Encoding.Default.GetBytes(str);
        }
 
        public  async Task<Message> Producer<T>(string topic,string key,T t)
        {  
            if(string.IsNullOrEmpty(topic)
            || t==null)
            {
                throw new ArgumentNullException("topic或者value不能爲null.");
            }
            string data = Newtonsoft.Json.JsonConvert.SerializeObject(t);
            var task=  await _producer.ProduceAsync(topic,ConvertToByte(key),ConvertToByte(data)); 
           return task;
        }


        public void AddProductEvent()
        {
            _producer.OnError+=new EventHandler<Error>(Error);
            _producer.OnLog+=new EventHandler<LogMessage>(Log);
        }
      ///以事件的方式獲取message
        public void AddConsumerEvent(IEnumerable<string> topics)
        {
            _consumer.Subscribe(topics);
            _consumer.OnMessage += new EventHandler<Message>(GetMessage);
            _consumer.OnError += new EventHandler<Error>(Error);
            _consumer.OnLog += new EventHandler<LogMessage>(Log);
        }

        private void GetMessage(object sender, Message mess)
        {
            if(GetMessageDele!=null)
            {
                GetMessageDele(mess);
            }
        }

        private void Error(object sender, Error mess)
        {
            if(ErrorDele!=null)
            {
                ErrorDele(mess);
            }
        }

        private void Log(object sender, LogMessage mess)
        {
            if(LogDele!=null)
            {
                LogDele(mess);
            }
        }
     //以輪詢的方式獲取message
        public Message Poll(int timeoutMilliseconds)
        {
            Message message =default(Message);
            _consumer.Consume(out message, timeoutMilliseconds);
            return message;
        }
    }
}
複製代碼

以事件激發的方式,由於是線程安全的方式調用,而本實例是後臺方式執行,少不了多線程,因此仍是以輪詢的方式。以輪詢的方式,這樣的程序須要放那塊尼?就是咱們的後臺程序框架。api

 

二 後臺程序管理框架開發                                                    安全

 

他的原理和job幾乎差很少,比job要簡單多了。看入口程序:服務器

複製代碼
using System;
using System.Collections.Generic;
using System.Collections.ObjectModel;
using System.IO;
using System.Linq;
using System.Reflection;
using System.Threading.Tasks;
using Microsoft.AspNetCore;
using Microsoft.AspNetCore.Hosting;
using Microsoft.Extensions.Configuration;
using Microsoft.Extensions.DependencyInjection;
using Microsoft.Extensions.Hosting;
using Microsoft.Extensions.Logging;
using EnvironmentName = Microsoft.Extensions.Hosting.EnvironmentName;
using Walt.Framework.Log;
using Walt.Framework.Service;
using Walt.Framework.Service.Kafka;
using Walt.Framework.Configuration;
using MySql.Data.EntityFrameworkCore;
using Microsoft.EntityFrameworkCore;
using System.Threading;
using IApplicationLife =Microsoft.Extensions.Hosting;
using IApplicationLifetime = Microsoft.Extensions.Hosting.IApplicationLifetime;

namespace Walt.Framework.Console
{
    public class Program
    {
        public static void Main(string[] args)
        {
            //這裏獲取程序及和工做線程配置信息
            Dictionary<string, Assembly> assmblyColl = new Dictionary<string, Assembly>();
            var host = new HostBuilder()
                    .UseEnvironment(EnvironmentName.Development)
                
                    .ConfigureAppConfiguration((hostContext, configApp) =>
                    {
                        //這裏netcore支持多數據源,因此能夠擴展到數據庫或者redis,集中進行配置。
                        //
                        configApp.SetBasePath(Directory.GetCurrentDirectory());
                        configApp.AddJsonFile(
                              $"appsettings.{hostContext.HostingEnvironment.EnvironmentName}.json",
                                 optional: true);
                        configApp.AddEnvironmentVariables("PREFIX_");
                        configApp.AddCommandLine(args);
                    }).ConfigureLogging((hostContext, configBuild) =>
                    {
                        configBuild.AddConfiguration(hostContext.Configuration.GetSection("Logging"));
                        configBuild.AddConsole();
                        configBuild.AddCustomizationLogger();
                    })
                    .ConfigureServices((hostContext, service) =>
                    {
                        service.Configure<HostOptions>(option =>
                        {
                            option.ShutdownTimeout = System.TimeSpan.FromSeconds(10);
                        });

                        service.AddKafka(KafkaBuilder =>
                        {
                            KafkaBuilder.AddConfiguration(hostContext.Configuration.GetSection("KafkaService"));
                        });
                        service.AddElasticsearchClient(config=>{
                            config.AddConfiguration(hostContext.Configuration.GetSection("ElasticsearchService"));
                        });

                        service.AddDbContext<ConsoleDbContext>(option =>
                        option.UseMySQL(hostContext.Configuration.GetConnectionString("ConsoleDatabase")), ServiceLifetime.Transient, ServiceLifetime.Transient);
                        ///TODO 待實現從數據庫中pull數據,再將任務添加進DI
                        service.AddSingleton<IConsole,KafkaToElasticsearch>();
                    })
                    .Build();
             CancellationTokenSource source = new CancellationTokenSource();
            CancellationToken token = source.Token;
            var task=Task.Run(async () =>{
                IConsole console = host.Services.GetService<IConsole>();
                await console.AsyncExcute(source.Token);
            },source.Token);
            Dictionary<string, Task> dictTask = new Dictionary<string, Task>();
            dictTask.Add("kafkatoelasticsearch", task);

            int recordRunCount = 0;
            var fact = host.Services.GetService<ILoggerFactory>();
            var log = fact.CreateLogger<Program>();
            var disp = Task.Run(() =>
            {
                while (true)
                {
                    if (!token.IsCancellationRequested)
                    {
                        ++recordRunCount;
                        foreach (KeyValuePair<string, Task> item in dictTask)
                        {
                            if (item.Value.IsCanceled
                            || item.Value.IsCompleted
                            || item.Value.IsCompletedSuccessfully
                            || item.Value.IsFaulted)
                            {
                                log.LogWarning("console任務:{0},參數:{1},執行異常,task狀態:{2}", item.Key, "", item.Value.Status);
                                if (item.Value.Exception != null)
                                {
                                    log.LogError(item.Value.Exception, "task:{0},參數:{1},執行錯誤.", item.Key, "");
                                    //TODO 根據參數更新數據庫狀態,以便被監控到。
                                }
                                //更新數據庫狀態。
                            }
                        }
                    }
                    System.Threading.Thread.Sleep(2000);
                    log.LogInformation("循環:{0}次,接下來等待2秒。", recordRunCount);
                }
            },source.Token);
            
            IApplicationLifetime appLiftTime = host.Services.GetService<IApplicationLifetime>();
            appLiftTime.ApplicationStopping.Register(()=>{
                log.LogInformation("程序中止中。");
                source.Cancel();
                log.LogInformation("程序中止完成。");
            });
            host.RunAsync().GetAwaiter().GetResult();
        }
    }
}
複製代碼

由於分佈式job有quartz,是有本身的設計理念,可是這個console後臺框架不須要,是本身開發,因此徹底和Host通用主機兼容,全部的部件均可以DI。設計原理就是以數據庫的配置,構造Task,而後使用

CancellationTokenSource和TaskCompletionSource去管理Task。運行結果根據狀態去更新數據庫,以便監控。固然我們這個例子功能沒實現全,後面能夠完善
,感興趣的能夠去個人github上pull代碼。我們看任務中的例子代碼:
複製代碼
using System.Collections.Generic;
using System.Threading;
using System.Threading.Tasks;
using Confluent.Kafka;
using Microsoft.Extensions.Configuration;
using Microsoft.Extensions.Logging;
using Nest;
using Walt.Framework.Log;
using Walt.Framework.Service.Elasticsearch;
using Walt.Framework.Service.Kafka;

namespace Walt.Framework.Console
{
    public class KafkaToElasticsearch : IConsole
    {
        ILoggerFactory _logFact;

        IConfiguration _config;

        IElasticsearchService _elasticsearch;

        IKafkaService _kafkaService;

        public KafkaToElasticsearch(ILoggerFactory logFact,IConfiguration config
        ,IElasticsearchService elasticsearch
        ,IKafkaService kafkaService)
        {
            _logFact = logFact;
            _config = config;
            _elasticsearch = elasticsearch;
            _kafkaService = kafkaService;
        }
        public async Task AsyncExcute(CancellationToken cancel=default(CancellationToken))
        {
            var log = _logFact.CreateLogger<KafkaToElasticsearch>();
            _kafkaService.AddConsumerEvent(new List<string>(){"mylog"});

        //以事件方式獲取message不工做,由於跨線程 // _kafkaService.GetMessageDele = (message) => { // var id = message.Key; // var offset = string.Format("{0}---{2}",message.Offset.IsSpecial,message.Offset.Value); // var topic = message.Topic; // var topicPartition = message.TopicPartition.Partition.ToString(); // var topicPartitionOffsetValue = message.TopicPartitionOffset.Offset.Value; // // log.LogInformation("id:{0},offset:{1},topic:{2},topicpatiton:{3},topicPartitionOffsetValue:{4}" // // ,id,offset,topic,topicPartition,topicPartitionOffsetValue); // }; // _kafkaService.ErrorDele = (message) => { // log.LogError(message.ToString()); // }; // _kafkaService.LogDele = (message) => { // log.LogInformation(message.ToString()); // }; // log.LogInformation("事件添加完畢"); // var waitForStop = // new TaskCompletionSource<object>(TaskCreationOptions.RunContinuationsAsynchronously); // cancel.Register(()=>{ // log.LogInformation("task執行被取消回掉函數"); // waitForStop.SetResult(null); // }); // waitForStop.Task.Wait(); // log.LogInformation("任務已經被取消。");
        //下面以輪詢方式。 if(!cancel.IsCancellationRequested) { while (true) { Message message = _kafkaService.Poll(2000); if (message != null) { if(message.Error!=null&&message.Error.Code!=ErrorCode.NoError) { //log.LogError("consumer獲取message出錯,詳細信息:{0}",message.Error); System.Console.WriteLine("consumer獲取message出錯,詳細信息:{0}",message.Error); System.Threading.Thread.Sleep(200); continue; } var id =message.Key==null?"":System.Text.Encoding.Default.GetString(message.Key); var offset = string.Format("{0}---{1}", message.Offset.IsSpecial, message.Offset.Value); var topic = message.Topic; var topicPartition = message.TopicPartition.Partition.ToString(); var topicPartitionOffsetValue = message.TopicPartitionOffset.Offset.Value; var val =System.Text.Encoding.Default.GetString( message.Value); EntityMessages entityMess = Newtonsoft.Json.JsonConvert.DeserializeObject<EntityMessages>(val); await _elasticsearch.CreateIndexIfNoExists<LogElasticsearch>("mylog"+entityMess.OtherFlag); // _elasticsearch.CreateMappingIfNoExists<LogElasticsearch>("mylog"+entityMess.OtherFlag // ,"mylog"+entityMess.OtherFlag+"type",null);

              //爲elasticsearch添加document var addDocumentResponse = await _elasticsearch.CreateDocument<LogElasticsearch>("mylog" + entityMess.OtherFlag , new LogElasticsearch() { Id = entityMess.Id, Time = entityMess.DateTime, LogLevel = entityMess.LogLevel, Exception = entityMess.Message } ); if (addDocumentResponse != null) { if (!addDocumentResponse.ApiCall.Success) { } } } } } return ; } } }
複製代碼

 

 

 三 elasticsearch 服務開發                                         

  服務已經開發不少了,主要就是構建和配置的設計,還有就是對組件的封裝,看程序結構:

配置:

複製代碼
{
  "Logging": {
    "LogLevel": {
      "Default": "Information",
      "System": "Information",
      "Microsoft": "Information"
    },
    "KafkaLog":{
      "Prix":"console", //目前這個屬性,能夠放程序類別,好比用戶中心,商品等。
      "LogStoreTopic":"mylog"
    }
  },
  "KafkaService":{
    "Properties":{
      "bootstrap.servers":"192.168.249.106:9092",
      "group.id":"group2"
    }
  },
  "ConnectionStrings": {
    "ConsoleDatabase":"Server=192.168.249.106;Database=quartz;Uid=quartz;Pwd=quartz"
  },
  "ElasticsearchService":{
    "Host":["http://192.168.249.105:9200","http://localhost:9200"],
    "TimeOut":"10000",
    "User":"",
    "Pass":""
  }
}
複製代碼

服務類:這裏有必要說下,elasticsearch是基於api的接口,最底層就是http請求,在接口上,實現了一個高級的接口和一個低級別的接口,固然低級別的接口須要熟悉elasticsearch的協議,

而高級別的api,使用強類型去使用,對開發頗有幫助。下面是封裝elasticsearch的服務類:

複製代碼
using System;
using System.Net.Http;
using Elasticsearch.Net;
using Microsoft.Extensions.Options;
using System.Threading.Tasks;
using Microsoft.Extensions.Logging;
using Nest;

namespace Walt.Framework.Service.Elasticsearch
{
    public class ElasticsearchService:IElasticsearchService
    {

        private  ElasticsearchOptions _elasticsearchOptions=null;

        private ElasticClient _elasticClient = null;

        private ILoggerFactory _loggerFac;

        public ElasticsearchService(IOptionsMonitor<ElasticsearchOptions>  options
        ,ILoggerFactory loggerFac)
        {
            _elasticsearchOptions = options.CurrentValue;
             options.OnChange((elasticsearchOpt,s)=>{
                _elasticsearchOptions=elasticsearchOpt; 
                    System.Diagnostics.Debug
                    .WriteLine(Newtonsoft.Json.JsonConvert.SerializeObject(elasticsearchOpt)+"---"+s);
            });
       //鏈接客戶端需,支持多個節點,防止單點故障
            var lowlevelClient = new ElasticLowLevelClient();
            var urlColl = new Uri[_elasticsearchOptions.Host.Length];
            for (int i = 0; i < _elasticsearchOptions.Host.Length;i++)
            {
                urlColl[i] = new Uri(_elasticsearchOptions.Host[i]);
            }
            _loggerFac = loggerFac;
            var connectionPool = new SniffingConnectionPool(urlColl);
            var settings = new ConnectionSettings(connectionPool)
            .RequestTimeout(TimeSpan.FromMinutes(_elasticsearchOptions.TimeOut))
            .DefaultIndex("mylogjob");//設置默認的index
            _elasticClient = new ElasticClient(settings);
        }
     //若是index存在,則返回,若是不存在,則建立,type的建立方式是爲文檔類型打標籤ElasticsearchTypeAttribute
public async Task<bool> CreateIndexIfNoExists<T>(string indexName) where T : class
        {

            var log = _loggerFac.CreateLogger<ElasticsearchService>();
            var exists = await _elasticClient.IndexExistsAsync(Indices.Index(indexName));
            if (exists.Exists)
            {
                log.LogWarning("index:{0}已經存在", indexName.ToString());
                return await Task.FromResult(true);
            }
            var response = await _elasticClient.CreateIndexAsync(indexName
                ,c=>c.Mappings(mm=>mm.Map<T>(m=>m.AutoMap())));//將類型的屬性自動映射到index的type上,也能夠打標籤控制那個能夠映射,那些不能夠
            log.LogInformation(response.DebugInformation);
            if (response.Acknowledged)
            {
                log.LogInformation("index:{0},建立成功", indexName.ToString());
                return await Task.FromResult(false);
            }
            else
            {
                log.LogError(response.ServerError.ToString());
                log.LogError(response.OriginalException.ToString());
                return await Task.FromResult(false);
            }
        }



     //建立document
        public async Task<ICreateResponse> CreateDocument<T>(string indexName,T  t) where T:class
        {
            var log=_loggerFac.CreateLogger<ElasticsearchService>(); 
            if(t==null)
            {
                log.LogError("bulk 參數不能爲空。");
                return null;
            }
            IndexRequest<T> request = new IndexRequest<T>(indexName, TypeName.From<T>()) { Document = t };
             
             var createResponse = await _elasticClient.CreateDocumentAsync<T>(t);
             log.LogInformation(createResponse.DebugInformation);
            if (createResponse.ApiCall.Success)
            {
                log.LogInformation("index:{0},type:{1},建立成功", createResponse.Index, createResponse.Type);
                return createResponse;
            }
            else
            {
                log.LogError(createResponse.ServerError.ToString());
                log.LogError(createResponse.OriginalException.ToString());
                return null;
            }
        }
    }
}
複製代碼

poco類型,這個類會和index的typ相關聯的:

複製代碼
using System;
using Nest;

namespace Walt.Framework.Console
{
    [ElasticsearchTypeAttribute(Name="LogElasticsearchDefaultType")] //可使用類型生成和查找type
    public class LogElasticsearch
    {
        public string Id { get; set; }

        public DateTime Time { get; set; }

        public string LogLevel{ get; set; }

        public string Exception{ get; set; }

        public string Mess{ get; set; }
    }
}
複製代碼

而後就是執行咱們console後臺程序,就能夠在kibana看到日誌被同步的狀況:

 全部程序都提交到github,若是調試代碼,再看這篇文章,或許理解能更快。

相關文章
相關標籤/搜索