反扒

反爬蟲項目開發html

項目介紹前端

項目背景java

爲何要有反爬蟲項目node

爬蟲程序大量佔用咱們的系統資源,好比帶寬/計算能力等mysql

爬蟲程序進行預訂/搶票影響咱們的正常業務.nginx

導入反爬WEB工程web

  1. 建立一個maven工程
  2. 建立一個module
  3. 將反扒參考資料\項目代碼\WebProject中的代碼拷貝到新建的module中
  4. 運行項目
  5. 報錯:
  6. 使用 反扒參考資料\DB\gciantispider.sql 建立數據庫的表
    • 若是報getChildLst函數找不到,將SQL中下方的語句刪除掉
  7. 啓動項目,經過http://localhost:8081訪問工程
  8. 用戶名root,密碼000000

反爬蟲項目數據流程走向正則表達式

邏輯架構redis

功能描述算法

數據管理模塊

  1. 數據採集
    負責各採集服務器的狀態信息展現,好比:服務器名稱/當前活躍鏈接數/最近3天信息採集量
  2. 數據處理
    對查詢/預約.國內/國際不一樣方式的數據解析規則進行配置

流程管理

  1. 對流程管理,負責流程的增/刪/改/查/啓動流程
  2. 對規則的配置,是否啓用規則,規則的標準,規則的打分
  3. 對策略的分值進行控制

非功能性描述

數據的組成:

點擊流(信息採集服務器)/業務日誌(業務服務器)/第三方對接

數據量級計算方式:

  1. 估算每條數據的大小,(1-2K比較合適),如1.5K
  2. 估算明天數據的條數.如8億條
  3. 8億條* 1.5K=天天的數據量約1T左右.
  4. 若是數據全量備份,有3個備份,天天產生3T的數據
  5. 若是天天服務器有100T的磁盤,總共有15臺服務器,總共能存儲500天的數據
  6. 假如一天有500G數據,7臺機器就好了(32核處理器/128G內存/32T硬盤)

峯值數數據量有多大?

將8億條平均到每秒,峯值每秒20萬

500萬用戶,日活20萬,若是每一個用戶點擊30,600萬次點擊,業務日誌6000萬,總共數據量6600萬

公司集羣分類:

數據庫ER圖

PowerDesigner的使用

  1. 安裝:找到安裝文件反扒參考資料\PowerDesigner
  2. 新建一個model

  3. 導出SQL文件

防爬規則

數據源

數據源做爲反扒的各類指標計算來源,主要包含用戶請求攜帶的各類參數.好比:用戶來源的URL,用請求的URL,用戶的SessionID.查詢相關的出發地/目的地/出發時間

防爬規則

  1. 根據每5分鐘查詢不一樣的出發地/目的地
  2. 根據每5分鐘查詢的時間最小間隔小於2秒鐘
  3. 根據每5分鐘查詢的時間查詢的次數大於30次

爬蟲程序的特色

  1. 爬蟲程序常常在節假日等高峯期進行爬取
  2. 爬蟲比較穩定,不區分白天和晚上
  3. 爬蟲常常切換出發地/目的地
  4. 爬蟲爬取頻次比較高
  5. 爬蟲攜帶的瀏覽路徑信息不完整

數據採集

安裝Openresty

  1. 到目錄反扒參考資料\OpenResty找到openresty-1.13.6.1.tar.gz
  2. 解壓 tar -xzvf openresty-1.13.6.1.tar.gz
  3. 配置Openresty

    ./configure --prefix=/usr/local/openresty --with-http_stub_status_module

  4. make && make install
  5. 若是缺乏依賴,安裝依賴

    yum install readline-devel pcre-devel openssl-devel perl gcc

若是不想本身編譯,資料中 反扒參考資料\OpenResty\編譯後\openresty是編譯好的.直接放入Linux中就可使用

  1. 進入/export/servers/openresty/nginx目錄下
  2. 執行sbin/nginx執行程序(使用chmod u+x nginx賦予執行權限)
  3. 經過瀏覽器訪問:

Lua語法入門

使用方式

交互方式

  1. 使用lua命令啓動交互窗口
  2. 輸入執行的命令print(「hello world」)

文件方式

  1. 建立lua腳本文件
    vim helloworld.lua
  2. 執行腳本文件
    lua helloworld.lua

數據類型

java中的數據類型

數字相關的:
    整型:
        byte/int/short/long
    浮點型:
        float/double
布爾類型: boolean
字符類型: char

Lua 數據類型

nil:==java中的null
boolean:布爾類型
number:數字類型,不區分整型和浮點型
string:字符串
userdata: C的數據結構
function:函數(方法)
thread:線程
table:集合/數組/Map

Lua運算符

賦值運算符

--賦值運算符
a = 3
b = "hello"
print(a)
print(b)

c,d = 1,2
print(c,d)

e,f = 1,2,3
print(e,f)

g,h = 1
print(g,h)

str = "hello" .. "world"
print(str)

字符串的拼接操做不能使用"+",應該使用".."進行拼接

算術運算符

-- 算術運算符
-- 加減乘除取餘
a,b = 10,20

print("a+b=" .. a + b)
print("a-b=" .. a - b)
print("a*b=" .. a * b)
print("a/b=" .. a / b)
print("a%b=" .. a % b)

關係運算符

-- 關係運算符
print("========= 關係運算符 =========")
a,b = 1,2

print("a等於b" .. a == b)
print("a不等於b" .. a ~= b)
print(a > b)
print(a >= b)
print(a < b)
print(a <= b)
  1. 不等於須要~=來表示
  2. 字符串不能和關係運算符進行拼接

邏輯預算符

-- 邏輯運算符
print("========= 邏輯運算符 =========")
a,b = true,false

print(a and b)
print(a or b)
print (not b)

