CassandraAppender - distributed logging,分佈式軟件logback-appender

   農曆年最後一場scala-meetup聽劉穎分享專業軟件開發經驗,大受啓發。忽然意識到一直以來都沒有徹底按照任何標準的開發規範作事。誠然,在作技術調研和學習的過程當中不會對規範操做有什麼嚴格要求,一旦技術落地進入應用階段,開始進行產品開發時,只有嚴格按照專業的軟件開發規範才能保證軟件產品的質量。劉穎在meetup中提到了異常處理(exception handling)和過程跟蹤(logging)做爲軟件開發規範中的重要環節。咱們在這篇先討論logging。logging經過記錄軟件運行過程幫助開發者跟蹤軟件運行狀況,分析運算結果或者異常產生緣由,是一個成功完整的軟件不可缺乏的環節。 logback應該是java生態鏈中最流行、最通用的logger了。雖然logback已經提供了STDOUT、FILE、DB等多種跟蹤信息輸出方式,即ConsoleAppender、FileAppender、DBAppender,但針對分佈式應用的appender仍是須要定製。由於分佈式軟件是跨系統運行的,跟蹤信息天然也會在不一樣的系統中產生並存儲,因此分佈式應用須要分佈式存儲才能實現跟蹤信息的全局管理。logback是一套開發架構,任何定製的appender能夠很方便地整合入logback。那麼咱們就嘗試開發一套基於cassandra的logback-appender。java

首先認識一下logback:感受須要重點了解的logging運做核心應該是消息等級level的操做。消息等級是指logback根據不一樣的消息等級來篩選須要記錄的消息。logback支持下面幾個消息等級,按照各自記錄動做覆蓋面由弱到強排列,包括:數據庫

TRACE -> DEBUG -> INFO -> WARN -> ERROR 分別對應記錄函數 trace(msg),debug(msg),info(msg),warn(msg),error(msg)json

logback按消息等級進行記錄篩選的規則以下:session

假設記錄函數爲p,某個class的消息等級level爲q:當p>=q時選擇記錄消息。換言之調用函數error(msg)時logback會記錄全部等級消息,反之trace(msg)只能記錄TRACE級別的消息。logback手冊中以下表示:架構

                 TRACE    DEBUG   INFO    WARN    ERROR   OFF
trace()         YES          NO      NO         NO         NO        NO
debug()       YES         YES      NO         NO         NO        NO
info()           YES         YES     YES         NO         NO        NO
warn()         YES         YES     YES        YES         NO        NO
error()         YES         YES     YES        YES        YES        NOapp

logback中每一個類的默認消息等級能夠按照類型繼承樹結構繼承。當一個子類沒有定義消息等級時,它繼承對上父類的消息等級,即:X.Y.Z中Z的默認消息等級從Y繼承。框架

好了,以上運做流程都包括在logback的功能裏了,跟消息的存儲appender沒什麼關係。下面咱們就開始自制一套基於cassandra的appender。上面提過,logback是一套開放的框架,任何按照logback要求開發的appender均可以很方便的整合入logback的功能中去。下面是一個logback的appender框架:分佈式

package com.datatech.logback import ch.qos.logback.classic.spi.ILoggingEvent import ch.qos.logback.core.UnsynchronizedAppenderBase import com.datastax.driver.core.querybuilder.QueryBuilder class CassandraAppender extends UnsynchronizedAppenderBase[ILoggingEvent]{ override def append(eventObject: ILoggingEvent): Unit = { //write log message to cassandra
 } override def start(): Unit = { //setup cassandra
 super.start() } override def stop(): Unit = { super.stop() //clean up, closing cassandra
 } }

咱們先實現一個完整的logback配置文件logback.xml,包括ConsoleAppender,FileAppender,CassandraAppender ide

<configuration>
    <appender name="STDOUT" class="ch.qos.logback.core.ConsoleAppender">
        <encoder>
            <Pattern>
                %d{yyyy-MM-dd HH:mm:ss} [%thread] %-5level %logger{36} - %msg%n </Pattern>
        </encoder>
    </appender>
    <appender name="FILE" class="ch.qos.logback.core.FileAppender">
        <!-- path to your log file, where you want to store logs -->
        <file>/Users/Tiger/logback.log</file>
        <append>false</append>
        <encoder>
            <pattern>%d{HH:mm:ss.SSS} [%thread] %-5level %logger{36} - %msg%n</pattern>
        </encoder>
    </appender>
    <appender name="cassandraLogger" class="com.datatech.logback.CassandraAppender">
      <hosts>192.168.0.189</hosts>
        <port>9042</port>
        <appName>posware</appName>
        <defaultFieldValues>{"app_customer":"bayakala.com","app_device":"1001"}</defaultFieldValues>
      <keyspaceName>applog</keyspaceName>
      <columnFamily>txnlog</columnFamily>
    </appender>
    <root level="debug">
      <appender-ref ref="cassandraLogger" />
      <appender-ref ref="STDOUT" />
    </root>
    <shutdownHook class="ch.qos.logback.core.hook.DelayingShutdownHook"/>
