架構圖(資源問題一切從簡)html
下載必須的包 (注意 kafka spark對jdk,scala 版本有要求,官網查看)java
192.168.10.129 flume infuxdb grafana kapacitorpython
192.168.10.130 kafka sparklinux
安裝flume-ng(須要安裝jdk1.8)nginx
flume是二進制包直接解壓就行數據庫
cd conf;cp flume-env.sh.template flume-env.shapache
echo -e 'export JAVA_HOME=/opt/jdk1.8\nexport JAVA_OPTS="-Xms500m -Xmx1000m -Dcom.sun.management.jmxremote"'>>flume-env.shjson
flume-ng詳細介紹可參考 https://blog.csdn.net/zhaodedong/article/details/52541688bootstrap
source 詳解tomcat
Source類型 說明 Avro Source 支持Avro協議(其實是Avro RPC),內置支持 Thrift Source 支持Thrift協議,內置支持 Exec Source 基於Unix的command在標準輸出上生產數據 JMS Source 從JMS系統(消息、主題)中讀取數據,ActiveMQ已經測試過 Spooling Directory Source 監控指定目錄內數據變動 Twitter 1% firehose Source 經過API持續下載Twitter數據,試驗性質 Netcat Source 監控某個端口,將流經端口的每個文本行數據做爲Event輸入 Sequence Generator Source 序列生成器數據源,生產序列數據 Syslog Sources 讀取syslog數據,產生Event,支持UDP和TCP兩種協議 HTTP Source 基於HTTP POST或GET方式的數據源,支持JSON、BLOB表示形式 Legacy Sources 兼容老的Flume OG中Source(0.9.x版本)
channel詳解
Channel類型 說明
Memory Channel Event數據存儲在內存中
JDBC Channel Event數據存儲在持久化存儲中,當前Flume Channel內置支持Derby
File Channel Event數據存儲在磁盤文件中
Spillable Memory Channel Event數據存儲在內存中和磁盤上,當內存隊列滿了,會持久化到磁盤文件(當前試驗性的,不建議生產環境使用)
Pseudo Transaction Channel 測試用途
Custom Channel 自定義Channel實現
sink詳解
Sink類型 說明
HDFS Sink 數據寫入HDFS
Logger Sink 數據寫入日誌文件
Avro Sink 數據被轉換成Avro Event,而後發送到配置的RPC端口上
Thrift Sink 數據被轉換成Thrift Event,而後發送到配置的RPC端口上
IRC Sink 數據在IRC上進行回放
File Roll Sink 存儲數據到本地文件系統
Null Sink 丟棄到全部數據
HBase Sink 數據寫入HBase數據庫
Morphline Solr Sink 數據發送到Solr搜索服務器(集羣)
ElasticSearch Sink 數據發送到Elastic Search搜索服務器(集羣)
Kite Dataset Sink 寫數據到Kite Dataset,試驗性質的
Custom Sink 自定義Sink實現
flume.conf配置
# logser能夠看作是flume服務的名稱,每一個flume都由sources、channels和sinks三部分組成 ,source sinks能夠多個用空格隔開 # # sources能夠看作是數據源頭、channels是中間轉存的渠道、sinks是數據後面的去向 logser.sources = src_dir logser.sinks = sink_kfk logser.channels = ch # # source # # 源頭類型 logser.sources.src_dir.type = TAILDIR # # 記錄全部監控的文件信息 logser.sources.src_dir.positionFile=/opt/flume-1.8.0/logs/taildir_position.json logser.sources.src_dir.filegroups = f1 logser.sources.src_dir.filegroups.f1=/opt/logs/.* logser.sources.src_dir.filegroups.f1.headerKey1 = tomcatAPP logser.sources.src_dir.filegroups.fileHeader = true # # channel logser.channels.ch.type = memory logser.channels.ch.capacity = 10000 logser.channels.ch.transactionCapacity = 1000 # # kfk sink # # 指定sink類型是Kafka,說明日誌最後要發送到Kafka logser.sinks.sink_kfk.type = org.apache.flume.sink.kafka.KafkaSink # # Kafka servers#多個用逗號區分 logser.sinks.sink_kfk.kafka.bootstrap.servers= 192.168.10.130:9092 logser.sinks.sink_kfk.kafka.topic=tomcatCom # # Bind the source and sink to the channel logser.sources.src_dir.channels = ch logser.sinks.sink_kfk.channel = ch
啓動flume(先啓動kafka)
nohup bin/flume-ng agent --conf conf/ --conf-file conf/flume.conf --name logser -Dflume.root.logger=INFO,console >logs/fume-ng.log 2>&1 &
java log4j延伸
log4j.properties
log4j.rootlogger=INFO,stdout log4j.appender.flume = org.apache.flume.clients.log4jappender.Log4jAppender log4j.appender.flume.Hostname = flume //遠程flume agent avro主機名 log4j.appender.flume.Port = 41414 /遠程flume agent avro監聽端口 log4j.appender.flume.UnsafeMode = true /* 注意jar應用須要加依賴 <dependency> <groupId>org.apache.flume.flume-ng-clients</groupId> <artifactId>flume-ng-log4jappender</artifactId> <version>與flume版本同樣便可</version> </dependency> */
flume.conf
agent1.sources=avro-source agent1.channels=logger-channel agent1.sinks=kafka-sink #define source agent1.sources.avro-source.type=avro agent1.sources.avro-source.bind=192.168.10.129 agent1.sources.avro-source.port=41414 #define channel agent1.channels,logger-channel.type=memory #define sink agent1.sinks.kafka-sink.type=org.apache.flume.sink.kafka.KafkaSink agent1.sinks.kafka-sink.topic=mytest #kafka topic agent1.sinks.kafka-sink.bootstrap.servers=192.168.10.130:9092 agent1.sinks.kafka-sink.flumeBatchSize=20 #一批中處理多少條消息。較大的批次能夠提升吞吐量,默認100 agent1.sinks.kafka-sink.producer.acks=1 #默認爲1 # Bind the source and sink to the channel agent1.sources.avro-source.channels=logger-channel agent1.sinks.kafka-sink.channel=logger-channel
安裝kafka
安裝scalca環境
yum localinstall scala-2.11.7.rpm
安裝 jdk環境
export JAVA_HOME=/opt/jdk1.8
export PATH=$PATH:/opt/jdk1.8/bin
直接解壓kafka包
ZK使用的是kafka自帶的
zookeeper.properties
dataDir=/opt/kafka_2.11/zk dataLogDir=/opt/kafka_2.11/zk/logs clientPort=2181 maxClientCnxns=0
server.properties (本地測試環境單節點)
broker.id=0 #默認狀況下Producer往一個不存在的Topic發送message時會自動建立這個Topic #默認刪除時,會出現「marked for deletion」提示,只是將該topic標記爲刪除,使用list命令仍然能看到 #建立調整爲不自動建立,刪除怕誤刪因此保持默認不修改 auto.create.topics.enable=false delete.topic.enable=false port=9092 host.name=192.168.10.130 #默認是域名 外部網絡或者未配置hostname映射,報錯org.apache.kafka.common.errors.TimeoutException: Batch Expired advertised.host.name=192.168.10.130 num.network.threads=3 num.io.threads=8 socket.send.buffer.bytes=102400 socket.receive.buffer.bytes=102400 socket.request.max.bytes=104857600 log.dirs=/opt/kafka_2.11/kafka-logs num.partitions=1 num.recovery.threads.per.data.dir=1 offsets.topic.replication.factor=1 transaction.state.log.replication.factor=1 transaction.state.log.min.isr=1 log.retention.hours=168 log.segment.bytes=1073741824 log.retention.check.interval.ms=300000 zookeeper.connect=192.168.10.130:2181 zookeeper.connection.timeout.ms=6000 group.initial.rebalance.delay.ms=0
生產消費配置
#producer.properties bootstrap.servers=192.168.10.130:9092 compression.type=none #consumer.properties bootstrap.servers=192.168.10.130:9092 group.id=test-consumer-group
常見命令
#啓動zk bin/zookeeper-server-start.sh config/zookeeper.properties >> /opt/kafka_2.11/zk/logs/zk.log & #啓動kafka bin/kafka-server-start.sh -daemon config/server.properties & #建立topic bin/kafka-topics.sh --create --zookeeper 192.168.10.130:2181 --replication-factor 1 --partitions 1 --topic tomcatCom #查看topic bin/kafka-topics.sh --list --zookeeper 192.168.10.130:2181 bin/kafka-topics.sh --describe --zookeeper 192.168.10.130:2181 --topic test #模擬生產 bin/kafka-console-producer.sh --broker-list 192.168.10.130:9092 --topic test #模擬消費 bin/kafka-console-consumer.sh --bootstrap-server 192.168.10.130:9092 --topic tomcatCom --from-beginning --consumer.config config/consumer.properties
安裝spark
安裝scala,jdk,python 版本要求看官網
資源有限本地master採用local執行
安裝infuxdb+grafana+kapacitor
yum localinstall influxdb-1.5.2.x86_64.rpm grafana-5.1.3-1.x86_64.rpm kapacitor-1.4.1.x86_64.rpm
influxdb
service influxdb start
配置文件可參考http://www.ywnds.com/?p=10763
簡單運用可參考https://www.linuxdaxue.com/influxdb-study-series-manual.html
鑑權參考https://blog.csdn.net/caodanwang/article/details/51967393
influx -host '192.168.10.129' -port '8086'
influx -host '192.168.10.129' -port '8086' -username 'root' -password '123456'
curl -POST http://192.168.10.129:8086/query --data-urlencode "q=CREATE DATABASE mydb"
數據保留策略
name--名稱,此示例名稱爲 default duration--持續時間,0表明無限制 shardGroupDuration--shardGroup的存儲時間,shardGroup是InfluxDB的一個基本儲存結構,應該大於這個時間的數據在查詢效率上應該有所下降。 replicaN--全稱是REPLICATION,副本個數 default--是不是默認策略
SHOW RETENTION POLICIES ON nginxlog_clear CREATE RETENTION POLICY "2_hours" ON "nginxlog_clear" DURATION 2h REPLICATION 1 DEFAULT -- 建立name爲2_hours保留策略2小時的默認規則 -- ALTER RETENTION POLICY "2_hours" ON "nginxlog_clear" DURATION 4h DEFAULT -- 修改成4小時 -- drop retention POLICY "2_hours" ON "nginxlog_clear" -- 刪除 刪除了也會採用這策略報錯找不到2_hours策略須要將autogen的default改成true--
連續查詢
show continuous queries CREATE CONTINUOUS QUERY cq_1m ON nginxlog_clear BEGIN SELECT count(count) AS count_404 INTO current_data.two_year.ten_min_count FROM nginxlog_clear.autogen.nginx WHERE status = '404' GROUP BY time(1m) END -- cq_1m 連續查詢名字 nginxlog_clear,current_data數據庫 -- -- two_year,autogen保留策略-- -- ten_min_count,nginx表 -- DROP CONTINUOUS QUERY <cq_name> ON <database_name> -- 刪除,持續查詢不能修改只能刪除從新配置--
influxdb.conf 一些配置
bind-address = "192.168.10.129:8088" [meta] dir = "/opt/influxdb/meta" [data] dir = "/opt/influxdb/data" wal-dir = "/opt/influxdb/wal" [http] enabled = true bind-address = "192.168.10.129:8086" access-log-path = "/opt/influxdb/data/logs" [continuous_queries] enabled = true log-enabled = true run-interval = "1m" #能夠根據最短的持續查詢group by time(1m) 設定檢測時間
grafana配置
[server] domain = 192.168.10.129 [smtp] enabled = true host = 192.168.10.129:25 #注意postfix郵箱改爲inet_interfaces = 192.168.10.129 password = xian6630753 from_address = admin@grafana.com from_name = Grafana
配置圖形
配置告警郵箱
spark
NginxClear //這裏正則匹配用正則結合模式匹配更優
package com.sgm.spark import scala.util import scalaj.http._ import org.apache.spark.SparkConf import org.apache.kafka.common.serialization.StringDeserializer import org.apache.spark.streaming._ import org.apache.spark.streaming.kafka010._ object NginxClear { def main(args: Array[String]) { if (args.length !=2) { System.err.println(s""" |Usage: DirectKafkaWordCount <brokers> <topics> | <bootstrap.servers> is a list of one or more Kafka brokers | <topics> is a list of one or more kafka topics to consume from | """.stripMargin) System.exit(1) } val Array(brokers, topics) = args // Create context with 30 second batch interval val sparkConf = new SparkConf().setAppName("DirectKafkaWordCount").setMaster("local[2]") val ssc = new StreamingContext(sparkConf, Seconds(30)) ssc.checkpoint("C:\\Users\\itlocal\\IdeaProjects\\nginxlog\\checkpoint") // Create direct kafka stream with brokers and topics val topicsSet = topics.split(",").toSet val kafkaParams = Map[String, Object]("bootstrap.servers" -> brokers, "key.deserializer" -> classOf[StringDeserializer], "value.deserializer" -> classOf[StringDeserializer], "group.id" -> "use_a_separate_group_id_for_each_stream", "auto.offset.reset" -> "latest", "enable.auto.commit" -> (true: java.lang.Boolean) //採用checkpoint提交記錄的偏移量,沒有的話執行auto.offset.reset ) val messages = KafkaUtils.createDirectStream[String, String]( ssc, LocationStrategies.PreferConsistent, ConsumerStrategies.Subscribe[String, String](topicsSet, kafkaParams)) val logs=messages.map(_.value) /*改用正則匹配+模式匹配結合 val clearDate=logs.map(line=> { val info = line.split("\n") val infoPattern ="""(\d+\.\d+\.\d+\.\d+)(.*)(\d+/[A-Z,a-z]+/\d+\:\d+\:\d+\:\d+)(.*)([1-5]{1}[0-9]{2}) (\d+) (\"http.*\") (\".*\") (.*)""".r info.foreach(x=>x match { case infoPattern(ip,none1,currentTime,none2,respCode,requestTime,url,none3,upstreamTIME)=> println(ip+"\t"+currentTime+"\t"+respCode+"\t"+requestTime+"\t"+url+"\t"+upstreamTIME+"\t") case _=>println("none") }) }) */ val cleanDate=logs.map(mapFunc = line => { val influxdb_url = "http://192.168.10.129:8086/write?db=nginxlog_clear" val infos = line.split(" ") if (infos.length>10) { val actime = infos(3).split(" ")(0).split("\\[")(1) val random_num = (new util.Random).nextInt(999999) val curent_timestamp = (DateUtil.getTime(actime).toString + "000000").toLong + random_num //influxdb精確到納秒,而時間戳到毫秒,不轉換成納秒時間戳不識別 val urlPattern="\"http.*".r val ipPattern="^[1-5]{1}[0-9]{2}$".r //infos.foreach(println) var initUrl="\"none\"" var initStatus="\"none\"" infos.foreach(info=>urlPattern.findAllIn(info).foreach(x=>initUrl=Some(x).get)) infos.foreach(info=>ipPattern.findAllIn(info).foreach(x=>initStatus=x)) //println(initStatus) val date = s"nginxLog,ip=${infos(0)},acess_time=${DateUtil.parseTime(actime)},status=$initStatus,upstream_time=${infos.last} send_bytes=${infos(9)},url=$initUrl,count=1 $curent_timestamp" println(date) Http(influxdb_url).postData(date).header("content-type", "application/json").asString.code } })//.filter(clearlog=>clearlog.statusCode != 200) cleanDate.count().print() ssc.start() ssc.awaitTermination() } }
時間格式轉換
DateUtil
package com.sgm.spark import java.util.{Date, Locale} import org.apache.commons.lang3.time.FastDateFormat object DateUtil { val curent_day=FastDateFormat.getInstance("dd/MMM/yyyy:HH:mm:ss", Locale.ENGLISH) val targe_fomat=FastDateFormat.getInstance("yyyyMMddHHmmss") def getTime(time:String)={ //轉化爲時間戳 curent_day.parse(time).getTime } def parseTime(time:String)={ targe_fomat.format(new Date(getTime(time))) } def main(args: Array[String]): Unit = { println(parseTime("04/MAY/2017:09:22:05")) } }
maven安裝打包 參考 scala 開發環境安裝
submit提交
bin/spark-submit --master local[2] --class com.sgm.spark.NginxClear--name NginxClear /root/bigdata-1.0-SNAPSHOT.jar