其它運算符

  1. ".."拼接字符串
  2. "#"用來獲取字符串或者集合的長度

    --其它運算符
    print("========其它運算符===========")

    str = "hello java"
    print(#str)

Lua流程控制

if條件判斷

-- 條件控制
-- if條件判斷
a = 10
if a > 5
then
        print("a>5")
end

-- if - else 條件判斷
if a > 11
then
        print("a>11")
else
        print("a<=11")
end

-- if 嵌套
if a > 5
then
        print("a>5")
        if a < 12
        then
                print("a<12")
        end
end

循環

while循環

--While循環
print("=======while循環=======")
while a > 0
do
        print(a)
        a = a - 1
end

repeat循環

-- repeat循環
print("=======repeat循環=======")

repeat
        print(a)
        a = a + 1
until a > 10

repeat循環最少執行一次

假如一張紙的厚度爲0.04,累計疊多少次才能超過珠穆朗瑪峯的高度8847.

for循環

--for循環
print("=======for循環=======")
for b = 10, 1,-1
do
        print(b)
end

for循環後面3個參數

  1. 初始化變量
  2. 循環終止條件(包含等於的條件)
  3. 步長,默認狀況下此參數能夠省略,默認爲1

Lua的數組

--lua的數組
arr = {"zhangsan","lisi","wangwu"}
print(#arr)

for a = 1,#arr
do
        print(arr[a])
end

--使用泛型for循環
-- i是索引
-- name 該索引對應的值
for i,name in ipairs(arr)
do
        print(i,name)
end

注意:

使用ipairs的時候,只能針對於集合/數組

遍歷Map數據結構的時候須要使用pairs關鍵字

Lua的類型轉換

其它類型轉換爲字符串

  • tostring()
  • tonumber()

    -- 其它類型轉換爲String
    -- tostring()
    -- 布爾類型轉string
    boo = true
    print(type(boo))
    print(type(tostring(boo)))
    print(tostring(boo))
    -- 數值類型轉string
    num = 20
    print(type(num))
    print(type(tostring(num)))
    print(tostring(num))
    -- table類型轉string

    tbl = {"tom","cat"}
    print(type(tbl))
    print(type(tostring(tbl)))
    print(tostring(tbl))

通常都是將數字轉換爲字符串

function/table默認不能轉爲字符串

其它類型轉換爲數字

-- 其它類型轉數字:
-- tonumber()
num = "12"
print(type(num))
print(type(tonumber(num)))
print(tonumber(num))

num = "AF"
print(type(num))
print(type(tonumber(num,16)))
print(tonumber(num,16))

tbl = {"tom","cat"}
print(tonumber(tbl))

boo = false
print(tonumber(boo))

通常非數字格式的都轉換不了,好比布爾類型/table/"hello"

Lua的函數

Lua函數定義方式:

函數做用範圍 function 函數名字(參數1,參數2...)

函數體

        return 結果1, 結果2 ...

end

--Lua的函數定義
function f1(a,b)
        print("hello function")
        return a+b
end
result = f1(3,4)
print(result)

--多個返回值
local function f2(a,b)
        return a,b
end
c,d = f2(1,2)
print(c,d)

Lua變量的做用範圍

Lua變量默認做用範圍是全局的,

加了local關鍵字以後就變成了局部的,

若是使用全局變量,須要注意變量名不要定義重複了,原來的變量會被替換掉

-- 變量的做用範圍
a = 10
if a>3
then
        b = 20
        local c = 30
        print(a)
        print(b)
        print(c)
end
a = "hello"
print(a)
print(b)
print(c)  -- nil

Lua的Table

Lua的table能夠表明java中的數組/list/Map類型的數據結構

若是table中是數組格式的數據,遍歷的時候應該使用ipairs關鍵字,若是是Map數據結構,使用paris關鍵字

--定義一個集合table
local arr = {"zhangsan","lisi","wangwu"}
print(arr[1])
--使用索引遍歷table
for i = 1, #arr
do
        print(arr[i])
end
print("========泛型方式遍歷=========")
for index, value in ipairs(arr)
do
        print(index, value)
end

print("========Map類型數據結構=========")
map = {name="zhangsan", sex="男", age = 13}
print(map["name"])
print(map.name)
print(map.age)
-- 賦值操做,能夠經過"."變量的形式進行賦值或者取值
map.address = "深圳"
print(map.address)

print("========使用循環遍歷Map數據結構=========")
for key, value in pairs(map)
do
        print(key, value)
end

Lua的模塊

Lua的模塊功能依賴於table,先定義一個空的table來存儲成員變量或者函數

引用模塊的時候使用require關鍵字,require空格"模塊名字"注意不須要".lua"後綴名

模擬向Kafka發送消息

kafka.lua

-- 模擬向Kafka發送消息
_M = {}
--默認分區數量
_M.default_partition_num = 5
function _M.new(props)
        -- 根據傳入的props,建立客戶端
        return "Kafka client ..."
end
-- 向Kafka發送消息
function _M.send(topic, key, message)
        print("正在向Kafka發送消息,Topic爲:"..topic..",消息體爲:"..message)
        -- 根據發送結果,返回狀態信息,方便作出判斷
        return nil,"error"
end

testKafka.lua

-- 模擬測試引入自定義的Kafka模塊
require "Kafka"
dpn = _M.default_partition_num
print("默認分區數爲:"..dpn)
--建立客戶端對象
--須要傳入props
props = {{hosts="192.168.80.81", port="9092"},{hosts="192.168.80.81", port="9092"}}
_M.new(props)
--發送消息
ok, err = _M.send("sz07", "1", "向Kafka發送測試消息")
if not ok
then
        --若是結果不正常,打印一下錯誤信息
        print(err)
        return
end

Lua和Nginx的整合

Lua結合Nginx的2種方式

Lua代碼塊

location / {
            #root   html;
            #index  index.html index.htm;
            default_type text/html;
            content_by_lua_block{
               #編寫lua代碼
               print("hello")
               ngx.say("hello openresty")
            }
        }

Lua腳本文件

location / {
            #root   html;
            #index  index.html index.htm;
            default_type text/html;
            content_by_lua_file /export/servers/openresty/test.lua;
        }

content_by_lua_file /export/servers/openresty/test.lua;

最後又一個";"號別忘記寫了

Lua獲取Http請求參數

獲取Get請求參數

-- 使用Lua獲取Http請求參數
-- get請求參數的獲取
getArgs = ngx.req.get_uri_args()
--獲取參數信息
for k,v in pairs(getArgs)
do
        ngx.say("參數名:"..k.." 參數值:"..v)
        ngx.say("<br>")
end

獲取Post請求參數

ngx.say("=======獲取Post請求參數========")
-- post請求參數的獲取
-- 想要讀取請求體內容,須要先調用read_body()方法
ngx.req.read_body()

postArgs = ngx.req.get_post_args()
--獲取參數信息
for k,v in pairs(postArgs)
do
        ngx.say("參數名:"..k.." 參數值:"..v)
        ngx.say("<br>")
end

凡是涉及到操做請求體的動做,都須要先調用ngx.req.read_body()方法

獲取請求頭參數

ngx.say("=======獲取請求頭參數========")

headerArgs = ngx.req.get_headers()
for k,v in pairs(headerArgs)
do
        ngx.say("參數名:"..k.." 參數值:"..v)
        ngx.say("<br>")
end

獲取請求體內容(針對於JSON請求參數)

ngx.say("=======獲取請求體內容========")
-- 必須先調用read_body()方法
ngx.req.read_body()
bodyData = ngx.req.get_body_data()
-- 由於若是是JSON的請求體內容,沒有辦法直接遍歷,因此直接輸出
ngx.say(bodyData)

使用Lua鏈接MySQL

先引用MySQL模塊.位置在:openresty/lualib/resty/mysql.lua

-- 鏈接MySQL操做
-- 引入MySQL的模塊
local restyMysql = require "resty.mysql"
-- Lua調用方法默認用"."就能夠了,但若是第一個參數是self,那麼能夠經過":"來調用,就能夠省略掉第一個self參數
local mysql = restyMysql:new()
--設置鏈接超時時間
mysql:set_timeout(20000)
--開始鏈接MySQL
--定義鏈接MySQL的配置
local opts = {}
opts.host = "192.168.80.81"
opts.port = 3306
opts.database = "test"
opts.user = "root"
opts.password = "root"
local ok, err = mysql:connect(opts)
if not ok
then
        ngx.say("鏈接MySQL失敗" .. err)
        return
end
--定義SQL
local sql = "select * from user"
local result, err = mysql:query(sql)
if not result
then
        ngx.say("查詢數據失敗:" .. err)
        return
end
-- 從查詢結果中獲取數據
for i,row in ipairs(result)
do
        for key,value in pairs(row)
        do
                ngx.say("列名:"..key.." 值爲:" .. value)
        end
        ngx.say("<br>")
end

ngx.say("全部數據打印完畢")

對MySQL進行增刪改操做

--新增數據
local sql = "insert into user values('lisi','123','深圳','0','2019-01-01')"
local result, err = mysql:query(sql)
if not result
then
        ngx.say("插入數據失敗:" .. err)
        return
end
ngx.say("數據插入成功")

--刪除數據
local sql = "delete from user where username='lisi'"
local result, err = mysql:query(sql)
if not result
then
        ngx.say("數據刪除失敗:" .. err)
        return
end

ngx.say("數據刪除成功")
for i,row in ipairs(result)
do
        for key,value in pairs(row)
        do
                ngx.say("列名:"..key.." 值爲:" .. value)
        end
        ngx.say("<br>")
end

--修改數據
local sql = "update user set username = 'lisi' where username='zhangsan'"
local result, err = mysql:query(sql)
if not result
then
        ngx.say("數據修改失敗:" .. err)
        return
end

ngx.say("數據修改爲功")

使用Lua鏈接Redis

redis單機安裝

Redis是基於內存的NoSQL的數據庫,裏面存儲的都是鍵值對.

若是不想編譯安裝,可使用反扒參考資料\Redis\redis-5.0.4直接拷貝到虛擬機中使用.

  1. 將redis-5.0.4拷貝到/export/servers目錄下
  2. 進入bin目錄,執行chmod u+x ./*命令賦予執行權限

redis.conf配置文件

#綁定的主機地址
bind 0.0.0.0
#綁定的端口號
port 6379
#後臺運行,默認狀況下,redis服務器獨佔一個進程窗口
daemonize yes
#redis進程文件所在目錄
pidfile /var/run/redis_6379.pid
#redis備份文件
dbfilename dump.rdb
  1. 啓動Redis服務端

    ./redis-server redis.conf

  2. 查看redis狀態

    ps -ef | grep redis

  3. 鏈接Redis

    ./redis-cli

Lua鏈接Redis

--使用Lua鏈接Redis
--引用Redis的模塊
local restyRedis = require "resty.redis"
--調用new方法建立redis客戶端
local redis = restyRedis:new()
--設置超時時間
redis:set_timeout(20000)
--建立鏈接
ok,err = redis:connect("192.168.80.83", 6379)
if not ok
then
        ngx.say("鏈接失敗"..err)
        return
end

-- 鏈接成功
ok, err = redis:set("username", "zhangsan")
if not ok
then
        ngx.say("設置失敗"..err)
        return
end
ngx.say("設置成功")

--獲取Redis數據
ok, err = redis:get("username")
if not ok
then
        ngx.say("獲取失敗"..err)
        return
end
ngx.say(ok)

Redis集羣

運行原理

  1. redis各個節點會相互通訊,每一個節點都會開啓2個端口,一個端口用於和客戶端通訊,一個端口用於內部通訊,內部通訊端口比客戶端端口多10000.
  2. 每一個節點分配必定數量的槽,槽的總數量是16384.
  3. 若是有鏈接須要存入數據,當前鏈接的Redis節點會先按照必定的算法,獲得一個Key的值,好比155533.接下來使用155533%16384,獲得的結果看在哪臺機器上,
  4. 就將當前的數據存入計算結果對應的機器上.
  5. 若是去集羣中取值,同樣須要計算,假如數據沒有在當前鏈接的節點上,會將當前的鏈接重定向到數據所在的節點.

集羣搭建

參考反扒參考資料\Redis\Redis集羣搭建步驟.md

每一個節點的文件夾下面都有一個700x.conf

每一個配置文件中都有一些路徑相關的配置,因此儘可能安裝課程去存放,不然須要手動修改路徑

7001.conf:

port 7001
dir /export/servers/redis-5.0.4/cluster/7001/data
cluster-enabled yescluster-config-file /export/servers/redis-5.0.4/cluster/7001/data/nodes.conf

啓動集羣:

bin/redis-server cluster/7001/7001.conf
bin/redis-server cluster/7002/7002.conf
bin/redis-server cluster/7003/7003.conf 
bin/redis-server cluster/7004/7004.conf 
bin/redis-server cluster/7005/7005.conf 
bin/redis-server cluster/7006/7006.conf

經過netstat -nltp查看集羣狀態

初始化:

若是服務端第一次啓動後,直接使用客戶端去鏈接,存入數據,這個時候會報錯,報槽沒有分配錯誤

下面的初始化操做,只須要第一次運行的時候執行,之後不須要再重複執行

-- 將下方的192.168.80.81替換爲本身的IP地址
bin/redis-cli --cluster create --cluster-replicas 1 你的機器IP:7001 192.168.80.83:7002 192.168.80.83:7003 192.168.80.83:7004 192.168.80.83:7005 192.168.80.83:7006

--cluster-replicas 1指定副本數爲1個

鏈接集羣:

bin/redis-cli -c -p 7001
set hello world
get hello

-c 指定我是要鏈接集羣,若是不添加此參數,會形成重定向失敗

-p 指定鏈接的端口號

使用Lua鏈接Kafka

編寫Lua腳本

-- 鏈接Kafka發送消息
-- 引用Kafka模塊
local kafka = require "resty.kafka.producer"
--建立producer
local broker_list = {{host="192.168.80.81",port=9092},{host="192.168.80.82",port=9092},{host="192.168.80.83",port=9092}}
local producer = kafka:new(broker_list)
--發送數據
local ok, err = producer:send("test", "1", "hello openresty")
if not ok
then
        ngx.say("Kafka發送失敗"..err)
        return
end

ngx.say("消息發送成功")

啓動Kafka集羣

  1. 先啓動zookeeper

    zkServer.sh start

  2. 啓動Kafka

    nohup /export/servers/kafka_2.11-1.0.0/bin/kafka-server-start.sh /export/servers/kafka_2.11-1.0.0/config/server.properties > /dev/null 2>&1 &

/dev/null 指定消息輸出的目錄

2>&1 將錯誤消息轉換爲標準輸出

& 後臺運行

  1. 顯示全部的Topic

    /export/servers/kafka_2.11-1.0.0/bin/kafka-topics.sh --zookeeper node01:2181 --list

  2. 啓動console-consumer

    /export/servers/kafka_2.11-1.0.0/bin/kafka-console-consumer.sh --zookeeper node01:2181 --from-beginning --topic test

編寫Lua腳本進行信息採集

修改nginx.conf

http {
    include       mime.types;
    default_type  application/octet-stream;

    #log_format  main  '$remote_addr - $remote_user [$time_local] "$request" '
    #                  '$status $body_bytes_sent "$http_referer" '
    #                  '"$http_user_agent" "$http_x_forwarded_for"';

    #access_log  logs/access.log  main;

    sendfile        on;
    #tcp_nopush     on;

    #keepalive_timeout  0;
    keepalive_timeout  65;

    #gzip  on;
    
    
    #開啓共享詞典功能, 開啓的空間爲10Mb大小,由於咱們只是存儲一些數字,10Mb夠用了
    lua_shared_dict shared_data 10m;
    #配置本地域名解析
    resolver 127.0.0.1;
    
    
        server {
                listen       80;
                server_name  localhost;

                #charset koi8-r;

                #access_log  logs/host.access.log  main;

                location / {
                    #root   html;
                    #index  index.html index.htm;
                    
                    
                    #開啓 nginx 監控
                    stub_status on;
                    default_type text/html;
                    #content_by_lua_block{
                    #   print("hello")
                    #   ngx.say("hello openresty")
                    #}
                    content_by_lua_file /export/servers/openresty/mylua/controller.lua;
                    
                    
                }

編寫controller.lua

--過載保護功能,若是鏈接超出必定範圍,再也不進行信息採集
--定義過載的最大值
local maxConnectNum = 10000
--獲取當前鏈接數量
local currentConnect = tonumber(ngx.var.connections_active)
--若是當前鏈接數大於過載範圍,再也不進行信息採集
if currentConnect > maxConnectNum
then
        return
end


-- 均衡分區操做
--定義Kafka分區數量
local partition_num = 6
--定義共享詞典中的變量名
local sharedKey = "publicValue"
--共享詞典操做對象
local shared_data = ngx.shared.shared_data
--從共享詞典中取出數據
local num = shared_data:get(sharedKey)
--若是第一運行,num沒有值
if not num
then
        --初始化一個值存入共享詞典
        num = 0
        shared_data:set(sharedKey, 0)
end

--進行取餘操做,肯定分區ID
local patitionID = num % partition_num
--調用共享詞典自帶的自增功能進行累加
shared_data:incr(sharedKey, 1)

-- 數據採集
-- 獲取當前系統時間
local time_local = ngx.var.time_local
if time_local == nil then
time_local = "" end
-- 請求的URL
local request = ngx.var.request
if request == nil then
request = "" end
-- 獲取請求方式
local request_method = ngx.var.request_method
if request_method == nil then
request_method = "" end
-- 獲取請求的內容類型,text/html,application/json
local content_type = ngx.var.content_type
if content_type == nil then
content_type = "" end
-- 讀取請求體內容
ngx.req.read_body()
--獲取請求體數據
local request_body = ngx.var.request_body
if request_body == nil then
request_body = "" end
-- 獲取來源的URL
local http_referer = ngx.var.http_referer
if http_referer == nil then
http_referer = "" end
-- 客戶端的IP地址
local remote_addr = ngx.var.remote_addr
if remote_addr == nil then
remote_addr = "" end
-- 獲取請求攜帶的UA信息
local http_user_agent = ngx.var.http_user_agent
if http_user_agent == nil then
http_user_agent = "" end
-- 請求攜帶的時間
local time_iso8601 = ngx.var.time_iso8601
if time_iso8601 == nil then
time_iso8601 = "" end
-- 請求的IP地址(服務器地址)
local server_addr = ngx.var.server_addr
if server_addr == nil then
server_addr = "" end
--獲取用戶的Cookie信息
local http_cookie = ngx.var.http_cookie
if http_cookie == nil then
http_cookie = "" end
--封裝數據
local message = time_local .."#CS#".. request .."#CS#".. request_method .."#CS#".. content_type .."#CS#".. request_body .."#CS#".. http_referer .."#CS#".. remote_addr .."#CS#".. http_user_agent .."#CS#".. time_iso8601 .."#CS#".. server_addr .."#CS#".. http_cookie;

-- 鏈接Kafka,將message發送出去
-- 引用Kafka模塊
local kafka = require "resty.kafka.producer"
--建立producer
local broker_list = {{host="192.168.80.81",port=9092},{host="192.168.80.82",port=9092},{host="192.168.80.83",port=9092}}
local producer = kafka:new(broker_list)
--發送數據(主題,key(使用partitionid(0-5)做爲key),消息)
local ok, err = producer:send("sz07", tostring(patitionID), message)
if not ok
then
        ngx.say("Kafka發送失敗"..err)
        return
end

注意:

分區數量使用Lua沒法指定,須要使用kafka腳本手動指定

查看topic操做的幫助
/export/servers/kafka_2.11-1.0.0/bin/kafka-topics.sh --zookeeper node01:2181 --help
修改分區數量爲6:
/export/servers/kafka_2.11-1.0.0/bin/kafka-topics.sh --zookeeper node01:2181 --alter --partitions 6 --topic sz07

數據預處理

獲取Kafka中的消息

建立工程

導入pom.xml配置文件

導入配置文件

將反扒參考資料\配置文件目錄下的文件拷貝到項目resources目錄下

修改配置文件中的IP相關配置

導入項目須要的實體類以及工具類

將反扒參考資料\工具包中的類拷貝到項目中

消費Kafka數據的2種方式

  • receiver方式
    1. Kafka 高層次的消費者 api進行消費
    2. 爲了方式數據丟失,須要啓動 WAL 日誌,該日誌存儲在 HDFS 上
      缺點:
      1. Spark進行消費的時候,是經過開啓多個線程進行消費,並無增長處理速度,不能利用spark的rdd分區特性
      2. 若是開啓了WAL,數據會在Kafka複製一份,在HDFS上也複製一份,形成性能浪費
      3. Kafka偏移量是在zookeeper上保存的,有可能出現同步失敗,形成重複消費問題
  • direct方式
    1. 利用Kafka低層次的API進行消費
    2. 根據topic+partition獲取指定偏移量的數據,
    3. 消費的偏移量由Spark保存在CheckPoint中
      優勢:
      1. 不會出現重複消費,可以保證剛好一次語義

      1. 沒有使用WAL,數據僅僅被Kafka複製了一次,性能比較好.
        1. 能夠建立和Kafka相同的分區數量,提升消費性能

鏈路統計

編寫主程序APP

package com.air.antispider.stream.dataprocess

import com.air.antispider.stream.common.util.jedis.PropertiesUtil
import com.air.antispider.stream.dataprocess.businessprocess.BusinessProcess
import kafka.serializer.StringDecoder
import org.apache.spark.SparkConf
import org.apache.spark.streaming.dstream.{DStream, InputDStream}
import org.apache.spark.streaming.{Seconds, StreamingContext}
import org.apache.spark.streaming.kafka.KafkaUtils

/**
  * 數據預處理的主程序
  */
object DataProcessApp {

  def main(args: Array[String]): Unit = {

    //建立Spark配置對象
    val sparkConf: SparkConf = new SparkConf().setMaster("local[*]").setAppName("DataProcessApp")

    //建立SparkStreamingContext對象
    val ssc = new StreamingContext(sparkConf, Seconds(2))

    //消費Kafka消息,有幾種方式?2種

    var kafkaParams = Map[String, String]()

    //從kafkaConfig.properties配置文件中獲取broker列表信息
    val brokerList: String = PropertiesUtil.getStringByKey("default.brokers", "kafkaConfig.properties")

    kafkaParams += ("metadata.broker.list" -> brokerList)

    val topics = Set[String]("sz07")

    //使用Direct方式從Kafka中消費數據
    //StringDecoder:默認狀況下,java的序列化性能不高,Kafka爲了提升序列化性能,須要使用kafka本身的序列化機制
    val inputDStream: InputDStream[(String, String)] = KafkaUtils.createDirectStream[String, String, StringDecoder, StringDecoder](ssc, kafkaParams, topics)
    //獲取的消息是(key,message)的形式,
    val messageDStream: DStream[String] = inputDStream.map(_._2)

    messageDStream.foreachRDD(messageRDD =>{
      //開啓鏈路統計功能
      BusinessProcess.linkCount(messageRDD)
      messageRDD.foreach(println)
    })

    //啓動Spark程序
    ssc.start()
    ssc.awaitTermination()
  }
}

鏈路統計代碼

package com.air.antispider.stream.dataprocess.businessprocess

import java.util.Date

import com.air.antispider.stream.common.util.jedis.{JedisConnectionUtil, PropertiesUtil}
import org.apache.spark.rdd.RDD
import org.json4s.DefaultFormats
import org.json4s.jackson.Json
import redis.clients.jedis.JedisCluster

/**
  * 鏈路統計功能
  */
object BusinessProcess {
  def linkCount(messageRDD: RDD[String]) = {
    //信息採集量
    val serverCountRDD: RDD[(String, Int)] = messageRDD.map(message => {
      val arr: Array[String] = message.split("#CS#")
      if (arr.length > 9) {
        //有數據
        val serverIP = arr(9)
        //(ip,1次)
        (serverIP, 1)
      } else {
        ("", 1)
      }
    })
      //按照Key進行累加操做
      .reduceByKey(_ + _)

    //當前活躍鏈接數
    val activeNumRDD: RDD[(String, Int)] = messageRDD.map(message => {
      val arr: Array[String] = message.split("#CS#")
      if (arr.length > 11) {
        //取IP
        val serverIP = arr(9)
        //取本IP的活躍鏈接數量
        val activeNum = arr(11)
        //(ip,1次)
        (serverIP, activeNum.toInt)
      } else {
        ("", 1)
      }
    })
      //捨棄一個值,主須要一個活躍鏈接數就ok了
      .reduceByKey((x, y) => y)

    //進行數據展現
    //經過跟蹤java代碼,發現咱們須要封裝一個json數據,存入Redis中,讓前端進行數據展現
    if (!serverCountRDD.isEmpty() && !activeNumRDD.isEmpty()) {
      //若是數據不爲空,開始數據處理
      //將RDD的結果轉換爲Map
      val serversCountMap: collection.Map[String, Int] = serverCountRDD.collectAsMap()
      val activeNumMap: collection.Map[String, Int] = activeNumRDD.collectAsMap()

      val map = Map[String, collection.Map[String, Int]](
        "serversCountMap" -> serversCountMap,
        "activeNumMap" -> activeNumMap
      )

      //將map轉換爲JSON
      val jsonData: String = Json(DefaultFormats).write(map)

      //將jsonData存入Redis中
      //獲取Redis鏈接
      val jedis: JedisCluster = JedisConnectionUtil.getJedisCluster
      //存入數據
      //使用CSANTI_MONITOR_LP + 時間戳   格式來做爲Key
      val key: String = PropertiesUtil.getStringByKey("cluster.key.monitor.linkProcess", "jedisConfig.properties") + new Date().getTime
      val ex: Int = PropertiesUtil.getStringByKey("cluster.exptime.monitor", "jedisConfig.properties").toInt
      //當前數據是以天爲單位進行存儲的,因此有效時間,設置爲1天就好了
//      jedis.set(key, jsonData)
      //設置超時時間爲2分鐘
      jedis.setex(key, ex, jsonData)
    }
  }
}

URL過濾

流程:

1. 先獲取MySQL數據庫中的URL過濾規則
  1. 將過濾規則放入廣播變量
    1. 根據Redis的狀態更新廣播變量
      1. 使用過濾規則的廣播變量實現過濾功能

代碼編寫

  1. 先獲取MySQL數據庫中的URL過濾規則

代碼:

import com.air.antispider.stream.common.util.database.QueryDB

import scala.collection.mutable.ArrayBuffer

/**
  * 加載MySQL中的規則,方便Spark進行計算
  */
object AnalyzeRuleDB {

  /**
    * 獲取MySQL中的URL過濾規則
    */
  def getFilterRule(): ArrayBuffer[String]  = {
    val sql = "select value from nh_filter_rule"
    val field = "value"
    //查詢數據庫的value列
    val filterRule: ArrayBuffer[String] = QueryDB.queryData(sql, field)
    filterRule
  }
}
  1. 將過濾規則放入廣播變量

在建立SparkContext以後,獲取Kafka數據以前,加載數據庫的信息,放入廣播變量

//加載數據庫規則,放入廣播變量
    val filterRuleList: ArrayBuffer[String] = AnalyzeRuleDB.getFilterRule()
    //將過濾規則列表放入廣播變量
    //@volatile 讓多個線程可以安全的修改廣播變量
    @volatile var filterRuleBroadcast: Broadcast[ArrayBuffer[String]] = sc.broadcast(filterRuleList)

@volatile註解:

  1. 更新廣播變量

    //先檢查數據庫,更新廣播變量
    var filterRuleChangeFlag = jedis.get("FilterRuleChangeFlag")
    //檢查標記是否存在
    if (StringUtils.isBlank(filterRuleChangeFlag)) {
    filterRuleChangeFlag = "true"
    //從新設置到Redis中
    jedis.set("FilterRuleChangeFlag", filterRuleChangeFlag)
    }
    //更新廣播變量
    if (filterRuleChangeFlag.toBoolean) {
    //FilterRuleChangeFlag爲true,表明須要從新更新廣播變量
    //加載數據庫規則,放入廣播變量
    val filterRuleList: ArrayBuffer[String] = AnalyzeRuleDB.getFilterRule()
    //將過濾規則列表放入廣播變量
    //@volatile 讓多個線程可以安全的修改廣播變量
    filterRuleBroadcast = sc.broadcast(filterRuleList)
    filterRuleChangeFlag = "false"
    jedis.set("FilterRuleChangeFlag", filterRuleChangeFlag)
    }

  2. 建立URLFilter過濾類

    import scala.collection.mutable.ArrayBuffer

    /**
    • 使用廣播變量,實現URL過濾功能
      */
      object URLFilter {
      /**
      • @param message 原始數據
      • @param filterRulList 過濾規則
        */
        def filterURL(message: String, filterRulList: ArrayBuffer[String]): Boolean = {
        //看當前的message是否匹配filterRuleList
        //先取出message中的URL
        var url = ""
        val arr: Array[String] = message.split("#CS#")
        if (arr.length > 1) {
        val arrTemp: Array[String] = arr(1).split(" ")
        if (arrTemp.length > 1) {
        url = arrTemp(1)
        }
        }
        //判斷是否能取出URL,
        if (StringUtils.isBlank(url)) {
        return false
        }
        //遍歷filterRulList
        for (filterRule <- filterRulList) {
        if (url.matches(filterRule)) {
        return false
        }
        }
        //若是整個集合都遍歷完了,尚未return,那確定是沒有一個能匹配上
        return true
        }
        }
  3. 在主程序中引用URLFilter過濾類

    //URL過濾功能
    val filterRDD: RDD[String] = messageRDD.filter(message => URLFilter.filterURL(message, filterRuleBroadcast.value))

數據加密操做

代碼編寫:

package com.air.antispider.stream.dataprocess.businessprocess

import java.util.regex.{Matcher, Pattern}
import com.air.antispider.stream.common.util.decode.MD5
import org.apache.spark.rdd.RDD

/**
  * 對用戶的敏感信息進行加密操做
  */
object EncryptedData {
  /**
    * 加密身份證號
    * @param encryptedPhoneRDD
    * @return
    */
  def encryptedID(encryptedPhoneRDD: RDD[String]): RDD[String] = {
    //如何找到手機號
    encryptedPhoneRDD.map(message => {
      //建立加密對象
      val md5 = new MD5
      //找message中的手機號
      //可使用正則表達式來找
      val pattern: Pattern = Pattern.compile("(\\d{18})|(\\d{17}(\\d|X|x))|(\\d{15})")
      //使用正則對象,對message進行匹配,matcher是匹配結果
      val matcher: Matcher = pattern.matcher(message)
      var tempMessage = message
      //      while (iterator.hasNext()) {
      //        iterator.next()
      //      }
      //循環結果,看有沒有匹配到的數據
      while (matcher.find()) {
        //取出匹配結果
        val id: String = matcher.group()
        //加密/替換
        val encryptedID: String = md5.getMD5ofStr(id)
        tempMessage = tempMessage.replace(id, encryptedID)
      }
      //返回加密以後的數據
      tempMessage
    })
  }

  //手機號加密
  def encryptedPhone(filterRDD: RDD[String]): RDD[String] = {

    //如何找到手機號
    filterRDD.map(message => {
      //建立加密對象
      val md5 = new MD5
      //找message中的手機號
      //可使用正則表達式來找
      val pattern: Pattern = Pattern.compile("((13[0-9])|(14[5|7])|(15([0-3]|[5-9]))|(17[0-9])|(18[0,5-9]))\\d{8}")
      //使用正則對象,對message進行匹配,matcher是匹配結果
      val matcher: Matcher = pattern.matcher(message)
      var tempMessage = message
//      while (iterator.hasNext()) {
//        iterator.next()
//      }
      //循環結果,看有沒有匹配到的數據
      while (matcher.find()) {
        //取出匹配結果
        val phone: String = matcher.group()
        //加密/替換
        val encryptedPhone: String = md5.getMD5ofStr(phone)
        tempMessage = tempMessage.replace(phone, encryptedPhone)
      }
      //返回加密以後的數據
      tempMessage
    })
  }
}

主程序:

//進行數據脫敏操做
      //加密手機號
      val encryptedPhoneRDD: RDD[String] = EncryptedData.encryptedPhone(filterRDD)
      //加密身份證號
      val encryptedRDD: RDD[String] = EncryptedData.encryptedID(encryptedPhoneRDD)

數據切割操做

代碼:

package com.air.antispider.stream.dataprocess.businessprocess

import java.util.regex.Pattern

import com.air.antispider.stream.common.util.decode.{EscapeToolBox, RequestDecoder}
import com.air.antispider.stream.common.util.jedis.PropertiesUtil
import com.air.antispider.stream.common.util.string.CsairStringUtils

/**
  * 數據切割主程序
  */
object DataSplit {

  /**
    * 將源數據進行切割,獲得具體的參數
    * @param message
    * @return
    */
  def split(message: String):(String,String,String,String,String,String,String,String,String,String,String,String ) = {
    val values: Array[String] = message.split("#CS#")
    //從arr中取出這12個參數,進行賦值操做
    //記錄數據長度
    val valuesLength = values.length
    //request 原始數據
    val regionalRequest = if (valuesLength > 1) values(1) else ""
    //分割出 request 中的 url
    val request = if (regionalRequest.split(" ").length > 1) {
      regionalRequest.split(" ")(1)
    } else { ""
    }
    //請求方式 GET/POST
    val requestMethod = if (valuesLength > 2) values(2) else ""
    //content_type
    val contentType = if (valuesLength > 3) values(3) else ""
    //Post 提交的數據體
    val requestBody = if (valuesLength > 4) values(4) else ""
    //http_referrer
    val httpReferrer = if (valuesLength > 5) values(5) else ""
    //客戶端 IP
    val remoteAddr = if (valuesLength > 6) values(6) else ""
    //客戶端 UA
    val httpUserAgent = if (valuesLength > 7) values(7) else ""
    //服務器時間的 ISO8610 格式
    val timeIso8601 = if (valuesLength > 8) values(8) else ""
    //服務器地址
    val serverAddr = if (valuesLength > 9) values(9) else ""
    //Cookie 信息
    //原始信息中獲取 Cookie 字符串,去掉空格,製表符
    val cookiesStr = CsairStringUtils.trimSpacesChars(if (valuesLength > 10) values(10) else "")
    //提取 Cookie 信息並保存爲 K-V 形式
    val cookieMap = {
      var tempMap = new scala.collection.mutable.HashMap[String, String]
      if (!cookiesStr.equals("")) {
        cookiesStr.split(";").foreach { s =>
          val kv = s.split("=")
          //UTF8 解碼
          if (kv.length > 1) {
            try {
              val chPattern = Pattern.compile("u([0-9a-fA-F]{4})")
              val chMatcher = chPattern.matcher(kv(1))
              var isUnicode = false
              while (chMatcher.find()) {
                isUnicode = true
              }
              if (isUnicode) {
                tempMap += (kv(0) -> EscapeToolBox.unescape(kv(1)))
              } else {
                tempMap += (kv(0) -> RequestDecoder.decodePostRequest(kv(1)))
              }
            } catch {
              case e: Exception => e.printStackTrace()
            }
          }
        }
      }
      tempMap
    }
    //Cookie 關鍵信息解析
    //從配置文件讀取 Cookie 配置信息
    val cookieKey_JSESSIONID = PropertiesUtil.getStringByKey("cookie.JSESSIONID.key", "cookieConfig.properties")
    val cookieKey_userId4logCookie = PropertiesUtil.getStringByKey("cookie.userId.key", "cookieConfig.properties")
    //Cookie-JSESSIONID
    val cookieValue_JSESSIONID = cookieMap.getOrElse(cookieKey_JSESSIONID, "NULL")
    //Cookie-USERID-用戶 ID
    val cookieValue_USERID = cookieMap.getOrElse(cookieKey_userId4logCookie, "NULL")
    (request,requestMethod,contentType,requestBody,httpReferrer,remoteAddr,httpUserAgent,timeIso8601,serverAddr,cookiesStr,cookieValue_JSESSIONID,cookieValue_USERID)
  }

}

主程序:

encryptedRDD.map(message => {
        //獲取到消息後開始進行數據切割/打標籤等操做
        //數據切割
        val (request,
        requestMethod,
        contentType,
        requestBody,
        httpReferrer,
        remoteAddr,
        httpUserAgent,
        timeIso8601,
        serverAddr,
        cookiesStr,
        cookieValue_JSESSIONID,
        cookieValue_USERID) = DataSplit.split(message)
    
      })

數據打標籤

爲了方便後面的業務進行數據解析操做,必須知道當前的信息是一個什麼樣的請求,好比是國內/查詢/單程,仍是國際/查詢/往返,

分類打標籤

  1. 去數據庫中查詢分類規則信息

    /**
    * 查詢標籤規則的數據
    */
    def getClassifyRule(): Map[String, ArrayBuffer[String]] = {
    //獲取"國內查詢"的全部URL
    val nationalQuerySQL = "select expression from nh_classify_rule where flight_type = " + FlightTypeEnum.National.id + " and operation_type = " + BehaviorTypeEnum.Query.id
    val nationalQueryList: ArrayBuffer[String] = QueryDB.queryData(nationalQuerySQL, "expression")
    //獲取"國內預約"的全部URL
    val nationalBookSQL = "select expression from nh_classify_rule where flight_type = " + FlightTypeEnum.National.id + " and operation_type = " + BehaviorTypeEnum.Book.id
    val nationalBookList: ArrayBuffer[String] = QueryDB.queryData(nationalBookSQL, "expression")
    //獲取"國際查詢"的全部URL
    val internationalQuerySQL = "select expression from nh_classify_rule where flight_type = " + FlightTypeEnum.International.id + " and operation_type = " + BehaviorTypeEnum.Query.id
    val internationalQueryList: ArrayBuffer[String] = QueryDB.queryData(internationalQuerySQL, "expression")
    //獲取"國際預約"的全部URL
    val internationalBookSQL = "select expression from nh_classify_rule where flight_type = " + FlightTypeEnum.International.id + " and operation_type = " + BehaviorTypeEnum.Book.id
    val internationalBookList: ArrayBuffer[String] = QueryDB.queryData(internationalBookSQL, "expression")

    //定義一個Map,用來封裝上面的4個集合
    
     val map = Map[String, ArrayBuffer[String]](
       "nationalQueryList" -> nationalQueryList,
       "nationalBookList" -> nationalBookList,
       "internationalQueryList" -> internationalQueryList,
       "internationalBookList" -> internationalBookList
     )
     map

    }

  2. 加載分類規則到廣播變量

    //將分類規則加載到廣播變量
     val classifyRuleMap: Map[String, ArrayBuffer[String]] = AnalyzeRuleDB.getClassifyRule()
     @volatile var classifyRuleBroadcast: Broadcast[Map[String, ArrayBuffer[String]]] = sc.broadcast(classifyRuleMap)
  3. 更新廣播變量

    //更新分類規則信息
    var classifyRuleChangeFlag: String = jedis.get("ClassifyRuleChangeFlag")
    //先判斷classifyRuleChangeFlag是否爲空
    if (StringUtils.isBlank(classifyRuleChangeFlag)){
    classifyRuleChangeFlag = "true"
    //從新設置到Redis中
    jedis.set("ClassifyRuleChangeFlag", classifyRuleChangeFlag)
    }
    if (classifyRuleChangeFlag.toBoolean) {
    classifyRuleBroadcast.unpersist()
    //將分類規則加載到廣播變量
    val classifyRuleMap: Map[String, ArrayBuffer[String]] = AnalyzeRuleDB.getClassifyRule()
    classifyRuleBroadcast = sc.broadcast(classifyRuleMap)
    classifyRuleChangeFlag = "false"
    //從新設置到Redis中
    jedis.set("ClassifyRuleChangeFlag", classifyRuleChangeFlag)
    }

  4. 根據廣播變量中的規則對當前請求打標籤

    package com.air.antispider.stream.dataprocess.businessprocess

    import com.air.antispider.stream.common.bean.RequestType
    import com.air.antispider.stream.dataprocess.constants.{BehaviorTypeEnum, FlightTypeEnum}
    import com.air.antispider.stream.dataprocess.constants.FlightTypeEnum.FlightTypeEnum

    import scala.collection.mutable.ArrayBuffer

    object RequestTypeClassifier {
    /**
    * 對請求的分類進行判斷
    * @param request
    * @param classifyRuleMap
    * @return 用戶的請求分類信息(國內,查詢)
    */
    def requestTypeClassifier(request: String, classifyRuleMap: Map[String, ArrayBuffer[String]]): RequestType = {
    //取出分類集合中的數據
    val nationalQueryList: ArrayBuffer[String] = classifyRuleMap.getOrElse("nationalQueryList", null)
    val nationalBookList: ArrayBuffer[String] = classifyRuleMap.getOrElse("nationalBookList", null)
    val internationalQueryList: ArrayBuffer[String] = classifyRuleMap.getOrElse("internationalQueryList", null)
    val internationalBookList: ArrayBuffer[String] = classifyRuleMap.getOrElse("internationalBookList", null)

    //變量這4個集合,看當前的request在哪一個集合中匹配
     //國內查詢
     if (nationalQueryList != null) {
       //      fira code
       for (expression <- nationalQueryList) {
         //判斷當前請求的URL是否和本正則匹配
         if (request.matches(expression)) {
           return RequestType(FlightTypeEnum.National, BehaviorTypeEnum.Query)
         }
       }
     }
     //國內預約
     if (nationalBookList != null) {
       //      fira code
       for (expression <- nationalBookList) {
         //判斷當前請求的URL是否和本正則匹配
         if (request.matches(expression)) {
           return RequestType(FlightTypeEnum.National, BehaviorTypeEnum.Book)
         }
       }
     }
     //國際查詢
     if (internationalQueryList != null) {
       //      fira code
       for (expression <- internationalQueryList) {
         //判斷當前請求的URL是否和本正則匹配
         if (request.matches(expression)) {
           return RequestType(FlightTypeEnum.International, BehaviorTypeEnum.Query)
         }
       }
     }
     //國際預約
     if (internationalBookList != null) {
       //      fira code
       for (expression <- internationalBookList) {
         //判斷當前請求的URL是否和本正則匹配
         if (request.matches(expression)) {
           return RequestType(FlightTypeEnum.International, BehaviorTypeEnum.Book)
         }
       }
     }
    
     //若是上面沒有任何一個匹配上,那麼返回一個默認值
     return RequestType(FlightTypeEnum.Other, BehaviorTypeEnum.Other)

    }
    }

5.在主程序中引用打標籤的方法

//對請求的分類進行打標籤操做
        val requestType: RequestType = RequestTypeClassifier.requestTypeClassifier(request, classifyRuleBroadcast.value)

往返類型打標籤

用戶請求信息中沒有攜帶往返類型信息,咱們須要須要用HttpReferrer中獲取日期數量來判斷往返類型,若是日期個數爲1,單程.若是個數爲2,往返

編寫代碼:

package com.air.antispider.stream.dataprocess.businessprocess

import java.util.regex.{Matcher, Pattern}

import com.air.antispider.stream.dataprocess.constants.TravelTypeEnum
import com.air.antispider.stream.dataprocess.constants.TravelTypeEnum.TravelTypeEnum

/**
  * 往返信息打標籤
  */
object TravelTypeClassifier {

  def travelTypeClassifier(httpReferrer: String): TravelTypeEnum = {
    val pattern: Pattern = Pattern.compile("(\\d{4})-(0\\d{1}|1[0-2])-(0\\d{1}|[12]\\d{1}|3[01])")
    val matcher: Matcher = pattern.matcher(httpReferrer)

    //建立一個計數器
    var num = 0

    //調用find方法的時候,遊標會自動向下
    while (matcher.find()) {
      num = num + 1
    }

    if (num == 1) {
      //是單程
      return TravelTypeEnum.OneWay
    } else if (num == 2) {
      //是往返
      return TravelTypeEnum.RoundTrip
    } else {
      //不知道啊
      return TravelTypeEnum.Unknown
    }
  }
}

主程序:

//對往返數據進行打標籤操做
        val travelTypeEnum: TravelTypeEnum = TravelTypeClassifier.travelTypeClassifier(httpReferrer)

數據解析操做

由於先有南航系統,由於系統開發久遠,各個模塊請求參數不統一或者請求格式不統一,咱們根據航線類型/操做類型/請求的URL/請求方式等信息,經過查詢數據庫中的analyzerule表信息,獲取解析規則,經過數據庫配置好的解析規則來進行數據解析,

此處主要肯定2個內容:1. 解析方式,好比使用json解析仍是使用XML方式解析. 2. 肯定須要解析哪些字段

加載數據庫解析規則

從反扒參考資料\工具包\解析類\AnalyzeRuleDB.scala中找到獲取解析規則的方法:queryRule

/**
    * 查詢"查詢規則"或者「預約規則」正則表達式,添加到廣播變量
    *
    * @return
    */
  def queryRule(behaviorType: Int): List[AnalyzeRule] = {
    //mysql中解析規則(0-查詢,1-預訂)數據
    var analyzeRuleList = new ArrayBuffer[AnalyzeRule]()
    val sql: String = "select * from analyzerule where behavior_type =" + behaviorType
    var conn: Connection = null
    var ps: PreparedStatement = null
    var rs: ResultSet = null
    try {
      conn = c3p0Util.getConnection
      ps = conn.prepareStatement(sql)
      rs = ps.executeQuery()
      while (rs.next()) {
        val analyzeRule = new AnalyzeRule()
        analyzeRule.id = rs.getString("id")
        analyzeRule.flightType = rs.getString("flight_type").toInt
        analyzeRule.BehaviorType = rs.getString("behavior_type").toInt
        analyzeRule.requestMatchExpression = rs.getString("requestMatchExpression")
        analyzeRule.requestMethod = rs.getString("requestMethod")
        analyzeRule.isNormalGet = rs.getString("isNormalGet").toBoolean
        analyzeRule.isNormalForm = rs.getString("isNormalForm").toBoolean
        analyzeRule.isApplicationJson = rs.getString("isApplicationJson").toBoolean
        analyzeRule.isTextXml = rs.getString("isTextXml").toBoolean
        analyzeRule.isJson = rs.getString("isJson").toBoolean
        analyzeRule.isXML = rs.getString("isXML").toBoolean
        analyzeRule.formDataField = rs.getString("formDataField")
        analyzeRule.book_bookUserId = rs.getString("book_bookUserId")
        analyzeRule.book_bookUnUserId = rs.getString("book_bookUnUserId")
        analyzeRule.book_psgName = rs.getString("book_psgName")
        analyzeRule.book_psgType = rs.getString("book_psgType")
        analyzeRule.book_idType = rs.getString("book_idType")
        analyzeRule.book_idCard = rs.getString("book_idCard")
        analyzeRule.book_contractName = rs.getString("book_contractName")
        analyzeRule.book_contractPhone = rs.getString("book_contractPhone")
        analyzeRule.book_depCity = rs.getString("book_depCity")
        analyzeRule.book_arrCity = rs.getString("book_arrCity")
        analyzeRule.book_flightDate = rs.getString("book_flightDate")
        analyzeRule.book_cabin = rs.getString("book_cabin")
        analyzeRule.book_flightNo = rs.getString("book_flightNo")
        analyzeRule.query_depCity = rs.getString("query_depCity")
        analyzeRule.query_arrCity = rs.getString("query_arrCity")
        analyzeRule.query_flightDate = rs.getString("query_flightDate")
        analyzeRule.query_adultNum = rs.getString("query_adultNum")
        analyzeRule.query_childNum = rs.getString("query_childNum")
        analyzeRule.query_infantNum = rs.getString("query_infantNum")
        analyzeRule.query_country = rs.getString("query_country")
        analyzeRule.query_travelType = rs.getString("query_travelType")
        analyzeRule.book_psgFirName = rs.getString("book_psgFirName")
        analyzeRuleList += analyzeRule
      }
    } catch {
      case e: Exception => e.printStackTrace()
    } finally {
      c3p0Util.close(conn, ps, rs)
    }
    analyzeRuleList.toList
  }

將規則放入廣播變量

//加載解析規則信息到廣播變量
    val queryRuleList: List[AnalyzeRule] = AnalyzeRuleDB.queryRule(0)
    val bookRuleList: List[AnalyzeRule] = AnalyzeRuleDB.queryRule(1)
    @volatile var queryRuleBroadcast: Broadcast[List[AnalyzeRule]] = sc.broadcast(queryRuleList)
    @volatile var bookRuleBroadcast: Broadcast[List[AnalyzeRule]] = sc.broadcast(bookRuleList)

更新廣播變量

//更新解析規則信息
      var analyzeRuleChangeFlag: String = jedis.get("AnalyzeRuleChangeFlag")
      //先判斷classifyRuleChangeFlag是否爲空
      if (StringUtils.isBlank(analyzeRuleChangeFlag)){
        analyzeRuleChangeFlag = "true"
        //從新設置到Redis中
        jedis.set("AnalyzeRuleChangeFlag", analyzeRuleChangeFlag)
      }
      if (analyzeRuleChangeFlag.toBoolean) {
        queryRuleBroadcast.unpersist()
        bookRuleBroadcast.unpersist()
        //將解析規則加載到廣播變量
        //加載解析規則信息到廣播變量
        val queryRuleList: List[AnalyzeRule] = AnalyzeRuleDB.queryRule(0)
        val bookRuleList: List[AnalyzeRule] = AnalyzeRuleDB.queryRule(1)
        queryRuleBroadcast = sc.broadcast(queryRuleList)
        bookRuleBroadcast = sc.broadcast(bookRuleList)

        analyzeRuleChangeFlag = "false"
        //從新設置到Redis中
        jedis.set("AnalyzeRuleChangeFlag", analyzeRuleChangeFlag)
      }

編寫解析規則代碼

將反扒參考資料\工具包\解析類路徑下的AnalyzeBookRequest和AnalyzeRequest2個類拷貝到com.air.antispider.stream.dataprocess.businessprocess包下

主程序調用

//開始解析數據
        //解析查詢數據
        val queryParams: Option[QueryRequestData] = AnalyzeRequest.analyzeQueryRequest(
          requestType,
          requestMethod,
          contentType,
          request,
          requestBody,
          travelTypeEnum,
          queryRuleBroadcast.value)
        //解析預約數據
        val bookParams: Option[BookRequestData] = AnalyzeBookRequest.analyzeBookRequest(
          requestType,
          requestMethod,
          contentType,
          request,
          requestBody,
          travelTypeEnum,
          bookRuleBroadcast.value
        )

數據加工

提早將本次訪問的IP和MySQL中的黑名單數據進行比對,判斷當前的IP是不是一個高頻IP,若是是高頻IP,那麼就打個標記,讓後續業務使用.

  1. 加載MySQL中的黑名單數據

    /**
    * 查詢MySQL數據庫中的黑名單數據
    * @return
    */
    def getIpBlackList (): ArrayBuffer[String] = {
    val sql = "select ip_name from nh_ip_blacklist"
    val blackIPList: ArrayBuffer[String] = QueryDB.queryData(sql, "ip_name")
    blackIPList
    }

  2. 將黑名單數據放入廣播變量

    //將黑名單數據加載到廣播變量
     val blackIPList: ArrayBuffer[String] = AnalyzeRuleDB.getIpBlackList()
     @volatile var blackIPBroadcast: Broadcast[ArrayBuffer[String]] = sc.broadcast(blackIPList)
  3. 更新廣播變量的黑名單數據

    //更新黑名單信息
       var blackIPChangeFlag: String = jedis.get("BlackIPChangeFlag")
       //先判斷classifyRuleChangeFlag是否爲空
       if (StringUtils.isBlank(blackIPChangeFlag)){
         blackIPChangeFlag = "true"
         //從新設置到Redis中
         jedis.set("BlackIPChangeFlag", blackIPChangeFlag)
       }
       if (blackIPChangeFlag.toBoolean) {
         blackIPBroadcast.unpersist()
         //將黑名單數據加載到廣播變量
         val blackIPList: ArrayBuffer[String] = AnalyzeRuleDB.getIpBlackList()
         blackIPBroadcast = sc.broadcast(blackIPList)
    
         blackIPChangeFlag = "false"
         //從新設置到Redis中
         jedis.set("BlackIPChangeFlag", blackIPChangeFlag)
       }
  4. 編寫判斷高頻IP代碼

    package com.air.antispider.stream.dataprocess.businessprocess

    import scala.collection.mutable.ArrayBuffer

    object IpOperation {
    /**
    * 判斷當前客戶端IP是不是高頻IP
    * @param remoteAddr
    * @param blackIPList
    * @return
    */
    def operationIP(remoteAddr: String, blackIPList: ArrayBuffer[String]): Boolean = {
    //遍歷blackIPList,判斷remoteAddr在集合中是否存在
    for (blackIP <- blackIPList) {
    if (blackIP.equals(remoteAddr)){
    //若是相等,當前IP是高頻IP
    return true
    }
    }
    return false
    }
    }

  5. 主程序代碼

    //數據加工操做
    val highFrqIPGroup: Boolean = IpOperation.operationIP(remoteAddr, blackIPBroadcast.value)

數據結構化

前面獲取/計算的數據都是散亂的,沒辦法交給後面的業務進行處理,因此須要進行封裝爲結構化數據.

代碼編寫:

package com.air.antispider.stream.dataprocess.businessprocess

import com.air.antispider.stream.common.bean.{BookRequestData, CoreRequestParams, ProcessedData, QueryRequestData, RequestType}
import com.air.antispider.stream.dataprocess.constants.TravelTypeEnum.TravelTypeEnum

object DataPackage {
  /**
    * 對下方散亂的數據,進行封裝,封裝爲ProcessedData
    * @param source
    * @param requestMethod
    * @param request
    * @param remoteAddr
    * @param httpUserAgent
    * @param timeIso8601
    * @param serverAddr
    * @param highFrqIPGroup
    * @param requestType
    * @param travelType
    * @param cookieValue_JSESSIONID
    * @param cookieValue_USERID
    * @param queryParams
    * @param bookParams
    * @param httpReferrer
    * @return
    */
  def dataPackage(sourceData: String,
                  requestMethod: String,
                  request: String,
                  remoteAddr: String,
                  httpUserAgent: String,
                  timeIso8601: String,
                  serverAddr: String,
                  highFrqIPGroup: Boolean,
                  requestType: RequestType,
                  travelType: TravelTypeEnum,
                  cookieValue_JSESSIONID: String,
                  cookieValue_USERID: String,
                  queryParams: Option[QueryRequestData],
                  bookParams: Option[BookRequestData],
                  httpReferrer: String): ProcessedData = {

    //由於建立ProcessedData的時候,還須要核心請求參數,
    //但這些參數在queryParams/bookParams中

    //定義出發時間/始發地/目的地等參數
    var flightDate: String = ""
    //出發地
    var depcity: String = ""
    //目的地
    var arrcity: String = ""

   //看查詢請求參數中有沒有值
    queryParams match {
      //Option有值的狀況,queryData:若是有值,就使用此變量操做
      case Some(x) =>
        flightDate = x.flightDate
        depcity = x.depCity
        arrcity = x.arrCity
      //None:沒有值
      case None =>
        //若是查詢請求參數沒有值,就去預約請求參數中獲取
        bookParams match {
          //Option有值的狀況,queryData:若是有值,就使用此變量操做
          case Some(bookData) =>
            //爲了確保安全,須要加上長度判斷,只有長度大於0才能這樣取值
            flightDate = bookData.flightDate.mkString
            depcity = bookData.depCity.mkString
            arrcity = bookData.arrCity.mkString
          //None:沒有值
          case None =>
        }
    }

    //建立核心請求參數
    val requestParams = CoreRequestParams(flightDate, depcity, arrcity)

    ProcessedData(
      sourceData,
      requestMethod,
      request,
      remoteAddr,
      httpUserAgent,
      timeIso8601,
      serverAddr,
      highFrqIPGroup,
      requestType,
      travelType,
      requestParams,
      cookieValue_JSESSIONID,
      cookieValue_USERID,
      queryParams,
      bookParams,
      httpReferrer)
  }
}

主程序代碼

//進行數據信息提取/轉換等操做,獲得ProcessedDataRDD
      val processedDataRDD: RDD[ProcessedData] = encryptedRDD.map(message => {
        //獲取到消息後開始進行數據切割/打標籤等操做
        //數據切割
        val (request, //請求URL
        requestMethod,
        contentType,
        requestBody, //請求體
        httpReferrer, //來源URL
        remoteAddr, //客戶端IP
        httpUserAgent,
        timeIso8601,
        serverAddr,
        cookiesStr,
        cookieValue_JSESSIONID,
        cookieValue_USERID) = DataSplit.split(message)

        //對請求的分類進行打標籤操做
        val requestType: RequestType = RequestTypeClassifier.requestTypeClassifier(request, classifyRuleBroadcast.value)
        //對往返數據進行打標籤操做
        val travelType: TravelTypeEnum = TravelTypeClassifier.travelTypeClassifier(httpReferrer)
        //開始解析數據
        //解析查詢數據
        val queryParams: Option[QueryRequestData] = AnalyzeRequest.analyzeQueryRequest(
          requestType,
          requestMethod,
          contentType,
          request,
          requestBody,
          travelType,
          queryRuleBroadcast.value)
        //解析預約數據
        val bookParams: Option[BookRequestData] = AnalyzeBookRequest.analyzeBookRequest(
          requestType,
          requestMethod,
          contentType,
          request,
          requestBody,
          travelType,
          bookRuleBroadcast.value
        )
        //數據加工操做
        val highFrqIPGroup: Boolean = IpOperation.operationIP(remoteAddr, blackIPBroadcast.value)
        //對上面的散亂數據進行封裝
        val processedData: ProcessedData = DataPackage.dataPackage(
          "", //原始數據,此處直接置爲空
          requestMethod,
          request,
          remoteAddr,
          httpUserAgent,
          timeIso8601,
          serverAddr,
          highFrqIPGroup,
          requestType,
          travelType,
          cookieValue_JSESSIONID,
          cookieValue_USERID,
          queryParams,
          bookParams,
          httpReferrer)
        processedData
      })

數據推送模塊

爲了實現更好的解耦,在數據推送的時候,會根據請求具體的類型,好比查詢/預約,發送到不一樣的Topic.後面的業務,就很近本身的須要去拉取本身的消息

代碼編寫:

package com.air.antispider.stream.dataprocess.businessprocess

import com.air.antispider.stream.common.bean.ProcessedData
import com.air.antispider.stream.common.util.jedis.PropertiesUtil
import com.air.antispider.stream.dataprocess.constants.BehaviorTypeEnum
import com.air.antispider.stream.dataprocess.constants.BehaviorTypeEnum.BehaviorTypeEnum
import org.apache.kafka.clients.producer.{KafkaProducer, ProducerConfig, ProducerRecord}
import org.apache.spark.rdd.RDD

object SendData {
  /**
    * 發送預約數據到Kafka
    *
    * @param processedDataRDD
    */
  def sendBookDataKafka(processedDataRDD: RDD[ProcessedData]) = {
    sendToKafka(processedDataRDD, 1)
  }

  /**
    * 發送查詢數據到Kafka
    *
    * @param processedDataRDD
    */
  def sendQueryDataKafka(processedDataRDD: RDD[ProcessedData]) = {
    sendToKafka(processedDataRDD, 0)
  }

  /**
    * 根據指定的類型,發送到Kafka
    *
    * @param processedDataRDD
    * @param topicType 0: 查詢,1: 預約
    */
  def sendToKafka(processedDataRDD: RDD[ProcessedData], topicType: Int) = {
    //將processedData數據發送到Kafka中
    val messageRDD: RDD[String] = processedDataRDD
      //根據類型進行過濾
      .filter(processedData => processedData.requestType.behaviorType.id == topicType)
      //將數據轉換爲字符串
      .map(processedData => processedData.toKafkaString())

    //若是通過過濾操做後,還有數據,那麼就發送
    if (!messageRDD.isEmpty()) {
      //定義Kafka相關配置
      //查詢數據的 topic:target.query.topic = processedQuery
      var topicKey = ""
      if (topicType == 0) {
        topicKey = "target.query.topic"
      } else if (topicType == 1) {
        topicKey = "target.book.topic"
      }
      val queryTopic = PropertiesUtil.getStringByKey(topicKey, "kafkaConfig.properties")
      //建立 map 封裝 kafka 參數
      val props = new java.util.HashMap[String, Object]()
      //設置 brokers
      props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, PropertiesUtil.getStringByKey("default.brokers", "kafkaConfig.properties"))
      //key 序列化方法
      props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, PropertiesUtil.getStringByKey("default.key_serializer_class_config", "kafkaConfig.properties"))
      //value 序列化方法
      props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, PropertiesUtil.getStringByKey("default.value_serializer_class_config", "kafkaConfig.properties"))
      //批發送設置:32KB 做爲一批次或 10ms 做爲一批次
      props.put(ProducerConfig.BATCH_SIZE_CONFIG, PropertiesUtil.getStringByKey("default.batch_size_config", "kafkaConfig.properties"))
      props.put(ProducerConfig.LINGER_MS_CONFIG, PropertiesUtil.getStringByKey("default.linger_ms_config", "kafkaConfig.properties"))

      messageRDD.foreachPartition(iter => {
        //先建立Kafka鏈接
        val producer = new KafkaProducer[String, String](props)
        //發送數據
        iter.foreach(message => {
          //發送數據
          producer.send(new ProducerRecord[String, String](queryTopic, message))
        })
        //關閉Kafka鏈接
        producer.close()
      })
    }
  }
}