</configuration>

配置文件裏CassandraAppender的屬性,如hosts,port,keyspaceName等在scala程序實現方法以下:函數

 private var _hosts: String = "" def setHosts(hosts: String): Unit = _hosts = hosts private var _port: Int = 9042 // for the binary protocol, 9160 is default for thrift
  def setPort(port: Int): Unit = _port = port private var _username: String = "" def setUsername(username: String): Unit = _username = username private var _password: String = "" def setPassword(password: String): Unit = _password = password

屬性的使用以下:

  writeLog(eventObject)(optSession.get, _keyspaceName, _columnFamily)(_appName,ip,hostname,_defaultFieldValues)

實際上logback.xml裏的這些屬性能夠在runtime時設定,以下:

//get appender instances
  val log: Logger = LoggerFactory.getLogger(org.slf4j.Logger.ROOT_LOGGER_NAME).asInstanceOf[Logger] val cassAppender = log.getAppender("cassandraLogger").asInstanceOf[CassandraAppender] val stdoutAppender = log.getAppender("STDOUT").asInstanceOf[ConsoleAppender[ILoggingEvent]] val fileAppender = log.getAppender("FILE").asInstanceOf[FileAppender[ILoggingEvent]] if(cassAppender != null) { cassAppender.setHosts("192.168.0.189") cassAppender.setPort(9042) cassAppender.start() }

與通用的appender不一樣的是:咱們須要在應用中與logback進行互動,由於咱們須要把具體應用中一些特定的跟蹤目標看成數據庫字段記錄起來。這些特定的跟蹤目標如userid,productid等是應用業務特有的,通用的logger是沒法覆蓋的。因此咱們關注的是一套在應用層面通用的logger。爲了實現這一目標,首先能夠在數據庫表結構schema裏表現應用的業務特色,下面是個例子: 

CREATE TABLE IF NOT EXISTS applog.txnlog (
    class_name text,
    file_name text,
    host_ip text,
    host_name text,
    level text,
    line_number text,
    logger_name text,
    method_name text,
    thread_name text,
    throwable_str_rep text,
    log_date text,
    log_time text,
    log_msg text,
    app_name text,
    app_customer text,
    app_device text,
    PRIMARY KEY (app_customer, app_device, log_date, log_time)
);

以上的schema中app_customer,app_device屬於應用業務屬性,由於咱們但願從用戶或設備角度對消息進行分類管理。以此類推對其它應用咱們也是經過設計另外一套涵蓋業務特性的schema。這些反應業務特性的字段必須在應用中調用消息記錄函數時提供,由於這些字段的內容是動態的(如:一個服務端軟件的用戶可能有幾百上千個)。咱們只能經過記錄的消息來傳遞這些字段的值。記住,logback能夠同時支持自帶的appender如ConsoleAppender,FileAppender等,以及CassandraAppender,你們共用logback獲取的msg,但咱們又必須經過對msg的處理才能加入動態屬性的值。爲了避免影響msg的可讀性,能夠用json來處理msg,以下:

   var msg = event.getMessage() try { val logMap = fromJson[Map[String,String]](msg) logMap.foreach ( m => qryInsert = qryInsert.value(m._1, m._2)) } catch { case e: Throwable => qryInsert = qryInsert.value(MESSAGE, msg) try { val dftMap = fromJson[Map[String,String]](default) dftMap.foreach ( m => qryInsert = qryInsert.value(m._1, m._2)) } catch { case e: Throwable => } } session.execute(qryInsert)

若是event.getMessage()獲取的msg不是json格式(如:消息是應用中引用的第三方工具庫產生的),就採用在配置文件中定義的默認值(也是json格式的),如上面配置文件中的<defaultFieldValues>屬性。

cassandra的使用比較簡單,並且咱們只使用了insert一項操做。完整的CassandraAppender源代碼以下:

