flume+kafka+zookeeper+spark+infuxdb+grafana+kapacitor監控平臺

 

架構圖(資源問題一切從簡)html

 

下載必須的包  (注意 kafka spark對jdk,scala 版本有要求,官網查看)java

 

 wget https://dl.influxdata.com/influxdb/releases/influxdb-1.5.2.x86_64.rpm 
 yum localinstall influxdb-1.5.2.x86_64.rpm 
 wget https://dl.influxdata.com/kapacitor/releases/kapacitor-1.4.1.x86_64.rpm
yum localinstall kapacitor-1.4.1.x86_64.rpm 
因爲網絡問題同一下載了再上傳到服務器

192.168.10.129  flume infuxdb grafana kapacitorpython

192.168.10.130  kafka sparklinux

安裝flume-ng(須要安裝jdk1.8)nginx

flume是二進制包直接解壓就行數據庫

cd conf;cp flume-env.sh.template  flume-env.shapache

echo -e 'export JAVA_HOME=/opt/jdk1.8\nexport JAVA_OPTS="-Xms500m -Xmx1000m -Dcom.sun.management.jmxremote"'>>flume-env.shjson

flume-ng詳細介紹可參考 https://blog.csdn.net/zhaodedong/article/details/52541688bootstrap

source 詳解tomcat

Source類型    說明
Avro Source    支持Avro協議(其實是Avro RPC),內置支持
Thrift Source    支持Thrift協議,內置支持
Exec Source    基於Unix的command在標準輸出上生產數據
JMS Source    從JMS系統(消息、主題)中讀取數據,ActiveMQ已經測試過
Spooling Directory Source    監控指定目錄內數據變動
Twitter 1% firehose Source    經過API持續下載Twitter數據,試驗性質
Netcat Source    監控某個端口,將流經端口的每個文本行數據做爲Event輸入
Sequence Generator Source    序列生成器數據源,生產序列數據
Syslog Sources    讀取syslog數據,產生Event,支持UDP和TCP兩種協議
HTTP Source    基於HTTP POST或GET方式的數據源,支持JSON、BLOB表示形式
Legacy Sources    兼容老的Flume OG中Source(0.9.x版本)
View Code

channel詳解

Channel類型    說明
Memory Channel    Event數據存儲在內存中
JDBC Channel    Event數據存儲在持久化存儲中,當前Flume Channel內置支持Derby
File Channel    Event數據存儲在磁盤文件中
Spillable Memory Channel    Event數據存儲在內存中和磁盤上,當內存隊列滿了,會持久化到磁盤文件(當前試驗性的,不建議生產環境使用)
Pseudo Transaction Channel    測試用途
Custom Channel    自定義Channel實現
View Code

sink詳解

Sink類型    說明
HDFS Sink    數據寫入HDFS
Logger Sink    數據寫入日誌文件
Avro Sink    數據被轉換成Avro Event,而後發送到配置的RPC端口上
Thrift Sink    數據被轉換成Thrift Event,而後發送到配置的RPC端口上
IRC Sink    數據在IRC上進行回放
File Roll Sink    存儲數據到本地文件系統
Null Sink    丟棄到全部數據
HBase Sink    數據寫入HBase數據庫
Morphline Solr Sink    數據發送到Solr搜索服務器(集羣)
ElasticSearch Sink    數據發送到Elastic Search搜索服務器(集羣)
Kite Dataset Sink    寫數據到Kite Dataset,試驗性質的
Custom Sink    自定義Sink實現
View Code

 flume.conf配置

# logser能夠看作是flume服務的名稱,每一個flume都由sources、channels和sinks三部分組成 ,source sinks能夠多個用空格隔開
# # sources能夠看作是數據源頭、channels是中間轉存的渠道、sinks是數據後面的去向
logser.sources = src_dir
logser.sinks = sink_kfk
logser.channels = ch
# # source
# # 源頭類型
logser.sources.src_dir.type = TAILDIR
# # 記錄全部監控的文件信息 
logser.sources.src_dir.positionFile=/opt/flume-1.8.0/logs/taildir_position.json
logser.sources.src_dir.filegroups = f1
logser.sources.src_dir.filegroups.f1=/opt/logs/.*
logser.sources.src_dir.filegroups.f1.headerKey1 = tomcatAPP
logser.sources.src_dir.filegroups.fileHeader = true
# # channel
logser.channels.ch.type = memory
logser.channels.ch.capacity = 10000
logser.channels.ch.transactionCapacity = 1000
# # kfk sink
# # 指定sink類型是Kafka,說明日誌最後要發送到Kafka
logser.sinks.sink_kfk.type = org.apache.flume.sink.kafka.KafkaSink
# # Kafka servers#多個用逗號區分
logser.sinks.sink_kfk.kafka.bootstrap.servers= 192.168.10.130:9092
logser.sinks.sink_kfk.kafka.topic=tomcatCom
# # Bind the source and sink to the channel
logser.sources.src_dir.channels = ch
logser.sinks.sink_kfk.channel = ch