主程序:

//將結構化的數據ProcessedData根據不一樣的請求發送到不一樣的Topic中
//發送查詢數據到Kafka
SendData.sendQueryDataKafka(processedDataRDD)
//發送預約數據到Kafka
SendData.sendBookDataKafka(processedDataRDD)

任務實時監控

Spark自帶有性能監控功能,須要在建立SparkConf的時候開啓:

//當應用被中止的時候,進行以下設置能夠保證當前批次執行完以後再中止應用。
    System.setProperty("spark.streaming.stopGracefullyOnShutdown", "true")
    //建立Spark配置對象
    val sparkConf: SparkConf = new SparkConf()
      .setMaster("local[*]")
      .setAppName("DataProcessApp")
      //開啓Spark性能監控功能
      .set("spark.metrics.conf.executor.source.jvm.class", "org.apache.spark.metrics.source.JvmSource")

在瀏覽器上能夠經過:http://localhost:4040/metrics/json/訪問

代碼編寫

package com.air.antispider.stream.dataprocess.businessprocess

import com.air.antispider.stream.common.bean.ProcessedData
import com.air.antispider.stream.common.util.jedis.{JedisConnectionUtil, PropertiesUtil}
import com.air.antispider.stream.common.util.spark.SparkMetricsUtils
import com.alibaba.fastjson.JSONObject
import org.apache.commons.lang3.time.FastDateFormat
import org.apache.spark.SparkContext
import org.apache.spark.rdd.RDD
import org.json4s.DefaultFormats
import org.json4s.jackson.Json
import redis.clients.jedis.JedisCluster