package com.datatech.logback import ch.qos.logback.classic.spi._ import ch.qos.logback.core.UnsynchronizedAppenderBase import com.datastax.driver.core._ import com.datastax.driver.core.querybuilder.{Insert, QueryBuilder} import java.net.InetAddress import java.time._ import java.time.format._ import java.util.Locale class CassandraAppender extends UnsynchronizedAppenderBase[ILoggingEvent]{ import CassandraAppender._ private var _hosts: String = "" def setHosts(hosts: String): Unit = _hosts = hosts private var _port: Int = 9042 // for the binary protocol, 9160 is default for thrift
  def setPort(port: Int): Unit = _port = port private var _username: String = "" def setUsername(username: String): Unit = _username = username private var _password: String = "" def setPassword(password: String): Unit = _password = password private var _defaultFieldValues: String = "" def setDefaultFieldValues(defaultFieldValues: String) = _defaultFieldValues = defaultFieldValues private val ip: String = getIP() private val hostname: String = getHostName() // Keyspace/ColumnFamily information
  private var _keyspaceName: String = "Logging" def setKeyspaceName(keyspaceName: String): Unit = _keyspaceName = keyspaceName private var _columnFamily: String = "log_entries" def setColumnFamily(columnFamily: String): Unit = _columnFamily = columnFamily private var _appName: String = "default" def setAppName(appName: String): Unit = _appName = appName private var _replication: String = "{ 'class' : 'SimpleStrategy', 'replication_factor' : 1 }" def setReplication(replication: String): Unit = _replication = replication private var _consistencyLevelWrite: ConsistencyLevel = ConsistencyLevel.ONE def setConsistencyLevelWrite(consistencyLevelWrite: String): Unit = { try { _consistencyLevelWrite = ConsistencyLevel.valueOf(consistencyLevelWrite.trim) } catch { case e: Throwable =>
         throw new IllegalArgumentException("Consistency level " + consistencyLevelWrite + " wasn't found.") } } private var optCluster: Option[Cluster] = None private var optSession: Option[Session] = None def connectDB(): Unit = { try { val cluster = new Cluster .Builder() .addContactPoints(_hosts) .withPort(_port) .build() val session = cluster.connect() optCluster = Some(cluster) optSession = Some(session) } catch { case e: Throwable => optCluster = None optSession = None println(s"error when logger connecting to cassandra [${_hosts}:${_port}]") } } override def append(eventObject: ILoggingEvent): Unit = { if(optSession.isDefined) { try { writeLog(eventObject)(optSession.get, _keyspaceName, _columnFamily)(_appName,ip,hostname,_defaultFieldValues) } catch { case e: Throwable => } } } override def start(): Unit = { if(! _hosts.isEmpty) { connectDB() super.start() } } override def stop(): Unit = { super.stop() if(optSession.isDefined) { optSession.get.closeAsync() optCluster.get.closeAsync() } } } object CassandraAppender extends JsonConverter { // CF column names
  val HOST_IP: String = "host_ip" val HOST_NAME: String = "host_name" val APP_NAME: String = "app_name" val LOGGER_NAME: String = "logger_name" val LEVEL: String = "level" val CLASS_NAME: String = "class_name" val FILE_NAME: String = "file_name" val LINE_NUMBER: String = "line_number" val METHOD_NAME: String = "method_name" val THREAD_NAME: String = "thread_name" val THROWABLE_STR: String = "throwable_str_rep" val LOG_DATE: String = "log_date" val LOG_TIME: String = "log_time" val MESSAGE: String = "log_msg" val dateTimeFormatter = DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss.SSS", Locale.US) def logDate: String = java.time.LocalDate.now.format(DateTimeFormatter.ofPattern("yyyy-MM-dd")) def logTime: String = LocalDateTime.now.format(dateTimeFormatter).substring(11) def writeLog(event: ILoggingEvent)(session: Session, kspc: String, tbl: String)(appName: String, ip: String, hostName: String, default: String): ResultSet = { var qryInsert = QueryBuilder.insertInto(kspc,tbl) .value(APP_NAME,appName) .value(HOST_IP,ip) .value(HOST_NAME,hostName) .value(LOGGER_NAME,event.getLoggerName()) .value(LEVEL,event.getLevel().toString) .value(THREAD_NAME,event.getThreadName()) .value(LOG_DATE,logDate) .value(LOG_TIME,logTime) try { val callerData = event.getCallerData() if (callerData.nonEmpty) { qryInsert = qryInsert.value(CLASS_NAME, callerData.head.getClassName()) .value(FILE_NAME, callerData.head.getFileName()) .value(LINE_NUMBER, callerData.head.getLineNumber().toString) .value(METHOD_NAME, callerData.head.getMethodName()) } } catch {case e: Throwable => println(s"logging event error: ${e.getMessage}")} try { if (event.getThrowableProxy() != null) { val throwableStrs = event.getThrowableProxy().getSuppressed().asInstanceOf[List[IThrowableProxy]] val throwableStr = throwableStrs.foldLeft("") { case (b, t) => b + "," + t.getMessage() } qryInsert = qryInsert.value(THROWABLE_STR, throwableStr) } } catch {case e: Throwable => println(s"logging event error: ${e.getMessage}")} var msg = event.getMessage() try { val logMap = fromJson[Map[String,String]](msg) logMap.foreach ( m => qryInsert = qryInsert.value(m._1, m._2)) } catch { case e: Throwable => qryInsert = qryInsert.value(MESSAGE, msg) try { val dftMap = fromJson[Map[String,String]](default) dftMap.foreach ( m => qryInsert = qryInsert.value(m._1, m._2)) } catch { case e: Throwable => } } session.execute(qryInsert) } def getHostName(): String = { var hostname = "unknown"
    try { val addr: InetAddress = InetAddress.getLocalHost() hostname = addr.getHostName() } catch { case e: Throwable => hostname = "error"} hostname } def getIP(): String = { var ip: String = "unknown"
    try { val addr: InetAddress = InetAddress.getLocalHost() ip = addr.getHostAddress() } catch { case e: Throwable => ip = "error" } ip } }