啓動flume(先啓動kafka)

nohup  bin/flume-ng agent --conf conf/ --conf-file conf/flume.conf --name logser -Dflume.root.logger=INFO,console >logs/fume-ng.log 2>&1 &

java log4j延伸

log4j.properties

log4j.rootlogger=INFO,stdout
log4j.appender.flume = org.apache.flume.clients.log4jappender.Log4jAppender
log4j.appender.flume.Hostname = flume //遠程flume agent avro主機名
log4j.appender.flume.Port = 41414 /遠程flume agent avro監聽端口
log4j.appender.flume.UnsafeMode = true
/* 注意jar應用須要加依賴
<dependency>  
    <groupId>org.apache.flume.flume-ng-clients</groupId>  
    <artifactId>flume-ng-log4jappender</artifactId>  
    <version>與flume版本同樣便可</version>  
</dependency>  
*/

flume.conf

agent1.sources=avro-source
agent1.channels=logger-channel
agent1.sinks=kafka-sink

#define source
agent1.sources.avro-source.type=avro
agent1.sources.avro-source.bind=192.168.10.129
agent1.sources.avro-source.port=41414

#define channel
agent1.channels,logger-channel.type=memory

#define sink
agent1.sinks.kafka-sink.type=org.apache.flume.sink.kafka.KafkaSink
agent1.sinks.kafka-sink.topic=mytest #kafka topic
agent1.sinks.kafka-sink.bootstrap.servers=192.168.10.130:9092
agent1.sinks.kafka-sink.flumeBatchSize=20  #一批中處理多少條消息。較大的批次能夠提升吞吐量,默認100
agent1.sinks.kafka-sink.producer.acks=1 #默認爲1

# Bind the source and sink to the channel
agent1.sources.avro-source.channels=logger-channel
agent1.sinks.kafka-sink.channel=logger-channel

 

安裝kafka

安裝scalca環境

yum localinstall scala-2.11.7.rpm 

安裝 jdk環境

export JAVA_HOME=/opt/jdk1.8
export PATH=$PATH:/opt/jdk1.8/bin

直接解壓kafka包

ZK使用的是kafka自帶的

zookeeper.properties

dataDir=/opt/kafka_2.11/zk
dataLogDir=/opt/kafka_2.11/zk/logs
clientPort=2181
maxClientCnxns=0

server.properties (本地測試環境單節點)

broker.id=0
#默認狀況下Producer往一個不存在的Topic發送message時會自動建立這個Topic
#默認刪除時,會出現「marked for deletion」提示,只是將該topic標記爲刪除,使用list命令仍然能看到
#建立調整爲不自動建立,刪除怕誤刪因此保持默認不修改
auto.create.topics.enable=false
delete.topic.enable=false
port=9092
host.name=192.168.10.130
#默認是域名 外部網絡或者未配置hostname映射,報錯org.apache.kafka.common.errors.TimeoutException: Batch Expired
advertised.host.name=192.168.10.130
num.network.threads=3
num.io.threads=8
socket.send.buffer.bytes=102400
socket.receive.buffer.bytes=102400
socket.request.max.bytes=104857600
log.dirs=/opt/kafka_2.11/kafka-logs
num.partitions=1
num.recovery.threads.per.data.dir=1
offsets.topic.replication.factor=1
transaction.state.log.replication.factor=1
transaction.state.log.min.isr=1
log.retention.hours=168
log.segment.bytes=1073741824
log.retention.check.interval.ms=300000
zookeeper.connect=192.168.10.130:2181
zookeeper.connection.timeout.ms=6000
group.initial.rebalance.delay.ms=0

生產消費配置

#producer.properties
bootstrap.servers=192.168.10.130:9092
compression.type=none
#consumer.properties
bootstrap.servers=192.168.10.130:9092
group.id=test-consumer-group

常見命令