object SparkStreamingMonitor {
  /**
    * Spark性能監控,
    *
    * @param sc
    * @param processedDataRDD
    * @param serversCountMap
    */
  def streamMonitor(sc: SparkContext, processedDataRDD: RDD[ProcessedData], serversCountMap: collection.Map[String, Int]) = {

    //1. 獲取到Spark的狀態信息
    /*
    //在項目上線後,使用下方的方式獲取URL
        //監控數據獲取
        val sparkDriverHost =
          sc.getConf.get("spark.org.apache.hadoop.yarn.server.webproxy.amfilter.AmIpFilter.param.PROXY _URI_BASES")
        //在 yarn 上運行的監控數據 json 路徑
        val url = s"${sparkDriverHost}/metrics/json"
        */
    val url = "http://localhost:4040/metrics/json/"
    val sparkDataInfo: JSONObject = SparkMetricsUtils.getMetricsJson(url)
    val gaugesObj: JSONObject = sparkDataInfo.getJSONObject("gauges")

    //獲取應用ID和應用名稱,用來構建json中的key
    val id: String = sc.applicationId
    val appName: String = sc.appName
    //local-1561617727065.driver.DataProcessApp.StreamingMetrics.streaming.lastCompletedBatch_processingStartTime
    val startKey = id + ".driver." + appName + ".StreamingMetrics.streaming.lastCompletedBatch_processingStartTime"
    val endKey = id + ".driver." + appName + ".StreamingMetrics.streaming.lastCompletedBatch_processingEndTime"

    val startTime = gaugesObj.getJSONObject(startKey) //{"value": 1561617812011}
      .getLong("value")
    val endTime = gaugesObj.getJSONObject(endKey) //{"value": 1561617812011}
      .getLong("value")
    //將結束時間進行格式化yyyy-MM-dd HH:mm:ss,注意,web平臺使用的是24小時制,因此此處須要使用HH
    val endTimeStr: String = FastDateFormat.getInstance("yyyy-MM-dd HH:mm:ss").format(endTime)

    //2. 計算時間差
    val costTime = endTime - startTime

    //3. 根據時間差計算數據處理速度,速度= 數量/時間
    //獲取處理的數量
    val count: Long = processedDataRDD.count()
    //計算處理速度
    var countPer = 0.0
    if (costTime != 0) {
      countPer = count / costTime
    }


    //4. 交給JavaWeb進行結果展現
    //對serversCountMap進行轉換,轉換爲JSON
    val serversCountMapJson: String = Json(DefaultFormats).write(serversCountMap)

    //根據web平臺的代碼,發現須要存入Redis中
    val message = Map[String, Any](
      "costTime" -> costTime.toString, //時間差
      "applicationId" -> id, //應用ID
      "applicationUniqueName" -> appName, //應用名稱
      "countPerMillis" -> countPer.toString,//計算速度
      "endTime" -> endTimeStr, //結束時間:2019-06-27 15:44:32
      "sourceCount" -> count.toString, //數據的數量
      "serversCountMap" -> serversCountMap //數據採集信息
    )

   //將message轉換爲json
   val messageJson: String = Json(DefaultFormats).write(message)

    //將messageJson發送到Kafka

    val jedis: JedisCluster = JedisConnectionUtil.getJedisCluster
    //存入Redis的Key.CSANTI_MONITOR_DP + 時間戳
    val key = PropertiesUtil.getStringByKey("cluster.key.monitor.dataProcess", "jedisConfig.properties") + System.currentTimeMillis()
    val ex = PropertiesUtil.getStringByKey("cluster.exptime.monitor", "jedisConfig.properties").toInt
    jedis.setex(key, ex, messageJson)


    //若是須要最後一批數據,那麼可使用下面的方式,
    val lastKey = PropertiesUtil.getStringByKey("cluster.key.monitor.dataProcess", "jedisConfig.properties") + "_LAST"
    jedis.set(lastKey, messageJson)

  }
}

