search(9)- elastic4s logback-appender

前面寫了個cassandra-appender,一個基於cassandra的logback插件。正是cassandra的分佈式數據庫屬性才合適做爲akka-cluster-sharding分佈式應用的logger。因此,cassandra-appender核心功能就是對logback消息的存寫部分了。一樣,基於ES的logback-appender核心部分就是對ES的存寫過程了。在ES裏這個過程還附帶了索引indexing過程。未來對歷史消息的搜索、分析會更加方便。直接看看消息存寫這部分elastic4代碼:java

  def writeLog(event: ILoggingEvent)(client: ElasticClient, idx: String)(appName: String, ip: String, hostName: String, default: String) = { var content: List[(String,Any)] = List( APP_NAME -> appName, HOST_IP -> ip, HOST_NAME -> hostName, LOGGER_NAME -> event.getLoggerName(), LEVEL -> event.getLevel().toString, THREAD_NAME -> event.getThreadName(), LOG_DATE -> logDate, LOG_TIME -> logTime ) try { val callerData = event.getCallerData() if (callerData.nonEmpty) { content = content ++ List( CLASS_NAME -> callerData.head.getClassName(), FILE_NAME -> callerData.head.getFileName(), LINE_NUMBER -> callerData.head.getLineNumber().toString, METHOD_NAME -> callerData.head.getMethodName() ) } } catch {case e: Throwable => println(s"logging event error: ${e.getMessage}")} try { if (event.getThrowableProxy() != null) { val throwableStrs = event.getThrowableProxy().getSuppressed().asInstanceOf[List[IThrowableProxy]] val throwableStr = throwableStrs.foldLeft("") { case (b, t) => b + "," + t.getMessage() } content = content :+ (THROWABLE_STR -> throwableStr) } } catch {case e: Throwable => println(s"logging event error: ${e.getMessage}")} var logmsgs = event.getMessage() try { val logMap = fromJson[Map[String,String]](logmsgs) logMap.foreach ( m =>  content = content :+ (m._1 -> m._2)) } catch { case e: Throwable => content = content :+ (MESSAGE -> logmsgs) try { val dftMap = fromJson[Map[String,String]](default) dftMap.foreach ( m =>  content = content :+ (m._1 -> m._2)) } catch { case e: Throwable => } } val newRecord = indexInto(idx) .fields( content ).createOnly(true) client.execute(newRecord) //.await
 }

能夠看到,咱們先判斷了一下event.getMessage()消息是不是json格式的:若是是正確的json格式,那麼解析成爲字段名和字段值,不然就直接寫入log_msg字段 + 一串默認的字段和值。幹什麼呢?要知道這個elastic-appender是一個通用的logback-plugin,是能夠在任何軟件中使用的。由於各類軟件對運行狀態跟蹤目標、方式的要求不一樣,爲了知足這些要求,那麼經過用戶自定義跟蹤目標字段的方式應該是一個好的解決方案。從測試例子裏能夠理解:數據庫

  var loggedItems = Map[String,String]() loggedItems = loggedItems ++ Map( ("app_customer" -> "logback.com"), ("app_device" -> "9101"), ("log_msg" -> "specific message for elastic ...")) log.debug(toJson(loggedItems)) //logback.xml
    <appender name="elasticLogger" class="com.datatech.logback.ElasticAppender">
        <host>http://localhost</host>
        <port>9200</port>
        <appName>ESLoggerDemo</appName>
        <defaultFieldValues>{"app_customer":"中心書城","app_device":"9013"}</defaultFieldValues>
        <indexName>applog</indexName>
    </appender>

上面代碼裏定義了app_customer,app_device,log_msg這幾個自定義字段和值。這樣作的意思是:logback只定義了log.info(msg)裏msg一個字段。若是存放在數據庫裏咱們只能在msg一個字段裏進行分類、查詢了。但既然已經使用了數據庫做爲存儲咱們更但願用更多的字段來表明一條消息,如用戶號,機器號,店號等等。這樣跟蹤起來方便不少。因此,對於內部的用戶能夠要求把因應特殊須要額外增長的字段-值加密成json,而後傳遞給ElasticAppender去處理。對於應用中引用三方軟件所產生的logback-msg,咱們可沒辦法要求他們按照這個格式來傳遞消息,但仍然會存進ES,因此就用logback.xml中defaultFieldValaues定義的默認字段-值來填寫這些額外的信息了。json

這一篇咱們主要討論一下這個特別的elastic-appender,它的使用方法。那麼先重複一下logback的工做原理:app

