spark streaming 讀取kafka

pyspark streaming6.1官方文檔:html

http://spark.apache.org/docs/latest/api/python/pyspark.streaming.html#pyspark.streaming.StreamingContext.checkpointpython

streaming 滑動窗口介紹:nginx

http://ju.outofmemory.cn/entry/96018redis

目的:flume收集到nginx日誌-->kafka-> spark streaming 統計訪問次數過多的ip -> ip寫入redis->ip處理防採集apache

#! /usr/bin/env python
# encoding: utf-8
'''目的:讀取flume收集到的nginx的kafka數據 -> 解析 -> 統計訪問次數過多的ip -> 返回driver寫入redis->ip處理防採集
'''
from pyspark import SparkContext
from pyspark.streaming import StreamingContext
from pyspark.streaming.kafka import KafkaUtils
import re,json
import redis,sys,socket,traceback,os
class RedisClient:
    pool = None
    def __init__(self,redisIp,redisPort,redisDB):
        self.redisIp=redisIp
        self.redisPort=redisPort
        self.redisDB=redisDB
        self.getRedisPool(redisIp,redisPort,redisDB)
    def getRedisPool(self,redisIp,redisPort,redisDB):
        self.pool = redis.ConnectionPool(host=redisIp, port=redisPort, db=redisDB)
        return self.pool
    def insertRedis(self, key,field, value):
        if self.pool is None:
            self.pool = self.getRedisPool(self.redisIp,self.redisPort,self.redisDB)
        r = redis.Redis(connection_pool=self.pool)
        r.hset(key, field, value)
    def expire(self,key,times):
        if self.pool is None:
            self.pool = self.getRedisPool(self.redisIp,self.redisPort,self.redisDB)
        r = redis.Redis(connection_pool=self.pool)
        r.expire(key,times)
def check_spider(ip):
    '''判斷是否爲蜘蛛'''
    try:
        result = socket.gethostbyaddr(ip)
        print result
        if result:
            return True
        else:
            return False
    except:
        return False

if __name__ == '__main__':
    sc = SparkContext(appName="pyspark kafka-streaming-redis")
    # print(sc.version)
    lengthtime=int(sys.argv[1]) #窗口的大小 ,最近多少秒的數據
    slicetime=int(sys.argv[-1])  #窗口滑動的頻率,時間切片
    # print "----------------",lengthtime,slicetime
    # 建立Spark Streaming Context,每隔多少秒處理一批數據
    ssc = StreamingContext(sc,slicetime)
    paths='/tmp/checkpoint'#程序自動建目錄
    ssc.checkpoint(paths)#緩存機制
    kstream = KafkaUtils.createDirectStream(ssc=ssc,topics=['statis-detailinfo-collect'],kafkaParams={"group.id":"gyf_test","auto.offset.reset":"largest","metadata.broker.list":"172.17.13.100:9092,172.17.13.101:9092"})
    #info[1]爲本身真實數據  info[0] sparkstream自帶
    ipcount = kstream.map(lambda info:json.loads(info[1])["ip"]).filter(lambda ip: ip!='').map(lambda ip:(ip,1)).reduceByKeyAndWindow(lambda x,y: x+y,lengthtime,slicetime).filter(lambda x:x[1]>100)
    ipcount.pprint()
    def rdd_handle(rdd):
        r = RedisClient("192.168.8.177",6379,0)
        ip_list=rdd.collect()
        ip_list=[i for i in ip_list  if not check_spider(i[0])]#不是蜘蛛的爲採集ip
        for ip,count in ip_list:
            if count>=100 and count<500 :
                r.insertRedis("ip_100_count",ip,count)
            elif count>=500 and count <1000:
                r.insertRedis("ip_500_count",ip,count)
            elif count>1000:
                r.insertRedis("ip_1000_count",ip,count)
    ipcount.foreachRDD(rdd_handle)
    ssc.start()#開始計算
    ssc.awaitTermination()#等待計算結果


######### 運行 #####spark-streaming-kafka-assembly_2.10-1.6.1.jar的版本要和spark的版本一致
#每隔兩秒鐘 統計前180秒ip的訪問次數
# spark-submit   --jars spark-streaming-kafka-assembly_2.10-1.6.1.jar test.py 180 2
相關文章
相關標籤/搜索