主程序代碼:

由於第三個參數serversCountMap涉及到了以前的鏈路統計,因此須要修改鏈路統計的返回值

//開啓Spark性能監控
//SparkContext, 數據集RDD, 數據採集結果信息
SparkStreamingMonitor.streamMonitor(sc, processedDataRDD, serversCountMap)

實時計算

自定義維護Offset

讀取偏移量代碼:

package com.air.antispider.stream.rulecompute

import com.air.antispider.stream.common.util.jedis.PropertiesUtil
import com.air.antispider.stream.common.util.kafka.KafkaOffsetUtil
import kafka.common.TopicAndPartition
import kafka.message.MessageAndMetadata
import kafka.serializer.StringDecoder
import org.I0Itec.zkclient.ZkClient
import org.apache.spark.streaming.dstream.InputDStream
import org.apache.spark.streaming.kafka.KafkaUtils
import org.apache.spark.{SparkConf, SparkContext}
import org.apache.spark.streaming.{Seconds, StreamingContext}

/**
  * 黑名單實時計算主程序
  */
object RuleComputeApp {


  def main(args: Array[String]): Unit = {
    //建立Spark執行環境
    //當應用被中止的時候,進行以下設置能夠保證當前批次執行完以後再中止應用。
    System.setProperty("spark.streaming.stopGracefullyOnShutdown", "true")
    //建立Spark配置對象
    val sparkConf: SparkConf = new SparkConf()
      .setMaster("local[*]")
      .setAppName("RuleComputeApp")
      //開啓Spark性能監控功能
      .set("spark.metrics.conf.executor.source.jvm.class", "org.apache.spark.metrics.source.JvmSource")
    //建立SparkContext
    val sc = new SparkContext(sparkConf)
    //建立SparkStreamingContext對象
    val ssc = new StreamingContext(sc, Seconds(2))
  
    val inputStream: InputDStream[(String, String)] = createKafkaStream(ssc)

    inputStream.print()

    //啓動程序
    ssc.start()
    ssc.awaitTermination()
  }