首先認識一下logback:感受須要重點了解的logging運做核心應該是消息等級level的操做。消息等級是指logback根據不一樣的消息等級來篩選須要記錄的消息。logback支持下面幾個消息等級,按照各自記錄動做覆蓋面由弱到強排列,包括: TRACE -> DEBUG -> INFO -> WARN -> ERROR 分別對應記錄函數 trace(msg),debug(msg),info(msg),warn(msg),error(msg) logback按消息等級進行記錄篩選的規則以下: 假設記錄函數爲p,某個class的消息等級level爲q:當p>=q時選擇記錄消息。換言之調用函數error(msg)時logback會記錄全部等級消息,反之trace(msg)只能記錄TRACE級別的消息。logback手冊中以下表示: TRACE DEBUG INFO WARN ERROR OFF trace() YES NO NO NO NO NO debug() YES YES NO NO NO NO info() YES YES YES NO NO NO warn() YES YES YES YES NO NO error() YES YES YES YES YES NO logback中每一個類的默認消息等級能夠按照類型繼承樹結構繼承。當一個子類沒有定義消息等級時,它繼承對上父類的消息等級,即:X.Y.Z中Z的默認消息等級從Y繼承。

再看看下面logback.xml例子:分佈式

<?xml version="1.0" encoding="UTF-8"?>
<configuration>
    <appender name="STDOUT" class="ch.qos.logback.core.ConsoleAppender">
        <encoder>
            <Pattern>
                %d{yyyy-MM-dd HH:mm:ss} [%thread] %-5level %logger{36} - %msg%n </Pattern>
        </encoder>
    </appender>

    <appender name="FILE" class="ch.qos.logback.core.FileAppender">
        <!-- path to your log file, where you want to store logs -->
        <file>~/logback.log</file>
        <append>false</append>
        <encoder>
            <pattern>%d{HH:mm:ss.SSS} [%thread] %-5level %logger{36} - %msg%n</pattern>
        </encoder>
    </appender>

    <appender name="cassandraLogger" class="com.datatech.logback.CassandraAppender">
        <appName>POCServer</appName>
        <defaultFieldValues>{"app_customer":"999999","app_device":"9999"}</defaultFieldValues>
        <keyspaceName>applog</keyspaceName>
        <columnFamily>txnlog</columnFamily>
    </appender>

    <appender name="elasticLogger" class="com.datatech.logback.ElasticAppender">
        <host>http://localhost</host>
        <port>9200</port>
        <appName>ESLoggerDemo</appName>
        <defaultFieldValues>{"app_customer":"中心書城","app_device":"9013"}</defaultFieldValues>
        <indexName>applog</indexName>
    </appender>

    <logger name="com.datatech" level="info" additivity="false">
        <appender-ref ref="cassandraLogger" />
        <appender-ref ref="elasticLogger" />
        <appender-ref ref="STDOUT" />
    </logger>

    <logger name="com.datatech.sdp" level="info" additivity="false">
        <appender-ref ref="cassandraLogger" />
        <appender-ref ref="elasticLogger" />
        <appender-ref ref="STDOUT" />
    </logger>

    <root level="info">
        <appender-ref ref="cassandraLogger" />
        <appender-ref ref="elasticLogger" />
        <appender-ref ref="STDOUT" />
    </root>

    <shutdownHook class="ch.qos.logback.core.hook.DelayingShutdownHook"/>
</configuration>

上面配置文件中定義了包括STDOUT,FILE,cassandraLoggeer,elasticLogger幾個appender。首先,不一樣level可使用不一樣的appender。cassandraLogger,elasticLogger是咱們自定義的appender。在elasticLogger段落裏定義了ES終端鏈接參數如host,port。在ElasticAppender類源碼中的elastic終端鏈接和關閉以下:ide

override def start(): Unit = { if(! _hosts.isEmpty) { connectES() super.start() } } override def stop(): Unit = { if(optESClient.isDefined) { (optESClient.get).close() optESClient = None } super.stop() } def connectES(): Unit = { try { val url = _hosts + ":" + _port.toString val esjava = JavaClient(ElasticProperties(url)) val client = ElasticClient(esjava) optESClient = Some(client) } catch { case e: Throwable => optESClient = None } }

注意,假如host在logback.xml裏定義了那麼在ElasticAppender實例化時系統會自動直接鏈接,不然須要手工調用logger.start()來鏈接ES。xml文件裏的屬性是經過getter來獲取的,以下:函數

 private var _hosts: String = ""
  def setHost(host: String): Unit = _hosts = host
  def getHost : String = _hosts

  private var _port: Int = 9200
  def setPort(port: Int): Unit = _port = port

  private var _idxname: String = "applog"
  def setIndexName(indexName: String): Unit = _idxname = indexName

  private var _username: String = ""
  def setUsername(username: String): Unit = _username = username

  private var _password: String = ""
  def setPassword(password: String): Unit = _password = password

  private var _defaultFieldValues: String = ""
  def setDefaultFieldValues(defaultFieldValues: String) = _defaultFieldValues = defaultFieldValues

