最近在作企業安全建設,企業安全建設中最多見的一項就是作監控,監控的種類多種多樣,可是底層的技術棧卻基本是一致的————大數據技術,下面我記錄一下我最近學習到的一些大數據技術,下文只是描述個脈絡而已。html
大數據的技術棧,以及對應的上下依賴圖以下:
sql
看完這個圖,是否是以爲和以前學習過的網絡協議、框架都很是相識,無非就是把裏面的名詞替換了一下而已。我感受軟件產品的設計思路都是要分模塊化、解耦合,你看TCP/IP協議層,每層都各司其職,每層裏面的每一個功能也是按照這個整體思路繼續向下設計。解耦合的好處不少,建議自行百度。apache
我我的以爲,裏面比較有難度的就是Flink那塊,由於對數據的分析、計算處理都是在這一塊中完成的,Flink也能夠用storm替換,不過性能沒有flink好。
當將計算結果存儲到ES以後,就能夠作不少事了,好比作自動告警功能了。安全
數據源能夠是任何數據,不過如今採集最多的應該就是日誌類數據服務器
採集器是最容易理解的,主要是用來彙總日誌而後轉發的,採集器的技術方案也有不少,這裏舉例filebeat。網絡
Filebeat主要由兩個組件構成:prospector(探測器)
和harvester(收集器)
,這兩類組件一塊兒協做完成Filebeat的工做。併發
Filebeat的工做流程以下:
當開啓Filebeat程序的時候,它會啓動一個或多個探測器去檢測指定的日誌目錄或文件,對於探測器找出的每個日誌文件,Filebeat會啓動收集進程,每個收集進程讀取一個日誌文件的內容,而後將這些日誌數據發送到後臺處理程序,後臺處理程序會集合這些事件,最後發送集合的數據到output指定的目的地。app
Filebeat在有數據源的機器安裝好以後,要作的就是寫一下配置,
主要配置讀取文件的路徑,以及輸出流的位置以及相應的性能參數等,以Kafka消息中間件做爲緩衝,全部的日誌收集器都向Kafka輸送日誌流。框架
定義日誌信息輸出格式:分佈式
<Properties> //存放日誌的文件夾名稱 <Property name="LOG_HOME">logs</Property> //日誌文件名稱 <property name="FILE_NAME">collector</property> //日誌格式 //[%d{yyyy-MM-dd'T'HH:mm:ss.SSSZZ}] 日誌輸入時間,東八區 //[%level{length=5}] 日誌級別,debug、info、warn、error //[%thread-%tid] 當前線程信息 //[%logger] 當前日誌信息所屬類全路徑 //[%X{hostName}] 當前節點主機名。須要經過MDC來自定義。 //[%X{ip}] 當前節點ip。須要經過MDC來自定義。 //[%X{applicationName}] 當前應用程序名。須要經過MDC來自定義。 //[%F,%L,%C,%M] %F:當前日誌信息所屬的文件(類)名,%L:日誌信息在所屬文件中的行號,%C:當前日誌所屬文件的全類名,%M:當前日誌所屬的方法名 //[%m] 日誌詳情 //%ex 異常信息 //%n 換行 <property name="patternLayout">[%d{yyyy-MM-dd'T'HH:mm:ss.SSSZZ}] [%level{length=5}] [%thread-%tid] [%logger] [%X{hostName}] [%X{ip}] [%X{applicationName}] [%F,%L,%C,%M] [%m] ## '%ex'%n </property>
Filebeat配置參考信息:
paths: - /usr/local/logs/error-collector.log document_type: "error-log" multiline: # pattern: '^\s*(\d{4}|\d{2})\-(\d{2}|[a-zA-Z]{3})\-(\d{2}|\d{4})' # 指定匹配的表達式(匹配以 2017-11-15 08:04:23:889 時間格式開頭的字符串) pattern: '^\[' # 指定匹配的表達式(匹配以 "{ 開頭的字符串) negate: true # 是否匹配到 match: after # 合併到上一行的末尾 max_lines: 2000 # 最大的行數 timeout: 2s # 若是在規定時間沒有新的日誌事件就不等待後面的日誌 fields: logbiz: collector logtopic: error-log-collector ## 按服務劃分用做kafka topic evn: dev output.kafka: enabled: true hosts: ["192.168.204.139:9092"] topic: '%{[fields.logtopic]}' partition.hash: reachable_only: true compression: gzip max_message_bytes: 1000000 required_acks: 1 logging.to_files: true
Apache kafka是消息中間件的一種,功能是高吞吐量的分佈式發佈訂閱消息系統
Kafka特色:
kafka中的消息不是kafka主動去拉去的,而必須有生產者往kafka寫消息。
kafka是不會主動往消費者發佈消息的,而必須有消費者主動從kafka拉取消息。
kafka名詞解釋:
kafka的幾個名詞須要知道一下,好比topic、producer、consumer、broker,下面用最俗的方式解釋
kafka的單節點基本操做:
生產者
# 建立一個主題(標籤),Hello-Kafka bin/kafka-console-producer.sh --broker-list localhost:9092 --topic Hello-Kafka # 生產者將等待來自stdin的輸入併發布到Kafka集羣。 默認狀況下,每一個新行都做爲新消息發佈,而後在 config / producer.properties 文件中指定默認生產者屬性。 # 在終端中鍵入幾行消息 egg1 egg2
消費者
# 與生產者相似,在 config / consumer.proper-ties 文件中指定了缺省使用者屬性。 打開一個新終端並鍵入如下消息消息語法。 bin/kafka-console-consumer.sh --zookeeper localhost:2181 —topic Hello-Kafka --from-beginning # 自動出現 egg1 egg2
Flink核心是一個流式的數據流執行引擎,其針對數據流的分佈式計算提供了數據分佈、數據通訊以及容錯機制等功。
簡單的說就是,Flink能夠對數據流進行轉換、計算、聚合等功能。若是你採集的數據須要作告警功能,那麼就須要用Flink或者storm,若是隻是將採集的數據進行存儲,而後展現,那麼就不須要用到Flink這種技術。
好比在企業安全建設中,作監控平臺就須要有告警功能,採集到的監控數據會直接往 kafka 裏塞,而後告警這邊須要從 kafka topic 裏面實時讀取到監控數據,並將讀取到的監控數據作一些 轉換、計算、聚合等操做,而後將計算後的結果與告警規則的閾值進行比較,而後作出相應的告警措施(釘釘羣、郵件、短信、電話等)。畫了個簡單的圖以下:
flink處理靜態sql的代理流程:
這個sql只能是寫死在代碼裏面,若是是想要動態的修改sql,那麼就要重啓flink服務才能生效。
可是有個需求,就像下圖這樣,sql語句來以外部,由於須要讓安全人員來描述規則,他們跟進安全態勢來修改,而且須要經常更新規則來挖掘出最新安全事件,
那麼就出現一個問題了,像上面的flink只能處理靜態sql,想動態處理怎麼辦?
使用 flink-siddhi 來處理動態sql:
SIDDHI 是一款功能強大的open source CEP(Complex Event Processing)引擎引擎,具備本身的DSL,豐富的模式匹配功能和可擴展性,
使用Siddhi 引擎的好處就是,裏面的sql語句能夠任意修改,修改sql後,也不須要重啓flink服務。
siddhi引擎我最近也是剛開始學習,這裏就不過多筆墨了,後面會出siddhi的專項文章。
ES太常見了,之後有空在補充吧。
Kibana也很常見,之後有空在補充吧。但願讀者給個評論或者推薦,讓我有動力更新完。
http://www.javashuo.com/article/p-ubrmippf-nd.html
https://www.jianshu.com/p/a8b66f586fd4
http://kafka.apachecn.org/
https://www.w3cschool.cn/apache_kafka/apache_kafka_introduction.html
http://www.javashuo.com/article/p-txmadmte-mc.html
https://ci.apache.org/projects/flink/flink-docs-release-1.4/
http://www.javashuo.com/article/p-hfnxkmtc-nd.html
https://baijiahao.baidu.com/s?id=1623279487849430246&wfr=spider&for=pc