  /**
    * 消費Kafka數據,建立InputStream對象
    * @param ssc
    * @return
    */
  def createKafkaStream(ssc: StreamingContext): InputDStream[(String, String)] = {
    //鏈接Kafka
    //封裝Kafka參數信息
    var kafkaParams = Map[String, String]()
    //從kafkaConfig.properties配置文件中獲取broker列表信息
    val brokerList: String = PropertiesUtil.getStringByKey("default.brokers", "kafkaConfig.properties")
    kafkaParams += ("metadata.broker.list" -> brokerList)

    //zookeeper主機地址
    val zkHosts: String = PropertiesUtil.getStringByKey("zkHosts", "zookeeperConfig.properties")
    //topic信息存儲位置
    val zkPath: String = PropertiesUtil.getStringByKey("rulecompute.antispider.zkPath", "zookeeperConfig.properties")
    //topic
    val topic: String = PropertiesUtil.getStringByKey("source.query.topic", "kafkaConfig.properties")
    //封裝topic的集合
    val topics = Set[String](topic)
    //建立zk客戶端對象
    val zkClient = new ZkClient(zkHosts)


    //使用KafkaOffsetUtil來獲取TopicAndPartition數據
    val topicAndPartitionOption: Option[Map[TopicAndPartition, Long]] = KafkaOffsetUtil.readOffsets(zkClient, zkHosts, zkPath, topic)

    val inputStream: InputDStream[(String, String)] = topicAndPartitionOption match {
      //若是有數據:從Zookeeper中讀取偏移量
      case Some(topicAndPartition) =>
        val messageHandler = (mmd: MessageAndMetadata[String, String]) => (mmd.key, mmd.message)
        KafkaUtils.createDirectStream[String, String, StringDecoder, StringDecoder, (String, String)](ssc, kafkaParams, topicAndPartition, messageHandler)
      //若是沒有數據,還按照之前的方式來讀取數據
      case None => KafkaUtils.createDirectStream[String, String, StringDecoder, StringDecoder](ssc, kafkaParams, topics)
    }
    inputStream
  }
}

