隨着人口紅利的慢慢削減,互聯網產品的廝殺越發激烈,你們開始看好下沉市場的潛力,拼多多,趣頭條等廠商經過拉新獎勵,購物優惠等政策率先搶佔用戶,壯大起來。其餘各廠商也緊隨其後,紛紛推出本身產品的極速版,現在日頭條極速版,騰訊新聞極速版等,也經過拉新獎勵,閱讀獎勵等政策來吸引用戶。redis
對於這類APP,實時風控是必不可少的,一個比較常見的實時風控場景就是防刷接口做弊。刷接口是黑產的一種做弊手段,APP上的各類操做,通常都會對應後臺的某個接口,用戶操做APP數據就會經過接口上報到後臺,但若是黑產經過破解獲取到了APP的新增用戶接口,那他們就能跳過登錄APP步驟直接調後臺接口構造虛假數據牟利了。對於這類業務,咱們能夠經過Flink + Redis來實現實時防刷接口的功能。數據流圖以下所示:
刷接口做弊通常是越過登錄APP操做,直接調Server端的接口發數據,這些用戶在APP的上報日誌裏面就不存在,那咱們能夠經過Flink將APP實時上報上來的新增用戶寫入Redis中,而後Server端將接口上報上來的用戶與Redis裏的用戶進行比對,若是不在Redis裏面則判爲刷接口用戶。shell
對於這個需求,得要求實時計算引擎能達到毫秒級延遲,不然會形成用戶的誤判和影響用戶體驗。爲此咱們選擇了Flink做爲實時計算引擎。apache
主要代碼邏輯以下:服務器
//配置flink運行環境 val env = StreamExecutionEnvironment.getExecutionEnvironment //val env = StreamExecutionEnvironment.createLocalEnvironment() env.enableCheckpointing(1000 * 60 * 5) env.getCheckpointConfig.setCheckpointingMode(CheckpointingMode.AT_LEAST_ONCE) env.getCheckpointConfig.setMinPauseBetweenCheckpoints(1000 * 60 * 3) env.getCheckpointConfig.setMaxConcurrentCheckpoints(1) env.setStateBackend(new FsStateBackend(checkPointPath)) env.getConfig.setLatencyTrackingInterval(1000) env.getConfig.registerTypeWithKryoSerializer(classOf[Log], classOf[ProtobufSerializer]) env.setStreamTimeCharacteristic(EventTime) env.setParallelism(parallel) env.getConfig.setLatencyTrackingInterval(1000) //kafka source,實時消費kafka中日誌解析出用戶id val stream = env.addSource(new FlinkKafkaConsumer010[Array[Log]](topic, new LogDeserializationSchema(), properties)) val data = stream.flatMap(x => x) .map(log =>{ val userid = log.getUid.getUuid val current_time = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss").format(new Date()) (userid,current_time) }).filter(record=>{ val userid = record._1 var flag = false if(userid != null && !"".equals(userid)){ flag = true } flag }) //redis sink,將APP上報日誌的用戶id寫入redis供server端匹配 data.addSink(new RedisSink[(String, String)](getJedisClusterConfig, new RedisSinkMapper)) env.execute("newsinfo_active_userid_to_redis")
其中比較重要的幾點:微信
1 構造kafka source數據結構
val stream = env.addSource(new FlinkKafkaConsumer010[Array[Log]](topic, new LogDeserializationSchema(), properties))
通常APP上報的都是序列化的數據,咱們須要定義反序列化方法,LogDeserializationSchema 是一個protobuf類型的反序列化方法。app
//將kafka中的數據解析爲google protobuf 的Log,一個message可能包含多條Log class LogDeserializationSchema extends AbstractDeserializationSchema[Array[Log]] { override def deserialize(message: Array[Byte]): Array[Log] = { val data = ArrayBuffer[Log]() val input = new ByteArrayInputStream(message) while (input.available() > 0) { try { data += Log.parseDelimitedFrom(input) } catch { case _: Throwable => } } input.close() data.toArray } }
2 redis sinkide
這裏用的是網上開源的flink-connector-redis依賴庫。
更多相關內容見:http://bahir.apache.org/docs/flink/current/flink-streaming-redis大數據
Maven依賴以下ui
<dependency> <groupId>org.apache.bahir</groupId> <artifactId>flink-connector-redis_2.11</artifactId> <version>1.1-SNAPSHOT</version> </dependency>
Redis Sink 提供用於向Redis發送數據的接口的類。接收器可使用三種不一樣的方法與不一樣類型的Redis環境進行通訊:
Redis Sink 核心類是 RedisMappe 是一個接口,使用時咱們要編寫本身的redis操做類實現這個接口中的三個方法,以下所示:
class RedisExampleMapper extends RedisMapper[(String, String)]{ override def getCommandDescription: RedisCommandDescription = { new RedisCommandDescription(RedisCommand.HSET, "HASH_NAME") } override def getKeyFromData(data: (String, String)): String = data._1 override def getValueFromData(data: (String, String)): String = data._2 } val conf = new FlinkJedisPoolConfig.Builder().setHost("127.0.0.1").build() stream.addSink(new RedisSink[(String, String)](conf, new RedisExampleMapper))
使用RedisCommand設置數據結構類型時和redis結構對應關係。
以上咱們利用 Flink + Redis 實時了一個基本的實時防刷接口模型。關注微信公衆號《大數據技術進階》,觀看更多大數據實戰文章。