.Net Core 商城微服務項目系列(十三):搭建Log4net+ELK+Kafka日誌框架

以前是使用NLog直接將日誌發送到了ELK,本篇將會使用Docker搭建ELK和kafka,同時替換NLog爲Log4net。html

一.搭建kafkagit

1.拉取鏡像github

//下載zookeeper
docker pull wurstmeister/zookeeper //下載kafka
docker pull wurstmeister/kafka:2.11-0.11.0.3

2.啓動docker

//啓動zookeeper
docker run -d --name zookeeper --publish 2181:2181 --volume /etc/localtime:/etc/localtime wurstmeister/zookeeper //啓動kafka
docker run -d --name kafka --publish 9092:9092 \ --link zookeeper \ --env KAFKA_ZOOKEEPER_CONNECT=192.168.3.131:2181 \ --env KAFKA_ADVERTISED_HOST_NAME=192.168.3.131 \ --env KAFKA_ADVERTISED_PORT=9092 \ --volume /etc/localtime:/etc/localtime \ wurstmeister/kafka:2.11-0.11.0.3

3.測試Kafkajson

//查看Kafka容器ID
docker ps 進入容器 docker exec -it [容器ID] bin/bash //建立topic
bin/kafka-topics.sh --create --zookeeper 192.168.3.131:2181 --replication-factor 1 --partitions 1 --topic mykafka //查看topic
bin/kafka-topics.sh --list --zookeeper 192.168.3.131:2181

//建立生產者
bin/kafka-console-producer.sh --broker-list 192.168.3.131:9092 --topic mykafka //再開一個客戶端進入容器 //建立消費者
bin/kafka-console-consumer.sh --zookeeper 192.168.3.131:2181 --topic mykafka --from-beginning

再生產端發送消息,消費端能夠成功接收到就說明沒問題。bootstrap

 

二.Docker安裝ELKapi

1.拉取鏡像 docker pull sebp/elk 2.啓動ELK docker run -p 5601:5601 -p 9200:9200 -p 9300:9300 -p 5044:5044 -e ES_MIN_MEM=128m  -e ES_MAX_MEM=2048m -d --name elk sebp/elk //若啓動過程出錯通常是由於elasticsearch用戶擁有的內存權限過小,至少須要262144
切換到root用戶 執行命令: sysctl -w vm.max_map_count=262144 查看結果: sysctl -a|grep vm.max_map_count 顯示: vm.max_map_count = 262144 上述方法修改以後,若是重啓虛擬機將失效,因此: 解決辦法: 在 /etc/sysctl.conf文件最後添加一行 vm.max_map_count=262144 便可永久修改

等幾十秒,而後訪問9200和5601端口就能夠看到ELK相關的面板。瀏覽器

而後咱們還須要配置下logstash:bash

1.查看elk容器ID docker ps 2.進入elk容器 docker exec -it 容器ID bin/bash 3.執行命令 /opt/logstash/bin/logstash -e 'input { stdin { } } output { elasticsearch { hosts => ["localhost"] } }'

當命令成功被執行後,看到:Successfully started Logstash API endpoint {:port=>9600} 信息後,輸入:this is a dummy entry 而後回車,模擬一條日誌進行測試。app

打開瀏覽器,輸入:http://:9200/_search?pretty 如圖,就會看到咱們剛剛輸入的日誌內容。

注意:若是看到這樣的報錯信息 Logstash could not be started because there is already another instance using the configured data directory. If you wish to run multiple instances, you must change the "path.data" setting. 請執行命令:service logstash stop 而後在執行就能夠了。

 

OK,測試沒問題的話,說明Logstash和ES之間是能夠正常聯通,而後咱們須要配置Logstash從Kafka消費消息:

1.找到config文件 cd /opt/logstash/config 2.編輯配置文件 vi logstash.config
input { kafka{ bootstrap_servers =>["192.168.3.131:9092"] client_id => "test" group_id => "test" consumer_threads => 5 decorate_events => true topics => "mi" } } filter{ json{ source => "message" } } output { elasticsearch { hosts => ["localhost"] index => "mi-%{app_id}" codec => "json" } }

bootstrap_servers:Kafka地址

這裏介紹下這個app_id的做用,生產場景下咱們根據不一樣的項目生成不一樣的ES索引,好比服務是一個單獨的索引,Web是一個,MQ是一個,這些就能夠經過傳入的app_id來區分建立。

配置完成後加載該配置:

/opt/logstash/bin/logstash -f  /opt/logstash/config/logstash.conf

沒問題的話,此時Logstash就會從Kafka消費數據了,而後咱們新建一個.net core API項目測試一下:

1.經過NuGet引用 Microsoft.Extensions.Logging.Log4Net.AspNetCore;

2.啓動文件中注入 Log4Net:

public static IWebHost BuildWebHost(string[] args) => WebHost.CreateDefaultBuilder(args) .ConfigureLogging((logging) => { // 過濾掉 System 和 Microsoft 開頭的命名空間下的組件產生的警告級別如下的日誌
                logging.AddFilter("System", LogLevel.Warning); logging.AddFilter("Microsoft", LogLevel.Warning); logging.AddLog4Net(); }) .UseStartup<Startup>() .Build();

3.根目錄下添加 log4net.config,設置爲 「若是較新則複製」

appender 支持自定義,只要符合 appender 定義規則便可,這裏我使用了公司同事大佬寫的一個將日誌寫入 Kafka 的 Nuget 包, log4net.Kafka.Core,安裝後在 log4net.config 添加 KafkaAppender 配置便可。 log4net.Kafka.Core 源碼 在 github 上 ,能夠 Fork 自行修改。
<?xml version="1.0" encoding="utf-8" ?>
<log4net>
    <appender name="KafkaAppender" type="log4net.Kafka.Core.KafkaAppender, log4net.Kafka.Core">
        <KafkaSettings>
            <broker value="192.168.3.131:9092" />
            <topic value="mi" />
        </KafkaSettings>
        <layout type="log4net.Kafka.Core.KafkaLogLayout,log4net.Kafka.Core" >
            <appid value="api-test" />
        </layout>
    </appender>
    <root>
        <level value="ALL"/>
        <appender-ref ref="KafkaAppender" />
    </root>
</log4net>
broker: Kafka 服務地址,集羣可以使用,分割;
topic:日誌對應的 Topic 名稱;
appid:服務惟一標識,輔助識別日誌來源;
 
在 ValuesController 修改代碼進行測試:
[Route("api/[controller]")] public class ValuesController : Controller { private readonly ILogger _logger; public ValuesController(ILogger<ValuesController> logger) { _logger = logger; } // GET api/values
 [HttpGet] public IEnumerable<string> Get() { _logger.LogInformation("根據appId最後一次測試Kafka!"); return new string[] { "value1", "value2" }; } }

OK,運行而後訪問5601端口查看:

 

 OK,搭建成功!
 
 
參考文章:
相關文章
相關標籤/搜索