保存偏移量代碼:

//將數據偏移量到zookeeper中
    inputStream.foreachRDD(rdd => {
      //保存偏移量
      saveOffsets(rdd)
    })

/**
    * 保存偏移量信息
    * @param rdd
    */
  def saveOffsets(rdd: RDD[(String, String)]): Unit = {
    //zookeeper主機地址
    val zkHosts: String = PropertiesUtil.getStringByKey("zkHosts", "zookeeperConfig.properties")
    //建立zk客戶端對象
    val zkClient = new ZkClient(zkHosts)
    //topic信息存儲位置
    val zkPath: String = PropertiesUtil.getStringByKey("rulecompute.antispider.zkPath", "zookeeperConfig.properties")

    KafkaOffsetUtil.saveOffsets(zkClient, zkHosts, zkPath, rdd)
  }

數據封裝

將獲取到的字符串轉換爲ProcessedData對象,能夠直接從講義中拷貝過來

代碼:

package com.air.antispider.stream.rulecompute.businessprocess

import com.air.antispider.stream.common.bean._
import com.air.antispider.stream.dataprocess.constants.TravelTypeEnum.TravelTypeEnum
import com.air.antispider.stream.dataprocess.constants.{BehaviorTypeEnum, FlightTypeEnum, TravelTypeEnum}
import com.fasterxml.jackson.databind.ObjectMapper
import com.fasterxml.jackson.module.scala.DefaultScalaModule
import org.apache.spark.streaming.dstream.DStream

/**
  * 數據分割封裝
  */
object QueryDataPackage {
  /**
    * 數據分割封裝
    *
    * @param lines * @return
    */
  def queryDataLoadAndPackage(lines: DStream[String]): DStream[ProcessedData] = {
    //使用 mapPartitions 減小包裝類的建立開銷
    lines.mapPartitions { partitionsIterator =>
      //建立 json 解析
      val mapper = new ObjectMapper
      mapper.registerModule(DefaultScalaModule)
      //將數據進行 map,一條條處理
      partitionsIterator.map { sourceLine =>
        //分割數據
        val dataArray = sourceLine.split("#CS#", -1)
        //原始數據,站位,並沒有數據
        val sourceData = dataArray(0)
        val requestMethod = dataArray(1)
        val request = dataArray(2)
        val remoteAddr = dataArray(3)
        val httpUserAgent = dataArray(4)
        val timeIso8601 = dataArray(5)
        val serverAddr = dataArray(6)
        val highFrqIPGroup: Boolean = dataArray(7).equalsIgnoreCase("true")
        val requestType: RequestType = RequestType(FlightTypeEnum.withName(dataArray(8)), BehaviorTypeEnum.withName(dataArray(9)))
        val travelType: TravelTypeEnum = TravelTypeEnum.withName(dataArray(10))
        val requestParams: CoreRequestParams = CoreRequestParams(dataArray(11), dataArray(12), dataArray(13))
        val cookieValue_JSESSIONID: String = dataArray(14)
        val cookieValue_USERID: String = dataArray(15)
        //分析查詢請求的時候不須要 book 數據
        val bookRequestData: Option[BookRequestData] = None
        //封裝 query 數據
        val queryRequestData = if (!dataArray(16).equalsIgnoreCase("NULL")) {
          mapper.readValue(dataArray(16), classOf[QueryRequestData]) match {
            case value if value != null => Some(value)
            case _ => None
          }
        } else {
          None
        }
        val httpReferrer = dataArray(18)
        //封裝流程數據,返回
        ProcessedData("", requestMethod, request, remoteAddr, httpUserAgent, timeIso8601, serverAddr, highFrqIPGroup, requestType, travelType, requestParams, cookieValue_JSESSIONID, cookieValue_USERID, queryRequestData, bookRequestData, httpReferrer)
      }
    }
  }
}

主程序:

//從inputStream中取出消息
    val dStream: DStream[String] = inputStream.map(_._2)
    //將消息轉換爲ProcessedData對象
    val processedDataDStream: DStream[ProcessedData] = QueryDataPackage.queryDataLoadAndPackage(dStream)

加載規則

從MySQL中獲取:1. 關鍵頁面 2. 黑名單IP 3. 流程規則

  1. 從數據庫中查詢到全部的流程
  2. 關聯查詢到每一個流程的策略分值
  3. 查詢每一個流程對應的規則信息
  4. 查詢每一個規則的真實名稱

    /**
    * 獲取流程列表
    * 參數n爲0爲反爬蟲流程
    參數n爲1爲防佔座流程

    * @return ArrayBuffer[FlowCollocation]
    */
    def createFlow(n:Int) :ArrayBuffer[FlowCollocation] = {
    var array = new ArrayBuffer[FlowCollocation]
    var sql:String = ""
    if(n == 0){ sql = "select nh_process_info.id,nh_process_info.process_name,nh_strategy.crawler_blacklist_thresholds from nh_process_info,nh_strategy where nh_process_info.id=nh_strategy.id and status=0"}
    else if(n == 1){sql = "select nh_process_info.id,nh_process_info.process_name,nh_strategy.occ_blacklist_thresholds from nh_process_info,nh_strategy where nh_process_info.id=nh_strategy.id and status=1"}

    var conn: Connection = null
     var ps: PreparedStatement = null
     var rs:ResultSet = null
     try{
       conn = c3p0Util.getConnection
       ps = conn.prepareStatement(sql)
       rs = ps.executeQuery()
       while (rs.next()) {
         val flowId = rs.getString("id")
         val flowName = rs.getString("process_name")
         if(n == 0){
           val flowLimitScore = rs.getDouble("crawler_blacklist_thresholds")
           array += new FlowCollocation(flowId, flowName,createRuleList(flowId,n), flowLimitScore, flowId)
         }else if(n == 1){
           val flowLimitScore = rs.getDouble("occ_blacklist_thresholds")
           array += new FlowCollocation(flowId, flowName,createRuleList(flowId,n), flowLimitScore, flowId)
         }
    
       }
     }catch{
       case e : Exception => e.printStackTrace()
     }finally {
       c3p0Util.close(conn, ps, rs)
     }
     array

    }

    /**
    * 獲取規則列表

    @param process_id 根據該ID查詢規則
    * @return list列表
    /
    def createRuleList(process_id:String,n:Int):List[RuleCollocation] = {
    var list = new ListBuffer[RuleCollocation]
    val sql = "select
    from(select nh_rule.id,nh_rule.process_id,nh_rules_maintenance_table.rule_real_name,nh_rule.rule_type,nh_rule.crawler_type,"+
    "nh_rule.status,nh_rule.arg0,nh_rule.arg1,nh_rule.score from nh_rule,nh_rules_maintenance_table where nh_rules_maintenance_table."+
    "rule_name=nh_rule.rule_name) as tab where process_id = '"+process_id + "'and crawler_type="+n
    //and status="+n
    var conn: Connection = null
    var ps: PreparedStatement = null
    var rs:ResultSet = null
    try{
    conn = c3p0Util.getConnection
    ps = conn.prepareStatement(sql)
    rs = ps.executeQuery()
    while ( rs.next() ) {
    val ruleId = rs.getString("id")
    val flowId = rs.getString("process_id")
    val ruleName = rs.getString("rule_real_name")
    val ruleType = rs.getString("rule_type")
    val ruleStatus = rs.getInt("status")
    val ruleCrawlerType = rs.getInt("crawler_type")
    val ruleValue0 = rs.getDouble("arg0")
    val ruleValue1 = rs.getDouble("arg1")
    val ruleScore = rs.getInt("score")
    val ruleCollocation = new RuleCollocation(ruleId,flowId,ruleName,ruleType,ruleStatus,ruleCrawlerType,ruleValue0,ruleValue1,ruleScore)
    list += ruleCollocation
    }
    }catch {
    case e : Exception => e.printStackTrace()
    }finally {
    c3p0Util.close(conn, ps, rs)
    }
    list.toList
    }

FlowCollocation``RuleCollocation須要從反扒參考資料\工具包\ruleComputeBean中拷貝到項目中

將流程信息放入廣播變量

//將流程數據加載到廣播變量
    val flowCollocations: ArrayBuffer[FlowCollocation] = AnalyzeRuleDB.createFlow()
    @volatile var flowCollocationsBroadcast: Broadcast[ArrayBuffer[FlowCollocation]] = sc.broadcast(flowCollocations)

更新廣播變量

//更新流程的廣播變量flowCollocationsBroadcast
      var flowCollocationChangeFlag: String = jedis.get("flowCollocationChangeFlag")
      //先判斷classifyRuleChangeFlag是否爲空
      if (StringUtils.isBlank(flowCollocationChangeFlag)){
        flowCollocationChangeFlag = "true"
        //從新設置到Redis中
        jedis.set("flowCollocationChangeFlag", flowCollocationChangeFlag)
      }
      if (flowCollocationChangeFlag.toBoolean) {
        flowCollocationsBroadcast.unpersist()
        //將黑名單數據加載到廣播變量
        val flowCollocations: ArrayBuffer[FlowCollocation] = AnalyzeRuleDB.createFlow()
        flowCollocationsBroadcast = sc.broadcast(flowCollocations)

        flowCollocationChangeFlag = "false"
        //從新設置到Redis中
        jedis.set("flowCollocationChangeFlag", flowCollocationChangeFlag)
      }

規則計算

IP段指標計算

package com.air.antispider.stream.rulecompute.businessprocess

import com.air.antispider.stream.common.bean.ProcessedData
import org.apache.spark.rdd.RDD

/**
  * 按照不一樣的維度進行計算的工具類
  */
object CoreRule {
  /**
    * IP段指標計算
    * @param processedDataRDD
    */
  def ipBlockCount(processedDataRDD: RDD[ProcessedData]): collection.Map[String, Int] = {
    val mapRDD: RDD[(String, Int)] = processedDataRDD.map(processedData => {
      //獲取客戶端IP 192.168.80.81
      val ip: String = processedData.remoteAddr
      //獲取IP的前2位, 192.168
      val arr: Array[String] = ip.split("\\.")
      if (arr.length == 4) {
        //表明這是一個完整的IP
        val ipBlock = arr(0) + "." + arr(1)
        //(ip段, 1)
        (ipBlock, 1)
      } else {
        ("", 1)
      }
    })
    //按照IP段進行分組,聚合計算
    val resultRDD: RDD[(String, Int)] = mapRDD.reduceByKey((x, y) => x + y)
    //將結果採集爲Map類型返回
    resultRDD.collectAsMap()
  }
}

主程序:

//開始根據各個指標維度進行計算
  //計算IP段的訪問量
val ipBlockCountMap: collection.Map[String, Int] = CoreRule.ipBlockCount(processedDataRDD)

IP訪問量

代碼:

/**
    * 計算IP5分鐘訪問量
    * @param processedDataRDD
    * @return
    */
  def ipCount(processedDataRDD: RDD[ProcessedData]): collection.Map[String, Int] = {
    processedDataRDD.map(processedData => {
      val ip: String = processedData.remoteAddr
      //(ip, 次數)
      (ip, 1)
    })
      //累加
      .reduceByKey(_ + _)
      //採集數據
      .collectAsMap()
  }

IP對關鍵頁面的訪問量

/**
    * 計算IP訪問關鍵頁面的次數
    * @param processedDataRDD
    * @param criticalPagesList
    * @return
    */
  def ipCriticalPagesCount(processedDataRDD: RDD[ProcessedData], criticalPagesList: ArrayBuffer[String]): collection.Map[String, Int] = {
    processedDataRDD.map(processedData => {
      val ip: String = processedData.remoteAddr
      val url: String = processedData.request
      //定義訪問次數,默認爲0次
      var count = 0
      for (criticalPages <- criticalPagesList) {
        if (url.matches(criticalPages)){
          //若是匹配上,表明訪問了1次關鍵頁面
          count = 1
        }
      }
      (ip, count)
    })
      //累加
      .reduceByKey(_ + _)
      //採集數據
      .collectAsMap()
  }

IP攜帶不一樣UA的個數

/**
    * 計算IP5分鐘攜帶不一樣UA的個數
    * @param processedDataRDD
    * @return
    */
  def ipUACount(processedDataRDD: RDD[ProcessedData]): collection.Map[String, Int] = {
    //將processedData轉換爲(ip, ua)的格式
    val mapData: RDD[(String, String)] = processedDataRDD.map(processedData => {
      val ip: String = processedData.remoteAddr
      val ua: String = processedData.httpUserAgent
      (ip, ua)
    })
    //(ip, ua) => (ip, (ua1, ua2, ua1))的格式
    val groupRDD: RDD[(String, Iterable[String])] = mapData.groupByKey()
    //將(ip, (ua1, ua2, ua1))的格式 轉換爲 (ip, 次數)的格式
    groupRDD.map(line => {
      val ip: String = line._1
      val sourceData: Iterable[String] = line._2
      //建立一個Set集合,將原始的數據放入集合中,去重
      var set = Set[String]()
      for (ua <- sourceData) {
        //將ua放入set集合
        set += ua
      }
      (ip, set.size)
    })
      .collectAsMap()

  }