#啓動zk
bin/zookeeper-server-start.sh config/zookeeper.properties >> /opt/kafka_2.11/zk/logs/zk.log &
#啓動kafka
bin/kafka-server-start.sh -daemon config/server.properties &
#建立topic
bin/kafka-topics.sh --create --zookeeper 192.168.10.130:2181 --replication-factor 1 --partitions 1 --topic tomcatCom
#查看topic
bin/kafka-topics.sh --list --zookeeper 192.168.10.130:2181
bin/kafka-topics.sh --describe --zookeeper 192.168.10.130:2181 --topic test 
#模擬生產
bin/kafka-console-producer.sh --broker-list 192.168.10.130:9092 --topic test 
#模擬消費
bin/kafka-console-consumer.sh --bootstrap-server 192.168.10.130:9092 --topic tomcatCom  --from-beginning --consumer.config config/consumer.properties

安裝spark

安裝scala,jdk,python 版本要求看官網

資源有限本地master採用local執行

安裝infuxdb+grafana+kapacitor

yum localinstall influxdb-1.5.2.x86_64.rpm  grafana-5.1.3-1.x86_64.rpm  kapacitor-1.4.1.x86_64.rpm 

influxdb 

service influxdb start

配置文件可參考http://www.ywnds.com/?p=10763

簡單運用可參考https://www.linuxdaxue.com/influxdb-study-series-manual.html

鑑權參考https://blog.csdn.net/caodanwang/article/details/51967393

influx -host '192.168.10.129' -port '8086'

influx -host '192.168.10.129' -port '8086' -username 'root' -password '123456'

curl -POST http://192.168.10.129:8086/query --data-urlencode "q=CREATE DATABASE mydb"

 數據保留策略

name--名稱,此示例名稱爲 default
duration--持續時間,0表明無限制
shardGroupDuration--shardGroup的存儲時間,shardGroup是InfluxDB的一個基本儲存結構,應該大於這個時間的數據在查詢效率上應該有所下降。
replicaN--全稱是REPLICATION,副本個數
default--是不是默認策略
SHOW RETENTION POLICIES ON nginxlog_clear 
CREATE RETENTION POLICY "2_hours" ON "nginxlog_clear" DURATION 2h REPLICATION 1 DEFAULT   -- 建立name爲2_hours保留策略2小時的默認規則 --
ALTER RETENTION POLICY "2_hours" ON "nginxlog_clear" DURATION 4h DEFAULT -- 修改成4小時 --
drop retention POLICY "2_hours" ON "nginxlog_clear" -- 刪除 刪除了也會採用這策略報錯找不到2_hours策略須要將autogen的default改成true--

 連續查詢

show continuous queries 
CREATE CONTINUOUS QUERY cq_1m ON nginxlog_clear BEGIN SELECT count(count) AS count_404 INTO current_data.two_year.ten_min_count FROM nginxlog_clear.autogen.nginx WHERE status = '404' GROUP BY time(1m) END
-- cq_1m 連續查詢名字 nginxlog_clear,current_data數據庫 --
-- two_year,autogen保留策略--
-- ten_min_count,nginx表 --
DROP CONTINUOUS QUERY <cq_name> ON <database_name>
-- 刪除,持續查詢不能修改只能刪除從新配置--

influxdb.conf 一些配置

bind-address = "192.168.10.129:8088"
[meta]
dir = "/opt/influxdb/meta"
[data]
dir = "/opt/influxdb/data"
wal-dir = "/opt/influxdb/wal"
[http]
enabled = true
bind-address = "192.168.10.129:8086"
access-log-path = "/opt/influxdb/data/logs"
[continuous_queries]
enabled = true
log-enabled = true
run-interval = "1m"  #能夠根據最短的持續查詢group by time(1m) 設定檢測時間

 grafana配置

[server]
domain = 192.168.10.129
[smtp]
enabled = true 
host = 192.168.10.129:25  #注意postfix郵箱改爲inet_interfaces = 192.168.10.129
password = xian6630753  
from_address = admin@grafana.com
from_name = Grafana

 配置圖形

 

配置告警郵箱

 spark

 NginxClear  //這裏正則匹配用正則結合模式匹配更優

 