下面是測試代碼:

import ch.qos.logback.classic.Logger
import ch.qos.logback.core.{ConsoleAppender, FileAppender}
import com.datatech.logback.{CassandraAppender, JsonConverter}
import ch.qos.logback.classic.spi.ILoggingEvent
import org.slf4j.LoggerFactory
import ch.qos.logback.classic.LoggerContext
import java.time._
import java.time.format._
import java.util.Locale

import scala.io._
import com.datastax.driver.core._


object LoggingDemo extends App with JsonConverter {
  val log: Logger = LoggerFactory.getLogger(org.slf4j.Logger.ROOT_LOGGER_NAME).asInstanceOf[Logger]
  val cassAppender = log.getAppender("cassandraLogger").asInstanceOf[CassandraAppender]
  val stdoutAppender = log.getAppender("STDOUT").asInstanceOf[ConsoleAppender[ILoggingEvent]]
  val fileAppender = log.getAppender("FILE").asInstanceOf[FileAppender[ILoggingEvent]]


/*
  val cluster = new Cluster
  .Builder()
    .addContactPoints("192.168.0.189")
    .withPort(9042)
    .build()
  val session = cluster.connect()
  val keyspace = getClass.getResource("/logger.schema")
  val table = getClass.getResource("/txnlog.schema")
  val qrykspc = Source.fromFile(keyspace.getPath).getLines.mkString
  session.execute(qrykspc)

  val qrytbl = Source.fromFile(table.getPath).getLines.mkString
  session.execute(qrytbl)

  session.close()
  cluster.close()


  val json = toJson(loggedItems)
  println(s"json = $json")

  val m = fromJson[Map[String,String]](json)
  println(s"map = $m")

  //stop the appenders
  if (stdoutAppender != null)
     stdoutAppender.stop()
  if (fileAppender != null)
      fileAppender.stop()
*/

  if(cassAppender != null) {
    cassAppender.setHosts("192.168.0.189")
    cassAppender.setPort(9042)
    cassAppender.start()
  }

  val  dateTimeFormatter = DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss.SSS", Locale.US)
  val now = LocalDateTime.now.format(dateTimeFormatter)

  log.info("************this is a info message ..." + now)
  log.debug("***********debugging message here ..." + now)

  var loggedItems = Map[String,String]()
//  loggedItems += ("app_name" -> "test")
  loggedItems = loggedItems ++ Map(
    ("app_customer" -> "logback.com"),
    ("app_device" -> "9101"),
    ("log_msg" -> "specific message for cassandra ..."))

  log.debug(toJson(loggedItems))


//stop the logger
  val loggerContext = LoggerFactory.getILoggerFactory.asInstanceOf[LoggerContext]
  loggerContext.stop()
}
相關文章
相關標籤/搜索