IP訪問關鍵頁面最小時間間隔

/**
    * 計算IP5分鐘訪問關鍵頁面最小時間間隔
    *
    * @param processedDataRDD
    * @param criticalPagesList
    * @return
    */
  def ipCriticalPagesMinTimeCount(processedDataRDD: RDD[ProcessedData], criticalPagesList: ArrayBuffer[String]): collection.Map[String, Long] = {
    //先過濾出關鍵頁面
    processedDataRDD
      //過濾
      .filter(processedData => {
      val url: String = processedData.request
      //定義訪問次數,默認爲0次
      var count = 0
      for (criticalPages <- criticalPagesList) {
        if (url.matches(criticalPages)) {
          //若是匹配上,表明訪問了1次關鍵頁面
          count = 1
        }
      }
      //若是count == 1,表明當前訪問的是關鍵頁面,返回true
      if (count == 0) {
        false
      } else {
        true
      }
    })
      //轉換,獲取(ip,時間戳)
      .map(processedData => {
        val ip: String = processedData.remoteAddr
        val time: String = processedData.timeIso8601
        //time的格式2019-06-29T08:46:56+08:00
        val timeStamp: Long = FastDateFormat.getInstance("yyyy-MM-dd'T'HH:mm:ss").parse(time).getTime
        (ip, timeStamp)
      })
      //分組(ip,(時間1,時間2,時間3...))
      .groupByKey()
      //轉換,爲了獲取(IP,最小時間差)
      .map(line => {
        val ip: String = line._1
        //封裝全部時間的迭代器對象
        val sourceData: Iterable[Long] = line._2
        //將迭代器對象轉換爲Array
        val sourceArray: Array[Long] = sourceData.toArray
        //將原始數據進行排序
        util.Arrays.sort(sourceArray)
        //定義一個用於存儲差值的集合
        var resultArray = new ArrayBuffer[Long]()
        for (i <- 0 until sourceArray.size - 1) {
          //當前元素
          val currentTime: Long = sourceArray(i)
          //下一個元素
          val nexTime: Long = sourceArray(i + 1)
          val result = nexTime - currentTime
          //將差值存入集合
          resultArray += result
        }
        //將差值結果進行排序
        val array: Array[Long] = resultArray.toArray
        util.Arrays.sort(array)
        (ip, array(0))
      })
      //採集數據
      .collectAsMap()
  }

IP訪問關鍵頁面時間間隔小於預設時間的次數

代碼:

/**
    * 計算IP5分鐘訪問關鍵頁面最小時間間隔小於預設值的次數
    * @param processedDataRDD
    * @param criticalPagesList
    * @return
    */
  def ipCriticalPagesMinNumCount(processedDataRDD: RDD[ProcessedData], criticalPagesList: ArrayBuffer[String]): collection.Map[(String, String), Int] = {
    //先過濾出關鍵頁面
    processedDataRDD
      //過濾
      .filter(processedData => {
      val url: String = processedData.request
      //定義訪問次數,默認爲0次
      var count = 0
      for (criticalPages <- criticalPagesList) {
        if (url.matches(criticalPages)) {
          //若是匹配上,表明訪問了1次關鍵頁面
          count = 1
        }
      }
      //若是count == 1,表明當前訪問的是關鍵頁面,返回true
      if (count == 0) {
        false
      } else {
        true
      }
    })
      //轉換,獲取((IP, URL),時間戳)
      .map(processedData => {
      val ip: String = processedData.remoteAddr
      val url: String = processedData.request
      val time: String = processedData.timeIso8601
      //time的格式2019-06-29T08:46:56+08:00
      val timeStamp: Long = FastDateFormat.getInstance("yyyy-MM-dd'T'HH:mm:ss").parse(time).getTime
      ((ip, url), timeStamp)
    })
      //分組((IP, URL),(時間1,時間2,時間3...))
      .groupByKey()
      //轉換,爲了獲取(IP,最小時間差)
      .map(line => {
      val key: (String, String) = line._1
      //封裝全部時間的迭代器對象
      val sourceData: Iterable[Long] = line._2
      //將迭代器對象轉換爲Array
      val sourceArray: Array[Long] = sourceData.toArray
      //將原始數據進行排序
      util.Arrays.sort(sourceArray)
      //定義一個用於存儲差值的集合
      var resultArray = new ArrayBuffer[Long]()
      for (i <- 0 until sourceArray.size - 1) {
        //當前元素
        val currentTime: Long = sourceArray(i)
        //下一個元素
        val nexTime: Long = sourceArray(i + 1)
        val result = nexTime - currentTime
        //將小於預設值的差值存入集合(此處直接寫死5秒鐘)
        if (result < 5000) {
          resultArray += result
        }
      }
      //返回((ip, url), 次數)
      (key, resultArray.size)
    })
      .collectAsMap()
  }

計算IP5分鐘查詢不一樣航班的次數

/**
    * 計算IP5分鐘查詢不一樣航班的次數
    * @param processedDataRDD
    * @return
    */
  def ipCityCount(processedDataRDD: RDD[ProcessedData]): collection.Map[String, Int] = {
    //(ip , 出發地->目的地)
    processedDataRDD.map(line => {
      val ip: String = line.remoteAddr
      //出發地
      val depcity: String = line.requestParams.depcity
      //目的地
      val arrcity: String = line.requestParams.arrcity
      (ip, depcity + "->" + arrcity)
    })
      .groupByKey()
      //(ip, 不一樣城市的次數)
      .map(line => {
        val ip: String = line._1
        val sourceCitys: Iterable[String] = line._2
        //定義Set集合實現去重
        var set = Set[String]()
        //循環,去重
        for (city <- sourceCitys) {
          set += city
        }
        (ip, set.size)
      })
      .collectAsMap()
  }

IP5分鐘攜帶不一樣Cookie的數量

/**
  * 計算IP5分鐘攜帶不一樣Cookie的數量
  * @param processedDataRDD
  * @param criticalPagesList
  * @return
  */
def ipCookieCount(processedDataRDD: RDD[ProcessedData], criticalPagesList: ArrayBuffer[String]): collection.Map[String, Int] = {
  //先過濾出關鍵頁面
  processedDataRDD
    //過濾
    .filter(processedData => {
    val url: String = processedData.request
    //定義訪問次數,默認爲0次
    var count = 0
    for (criticalPages <- criticalPagesList) {
      if (url.matches(criticalPages)) {
        //若是匹配上,表明訪問了1次關鍵頁面
        count = 1
      }
    }
    //若是count == 1,表明當前訪問的是關鍵頁面,返回true
    if (count == 0) {
      false
    } else {
      true
    }
  })
    //(ip , jSessionID)
    .map(line => {
      val ip: String = line.remoteAddr
      //SessionID
      val sessionID: String = line.cookieValue_JSESSIONID
      (ip, sessionID)
    })
      .groupByKey()
      //(ip, (sID1, sID2, sID1))
      .map(line => {
      val ip: String = line._1
      val sourceSessionIDs: Iterable[String] = line._2
      //定義Set集合實現去重
      var set = Set[String]()
      //循環,去重
      for (sessionID <- sourceSessionIDs) {
        set += sessionID
      }
      (ip, set.size)
    })
      .collectAsMap()
}

黑名單打分計算

從數據庫中加載到流程的相關信息,裏面包含每一個流程本身的規則列表,咱們已經計算好了每一個規則的數量,只須要和數據庫的規則進行比對就能夠得出超出範圍指標打分的列表,以及開啓規則的打分列表

代碼:

package com.air.antispider.stream.rulecompute.businessprocess

import com.air.antispider.stream.common.bean.{FlowCollocation, ProcessedData, RuleCollocation}
import com.air.antispider.stream.rulecompute.bean.{AntiCalculateResult, FlowScoreResult}
import org.apache.spark.rdd.RDD

import scala.collection.mutable.ArrayBuffer

object RuleUtil {



  /**
    * 經過各個規則計算流程最終結果
    *
    * @param processedDataRDD
    * @param ipBlockCountMap
    * @param ipCountMap
    * @param ipCriticalPagesMap
    * @param ipUAMap
    * @param ipCriticalPagesMinTimeMap
    * @param ipCriticalPagesMinNumMap
    * @param ipCityCountMap
    * @param ipCookieCountMap
    * @param flowCollocationList
    */
  def calculateAntiResult(
                           processedDataRDD: RDD[ProcessedData],
                           ipBlockCountMap: collection.Map[String, Int],
                           ipCountMap: collection.Map[String, Int],
                           ipCriticalPagesMap: collection.Map[String, Int],
                           ipUAMap: collection.Map[String, Int],
                           ipCriticalPagesMinTimeMap: collection.Map[String, Long],
                           ipCriticalPagesMinNumMap: collection.Map[(String, String), Int],
                           ipCityCountMap: collection.Map[String, Int],
                           ipCookieCountMap: collection.Map[String, Int],
                           flowCollocationList: ArrayBuffer[FlowCollocation]
                         ): RDD[AntiCalculateResult] = {

    //從map中獲取各個指標的數據
    processedDataRDD.map(processedData => {
      val ip: String = processedData.remoteAddr
      val url: String = processedData.request

      //獲取IP的前2位, 192.168
      val arr: Array[String] = ip.split("\\.")
      var ipBlock = ""
      if (arr.length == 4) {
        //表明這是一個完整的IP
        ipBlock = arr(0) + "." + arr(1)
      }
      //獲取IP段的值
      val ipBlockCounts: Int = ipBlockCountMap.getOrElse(ipBlock, 0)
      //獲取IP的值
      val ipCounts: Int = ipCountMap.getOrElse(ip, 0)
      //獲取關鍵頁面的值
      val ipCriticalPagesCounts: Int = ipCriticalPagesMap.getOrElse(ip, 0)
      val ipUACounts: Int = ipUAMap.getOrElse(ip, 0)
      //最小訪問時間間隔,若是獲取不到IP,給個Int最大值,不能給0
      val ipCriticalPagesMinTimeCounts: Int = ipCriticalPagesMinTimeMap.getOrElse(ip, Integer.MAX_VALUE).toInt
      val ipCriticalPagesMinNumCounts: Int = ipCriticalPagesMinNumMap.getOrElse((ip, url), 0)
      val ipCityCounts: Int = ipCityCountMap.getOrElse(ip, 0)
      val ipCookieCounts: Int = ipCookieCountMap.getOrElse(ip, 0)

      //定義map封裝規則分值信息
      val map = Map[String, Int](
        "ipBlock" -> ipBlockCounts,
        "ip" -> ipCounts,
        "criticalPages" -> ipCriticalPagesCounts,
        "userAgent" -> ipUACounts,
        "criticalPagesAccTime" -> ipCriticalPagesMinTimeCounts,
        "criticalPagesLessThanDefault" -> ipCriticalPagesMinNumCounts,
        "flightQuery" -> ipCityCounts,
        "criticalCookies" -> ipCookieCounts
      )


      val flowsScore: Array[FlowScoreResult] = computeFlowScore(map, flowCollocationList)



      AntiCalculateResult(
        processedData,
        ip,
        ipBlockCounts,
        ipCounts,
        ipCriticalPagesCounts,
        ipUACounts,
        ipCriticalPagesMinTimeCounts,
        ipCriticalPagesMinNumCounts,
        ipCityCounts,
        ipCookieCounts,
        null
      )
    })
  }

  /**
    * 開始計算,獲取最終計算結果
    * @param map
    * @param flowCollocationList
    * @return
    */
  def computeFlowScore(map: Map[String, Int], flowCollocationList: ArrayBuffer[FlowCollocation]): Array[FlowScoreResult] = {
    //由於傳過來的flowCollocationList表明多個流程,因此先循環流程
    for (flow <- flowCollocationList) {
      //經過flow,獲取該流程下的規則
      val rules: List[RuleCollocation] = flow.rules

      //定義集合存儲超出範圍的規則得分信息
      var array1 = new ArrayBuffer[Int]()
      //定義超出範圍,而且處於開啓狀態的得分信息
      var array2 = new ArrayBuffer[Int]()

      for (rule <- rules) {
        val ruleName: String = rule.ruleName
        val num: Int = map.getOrElse(ruleName, 0)
        //若是數據庫名稱和計算結果名稱同樣,開始比較大小
        if (num > rule.ruleValue0) {
          //若是當前計算結果超出了數據庫配置好的閾值範圍,那麼就命中該規則
          //將得分放入集合
          array1 += rule.ruleScore
          if (rule.ruleStatus == 0){
            //若是當前規則狀態爲開啓狀態
            array2 += rule.ruleScore
          }
        }
      }

//      val result = xxx(array1, array2)


    }
    null
  }

}
相關文章
相關標籤/搜索