反爬蟲項目開發html
項目介紹前端
項目背景java
爲何要有反爬蟲項目node
爬蟲程序大量佔用咱們的系統資源,好比帶寬/計算能力等mysql
爬蟲程序進行預訂/搶票影響咱們的正常業務.nginx
導入反爬WEB工程web
反爬蟲項目數據流程走向正則表達式
邏輯架構redis
功能描述算法
數據管理模塊
流程管理
非功能性描述
數據的組成:
點擊流(信息採集服務器)/業務日誌(業務服務器)/第三方對接
數據量級計算方式:
峯值數數據量有多大?
將8億條平均到每秒,峯值每秒20萬
500萬用戶,日活20萬,若是每一個用戶點擊30,600萬次點擊,業務日誌6000萬,總共數據量6600萬
公司集羣分類:
數據庫ER圖
PowerDesigner的使用
新建一個model
導出SQL文件
防爬規則
數據源
數據源做爲反扒的各類指標計算來源,主要包含用戶請求攜帶的各類參數.好比:用戶來源的URL,用請求的URL,用戶的SessionID.查詢相關的出發地/目的地/出發時間
防爬規則
爬蟲程序的特色
數據採集
安裝Openresty
配置Openresty
./configure --prefix=/usr/local/openresty --with-http_stub_status_module
若是缺乏依賴,安裝依賴
yum install readline-devel pcre-devel openssl-devel perl gcc
若是不想本身編譯,資料中 反扒參考資料\OpenResty\編譯後\openresty是編譯好的.直接放入Linux中就可使用
Lua語法入門
使用方式
交互方式
文件方式
數據類型
java中的數據類型
數字相關的: 整型: byte/int/short/long 浮點型: float/double 布爾類型: boolean 字符類型: char
Lua 數據類型
nil:==java中的null boolean:布爾類型 number:數字類型,不區分整型和浮點型 string:字符串 userdata: C的數據結構 function:函數(方法) thread:線程 table:集合/數組/Map
Lua運算符
賦值運算符
--賦值運算符 a = 3 b = "hello" print(a) print(b) c,d = 1,2 print(c,d) e,f = 1,2,3 print(e,f) g,h = 1 print(g,h) str = "hello" .. "world" print(str)
字符串的拼接操做不能使用"+",應該使用".."進行拼接
算術運算符
-- 算術運算符 -- 加減乘除取餘 a,b = 10,20 print("a+b=" .. a + b) print("a-b=" .. a - b) print("a*b=" .. a * b) print("a/b=" .. a / b) print("a%b=" .. a % b)
關係運算符
-- 關係運算符 print("========= 關係運算符 =========") a,b = 1,2 print("a等於b" .. a == b) print("a不等於b" .. a ~= b) print(a > b) print(a >= b) print(a < b) print(a <= b)
邏輯預算符
-- 邏輯運算符 print("========= 邏輯運算符 =========") a,b = true,false print(a and b) print(a or b) print (not b)
其它運算符
"#"用來獲取字符串或者集合的長度
--其它運算符
print("========其它運算符===========")
str = "hello java"
print(#str)
Lua流程控制
if條件判斷
-- 條件控制 -- if條件判斷 a = 10 if a > 5 then print("a>5") end -- if - else 條件判斷 if a > 11 then print("a>11") else print("a<=11") end -- if 嵌套 if a > 5 then print("a>5") if a < 12 then print("a<12") end end
循環
while循環
--While循環 print("=======while循環=======") while a > 0 do print(a) a = a - 1 end
repeat循環
-- repeat循環 print("=======repeat循環=======") repeat print(a) a = a + 1 until a > 10
repeat循環最少執行一次
假如一張紙的厚度爲0.04,累計疊多少次才能超過珠穆朗瑪峯的高度8847.
for循環
--for循環 print("=======for循環=======") for b = 10, 1,-1 do print(b) end
for循環後面3個參數
Lua的數組
--lua的數組 arr = {"zhangsan","lisi","wangwu"} print(#arr) for a = 1,#arr do print(arr[a]) end --使用泛型for循環 -- i是索引 -- name 該索引對應的值 for i,name in ipairs(arr) do print(i,name) end
注意:
使用ipairs的時候,只能針對於集合/數組 遍歷Map數據結構的時候須要使用pairs關鍵字
Lua的類型轉換
其它類型轉換爲字符串
tonumber()
-- 其它類型轉換爲String
-- tostring()
-- 布爾類型轉string
boo = true
print(type(boo))
print(type(tostring(boo)))
print(tostring(boo))
-- 數值類型轉string
num = 20
print(type(num))
print(type(tostring(num)))
print(tostring(num))
-- table類型轉string
tbl = {"tom","cat"}
print(type(tbl))
print(type(tostring(tbl)))
print(tostring(tbl))
通常都是將數字轉換爲字符串
function/table默認不能轉爲字符串
其它類型轉換爲數字
-- 其它類型轉數字: -- tonumber() num = "12" print(type(num)) print(type(tonumber(num))) print(tonumber(num)) num = "AF" print(type(num)) print(type(tonumber(num,16))) print(tonumber(num,16)) tbl = {"tom","cat"} print(tonumber(tbl)) boo = false print(tonumber(boo))
通常非數字格式的都轉換不了,好比布爾類型/table/"hello"
Lua的函數
Lua函數定義方式:
函數做用範圍 function 函數名字(參數1,參數2...)
函數體 return 結果1, 結果2 ...
end
--Lua的函數定義 function f1(a,b) print("hello function") return a+b end result = f1(3,4) print(result) --多個返回值 local function f2(a,b) return a,b end c,d = f2(1,2) print(c,d)
Lua變量的做用範圍
Lua變量默認做用範圍是全局的,
加了local關鍵字以後就變成了局部的,
若是使用全局變量,須要注意變量名不要定義重複了,原來的變量會被替換掉
-- 變量的做用範圍 a = 10 if a>3 then b = 20 local c = 30 print(a) print(b) print(c) end a = "hello" print(a) print(b) print(c) -- nil
Lua的Table
Lua的table能夠表明java中的數組/list/Map類型的數據結構
若是table中是數組格式的數據,遍歷的時候應該使用ipairs關鍵字,若是是Map數據結構,使用paris關鍵字
--定義一個集合table local arr = {"zhangsan","lisi","wangwu"} print(arr[1]) --使用索引遍歷table for i = 1, #arr do print(arr[i]) end print("========泛型方式遍歷=========") for index, value in ipairs(arr) do print(index, value) end print("========Map類型數據結構=========") map = {name="zhangsan", sex="男", age = 13} print(map["name"]) print(map.name) print(map.age) -- 賦值操做,能夠經過"."變量的形式進行賦值或者取值 map.address = "深圳" print(map.address) print("========使用循環遍歷Map數據結構=========") for key, value in pairs(map) do print(key, value) end
Lua的模塊
Lua的模塊功能依賴於table,先定義一個空的table來存儲成員變量或者函數
引用模塊的時候使用require關鍵字,require空格"模塊名字"注意不須要".lua"後綴名
模擬向Kafka發送消息
kafka.lua
-- 模擬向Kafka發送消息 _M = {} --默認分區數量 _M.default_partition_num = 5 function _M.new(props) -- 根據傳入的props,建立客戶端 return "Kafka client ..." end -- 向Kafka發送消息 function _M.send(topic, key, message) print("正在向Kafka發送消息,Topic爲:"..topic..",消息體爲:"..message) -- 根據發送結果,返回狀態信息,方便作出判斷 return nil,"error" end
testKafka.lua
-- 模擬測試引入自定義的Kafka模塊 require "Kafka" dpn = _M.default_partition_num print("默認分區數爲:"..dpn) --建立客戶端對象 --須要傳入props props = {{hosts="192.168.80.81", port="9092"},{hosts="192.168.80.81", port="9092"}} _M.new(props) --發送消息 ok, err = _M.send("sz07", "1", "向Kafka發送測試消息") if not ok then --若是結果不正常,打印一下錯誤信息 print(err) return end
Lua和Nginx的整合
Lua結合Nginx的2種方式
Lua代碼塊
location / { #root html; #index index.html index.htm; default_type text/html; content_by_lua_block{ #編寫lua代碼 print("hello") ngx.say("hello openresty") } }
Lua腳本文件
location / { #root html; #index index.html index.htm; default_type text/html; content_by_lua_file /export/servers/openresty/test.lua; }
content_by_lua_file /export/servers/openresty/test.lua;
最後又一個";"號別忘記寫了
Lua獲取Http請求參數
獲取Get請求參數
-- 使用Lua獲取Http請求參數 -- get請求參數的獲取 getArgs = ngx.req.get_uri_args() --獲取參數信息 for k,v in pairs(getArgs) do ngx.say("參數名:"..k.." 參數值:"..v) ngx.say("<br>") end
獲取Post請求參數
ngx.say("=======獲取Post請求參數========") -- post請求參數的獲取 -- 想要讀取請求體內容,須要先調用read_body()方法 ngx.req.read_body() postArgs = ngx.req.get_post_args() --獲取參數信息 for k,v in pairs(postArgs) do ngx.say("參數名:"..k.." 參數值:"..v) ngx.say("<br>") end
凡是涉及到操做請求體的動做,都須要先調用ngx.req.read_body()方法
獲取請求頭參數
ngx.say("=======獲取請求頭參數========") headerArgs = ngx.req.get_headers() for k,v in pairs(headerArgs) do ngx.say("參數名:"..k.." 參數值:"..v) ngx.say("<br>") end
獲取請求體內容(針對於JSON請求參數)
ngx.say("=======獲取請求體內容========") -- 必須先調用read_body()方法 ngx.req.read_body() bodyData = ngx.req.get_body_data() -- 由於若是是JSON的請求體內容,沒有辦法直接遍歷,因此直接輸出 ngx.say(bodyData)
使用Lua鏈接MySQL
先引用MySQL模塊.位置在:openresty/lualib/resty/mysql.lua
-- 鏈接MySQL操做 -- 引入MySQL的模塊 local restyMysql = require "resty.mysql" -- Lua調用方法默認用"."就能夠了,但若是第一個參數是self,那麼能夠經過":"來調用,就能夠省略掉第一個self參數 local mysql = restyMysql:new() --設置鏈接超時時間 mysql:set_timeout(20000) --開始鏈接MySQL --定義鏈接MySQL的配置 local opts = {} opts.host = "192.168.80.81" opts.port = 3306 opts.database = "test" opts.user = "root" opts.password = "root" local ok, err = mysql:connect(opts) if not ok then ngx.say("鏈接MySQL失敗" .. err) return end --定義SQL local sql = "select * from user" local result, err = mysql:query(sql) if not result then ngx.say("查詢數據失敗:" .. err) return end -- 從查詢結果中獲取數據 for i,row in ipairs(result) do for key,value in pairs(row) do ngx.say("列名:"..key.." 值爲:" .. value) end ngx.say("<br>") end ngx.say("全部數據打印完畢")
對MySQL進行增刪改操做
--新增數據 local sql = "insert into user values('lisi','123','深圳','0','2019-01-01')" local result, err = mysql:query(sql) if not result then ngx.say("插入數據失敗:" .. err) return end ngx.say("數據插入成功") --刪除數據 local sql = "delete from user where username='lisi'" local result, err = mysql:query(sql) if not result then ngx.say("數據刪除失敗:" .. err) return end ngx.say("數據刪除成功") for i,row in ipairs(result) do for key,value in pairs(row) do ngx.say("列名:"..key.." 值爲:" .. value) end ngx.say("<br>") end --修改數據 local sql = "update user set username = 'lisi' where username='zhangsan'" local result, err = mysql:query(sql) if not result then ngx.say("數據修改失敗:" .. err) return end ngx.say("數據修改爲功")
使用Lua鏈接Redis
redis單機安裝
Redis是基於內存的NoSQL的數據庫,裏面存儲的都是鍵值對.
若是不想編譯安裝,可使用反扒參考資料\Redis\redis-5.0.4直接拷貝到虛擬機中使用.
redis.conf配置文件
#綁定的主機地址 bind 0.0.0.0 #綁定的端口號 port 6379 #後臺運行,默認狀況下,redis服務器獨佔一個進程窗口 daemonize yes #redis進程文件所在目錄 pidfile /var/run/redis_6379.pid #redis備份文件 dbfilename dump.rdb
啓動Redis服務端
./redis-server redis.conf
查看redis狀態
ps -ef | grep redis
鏈接Redis
./redis-cli
Lua鏈接Redis
--使用Lua鏈接Redis --引用Redis的模塊 local restyRedis = require "resty.redis" --調用new方法建立redis客戶端 local redis = restyRedis:new() --設置超時時間 redis:set_timeout(20000) --建立鏈接 ok,err = redis:connect("192.168.80.83", 6379) if not ok then ngx.say("鏈接失敗"..err) return end -- 鏈接成功 ok, err = redis:set("username", "zhangsan") if not ok then ngx.say("設置失敗"..err) return end ngx.say("設置成功") --獲取Redis數據 ok, err = redis:get("username") if not ok then ngx.say("獲取失敗"..err) return end ngx.say(ok)
Redis集羣
運行原理
集羣搭建
參考反扒參考資料\Redis\Redis集羣搭建步驟.md
每一個節點的文件夾下面都有一個700x.conf
每一個配置文件中都有一些路徑相關的配置,因此儘可能安裝課程去存放,不然須要手動修改路徑
7001.conf:
port 7001 dir /export/servers/redis-5.0.4/cluster/7001/data cluster-enabled yescluster-config-file /export/servers/redis-5.0.4/cluster/7001/data/nodes.conf
啓動集羣:
bin/redis-server cluster/7001/7001.conf bin/redis-server cluster/7002/7002.conf bin/redis-server cluster/7003/7003.conf bin/redis-server cluster/7004/7004.conf bin/redis-server cluster/7005/7005.conf bin/redis-server cluster/7006/7006.conf 經過netstat -nltp查看集羣狀態
初始化:
若是服務端第一次啓動後,直接使用客戶端去鏈接,存入數據,這個時候會報錯,報槽沒有分配錯誤
下面的初始化操做,只須要第一次運行的時候執行,之後不須要再重複執行
-- 將下方的192.168.80.81替換爲本身的IP地址 bin/redis-cli --cluster create --cluster-replicas 1 你的機器IP:7001 192.168.80.83:7002 192.168.80.83:7003 192.168.80.83:7004 192.168.80.83:7005 192.168.80.83:7006
--cluster-replicas 1指定副本數爲1個
鏈接集羣:
bin/redis-cli -c -p 7001 set hello world get hello
-c 指定我是要鏈接集羣,若是不添加此參數,會形成重定向失敗
-p 指定鏈接的端口號
使用Lua鏈接Kafka
編寫Lua腳本
-- 鏈接Kafka發送消息 -- 引用Kafka模塊 local kafka = require "resty.kafka.producer" --建立producer local broker_list = {{host="192.168.80.81",port=9092},{host="192.168.80.82",port=9092},{host="192.168.80.83",port=9092}} local producer = kafka:new(broker_list) --發送數據 local ok, err = producer:send("test", "1", "hello openresty") if not ok then ngx.say("Kafka發送失敗"..err) return end ngx.say("消息發送成功")
啓動Kafka集羣
先啓動zookeeper
zkServer.sh start
啓動Kafka
nohup /export/servers/kafka_2.11-1.0.0/bin/kafka-server-start.sh /export/servers/kafka_2.11-1.0.0/config/server.properties > /dev/null 2>&1 &
/dev/null 指定消息輸出的目錄
2>&1 將錯誤消息轉換爲標準輸出
& 後臺運行
顯示全部的Topic
/export/servers/kafka_2.11-1.0.0/bin/kafka-topics.sh --zookeeper node01:2181 --list
啓動console-consumer
/export/servers/kafka_2.11-1.0.0/bin/kafka-console-consumer.sh --zookeeper node01:2181 --from-beginning --topic test
編寫Lua腳本進行信息採集
修改nginx.conf
http { include mime.types; default_type application/octet-stream; #log_format main '$remote_addr - $remote_user [$time_local] "$request" ' # '$status $body_bytes_sent "$http_referer" ' # '"$http_user_agent" "$http_x_forwarded_for"'; #access_log logs/access.log main; sendfile on; #tcp_nopush on; #keepalive_timeout 0; keepalive_timeout 65; #gzip on; #開啓共享詞典功能, 開啓的空間爲10Mb大小,由於咱們只是存儲一些數字,10Mb夠用了 lua_shared_dict shared_data 10m; #配置本地域名解析 resolver 127.0.0.1; server { listen 80; server_name localhost; #charset koi8-r; #access_log logs/host.access.log main; location / { #root html; #index index.html index.htm; #開啓 nginx 監控 stub_status on; default_type text/html; #content_by_lua_block{ # print("hello") # ngx.say("hello openresty") #} content_by_lua_file /export/servers/openresty/mylua/controller.lua; }
編寫controller.lua
--過載保護功能,若是鏈接超出必定範圍,再也不進行信息採集 --定義過載的最大值 local maxConnectNum = 10000 --獲取當前鏈接數量 local currentConnect = tonumber(ngx.var.connections_active) --若是當前鏈接數大於過載範圍,再也不進行信息採集 if currentConnect > maxConnectNum then return end -- 均衡分區操做 --定義Kafka分區數量 local partition_num = 6 --定義共享詞典中的變量名 local sharedKey = "publicValue" --共享詞典操做對象 local shared_data = ngx.shared.shared_data --從共享詞典中取出數據 local num = shared_data:get(sharedKey) --若是第一運行,num沒有值 if not num then --初始化一個值存入共享詞典 num = 0 shared_data:set(sharedKey, 0) end --進行取餘操做,肯定分區ID local patitionID = num % partition_num --調用共享詞典自帶的自增功能進行累加 shared_data:incr(sharedKey, 1) -- 數據採集 -- 獲取當前系統時間 local time_local = ngx.var.time_local if time_local == nil then time_local = "" end -- 請求的URL local request = ngx.var.request if request == nil then request = "" end -- 獲取請求方式 local request_method = ngx.var.request_method if request_method == nil then request_method = "" end -- 獲取請求的內容類型,text/html,application/json local content_type = ngx.var.content_type if content_type == nil then content_type = "" end -- 讀取請求體內容 ngx.req.read_body() --獲取請求體數據 local request_body = ngx.var.request_body if request_body == nil then request_body = "" end -- 獲取來源的URL local http_referer = ngx.var.http_referer if http_referer == nil then http_referer = "" end -- 客戶端的IP地址 local remote_addr = ngx.var.remote_addr if remote_addr == nil then remote_addr = "" end -- 獲取請求攜帶的UA信息 local http_user_agent = ngx.var.http_user_agent if http_user_agent == nil then http_user_agent = "" end -- 請求攜帶的時間 local time_iso8601 = ngx.var.time_iso8601 if time_iso8601 == nil then time_iso8601 = "" end -- 請求的IP地址(服務器地址) local server_addr = ngx.var.server_addr if server_addr == nil then server_addr = "" end --獲取用戶的Cookie信息 local http_cookie = ngx.var.http_cookie if http_cookie == nil then http_cookie = "" end --封裝數據 local message = time_local .."#CS#".. request .."#CS#".. request_method .."#CS#".. content_type .."#CS#".. request_body .."#CS#".. http_referer .."#CS#".. remote_addr .."#CS#".. http_user_agent .."#CS#".. time_iso8601 .."#CS#".. server_addr .."#CS#".. http_cookie; -- 鏈接Kafka,將message發送出去 -- 引用Kafka模塊 local kafka = require "resty.kafka.producer" --建立producer local broker_list = {{host="192.168.80.81",port=9092},{host="192.168.80.82",port=9092},{host="192.168.80.83",port=9092}} local producer = kafka:new(broker_list) --發送數據(主題,key(使用partitionid(0-5)做爲key),消息) local ok, err = producer:send("sz07", tostring(patitionID), message) if not ok then ngx.say("Kafka發送失敗"..err) return end
注意:
分區數量使用Lua沒法指定,須要使用kafka腳本手動指定 查看topic操做的幫助 /export/servers/kafka_2.11-1.0.0/bin/kafka-topics.sh --zookeeper node01:2181 --help 修改分區數量爲6: /export/servers/kafka_2.11-1.0.0/bin/kafka-topics.sh --zookeeper node01:2181 --alter --partitions 6 --topic sz07
數據預處理
獲取Kafka中的消息
建立工程
導入pom.xml配置文件
導入配置文件
將反扒參考資料\配置文件目錄下的文件拷貝到項目resources目錄下
修改配置文件中的IP相關配置
導入項目須要的實體類以及工具類
將反扒參考資料\工具包中的類拷貝到項目中
消費Kafka數據的2種方式
消費的偏移量由Spark保存在CheckPoint中
優勢:
1. 不會出現重複消費,可以保證剛好一次語義
鏈路統計
編寫主程序APP
package com.air.antispider.stream.dataprocess import com.air.antispider.stream.common.util.jedis.PropertiesUtil import com.air.antispider.stream.dataprocess.businessprocess.BusinessProcess import kafka.serializer.StringDecoder import org.apache.spark.SparkConf import org.apache.spark.streaming.dstream.{DStream, InputDStream} import org.apache.spark.streaming.{Seconds, StreamingContext} import org.apache.spark.streaming.kafka.KafkaUtils /** * 數據預處理的主程序 */ object DataProcessApp { def main(args: Array[String]): Unit = { //建立Spark配置對象 val sparkConf: SparkConf = new SparkConf().setMaster("local[*]").setAppName("DataProcessApp") //建立SparkStreamingContext對象 val ssc = new StreamingContext(sparkConf, Seconds(2)) //消費Kafka消息,有幾種方式?2種 var kafkaParams = Map[String, String]() //從kafkaConfig.properties配置文件中獲取broker列表信息 val brokerList: String = PropertiesUtil.getStringByKey("default.brokers", "kafkaConfig.properties") kafkaParams += ("metadata.broker.list" -> brokerList) val topics = Set[String]("sz07") //使用Direct方式從Kafka中消費數據 //StringDecoder:默認狀況下,java的序列化性能不高,Kafka爲了提升序列化性能,須要使用kafka本身的序列化機制 val inputDStream: InputDStream[(String, String)] = KafkaUtils.createDirectStream[String, String, StringDecoder, StringDecoder](ssc, kafkaParams, topics) //獲取的消息是(key,message)的形式, val messageDStream: DStream[String] = inputDStream.map(_._2) messageDStream.foreachRDD(messageRDD =>{ //開啓鏈路統計功能 BusinessProcess.linkCount(messageRDD) messageRDD.foreach(println) }) //啓動Spark程序 ssc.start() ssc.awaitTermination() } }
鏈路統計代碼
package com.air.antispider.stream.dataprocess.businessprocess import java.util.Date import com.air.antispider.stream.common.util.jedis.{JedisConnectionUtil, PropertiesUtil} import org.apache.spark.rdd.RDD import org.json4s.DefaultFormats import org.json4s.jackson.Json import redis.clients.jedis.JedisCluster /** * 鏈路統計功能 */ object BusinessProcess { def linkCount(messageRDD: RDD[String]) = { //信息採集量 val serverCountRDD: RDD[(String, Int)] = messageRDD.map(message => { val arr: Array[String] = message.split("#CS#") if (arr.length > 9) { //有數據 val serverIP = arr(9) //(ip,1次) (serverIP, 1) } else { ("", 1) } }) //按照Key進行累加操做 .reduceByKey(_ + _) //當前活躍鏈接數 val activeNumRDD: RDD[(String, Int)] = messageRDD.map(message => { val arr: Array[String] = message.split("#CS#") if (arr.length > 11) { //取IP val serverIP = arr(9) //取本IP的活躍鏈接數量 val activeNum = arr(11) //(ip,1次) (serverIP, activeNum.toInt) } else { ("", 1) } }) //捨棄一個值,主須要一個活躍鏈接數就ok了 .reduceByKey((x, y) => y) //進行數據展現 //經過跟蹤java代碼,發現咱們須要封裝一個json數據,存入Redis中,讓前端進行數據展現 if (!serverCountRDD.isEmpty() && !activeNumRDD.isEmpty()) { //若是數據不爲空,開始數據處理 //將RDD的結果轉換爲Map val serversCountMap: collection.Map[String, Int] = serverCountRDD.collectAsMap() val activeNumMap: collection.Map[String, Int] = activeNumRDD.collectAsMap() val map = Map[String, collection.Map[String, Int]]( "serversCountMap" -> serversCountMap, "activeNumMap" -> activeNumMap ) //將map轉換爲JSON val jsonData: String = Json(DefaultFormats).write(map) //將jsonData存入Redis中 //獲取Redis鏈接 val jedis: JedisCluster = JedisConnectionUtil.getJedisCluster //存入數據 //使用CSANTI_MONITOR_LP + 時間戳 格式來做爲Key val key: String = PropertiesUtil.getStringByKey("cluster.key.monitor.linkProcess", "jedisConfig.properties") + new Date().getTime val ex: Int = PropertiesUtil.getStringByKey("cluster.exptime.monitor", "jedisConfig.properties").toInt //當前數據是以天爲單位進行存儲的,因此有效時間,設置爲1天就好了 // jedis.set(key, jsonData) //設置超時時間爲2分鐘 jedis.setex(key, ex, jsonData) } } }
URL過濾
流程:
1. 先獲取MySQL數據庫中的URL過濾規則
代碼編寫
代碼:
import com.air.antispider.stream.common.util.database.QueryDB import scala.collection.mutable.ArrayBuffer /** * 加載MySQL中的規則,方便Spark進行計算 */ object AnalyzeRuleDB { /** * 獲取MySQL中的URL過濾規則 */ def getFilterRule(): ArrayBuffer[String] = { val sql = "select value from nh_filter_rule" val field = "value" //查詢數據庫的value列 val filterRule: ArrayBuffer[String] = QueryDB.queryData(sql, field) filterRule } }
在建立SparkContext以後,獲取Kafka數據以前,加載數據庫的信息,放入廣播變量
//加載數據庫規則,放入廣播變量 val filterRuleList: ArrayBuffer[String] = AnalyzeRuleDB.getFilterRule() //將過濾規則列表放入廣播變量 //@volatile 讓多個線程可以安全的修改廣播變量 @volatile var filterRuleBroadcast: Broadcast[ArrayBuffer[String]] = sc.broadcast(filterRuleList)
@volatile註解:
更新廣播變量
//先檢查數據庫,更新廣播變量
var filterRuleChangeFlag = jedis.get("FilterRuleChangeFlag")
//檢查標記是否存在
if (StringUtils.isBlank(filterRuleChangeFlag)) {
filterRuleChangeFlag = "true"
//從新設置到Redis中
jedis.set("FilterRuleChangeFlag", filterRuleChangeFlag)
}
//更新廣播變量
if (filterRuleChangeFlag.toBoolean) {
//FilterRuleChangeFlag爲true,表明須要從新更新廣播變量
//加載數據庫規則,放入廣播變量
val filterRuleList: ArrayBuffer[String] = AnalyzeRuleDB.getFilterRule()
//將過濾規則列表放入廣播變量
//@volatile 讓多個線程可以安全的修改廣播變量
filterRuleBroadcast = sc.broadcast(filterRuleList)
filterRuleChangeFlag = "false"
jedis.set("FilterRuleChangeFlag", filterRuleChangeFlag)
}
建立URLFilter過濾類
import scala.collection.mutable.ArrayBuffer
/**在主程序中引用URLFilter過濾類
//URL過濾功能
val filterRDD: RDD[String] = messageRDD.filter(message => URLFilter.filterURL(message, filterRuleBroadcast.value))
數據加密操做
代碼編寫:
package com.air.antispider.stream.dataprocess.businessprocess import java.util.regex.{Matcher, Pattern} import com.air.antispider.stream.common.util.decode.MD5 import org.apache.spark.rdd.RDD /** * 對用戶的敏感信息進行加密操做 */ object EncryptedData { /** * 加密身份證號 * @param encryptedPhoneRDD * @return */ def encryptedID(encryptedPhoneRDD: RDD[String]): RDD[String] = { //如何找到手機號 encryptedPhoneRDD.map(message => { //建立加密對象 val md5 = new MD5 //找message中的手機號 //可使用正則表達式來找 val pattern: Pattern = Pattern.compile("(\\d{18})|(\\d{17}(\\d|X|x))|(\\d{15})") //使用正則對象,對message進行匹配,matcher是匹配結果 val matcher: Matcher = pattern.matcher(message) var tempMessage = message // while (iterator.hasNext()) { // iterator.next() // } //循環結果,看有沒有匹配到的數據 while (matcher.find()) { //取出匹配結果 val id: String = matcher.group() //加密/替換 val encryptedID: String = md5.getMD5ofStr(id) tempMessage = tempMessage.replace(id, encryptedID) } //返回加密以後的數據 tempMessage }) } //手機號加密 def encryptedPhone(filterRDD: RDD[String]): RDD[String] = { //如何找到手機號 filterRDD.map(message => { //建立加密對象 val md5 = new MD5 //找message中的手機號 //可使用正則表達式來找 val pattern: Pattern = Pattern.compile("((13[0-9])|(14[5|7])|(15([0-3]|[5-9]))|(17[0-9])|(18[0,5-9]))\\d{8}") //使用正則對象,對message進行匹配,matcher是匹配結果 val matcher: Matcher = pattern.matcher(message) var tempMessage = message // while (iterator.hasNext()) { // iterator.next() // } //循環結果,看有沒有匹配到的數據 while (matcher.find()) { //取出匹配結果 val phone: String = matcher.group() //加密/替換 val encryptedPhone: String = md5.getMD5ofStr(phone) tempMessage = tempMessage.replace(phone, encryptedPhone) } //返回加密以後的數據 tempMessage }) } }
主程序:
//進行數據脫敏操做 //加密手機號 val encryptedPhoneRDD: RDD[String] = EncryptedData.encryptedPhone(filterRDD) //加密身份證號 val encryptedRDD: RDD[String] = EncryptedData.encryptedID(encryptedPhoneRDD)
數據切割操做
代碼:
package com.air.antispider.stream.dataprocess.businessprocess import java.util.regex.Pattern import com.air.antispider.stream.common.util.decode.{EscapeToolBox, RequestDecoder} import com.air.antispider.stream.common.util.jedis.PropertiesUtil import com.air.antispider.stream.common.util.string.CsairStringUtils /** * 數據切割主程序 */ object DataSplit { /** * 將源數據進行切割,獲得具體的參數 * @param message * @return */ def split(message: String):(String,String,String,String,String,String,String,String,String,String,String,String ) = { val values: Array[String] = message.split("#CS#") //從arr中取出這12個參數,進行賦值操做 //記錄數據長度 val valuesLength = values.length //request 原始數據 val regionalRequest = if (valuesLength > 1) values(1) else "" //分割出 request 中的 url val request = if (regionalRequest.split(" ").length > 1) { regionalRequest.split(" ")(1) } else { "" } //請求方式 GET/POST val requestMethod = if (valuesLength > 2) values(2) else "" //content_type val contentType = if (valuesLength > 3) values(3) else "" //Post 提交的數據體 val requestBody = if (valuesLength > 4) values(4) else "" //http_referrer val httpReferrer = if (valuesLength > 5) values(5) else "" //客戶端 IP val remoteAddr = if (valuesLength > 6) values(6) else "" //客戶端 UA val httpUserAgent = if (valuesLength > 7) values(7) else "" //服務器時間的 ISO8610 格式 val timeIso8601 = if (valuesLength > 8) values(8) else "" //服務器地址 val serverAddr = if (valuesLength > 9) values(9) else "" //Cookie 信息 //原始信息中獲取 Cookie 字符串,去掉空格,製表符 val cookiesStr = CsairStringUtils.trimSpacesChars(if (valuesLength > 10) values(10) else "") //提取 Cookie 信息並保存爲 K-V 形式 val cookieMap = { var tempMap = new scala.collection.mutable.HashMap[String, String] if (!cookiesStr.equals("")) { cookiesStr.split(";").foreach { s => val kv = s.split("=") //UTF8 解碼 if (kv.length > 1) { try { val chPattern = Pattern.compile("u([0-9a-fA-F]{4})") val chMatcher = chPattern.matcher(kv(1)) var isUnicode = false while (chMatcher.find()) { isUnicode = true } if (isUnicode) { tempMap += (kv(0) -> EscapeToolBox.unescape(kv(1))) } else { tempMap += (kv(0) -> RequestDecoder.decodePostRequest(kv(1))) } } catch { case e: Exception => e.printStackTrace() } } } } tempMap } //Cookie 關鍵信息解析 //從配置文件讀取 Cookie 配置信息 val cookieKey_JSESSIONID = PropertiesUtil.getStringByKey("cookie.JSESSIONID.key", "cookieConfig.properties") val cookieKey_userId4logCookie = PropertiesUtil.getStringByKey("cookie.userId.key", "cookieConfig.properties") //Cookie-JSESSIONID val cookieValue_JSESSIONID = cookieMap.getOrElse(cookieKey_JSESSIONID, "NULL") //Cookie-USERID-用戶 ID val cookieValue_USERID = cookieMap.getOrElse(cookieKey_userId4logCookie, "NULL") (request,requestMethod,contentType,requestBody,httpReferrer,remoteAddr,httpUserAgent,timeIso8601,serverAddr,cookiesStr,cookieValue_JSESSIONID,cookieValue_USERID) } }
主程序:
encryptedRDD.map(message => { //獲取到消息後開始進行數據切割/打標籤等操做 //數據切割 val (request, requestMethod, contentType, requestBody, httpReferrer, remoteAddr, httpUserAgent, timeIso8601, serverAddr, cookiesStr, cookieValue_JSESSIONID, cookieValue_USERID) = DataSplit.split(message) })
數據打標籤
爲了方便後面的業務進行數據解析操做,必須知道當前的信息是一個什麼樣的請求,好比是國內/查詢/單程,仍是國際/查詢/往返,
分類打標籤
去數據庫中查詢分類規則信息
/**
* 查詢標籤規則的數據
*/
def getClassifyRule(): Map[String, ArrayBuffer[String]] = {
//獲取"國內查詢"的全部URL
val nationalQuerySQL = "select expression from nh_classify_rule where flight_type = " + FlightTypeEnum.National.id + " and operation_type = " + BehaviorTypeEnum.Query.id
val nationalQueryList: ArrayBuffer[String] = QueryDB.queryData(nationalQuerySQL, "expression")
//獲取"國內預約"的全部URL
val nationalBookSQL = "select expression from nh_classify_rule where flight_type = " + FlightTypeEnum.National.id + " and operation_type = " + BehaviorTypeEnum.Book.id
val nationalBookList: ArrayBuffer[String] = QueryDB.queryData(nationalBookSQL, "expression")
//獲取"國際查詢"的全部URL
val internationalQuerySQL = "select expression from nh_classify_rule where flight_type = " + FlightTypeEnum.International.id + " and operation_type = " + BehaviorTypeEnum.Query.id
val internationalQueryList: ArrayBuffer[String] = QueryDB.queryData(internationalQuerySQL, "expression")
//獲取"國際預約"的全部URL
val internationalBookSQL = "select expression from nh_classify_rule where flight_type = " + FlightTypeEnum.International.id + " and operation_type = " + BehaviorTypeEnum.Book.id
val internationalBookList: ArrayBuffer[String] = QueryDB.queryData(internationalBookSQL, "expression")
//定義一個Map,用來封裝上面的4個集合 val map = Map[String, ArrayBuffer[String]]( "nationalQueryList" -> nationalQueryList, "nationalBookList" -> nationalBookList, "internationalQueryList" -> internationalQueryList, "internationalBookList" -> internationalBookList ) map
}
加載分類規則到廣播變量
//將分類規則加載到廣播變量 val classifyRuleMap: Map[String, ArrayBuffer[String]] = AnalyzeRuleDB.getClassifyRule() @volatile var classifyRuleBroadcast: Broadcast[Map[String, ArrayBuffer[String]]] = sc.broadcast(classifyRuleMap)
更新廣播變量
//更新分類規則信息
var classifyRuleChangeFlag: String = jedis.get("ClassifyRuleChangeFlag")
//先判斷classifyRuleChangeFlag是否爲空
if (StringUtils.isBlank(classifyRuleChangeFlag)){
classifyRuleChangeFlag = "true"
//從新設置到Redis中
jedis.set("ClassifyRuleChangeFlag", classifyRuleChangeFlag)
}
if (classifyRuleChangeFlag.toBoolean) {
classifyRuleBroadcast.unpersist()
//將分類規則加載到廣播變量
val classifyRuleMap: Map[String, ArrayBuffer[String]] = AnalyzeRuleDB.getClassifyRule()
classifyRuleBroadcast = sc.broadcast(classifyRuleMap)
classifyRuleChangeFlag = "false"
//從新設置到Redis中
jedis.set("ClassifyRuleChangeFlag", classifyRuleChangeFlag)
}
根據廣播變量中的規則對當前請求打標籤
package com.air.antispider.stream.dataprocess.businessprocess
import com.air.antispider.stream.common.bean.RequestType
import com.air.antispider.stream.dataprocess.constants.{BehaviorTypeEnum, FlightTypeEnum}
import com.air.antispider.stream.dataprocess.constants.FlightTypeEnum.FlightTypeEnum
import scala.collection.mutable.ArrayBuffer
object RequestTypeClassifier {
/**
* 對請求的分類進行判斷
* @param request
* @param classifyRuleMap
* @return 用戶的請求分類信息(國內,查詢)
*/
def requestTypeClassifier(request: String, classifyRuleMap: Map[String, ArrayBuffer[String]]): RequestType = {
//取出分類集合中的數據
val nationalQueryList: ArrayBuffer[String] = classifyRuleMap.getOrElse("nationalQueryList", null)
val nationalBookList: ArrayBuffer[String] = classifyRuleMap.getOrElse("nationalBookList", null)
val internationalQueryList: ArrayBuffer[String] = classifyRuleMap.getOrElse("internationalQueryList", null)
val internationalBookList: ArrayBuffer[String] = classifyRuleMap.getOrElse("internationalBookList", null)
//變量這4個集合,看當前的request在哪一個集合中匹配 //國內查詢 if (nationalQueryList != null) { // fira code for (expression <- nationalQueryList) { //判斷當前請求的URL是否和本正則匹配 if (request.matches(expression)) { return RequestType(FlightTypeEnum.National, BehaviorTypeEnum.Query) } } } //國內預約 if (nationalBookList != null) { // fira code for (expression <- nationalBookList) { //判斷當前請求的URL是否和本正則匹配 if (request.matches(expression)) { return RequestType(FlightTypeEnum.National, BehaviorTypeEnum.Book) } } } //國際查詢 if (internationalQueryList != null) { // fira code for (expression <- internationalQueryList) { //判斷當前請求的URL是否和本正則匹配 if (request.matches(expression)) { return RequestType(FlightTypeEnum.International, BehaviorTypeEnum.Query) } } } //國際預約 if (internationalBookList != null) { // fira code for (expression <- internationalBookList) { //判斷當前請求的URL是否和本正則匹配 if (request.matches(expression)) { return RequestType(FlightTypeEnum.International, BehaviorTypeEnum.Book) } } } //若是上面沒有任何一個匹配上,那麼返回一個默認值 return RequestType(FlightTypeEnum.Other, BehaviorTypeEnum.Other)
}
}
5.在主程序中引用打標籤的方法
//對請求的分類進行打標籤操做 val requestType: RequestType = RequestTypeClassifier.requestTypeClassifier(request, classifyRuleBroadcast.value)
往返類型打標籤
用戶請求信息中沒有攜帶往返類型信息,咱們須要須要用HttpReferrer中獲取日期數量來判斷往返類型,若是日期個數爲1,單程.若是個數爲2,往返
編寫代碼:
package com.air.antispider.stream.dataprocess.businessprocess import java.util.regex.{Matcher, Pattern} import com.air.antispider.stream.dataprocess.constants.TravelTypeEnum import com.air.antispider.stream.dataprocess.constants.TravelTypeEnum.TravelTypeEnum /** * 往返信息打標籤 */ object TravelTypeClassifier { def travelTypeClassifier(httpReferrer: String): TravelTypeEnum = { val pattern: Pattern = Pattern.compile("(\\d{4})-(0\\d{1}|1[0-2])-(0\\d{1}|[12]\\d{1}|3[01])") val matcher: Matcher = pattern.matcher(httpReferrer) //建立一個計數器 var num = 0 //調用find方法的時候,遊標會自動向下 while (matcher.find()) { num = num + 1 } if (num == 1) { //是單程 return TravelTypeEnum.OneWay } else if (num == 2) { //是往返 return TravelTypeEnum.RoundTrip } else { //不知道啊 return TravelTypeEnum.Unknown } } }
主程序:
//對往返數據進行打標籤操做 val travelTypeEnum: TravelTypeEnum = TravelTypeClassifier.travelTypeClassifier(httpReferrer)
數據解析操做
由於先有南航系統,由於系統開發久遠,各個模塊請求參數不統一或者請求格式不統一,咱們根據航線類型/操做類型/請求的URL/請求方式等信息,經過查詢數據庫中的analyzerule表信息,獲取解析規則,經過數據庫配置好的解析規則來進行數據解析,
此處主要肯定2個內容:1. 解析方式,好比使用json解析仍是使用XML方式解析. 2. 肯定須要解析哪些字段
加載數據庫解析規則
從反扒參考資料\工具包\解析類\AnalyzeRuleDB.scala中找到獲取解析規則的方法:queryRule
/** * 查詢"查詢規則"或者「預約規則」正則表達式,添加到廣播變量 * * @return */ def queryRule(behaviorType: Int): List[AnalyzeRule] = { //mysql中解析規則(0-查詢,1-預訂)數據 var analyzeRuleList = new ArrayBuffer[AnalyzeRule]() val sql: String = "select * from analyzerule where behavior_type =" + behaviorType var conn: Connection = null var ps: PreparedStatement = null var rs: ResultSet = null try { conn = c3p0Util.getConnection ps = conn.prepareStatement(sql) rs = ps.executeQuery() while (rs.next()) { val analyzeRule = new AnalyzeRule() analyzeRule.id = rs.getString("id") analyzeRule.flightType = rs.getString("flight_type").toInt analyzeRule.BehaviorType = rs.getString("behavior_type").toInt analyzeRule.requestMatchExpression = rs.getString("requestMatchExpression") analyzeRule.requestMethod = rs.getString("requestMethod") analyzeRule.isNormalGet = rs.getString("isNormalGet").toBoolean analyzeRule.isNormalForm = rs.getString("isNormalForm").toBoolean analyzeRule.isApplicationJson = rs.getString("isApplicationJson").toBoolean analyzeRule.isTextXml = rs.getString("isTextXml").toBoolean analyzeRule.isJson = rs.getString("isJson").toBoolean analyzeRule.isXML = rs.getString("isXML").toBoolean analyzeRule.formDataField = rs.getString("formDataField") analyzeRule.book_bookUserId = rs.getString("book_bookUserId") analyzeRule.book_bookUnUserId = rs.getString("book_bookUnUserId") analyzeRule.book_psgName = rs.getString("book_psgName") analyzeRule.book_psgType = rs.getString("book_psgType") analyzeRule.book_idType = rs.getString("book_idType") analyzeRule.book_idCard = rs.getString("book_idCard") analyzeRule.book_contractName = rs.getString("book_contractName") analyzeRule.book_contractPhone = rs.getString("book_contractPhone") analyzeRule.book_depCity = rs.getString("book_depCity") analyzeRule.book_arrCity = rs.getString("book_arrCity") analyzeRule.book_flightDate = rs.getString("book_flightDate") analyzeRule.book_cabin = rs.getString("book_cabin") analyzeRule.book_flightNo = rs.getString("book_flightNo") analyzeRule.query_depCity = rs.getString("query_depCity") analyzeRule.query_arrCity = rs.getString("query_arrCity") analyzeRule.query_flightDate = rs.getString("query_flightDate") analyzeRule.query_adultNum = rs.getString("query_adultNum") analyzeRule.query_childNum = rs.getString("query_childNum") analyzeRule.query_infantNum = rs.getString("query_infantNum") analyzeRule.query_country = rs.getString("query_country") analyzeRule.query_travelType = rs.getString("query_travelType") analyzeRule.book_psgFirName = rs.getString("book_psgFirName") analyzeRuleList += analyzeRule } } catch { case e: Exception => e.printStackTrace() } finally { c3p0Util.close(conn, ps, rs) } analyzeRuleList.toList }
將規則放入廣播變量
//加載解析規則信息到廣播變量 val queryRuleList: List[AnalyzeRule] = AnalyzeRuleDB.queryRule(0) val bookRuleList: List[AnalyzeRule] = AnalyzeRuleDB.queryRule(1) @volatile var queryRuleBroadcast: Broadcast[List[AnalyzeRule]] = sc.broadcast(queryRuleList) @volatile var bookRuleBroadcast: Broadcast[List[AnalyzeRule]] = sc.broadcast(bookRuleList)
更新廣播變量
//更新解析規則信息 var analyzeRuleChangeFlag: String = jedis.get("AnalyzeRuleChangeFlag") //先判斷classifyRuleChangeFlag是否爲空 if (StringUtils.isBlank(analyzeRuleChangeFlag)){ analyzeRuleChangeFlag = "true" //從新設置到Redis中 jedis.set("AnalyzeRuleChangeFlag", analyzeRuleChangeFlag) } if (analyzeRuleChangeFlag.toBoolean) { queryRuleBroadcast.unpersist() bookRuleBroadcast.unpersist() //將解析規則加載到廣播變量 //加載解析規則信息到廣播變量 val queryRuleList: List[AnalyzeRule] = AnalyzeRuleDB.queryRule(0) val bookRuleList: List[AnalyzeRule] = AnalyzeRuleDB.queryRule(1) queryRuleBroadcast = sc.broadcast(queryRuleList) bookRuleBroadcast = sc.broadcast(bookRuleList) analyzeRuleChangeFlag = "false" //從新設置到Redis中 jedis.set("AnalyzeRuleChangeFlag", analyzeRuleChangeFlag) }
編寫解析規則代碼
將反扒參考資料\工具包\解析類路徑下的AnalyzeBookRequest和AnalyzeRequest2個類拷貝到com.air.antispider.stream.dataprocess.businessprocess包下
主程序調用
//開始解析數據 //解析查詢數據 val queryParams: Option[QueryRequestData] = AnalyzeRequest.analyzeQueryRequest( requestType, requestMethod, contentType, request, requestBody, travelTypeEnum, queryRuleBroadcast.value) //解析預約數據 val bookParams: Option[BookRequestData] = AnalyzeBookRequest.analyzeBookRequest( requestType, requestMethod, contentType, request, requestBody, travelTypeEnum, bookRuleBroadcast.value )
數據加工
提早將本次訪問的IP和MySQL中的黑名單數據進行比對,判斷當前的IP是不是一個高頻IP,若是是高頻IP,那麼就打個標記,讓後續業務使用.
加載MySQL中的黑名單數據
/**
* 查詢MySQL數據庫中的黑名單數據
* @return
*/
def getIpBlackList (): ArrayBuffer[String] = {
val sql = "select ip_name from nh_ip_blacklist"
val blackIPList: ArrayBuffer[String] = QueryDB.queryData(sql, "ip_name")
blackIPList
}
將黑名單數據放入廣播變量
//將黑名單數據加載到廣播變量 val blackIPList: ArrayBuffer[String] = AnalyzeRuleDB.getIpBlackList() @volatile var blackIPBroadcast: Broadcast[ArrayBuffer[String]] = sc.broadcast(blackIPList)
更新廣播變量的黑名單數據
//更新黑名單信息 var blackIPChangeFlag: String = jedis.get("BlackIPChangeFlag") //先判斷classifyRuleChangeFlag是否爲空 if (StringUtils.isBlank(blackIPChangeFlag)){ blackIPChangeFlag = "true" //從新設置到Redis中 jedis.set("BlackIPChangeFlag", blackIPChangeFlag) } if (blackIPChangeFlag.toBoolean) { blackIPBroadcast.unpersist() //將黑名單數據加載到廣播變量 val blackIPList: ArrayBuffer[String] = AnalyzeRuleDB.getIpBlackList() blackIPBroadcast = sc.broadcast(blackIPList) blackIPChangeFlag = "false" //從新設置到Redis中 jedis.set("BlackIPChangeFlag", blackIPChangeFlag) }
編寫判斷高頻IP代碼
package com.air.antispider.stream.dataprocess.businessprocess
import scala.collection.mutable.ArrayBuffer
object IpOperation {
/**
* 判斷當前客戶端IP是不是高頻IP
* @param remoteAddr
* @param blackIPList
* @return
*/
def operationIP(remoteAddr: String, blackIPList: ArrayBuffer[String]): Boolean = {
//遍歷blackIPList,判斷remoteAddr在集合中是否存在
for (blackIP <- blackIPList) {
if (blackIP.equals(remoteAddr)){
//若是相等,當前IP是高頻IP
return true
}
}
return false
}
}
主程序代碼
//數據加工操做
val highFrqIPGroup: Boolean = IpOperation.operationIP(remoteAddr, blackIPBroadcast.value)
數據結構化
前面獲取/計算的數據都是散亂的,沒辦法交給後面的業務進行處理,因此須要進行封裝爲結構化數據.
代碼編寫:
package com.air.antispider.stream.dataprocess.businessprocess import com.air.antispider.stream.common.bean.{BookRequestData, CoreRequestParams, ProcessedData, QueryRequestData, RequestType} import com.air.antispider.stream.dataprocess.constants.TravelTypeEnum.TravelTypeEnum object DataPackage { /** * 對下方散亂的數據,進行封裝,封裝爲ProcessedData * @param source * @param requestMethod * @param request * @param remoteAddr * @param httpUserAgent * @param timeIso8601 * @param serverAddr * @param highFrqIPGroup * @param requestType * @param travelType * @param cookieValue_JSESSIONID * @param cookieValue_USERID * @param queryParams * @param bookParams * @param httpReferrer * @return */ def dataPackage(sourceData: String, requestMethod: String, request: String, remoteAddr: String, httpUserAgent: String, timeIso8601: String, serverAddr: String, highFrqIPGroup: Boolean, requestType: RequestType, travelType: TravelTypeEnum, cookieValue_JSESSIONID: String, cookieValue_USERID: String, queryParams: Option[QueryRequestData], bookParams: Option[BookRequestData], httpReferrer: String): ProcessedData = { //由於建立ProcessedData的時候,還須要核心請求參數, //但這些參數在queryParams/bookParams中 //定義出發時間/始發地/目的地等參數 var flightDate: String = "" //出發地 var depcity: String = "" //目的地 var arrcity: String = "" //看查詢請求參數中有沒有值 queryParams match { //Option有值的狀況,queryData:若是有值,就使用此變量操做 case Some(x) => flightDate = x.flightDate depcity = x.depCity arrcity = x.arrCity //None:沒有值 case None => //若是查詢請求參數沒有值,就去預約請求參數中獲取 bookParams match { //Option有值的狀況,queryData:若是有值,就使用此變量操做 case Some(bookData) => //爲了確保安全,須要加上長度判斷,只有長度大於0才能這樣取值 flightDate = bookData.flightDate.mkString depcity = bookData.depCity.mkString arrcity = bookData.arrCity.mkString //None:沒有值 case None => } } //建立核心請求參數 val requestParams = CoreRequestParams(flightDate, depcity, arrcity) ProcessedData( sourceData, requestMethod, request, remoteAddr, httpUserAgent, timeIso8601, serverAddr, highFrqIPGroup, requestType, travelType, requestParams, cookieValue_JSESSIONID, cookieValue_USERID, queryParams, bookParams, httpReferrer) } }
主程序代碼
//進行數據信息提取/轉換等操做,獲得ProcessedDataRDD val processedDataRDD: RDD[ProcessedData] = encryptedRDD.map(message => { //獲取到消息後開始進行數據切割/打標籤等操做 //數據切割 val (request, //請求URL requestMethod, contentType, requestBody, //請求體 httpReferrer, //來源URL remoteAddr, //客戶端IP httpUserAgent, timeIso8601, serverAddr, cookiesStr, cookieValue_JSESSIONID, cookieValue_USERID) = DataSplit.split(message) //對請求的分類進行打標籤操做 val requestType: RequestType = RequestTypeClassifier.requestTypeClassifier(request, classifyRuleBroadcast.value) //對往返數據進行打標籤操做 val travelType: TravelTypeEnum = TravelTypeClassifier.travelTypeClassifier(httpReferrer) //開始解析數據 //解析查詢數據 val queryParams: Option[QueryRequestData] = AnalyzeRequest.analyzeQueryRequest( requestType, requestMethod, contentType, request, requestBody, travelType, queryRuleBroadcast.value) //解析預約數據 val bookParams: Option[BookRequestData] = AnalyzeBookRequest.analyzeBookRequest( requestType, requestMethod, contentType, request, requestBody, travelType, bookRuleBroadcast.value ) //數據加工操做 val highFrqIPGroup: Boolean = IpOperation.operationIP(remoteAddr, blackIPBroadcast.value) //對上面的散亂數據進行封裝 val processedData: ProcessedData = DataPackage.dataPackage( "", //原始數據,此處直接置爲空 requestMethod, request, remoteAddr, httpUserAgent, timeIso8601, serverAddr, highFrqIPGroup, requestType, travelType, cookieValue_JSESSIONID, cookieValue_USERID, queryParams, bookParams, httpReferrer) processedData })
數據推送模塊
爲了實現更好的解耦,在數據推送的時候,會根據請求具體的類型,好比查詢/預約,發送到不一樣的Topic.後面的業務,就很近本身的須要去拉取本身的消息
代碼編寫:
package com.air.antispider.stream.dataprocess.businessprocess import com.air.antispider.stream.common.bean.ProcessedData import com.air.antispider.stream.common.util.jedis.PropertiesUtil import com.air.antispider.stream.dataprocess.constants.BehaviorTypeEnum import com.air.antispider.stream.dataprocess.constants.BehaviorTypeEnum.BehaviorTypeEnum import org.apache.kafka.clients.producer.{KafkaProducer, ProducerConfig, ProducerRecord} import org.apache.spark.rdd.RDD object SendData { /** * 發送預約數據到Kafka * * @param processedDataRDD */ def sendBookDataKafka(processedDataRDD: RDD[ProcessedData]) = { sendToKafka(processedDataRDD, 1) } /** * 發送查詢數據到Kafka * * @param processedDataRDD */ def sendQueryDataKafka(processedDataRDD: RDD[ProcessedData]) = { sendToKafka(processedDataRDD, 0) } /** * 根據指定的類型,發送到Kafka * * @param processedDataRDD * @param topicType 0: 查詢,1: 預約 */ def sendToKafka(processedDataRDD: RDD[ProcessedData], topicType: Int) = { //將processedData數據發送到Kafka中 val messageRDD: RDD[String] = processedDataRDD //根據類型進行過濾 .filter(processedData => processedData.requestType.behaviorType.id == topicType) //將數據轉換爲字符串 .map(processedData => processedData.toKafkaString()) //若是通過過濾操做後,還有數據,那麼就發送 if (!messageRDD.isEmpty()) { //定義Kafka相關配置 //查詢數據的 topic:target.query.topic = processedQuery var topicKey = "" if (topicType == 0) { topicKey = "target.query.topic" } else if (topicType == 1) { topicKey = "target.book.topic" } val queryTopic = PropertiesUtil.getStringByKey(topicKey, "kafkaConfig.properties") //建立 map 封裝 kafka 參數 val props = new java.util.HashMap[String, Object]() //設置 brokers props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, PropertiesUtil.getStringByKey("default.brokers", "kafkaConfig.properties")) //key 序列化方法 props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, PropertiesUtil.getStringByKey("default.key_serializer_class_config", "kafkaConfig.properties")) //value 序列化方法 props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, PropertiesUtil.getStringByKey("default.value_serializer_class_config", "kafkaConfig.properties")) //批發送設置:32KB 做爲一批次或 10ms 做爲一批次 props.put(ProducerConfig.BATCH_SIZE_CONFIG, PropertiesUtil.getStringByKey("default.batch_size_config", "kafkaConfig.properties")) props.put(ProducerConfig.LINGER_MS_CONFIG, PropertiesUtil.getStringByKey("default.linger_ms_config", "kafkaConfig.properties")) messageRDD.foreachPartition(iter => { //先建立Kafka鏈接 val producer = new KafkaProducer[String, String](props) //發送數據 iter.foreach(message => { //發送數據 producer.send(new ProducerRecord[String, String](queryTopic, message)) }) //關閉Kafka鏈接 producer.close() }) } } }
主程序:
//將結構化的數據ProcessedData根據不一樣的請求發送到不一樣的Topic中 //發送查詢數據到Kafka SendData.sendQueryDataKafka(processedDataRDD) //發送預約數據到Kafka SendData.sendBookDataKafka(processedDataRDD)
任務實時監控
Spark自帶有性能監控功能,須要在建立SparkConf的時候開啓:
//當應用被中止的時候,進行以下設置能夠保證當前批次執行完以後再中止應用。 System.setProperty("spark.streaming.stopGracefullyOnShutdown", "true") //建立Spark配置對象 val sparkConf: SparkConf = new SparkConf() .setMaster("local[*]") .setAppName("DataProcessApp") //開啓Spark性能監控功能 .set("spark.metrics.conf.executor.source.jvm.class", "org.apache.spark.metrics.source.JvmSource")
在瀏覽器上能夠經過:http://localhost:4040/metrics/json/訪問
代碼編寫
package com.air.antispider.stream.dataprocess.businessprocess import com.air.antispider.stream.common.bean.ProcessedData import com.air.antispider.stream.common.util.jedis.{JedisConnectionUtil, PropertiesUtil} import com.air.antispider.stream.common.util.spark.SparkMetricsUtils import com.alibaba.fastjson.JSONObject import org.apache.commons.lang3.time.FastDateFormat import org.apache.spark.SparkContext import org.apache.spark.rdd.RDD import org.json4s.DefaultFormats import org.json4s.jackson.Json import redis.clients.jedis.JedisCluster object SparkStreamingMonitor { /** * Spark性能監控, * * @param sc * @param processedDataRDD * @param serversCountMap */ def streamMonitor(sc: SparkContext, processedDataRDD: RDD[ProcessedData], serversCountMap: collection.Map[String, Int]) = { //1. 獲取到Spark的狀態信息 /* //在項目上線後,使用下方的方式獲取URL //監控數據獲取 val sparkDriverHost = sc.getConf.get("spark.org.apache.hadoop.yarn.server.webproxy.amfilter.AmIpFilter.param.PROXY _URI_BASES") //在 yarn 上運行的監控數據 json 路徑 val url = s"${sparkDriverHost}/metrics/json" */ val url = "http://localhost:4040/metrics/json/" val sparkDataInfo: JSONObject = SparkMetricsUtils.getMetricsJson(url) val gaugesObj: JSONObject = sparkDataInfo.getJSONObject("gauges") //獲取應用ID和應用名稱,用來構建json中的key val id: String = sc.applicationId val appName: String = sc.appName //local-1561617727065.driver.DataProcessApp.StreamingMetrics.streaming.lastCompletedBatch_processingStartTime val startKey = id + ".driver." + appName + ".StreamingMetrics.streaming.lastCompletedBatch_processingStartTime" val endKey = id + ".driver." + appName + ".StreamingMetrics.streaming.lastCompletedBatch_processingEndTime" val startTime = gaugesObj.getJSONObject(startKey) //{"value": 1561617812011} .getLong("value") val endTime = gaugesObj.getJSONObject(endKey) //{"value": 1561617812011} .getLong("value") //將結束時間進行格式化yyyy-MM-dd HH:mm:ss,注意,web平臺使用的是24小時制,因此此處須要使用HH val endTimeStr: String = FastDateFormat.getInstance("yyyy-MM-dd HH:mm:ss").format(endTime) //2. 計算時間差 val costTime = endTime - startTime //3. 根據時間差計算數據處理速度,速度= 數量/時間 //獲取處理的數量 val count: Long = processedDataRDD.count() //計算處理速度 var countPer = 0.0 if (costTime != 0) { countPer = count / costTime } //4. 交給JavaWeb進行結果展現 //對serversCountMap進行轉換,轉換爲JSON val serversCountMapJson: String = Json(DefaultFormats).write(serversCountMap) //根據web平臺的代碼,發現須要存入Redis中 val message = Map[String, Any]( "costTime" -> costTime.toString, //時間差 "applicationId" -> id, //應用ID "applicationUniqueName" -> appName, //應用名稱 "countPerMillis" -> countPer.toString,//計算速度 "endTime" -> endTimeStr, //結束時間:2019-06-27 15:44:32 "sourceCount" -> count.toString, //數據的數量 "serversCountMap" -> serversCountMap //數據採集信息 ) //將message轉換爲json val messageJson: String = Json(DefaultFormats).write(message) //將messageJson發送到Kafka val jedis: JedisCluster = JedisConnectionUtil.getJedisCluster //存入Redis的Key.CSANTI_MONITOR_DP + 時間戳 val key = PropertiesUtil.getStringByKey("cluster.key.monitor.dataProcess", "jedisConfig.properties") + System.currentTimeMillis() val ex = PropertiesUtil.getStringByKey("cluster.exptime.monitor", "jedisConfig.properties").toInt jedis.setex(key, ex, messageJson) //若是須要最後一批數據,那麼可使用下面的方式, val lastKey = PropertiesUtil.getStringByKey("cluster.key.monitor.dataProcess", "jedisConfig.properties") + "_LAST" jedis.set(lastKey, messageJson) } }
主程序代碼:
由於第三個參數serversCountMap涉及到了以前的鏈路統計,因此須要修改鏈路統計的返回值
//開啓Spark性能監控 //SparkContext, 數據集RDD, 數據採集結果信息 SparkStreamingMonitor.streamMonitor(sc, processedDataRDD, serversCountMap)
實時計算
自定義維護Offset
讀取偏移量代碼:
package com.air.antispider.stream.rulecompute import com.air.antispider.stream.common.util.jedis.PropertiesUtil import com.air.antispider.stream.common.util.kafka.KafkaOffsetUtil import kafka.common.TopicAndPartition import kafka.message.MessageAndMetadata import kafka.serializer.StringDecoder import org.I0Itec.zkclient.ZkClient import org.apache.spark.streaming.dstream.InputDStream import org.apache.spark.streaming.kafka.KafkaUtils import org.apache.spark.{SparkConf, SparkContext} import org.apache.spark.streaming.{Seconds, StreamingContext} /** * 黑名單實時計算主程序 */ object RuleComputeApp { def main(args: Array[String]): Unit = { //建立Spark執行環境 //當應用被中止的時候,進行以下設置能夠保證當前批次執行完以後再中止應用。 System.setProperty("spark.streaming.stopGracefullyOnShutdown", "true") //建立Spark配置對象 val sparkConf: SparkConf = new SparkConf() .setMaster("local[*]") .setAppName("RuleComputeApp") //開啓Spark性能監控功能 .set("spark.metrics.conf.executor.source.jvm.class", "org.apache.spark.metrics.source.JvmSource") //建立SparkContext val sc = new SparkContext(sparkConf) //建立SparkStreamingContext對象 val ssc = new StreamingContext(sc, Seconds(2)) val inputStream: InputDStream[(String, String)] = createKafkaStream(ssc) inputStream.print() //啓動程序 ssc.start() ssc.awaitTermination() } /** * 消費Kafka數據,建立InputStream對象 * @param ssc * @return */ def createKafkaStream(ssc: StreamingContext): InputDStream[(String, String)] = { //鏈接Kafka //封裝Kafka參數信息 var kafkaParams = Map[String, String]() //從kafkaConfig.properties配置文件中獲取broker列表信息 val brokerList: String = PropertiesUtil.getStringByKey("default.brokers", "kafkaConfig.properties") kafkaParams += ("metadata.broker.list" -> brokerList) //zookeeper主機地址 val zkHosts: String = PropertiesUtil.getStringByKey("zkHosts", "zookeeperConfig.properties") //topic信息存儲位置 val zkPath: String = PropertiesUtil.getStringByKey("rulecompute.antispider.zkPath", "zookeeperConfig.properties") //topic val topic: String = PropertiesUtil.getStringByKey("source.query.topic", "kafkaConfig.properties") //封裝topic的集合 val topics = Set[String](topic) //建立zk客戶端對象 val zkClient = new ZkClient(zkHosts) //使用KafkaOffsetUtil來獲取TopicAndPartition數據 val topicAndPartitionOption: Option[Map[TopicAndPartition, Long]] = KafkaOffsetUtil.readOffsets(zkClient, zkHosts, zkPath, topic) val inputStream: InputDStream[(String, String)] = topicAndPartitionOption match { //若是有數據:從Zookeeper中讀取偏移量 case Some(topicAndPartition) => val messageHandler = (mmd: MessageAndMetadata[String, String]) => (mmd.key, mmd.message) KafkaUtils.createDirectStream[String, String, StringDecoder, StringDecoder, (String, String)](ssc, kafkaParams, topicAndPartition, messageHandler) //若是沒有數據,還按照之前的方式來讀取數據 case None => KafkaUtils.createDirectStream[String, String, StringDecoder, StringDecoder](ssc, kafkaParams, topics) } inputStream } }
保存偏移量代碼:
//將數據偏移量到zookeeper中 inputStream.foreachRDD(rdd => { //保存偏移量 saveOffsets(rdd) }) /** * 保存偏移量信息 * @param rdd */ def saveOffsets(rdd: RDD[(String, String)]): Unit = { //zookeeper主機地址 val zkHosts: String = PropertiesUtil.getStringByKey("zkHosts", "zookeeperConfig.properties") //建立zk客戶端對象 val zkClient = new ZkClient(zkHosts) //topic信息存儲位置 val zkPath: String = PropertiesUtil.getStringByKey("rulecompute.antispider.zkPath", "zookeeperConfig.properties") KafkaOffsetUtil.saveOffsets(zkClient, zkHosts, zkPath, rdd) }
數據封裝
將獲取到的字符串轉換爲ProcessedData對象,能夠直接從講義中拷貝過來
代碼:
package com.air.antispider.stream.rulecompute.businessprocess import com.air.antispider.stream.common.bean._ import com.air.antispider.stream.dataprocess.constants.TravelTypeEnum.TravelTypeEnum import com.air.antispider.stream.dataprocess.constants.{BehaviorTypeEnum, FlightTypeEnum, TravelTypeEnum} import com.fasterxml.jackson.databind.ObjectMapper import com.fasterxml.jackson.module.scala.DefaultScalaModule import org.apache.spark.streaming.dstream.DStream /** * 數據分割封裝 */ object QueryDataPackage { /** * 數據分割封裝 * * @param lines * @return */ def queryDataLoadAndPackage(lines: DStream[String]): DStream[ProcessedData] = { //使用 mapPartitions 減小包裝類的建立開銷 lines.mapPartitions { partitionsIterator => //建立 json 解析 val mapper = new ObjectMapper mapper.registerModule(DefaultScalaModule) //將數據進行 map,一條條處理 partitionsIterator.map { sourceLine => //分割數據 val dataArray = sourceLine.split("#CS#", -1) //原始數據,站位,並沒有數據 val sourceData = dataArray(0) val requestMethod = dataArray(1) val request = dataArray(2) val remoteAddr = dataArray(3) val httpUserAgent = dataArray(4) val timeIso8601 = dataArray(5) val serverAddr = dataArray(6) val highFrqIPGroup: Boolean = dataArray(7).equalsIgnoreCase("true") val requestType: RequestType = RequestType(FlightTypeEnum.withName(dataArray(8)), BehaviorTypeEnum.withName(dataArray(9))) val travelType: TravelTypeEnum = TravelTypeEnum.withName(dataArray(10)) val requestParams: CoreRequestParams = CoreRequestParams(dataArray(11), dataArray(12), dataArray(13)) val cookieValue_JSESSIONID: String = dataArray(14) val cookieValue_USERID: String = dataArray(15) //分析查詢請求的時候不須要 book 數據 val bookRequestData: Option[BookRequestData] = None //封裝 query 數據 val queryRequestData = if (!dataArray(16).equalsIgnoreCase("NULL")) { mapper.readValue(dataArray(16), classOf[QueryRequestData]) match { case value if value != null => Some(value) case _ => None } } else { None } val httpReferrer = dataArray(18) //封裝流程數據,返回 ProcessedData("", requestMethod, request, remoteAddr, httpUserAgent, timeIso8601, serverAddr, highFrqIPGroup, requestType, travelType, requestParams, cookieValue_JSESSIONID, cookieValue_USERID, queryRequestData, bookRequestData, httpReferrer) } } } }
主程序:
//從inputStream中取出消息 val dStream: DStream[String] = inputStream.map(_._2) //將消息轉換爲ProcessedData對象 val processedDataDStream: DStream[ProcessedData] = QueryDataPackage.queryDataLoadAndPackage(dStream)
加載規則
從MySQL中獲取:1. 關鍵頁面 2. 黑名單IP 3. 流程規則
查詢每一個規則的真實名稱
/**
* 獲取流程列表
* 參數n爲0爲反爬蟲流程
參數n爲1爲防佔座流程
* @return ArrayBuffer[FlowCollocation]
*/
def createFlow(n:Int) :ArrayBuffer[FlowCollocation] = {
var array = new ArrayBuffer[FlowCollocation]
var sql:String = ""
if(n == 0){ sql = "select nh_process_info.id,nh_process_info.process_name,nh_strategy.crawler_blacklist_thresholds from nh_process_info,nh_strategy where nh_process_info.id=nh_strategy.id and status=0"}
else if(n == 1){sql = "select nh_process_info.id,nh_process_info.process_name,nh_strategy.occ_blacklist_thresholds from nh_process_info,nh_strategy where nh_process_info.id=nh_strategy.id and status=1"}
var conn: Connection = null var ps: PreparedStatement = null var rs:ResultSet = null try{ conn = c3p0Util.getConnection ps = conn.prepareStatement(sql) rs = ps.executeQuery() while (rs.next()) { val flowId = rs.getString("id") val flowName = rs.getString("process_name") if(n == 0){ val flowLimitScore = rs.getDouble("crawler_blacklist_thresholds") array += new FlowCollocation(flowId, flowName,createRuleList(flowId,n), flowLimitScore, flowId) }else if(n == 1){ val flowLimitScore = rs.getDouble("occ_blacklist_thresholds") array += new FlowCollocation(flowId, flowName,createRuleList(flowId,n), flowLimitScore, flowId) } } }catch{ case e : Exception => e.printStackTrace() }finally { c3p0Util.close(conn, ps, rs) } array
}
/**
* 獲取規則列表
@param process_id 根據該ID查詢規則
* @return list列表
/
def createRuleList(process_id:String,n:Int):List[RuleCollocation] = {
var list = new ListBuffer[RuleCollocation]
val sql = "select from(select nh_rule.id,nh_rule.process_id,nh_rules_maintenance_table.rule_real_name,nh_rule.rule_type,nh_rule.crawler_type,"+
"nh_rule.status,nh_rule.arg0,nh_rule.arg1,nh_rule.score from nh_rule,nh_rules_maintenance_table where nh_rules_maintenance_table."+
"rule_name=nh_rule.rule_name) as tab where process_id = '"+process_id + "'and crawler_type="+n
//and status="+n
var conn: Connection = null
var ps: PreparedStatement = null
var rs:ResultSet = null
try{
conn = c3p0Util.getConnection
ps = conn.prepareStatement(sql)
rs = ps.executeQuery()
while ( rs.next() ) {
val ruleId = rs.getString("id")
val flowId = rs.getString("process_id")
val ruleName = rs.getString("rule_real_name")
val ruleType = rs.getString("rule_type")
val ruleStatus = rs.getInt("status")
val ruleCrawlerType = rs.getInt("crawler_type")
val ruleValue0 = rs.getDouble("arg0")
val ruleValue1 = rs.getDouble("arg1")
val ruleScore = rs.getInt("score")
val ruleCollocation = new RuleCollocation(ruleId,flowId,ruleName,ruleType,ruleStatus,ruleCrawlerType,ruleValue0,ruleValue1,ruleScore)
list += ruleCollocation
}
}catch {
case e : Exception => e.printStackTrace()
}finally {
c3p0Util.close(conn, ps, rs)
}
list.toList
}
FlowCollocation``RuleCollocation須要從反扒參考資料\工具包\ruleComputeBean中拷貝到項目中
將流程信息放入廣播變量
//將流程數據加載到廣播變量 val flowCollocations: ArrayBuffer[FlowCollocation] = AnalyzeRuleDB.createFlow() @volatile var flowCollocationsBroadcast: Broadcast[ArrayBuffer[FlowCollocation]] = sc.broadcast(flowCollocations)
更新廣播變量
//更新流程的廣播變量flowCollocationsBroadcast var flowCollocationChangeFlag: String = jedis.get("flowCollocationChangeFlag") //先判斷classifyRuleChangeFlag是否爲空 if (StringUtils.isBlank(flowCollocationChangeFlag)){ flowCollocationChangeFlag = "true" //從新設置到Redis中 jedis.set("flowCollocationChangeFlag", flowCollocationChangeFlag) } if (flowCollocationChangeFlag.toBoolean) { flowCollocationsBroadcast.unpersist() //將黑名單數據加載到廣播變量 val flowCollocations: ArrayBuffer[FlowCollocation] = AnalyzeRuleDB.createFlow() flowCollocationsBroadcast = sc.broadcast(flowCollocations) flowCollocationChangeFlag = "false" //從新設置到Redis中 jedis.set("flowCollocationChangeFlag", flowCollocationChangeFlag) }
規則計算
IP段指標計算
package com.air.antispider.stream.rulecompute.businessprocess import com.air.antispider.stream.common.bean.ProcessedData import org.apache.spark.rdd.RDD /** * 按照不一樣的維度進行計算的工具類 */ object CoreRule { /** * IP段指標計算 * @param processedDataRDD */ def ipBlockCount(processedDataRDD: RDD[ProcessedData]): collection.Map[String, Int] = { val mapRDD: RDD[(String, Int)] = processedDataRDD.map(processedData => { //獲取客戶端IP 192.168.80.81 val ip: String = processedData.remoteAddr //獲取IP的前2位, 192.168 val arr: Array[String] = ip.split("\\.") if (arr.length == 4) { //表明這是一個完整的IP val ipBlock = arr(0) + "." + arr(1) //(ip段, 1) (ipBlock, 1) } else { ("", 1) } }) //按照IP段進行分組,聚合計算 val resultRDD: RDD[(String, Int)] = mapRDD.reduceByKey((x, y) => x + y) //將結果採集爲Map類型返回 resultRDD.collectAsMap() } }
主程序:
//開始根據各個指標維度進行計算 //計算IP段的訪問量 val ipBlockCountMap: collection.Map[String, Int] = CoreRule.ipBlockCount(processedDataRDD)
IP訪問量
代碼:
/** * 計算IP5分鐘訪問量 * @param processedDataRDD * @return */ def ipCount(processedDataRDD: RDD[ProcessedData]): collection.Map[String, Int] = { processedDataRDD.map(processedData => { val ip: String = processedData.remoteAddr //(ip, 次數) (ip, 1) }) //累加 .reduceByKey(_ + _) //採集數據 .collectAsMap() }
IP對關鍵頁面的訪問量
/** * 計算IP訪問關鍵頁面的次數 * @param processedDataRDD * @param criticalPagesList * @return */ def ipCriticalPagesCount(processedDataRDD: RDD[ProcessedData], criticalPagesList: ArrayBuffer[String]): collection.Map[String, Int] = { processedDataRDD.map(processedData => { val ip: String = processedData.remoteAddr val url: String = processedData.request //定義訪問次數,默認爲0次 var count = 0 for (criticalPages <- criticalPagesList) { if (url.matches(criticalPages)){ //若是匹配上,表明訪問了1次關鍵頁面 count = 1 } } (ip, count) }) //累加 .reduceByKey(_ + _) //採集數據 .collectAsMap() }
IP攜帶不一樣UA的個數
/** * 計算IP5分鐘攜帶不一樣UA的個數 * @param processedDataRDD * @return */ def ipUACount(processedDataRDD: RDD[ProcessedData]): collection.Map[String, Int] = { //將processedData轉換爲(ip, ua)的格式 val mapData: RDD[(String, String)] = processedDataRDD.map(processedData => { val ip: String = processedData.remoteAddr val ua: String = processedData.httpUserAgent (ip, ua) }) //(ip, ua) => (ip, (ua1, ua2, ua1))的格式 val groupRDD: RDD[(String, Iterable[String])] = mapData.groupByKey() //將(ip, (ua1, ua2, ua1))的格式 轉換爲 (ip, 次數)的格式 groupRDD.map(line => { val ip: String = line._1 val sourceData: Iterable[String] = line._2 //建立一個Set集合,將原始的數據放入集合中,去重 var set = Set[String]() for (ua <- sourceData) { //將ua放入set集合 set += ua } (ip, set.size) }) .collectAsMap() }
IP訪問關鍵頁面最小時間間隔
/** * 計算IP5分鐘訪問關鍵頁面最小時間間隔 * * @param processedDataRDD * @param criticalPagesList * @return */ def ipCriticalPagesMinTimeCount(processedDataRDD: RDD[ProcessedData], criticalPagesList: ArrayBuffer[String]): collection.Map[String, Long] = { //先過濾出關鍵頁面 processedDataRDD //過濾 .filter(processedData => { val url: String = processedData.request //定義訪問次數,默認爲0次 var count = 0 for (criticalPages <- criticalPagesList) { if (url.matches(criticalPages)) { //若是匹配上,表明訪問了1次關鍵頁面 count = 1 } } //若是count == 1,表明當前訪問的是關鍵頁面,返回true if (count == 0) { false } else { true } }) //轉換,獲取(ip,時間戳) .map(processedData => { val ip: String = processedData.remoteAddr val time: String = processedData.timeIso8601 //time的格式2019-06-29T08:46:56+08:00 val timeStamp: Long = FastDateFormat.getInstance("yyyy-MM-dd'T'HH:mm:ss").parse(time).getTime (ip, timeStamp) }) //分組(ip,(時間1,時間2,時間3...)) .groupByKey() //轉換,爲了獲取(IP,最小時間差) .map(line => { val ip: String = line._1 //封裝全部時間的迭代器對象 val sourceData: Iterable[Long] = line._2 //將迭代器對象轉換爲Array val sourceArray: Array[Long] = sourceData.toArray //將原始數據進行排序 util.Arrays.sort(sourceArray) //定義一個用於存儲差值的集合 var resultArray = new ArrayBuffer[Long]() for (i <- 0 until sourceArray.size - 1) { //當前元素 val currentTime: Long = sourceArray(i) //下一個元素 val nexTime: Long = sourceArray(i + 1) val result = nexTime - currentTime //將差值存入集合 resultArray += result } //將差值結果進行排序 val array: Array[Long] = resultArray.toArray util.Arrays.sort(array) (ip, array(0)) }) //採集數據 .collectAsMap() }
IP訪問關鍵頁面時間間隔小於預設時間的次數
代碼:
/** * 計算IP5分鐘訪問關鍵頁面最小時間間隔小於預設值的次數 * @param processedDataRDD * @param criticalPagesList * @return */ def ipCriticalPagesMinNumCount(processedDataRDD: RDD[ProcessedData], criticalPagesList: ArrayBuffer[String]): collection.Map[(String, String), Int] = { //先過濾出關鍵頁面 processedDataRDD //過濾 .filter(processedData => { val url: String = processedData.request //定義訪問次數,默認爲0次 var count = 0 for (criticalPages <- criticalPagesList) { if (url.matches(criticalPages)) { //若是匹配上,表明訪問了1次關鍵頁面 count = 1 } } //若是count == 1,表明當前訪問的是關鍵頁面,返回true if (count == 0) { false } else { true } }) //轉換,獲取((IP, URL),時間戳) .map(processedData => { val ip: String = processedData.remoteAddr val url: String = processedData.request val time: String = processedData.timeIso8601 //time的格式2019-06-29T08:46:56+08:00 val timeStamp: Long = FastDateFormat.getInstance("yyyy-MM-dd'T'HH:mm:ss").parse(time).getTime ((ip, url), timeStamp) }) //分組((IP, URL),(時間1,時間2,時間3...)) .groupByKey() //轉換,爲了獲取(IP,最小時間差) .map(line => { val key: (String, String) = line._1 //封裝全部時間的迭代器對象 val sourceData: Iterable[Long] = line._2 //將迭代器對象轉換爲Array val sourceArray: Array[Long] = sourceData.toArray //將原始數據進行排序 util.Arrays.sort(sourceArray) //定義一個用於存儲差值的集合 var resultArray = new ArrayBuffer[Long]() for (i <- 0 until sourceArray.size - 1) { //當前元素 val currentTime: Long = sourceArray(i) //下一個元素 val nexTime: Long = sourceArray(i + 1) val result = nexTime - currentTime //將小於預設值的差值存入集合(此處直接寫死5秒鐘) if (result < 5000) { resultArray += result } } //返回((ip, url), 次數) (key, resultArray.size) }) .collectAsMap() }
計算IP5分鐘查詢不一樣航班的次數
/** * 計算IP5分鐘查詢不一樣航班的次數 * @param processedDataRDD * @return */ def ipCityCount(processedDataRDD: RDD[ProcessedData]): collection.Map[String, Int] = { //(ip , 出發地->目的地) processedDataRDD.map(line => { val ip: String = line.remoteAddr //出發地 val depcity: String = line.requestParams.depcity //目的地 val arrcity: String = line.requestParams.arrcity (ip, depcity + "->" + arrcity) }) .groupByKey() //(ip, 不一樣城市的次數) .map(line => { val ip: String = line._1 val sourceCitys: Iterable[String] = line._2 //定義Set集合實現去重 var set = Set[String]() //循環,去重 for (city <- sourceCitys) { set += city } (ip, set.size) }) .collectAsMap() }
IP5分鐘攜帶不一樣Cookie的數量
/** * 計算IP5分鐘攜帶不一樣Cookie的數量 * @param processedDataRDD * @param criticalPagesList * @return */ def ipCookieCount(processedDataRDD: RDD[ProcessedData], criticalPagesList: ArrayBuffer[String]): collection.Map[String, Int] = { //先過濾出關鍵頁面 processedDataRDD //過濾 .filter(processedData => { val url: String = processedData.request //定義訪問次數,默認爲0次 var count = 0 for (criticalPages <- criticalPagesList) { if (url.matches(criticalPages)) { //若是匹配上,表明訪問了1次關鍵頁面 count = 1 } } //若是count == 1,表明當前訪問的是關鍵頁面,返回true if (count == 0) { false } else { true } }) //(ip , jSessionID) .map(line => { val ip: String = line.remoteAddr //SessionID val sessionID: String = line.cookieValue_JSESSIONID (ip, sessionID) }) .groupByKey() //(ip, (sID1, sID2, sID1)) .map(line => { val ip: String = line._1 val sourceSessionIDs: Iterable[String] = line._2 //定義Set集合實現去重 var set = Set[String]() //循環,去重 for (sessionID <- sourceSessionIDs) { set += sessionID } (ip, set.size) }) .collectAsMap() }
黑名單打分計算
從數據庫中加載到流程的相關信息,裏面包含每一個流程本身的規則列表,咱們已經計算好了每一個規則的數量,只須要和數據庫的規則進行比對就能夠得出超出範圍指標打分的列表,以及開啓規則的打分列表
代碼:
package com.air.antispider.stream.rulecompute.businessprocess import com.air.antispider.stream.common.bean.{FlowCollocation, ProcessedData, RuleCollocation} import com.air.antispider.stream.rulecompute.bean.{AntiCalculateResult, FlowScoreResult} import org.apache.spark.rdd.RDD import scala.collection.mutable.ArrayBuffer object RuleUtil { /** * 經過各個規則計算流程最終結果 * * @param processedDataRDD * @param ipBlockCountMap * @param ipCountMap * @param ipCriticalPagesMap * @param ipUAMap * @param ipCriticalPagesMinTimeMap * @param ipCriticalPagesMinNumMap * @param ipCityCountMap * @param ipCookieCountMap * @param flowCollocationList */ def calculateAntiResult( processedDataRDD: RDD[ProcessedData], ipBlockCountMap: collection.Map[String, Int], ipCountMap: collection.Map[String, Int], ipCriticalPagesMap: collection.Map[String, Int], ipUAMap: collection.Map[String, Int], ipCriticalPagesMinTimeMap: collection.Map[String, Long], ipCriticalPagesMinNumMap: collection.Map[(String, String), Int], ipCityCountMap: collection.Map[String, Int], ipCookieCountMap: collection.Map[String, Int], flowCollocationList: ArrayBuffer[FlowCollocation] ): RDD[AntiCalculateResult] = { //從map中獲取各個指標的數據 processedDataRDD.map(processedData => { val ip: String = processedData.remoteAddr val url: String = processedData.request //獲取IP的前2位, 192.168 val arr: Array[String] = ip.split("\\.") var ipBlock = "" if (arr.length == 4) { //表明這是一個完整的IP ipBlock = arr(0) + "." + arr(1) } //獲取IP段的值 val ipBlockCounts: Int = ipBlockCountMap.getOrElse(ipBlock, 0) //獲取IP的值 val ipCounts: Int = ipCountMap.getOrElse(ip, 0) //獲取關鍵頁面的值 val ipCriticalPagesCounts: Int = ipCriticalPagesMap.getOrElse(ip, 0) val ipUACounts: Int = ipUAMap.getOrElse(ip, 0) //最小訪問時間間隔,若是獲取不到IP,給個Int最大值,不能給0 val ipCriticalPagesMinTimeCounts: Int = ipCriticalPagesMinTimeMap.getOrElse(ip, Integer.MAX_VALUE).toInt val ipCriticalPagesMinNumCounts: Int = ipCriticalPagesMinNumMap.getOrElse((ip, url), 0) val ipCityCounts: Int = ipCityCountMap.getOrElse(ip, 0) val ipCookieCounts: Int = ipCookieCountMap.getOrElse(ip, 0) //定義map封裝規則分值信息 val map = Map[String, Int]( "ipBlock" -> ipBlockCounts, "ip" -> ipCounts, "criticalPages" -> ipCriticalPagesCounts, "userAgent" -> ipUACounts, "criticalPagesAccTime" -> ipCriticalPagesMinTimeCounts, "criticalPagesLessThanDefault" -> ipCriticalPagesMinNumCounts, "flightQuery" -> ipCityCounts, "criticalCookies" -> ipCookieCounts ) val flowsScore: Array[FlowScoreResult] = computeFlowScore(map, flowCollocationList) AntiCalculateResult( processedData, ip, ipBlockCounts, ipCounts, ipCriticalPagesCounts, ipUACounts, ipCriticalPagesMinTimeCounts, ipCriticalPagesMinNumCounts, ipCityCounts, ipCookieCounts, null ) }) } /** * 開始計算,獲取最終計算結果 * @param map * @param flowCollocationList * @return */ def computeFlowScore(map: Map[String, Int], flowCollocationList: ArrayBuffer[FlowCollocation]): Array[FlowScoreResult] = { //由於傳過來的flowCollocationList表明多個流程,因此先循環流程 for (flow <- flowCollocationList) { //經過flow,獲取該流程下的規則 val rules: List[RuleCollocation] = flow.rules //定義集合存儲超出範圍的規則得分信息 var array1 = new ArrayBuffer[Int]() //定義超出範圍,而且處於開啓狀態的得分信息 var array2 = new ArrayBuffer[Int]() for (rule <- rules) { val ruleName: String = rule.ruleName val num: Int = map.getOrElse(ruleName, 0) //若是數據庫名稱和計算結果名稱同樣,開始比較大小 if (num > rule.ruleValue0) { //若是當前計算結果超出了數據庫配置好的閾值範圍,那麼就命中該規則 //將得分放入集合 array1 += rule.ruleScore if (rule.ruleStatus == 0){ //若是當前規則狀態爲開啓狀態 array2 += rule.ruleScore } } } // val result = xxx(array1, array2) } null } }