pyspark streaming6.1官方文檔:html
http://spark.apache.org/docs/latest/api/python/pyspark.streaming.html#pyspark.streaming.StreamingContext.checkpointpython
streaming 滑動窗口介紹:nginx
http://ju.outofmemory.cn/entry/96018redis
目的:flume收集到nginx日誌-->kafka-> spark streaming 統計訪問次數過多的ip -> ip寫入redis->ip處理防採集apache
#! /usr/bin/env python # encoding: utf-8 '''目的:讀取flume收集到的nginx的kafka數據 -> 解析 -> 統計訪問次數過多的ip -> 返回driver寫入redis->ip處理防採集 ''' from pyspark import SparkContext from pyspark.streaming import StreamingContext from pyspark.streaming.kafka import KafkaUtils import re,json import redis,sys,socket,traceback,os class RedisClient: pool = None def __init__(self,redisIp,redisPort,redisDB): self.redisIp=redisIp self.redisPort=redisPort self.redisDB=redisDB self.getRedisPool(redisIp,redisPort,redisDB) def getRedisPool(self,redisIp,redisPort,redisDB): self.pool = redis.ConnectionPool(host=redisIp, port=redisPort, db=redisDB) return self.pool def insertRedis(self, key,field, value): if self.pool is None: self.pool = self.getRedisPool(self.redisIp,self.redisPort,self.redisDB) r = redis.Redis(connection_pool=self.pool) r.hset(key, field, value) def expire(self,key,times): if self.pool is None: self.pool = self.getRedisPool(self.redisIp,self.redisPort,self.redisDB) r = redis.Redis(connection_pool=self.pool) r.expire(key,times) def check_spider(ip): '''判斷是否爲蜘蛛''' try: result = socket.gethostbyaddr(ip) print result if result: return True else: return False except: return False if __name__ == '__main__': sc = SparkContext(appName="pyspark kafka-streaming-redis") # print(sc.version) lengthtime=int(sys.argv[1]) #窗口的大小 ,最近多少秒的數據 slicetime=int(sys.argv[-1]) #窗口滑動的頻率,時間切片 # print "----------------",lengthtime,slicetime # 建立Spark Streaming Context,每隔多少秒處理一批數據 ssc = StreamingContext(sc,slicetime) paths='/tmp/checkpoint'#程序自動建目錄 ssc.checkpoint(paths)#緩存機制 kstream = KafkaUtils.createDirectStream(ssc=ssc,topics=['statis-detailinfo-collect'],kafkaParams={"group.id":"gyf_test","auto.offset.reset":"largest","metadata.broker.list":"172.17.13.100:9092,172.17.13.101:9092"}) #info[1]爲本身真實數據 info[0] sparkstream自帶 ipcount = kstream.map(lambda info:json.loads(info[1])["ip"]).filter(lambda ip: ip!='').map(lambda ip:(ip,1)).reduceByKeyAndWindow(lambda x,y: x+y,lengthtime,slicetime).filter(lambda x:x[1]>100) ipcount.pprint() def rdd_handle(rdd): r = RedisClient("192.168.8.177",6379,0) ip_list=rdd.collect() ip_list=[i for i in ip_list if not check_spider(i[0])]#不是蜘蛛的爲採集ip for ip,count in ip_list: if count>=100 and count<500 : r.insertRedis("ip_100_count",ip,count) elif count>=500 and count <1000: r.insertRedis("ip_500_count",ip,count) elif count>1000: r.insertRedis("ip_1000_count",ip,count) ipcount.foreachRDD(rdd_handle) ssc.start()#開始計算 ssc.awaitTermination()#等待計算結果 ######### 運行 #####spark-streaming-kafka-assembly_2.10-1.6.1.jar的版本要和spark的版本一致 #每隔兩秒鐘 統計前180秒ip的訪問次數 # spark-submit --jars spark-streaming-kafka-assembly_2.10-1.6.1.jar test.py 180 2