下面是ElasticAppender的使用示範:(先把logback_persist.jar放入lib目錄)測試

import scala.concurrent.ExecutionContext.Implicits.global import com.sksamuel.elastic4s.ElasticDsl._ import com.sksamuel.elastic4s.http.JavaClient import com.sksamuel.elastic4s.{ElasticClient, ElasticProperties} import ch.qos.logback.classic.Logger import ch.qos.logback.core.{ConsoleAppender, FileAppender} import com.datatech.logback.{CassandraAppender,ElasticAppender, JsonConverter} import ch.qos.logback.classic.spi.ILoggingEvent import org.slf4j.LoggerFactory import ch.qos.logback.classic.LoggerContext import java.time._ import java.time.format._ import java.util.Locale object ElasticAppenderDemo extends App with JsonConverter { val log: Logger = LoggerFactory.getLogger(org.slf4j.Logger.ROOT_LOGGER_NAME).asInstanceOf[Logger] val elasticAppender = log.getAppender("elasticLogger").asInstanceOf[ElasticAppender] val stdoutAppender = log.getAppender("STDOUT").asInstanceOf[ConsoleAppender[ILoggingEvent]] val fileAppender = log.getAppender("FILE").asInstanceOf[FileAppender[ILoggingEvent]] val cassAppender = log.getAppender("cassandraLogger").asInstanceOf[CassandraAppender] //stop other appenders
  if (stdoutAppender != null) stdoutAppender.stop() if (fileAppender != null) fileAppender.stop() if (cassAppender != null) cassAppender.stop() //check if host not set in logback.xml
  if(elasticAppender != null) { if (elasticAppender.getHost.isEmpty) { elasticAppender.setHost("http://localhost") elasticAppender.setPort(9200) elasticAppender.start() } } val dateTimeFormatter = DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss.SSS", Locale.US) val now = LocalDateTime.now.format(dateTimeFormatter) (1 to 100).foreach { idx => log.info(s"************this is a info message $idx ") } log.debug("***********debugging message here ..." + now) log.debug(toJson(loggedItems)) //stop the logger
 val loggerContext = LoggerFactory.getILoggerFactory.asInstanceOf[LoggerContext] loggerContext.stop() }

在Appender實例化時getAppender("elasticLogger")中這個elasticLogger是xml文件中appender段落名稱。若是host,port沒在xml文件中定義的話能夠手工用setter setHost,setPort在程序裏設置。loggerContext.stop()一次性關閉全部appender,包括它們鏈接的數據庫。也能夠用elasticAppender.stop()來關閉獨立的appender。ui

咱們能夠用elastic4自定義一個表結構mapping, 以下:this

    val esjava = JavaClient(ElasticProperties("http://localhost:9200")) val client = ElasticClient(esjava) //刪除索引
    val rspExists = client.execute(indexExists("applog")).await
    if (rspExists.result.exists) client.execute(deleteIndex("applog")).await

    //構建索引
    val idxCreate = client.execute(createIndex("applog") .shards(1).replicas(1)).await
    //建立表結構
    if(idxCreate.isSuccess) { val applogMapping = client.execute( putMapping("applog").fields( textField("class_name"), textField("file_name"), ipField("host_ip"), textField("host_name"), keywordField("level"), keywordField("line_number"), keywordField("logger_name"), keywordField("method_name"), keywordField("thread_name"), textField("throwable_str_rep"), dateField("log_date").format("basic_date").ignoreMalformed(true), dateField("log_time").format("basic_date_time").ignoreMalformed(true), textField("log_msg"), keywordField("app_name"), keywordField("app_customer"), keywordField("app_device") )).await
      if(applogMapping.isSuccess) println(s"mapping successfully created.") else println(s"mapping creation error: ${applogMapping.error.reason}") } else { println(s"index creation error: ${idxCreate.error.reason}") } client.close()

依賴引用在build.sbt裏:

name := "logback-persist-demo" version := "0.1" scalaVersion := "2.12.9" val elastic4sVersion = "7.6.0" libraryDependencies ++= Seq( "com.datastax.cassandra" % "cassandra-driver-core" % "3.6.0", "com.datastax.cassandra" % "cassandra-driver-extras" % "3.6.0", "com.sksamuel.elastic4s" %% "elastic4s-core" % elastic4sVersion, // for the default http client
  "com.sksamuel.elastic4s" %% "elastic4s-client-esjava" % elastic4sVersion, "ch.qos.logback"  %  "logback-classic"   % "1.2.3", "org.typelevel" %% "cats-core" % "2.0.0-M1", "org.json4s" %% "json4s-native" % "3.6.1", "org.json4s" %% "json4s-jackson" % "3.6.7", "org.json4s" %% "json4s-ext" % "3.6.7" )
相關文章
相關標籤/搜索