package com.sgm.spark
import scala.util
import scalaj.http._
import org.apache.spark.SparkConf
import org.apache.kafka.common.serialization.StringDeserializer
import org.apache.spark.streaming._
import org.apache.spark.streaming.kafka010._
object NginxClear {
  def main(args: Array[String]) {
    if (args.length !=2) {
      System.err.println(s"""
                            |Usage: DirectKafkaWordCount <brokers> <topics>
                            |  <bootstrap.servers> is a list of one or more Kafka brokers
                            |  <topics> is a list of one or more kafka topics to consume from
                            |
        """.stripMargin)
      System.exit(1)
    }


    val Array(brokers, topics) = args

    // Create context with 30 second batch interval
    val sparkConf = new SparkConf().setAppName("DirectKafkaWordCount").setMaster("local[2]")
    val ssc = new StreamingContext(sparkConf, Seconds(30))
    ssc.checkpoint("C:\\Users\\itlocal\\IdeaProjects\\nginxlog\\checkpoint")
    // Create direct kafka stream with brokers and topics
    val topicsSet = topics.split(",").toSet
    val kafkaParams = Map[String, Object]("bootstrap.servers" -> brokers,
      "key.deserializer" -> classOf[StringDeserializer],
      "value.deserializer" -> classOf[StringDeserializer],
      "group.id" -> "use_a_separate_group_id_for_each_stream",
      "auto.offset.reset" -> "latest",
      "enable.auto.commit" -> (true: java.lang.Boolean)  //採用checkpoint提交記錄的偏移量,沒有的話執行auto.offset.reset
    )
    val messages = KafkaUtils.createDirectStream[String, String](
      ssc,
      LocationStrategies.PreferConsistent,
      ConsumerStrategies.Subscribe[String, String](topicsSet, kafkaParams))
    val logs=messages.map(_.value)
    /*改用正則匹配+模式匹配結合
    val clearDate=logs.map(line=> {
      val info = line.split("\n")
      val infoPattern ="""(\d+\.\d+\.\d+\.\d+)(.*)(\d+/[A-Z,a-z]+/\d+\:\d+\:\d+\:\d+)(.*)([1-5]{1}[0-9]{2}) (\d+) (\"http.*\") (\".*\") (.*)""".r
      info.foreach(x=>x match {
        case infoPattern(ip,none1,currentTime,none2,respCode,requestTime,url,none3,upstreamTIME)=>
          println(ip+"\t"+currentTime+"\t"+respCode+"\t"+requestTime+"\t"+url+"\t"+upstreamTIME+"\t")
        case _=>println("none")
      })
    }) */
    val cleanDate=logs.map(mapFunc = line => {
      val influxdb_url = "http://192.168.10.129:8086/write?db=nginxlog_clear"
      val infos = line.split(" ")
      if (infos.length>10) {
        val actime = infos(3).split(" ")(0).split("\\[")(1)
        val random_num = (new util.Random).nextInt(999999)
        val curent_timestamp = (DateUtil.getTime(actime).toString + "000000").toLong + random_num  //influxdb精確到納秒,而時間戳到毫秒,不轉換成納秒時間戳不識別
        val urlPattern="\"http.*".r
        val ipPattern="^[1-5]{1}[0-9]{2}$".r
        //infos.foreach(println)
        var initUrl="\"none\""
        var initStatus="\"none\""
        infos.foreach(info=>urlPattern.findAllIn(info).foreach(x=>initUrl=Some(x).get))
        infos.foreach(info=>ipPattern.findAllIn(info).foreach(x=>initStatus=x))
        //println(initStatus)
        val date = s"nginxLog,ip=${infos(0)},acess_time=${DateUtil.parseTime(actime)},status=$initStatus,upstream_time=${infos.last} send_bytes=${infos(9)},url=$initUrl,count=1 $curent_timestamp"
        println(date)
        Http(influxdb_url).postData(date).header("content-type", "application/json").asString.code
      }
    })//.filter(clearlog=>clearlog.statusCode != 200)
    cleanDate.count().print()
    ssc.start()
    ssc.awaitTermination()
  }
}    

 

時間格式轉換

DateUtil

package com.sgm.spark


import java.util.{Date, Locale}

import org.apache.commons.lang3.time.FastDateFormat

object DateUtil {
  val curent_day=FastDateFormat.getInstance("dd/MMM/yyyy:HH:mm:ss", Locale.ENGLISH)
  val targe_fomat=FastDateFormat.getInstance("yyyyMMddHHmmss")
  def getTime(time:String)={    //轉化爲時間戳
    curent_day.parse(time).getTime
  }
  def parseTime(time:String)={
    targe_fomat.format(new Date(getTime(time)))
  }
  def main(args: Array[String]): Unit = {
    println(parseTime("04/MAY/2017:09:22:05"))
  }
}

 maven安裝打包 參考  scala 開發環境安裝

submit提交

bin/spark-submit --master local[2] --class com.sgm.spark.NginxClear--name NginxClear  /root/bigdata-1.0-SNAPSHOT.jar
相關文章
相關標籤/搜索