spark streaming 使用geoIP解析IP

一、首先將GEOIP放到服務器上,如,/opt/db/geo/GeoLite2-City.mmdbjava

二、新建scala sbt工程,測試是否能夠順利解析node

import java.io.File
import java.net.InetAddress
import com.maxmind.db.CHMCache
import com.maxmind.geoip2.DatabaseReader
import org.json4s.DefaultFormats

/**
* Created by zxh on 2016/7/17.
*/
object test {
implicit val formats = DefaultFormats

def main(args: Array[String]): Unit = {
val url = "F:\\Code\\OpenSource\\Data\\spark-sbt\\src\\main\\resources\\GeoLite2-City.mmdb"
// val url2 = "/opt/db/geo/GeoLite2-City.mmdb"
val geoDB = new File(url);
geoDB.exists()
val geoIPResolver = new DatabaseReader.Builder(geoDB).withCache(new CHMCache()).build();
val ip = "222.173.17.203"
val inetAddress = InetAddress.getByName(ip)
val geoResponse = geoIPResolver.city(inetAddress)
val (country, province, city) = (geoResponse.getCountry.getNames.get("zh-CN"), geoResponse.getSubdivisions.get(0).getNames().get("zh-CN"), geoResponse.getCity.getNames.get("zh-CN"))

println(s"country:$country,province:$province,city:$city")
}
}
build.sbt 內容以下
import AssemblyKeys._ assemblySettings mergeStrategy in assembly <<= (mergeStrategy in assembly) { mergeStrategy => { case entry => { val strategy = mergeStrategy(entry) if (strategy == MergeStrategy.deduplicate) MergeStrategy.first else strategy } } } assemblyOption in assembly := (assemblyOption in assembly).value.copy(includeScala = false) name := "scala_sbt" version := "1.0" scalaVersion := "2.10.4" libraryDependencies += "com.maxmind.geoip2" % "geoip2" % "2.5.0"

  將該程序打包,放到服務器上,執行scala -cp ./scala_sbt-assembly-1.0.jar test,解析結果以下apache

country:中國,province:山東省,city:濟南json

三、編寫streaming程序服務器

import java.io.File
import java.net.InetAddress

import com.maxmind.db.CHMCache
import com.maxmind.geoip2.DatabaseReader
import com.maxmind.geoip2.model.CityResponse
import org.apache.spark.rdd.RDD
import org.apache.spark.streaming.{Time, Seconds, StreamingContext}
import org.apache.spark.{SparkContext, SparkConf}

/**
* Created by zxh on 2016/7/17.
*/
object geoip {

def main(args: Array[String]): Unit = {
val conf = new SparkConf().setAppName("geoip_test").setMaster("local[2]")
val sc = new SparkContext(conf)
val ssc = new StreamingContext(sc, Seconds(10))
val lines = ssc.socketTextStream("localhost", 9999)

lines.foreachRDD((rdd: RDD[String], t: Time) => {
rdd.foreachPartition(p => {
val url2 = "/opt/db/geo/GeoLite2-City.mmdb"
val geoDB = new File(url2);
val geoIPResolver = new DatabaseReader.Builder(geoDB).withCache(new CHMCache()).build();

def resolve_ip(resp: CityResponse): (String, String, String) = {
(resp.getCountry.getNames.get("zh-CN"), resp.getSubdivisions.get(0).getNames().get("zh-CN"), resp.getCity.getNames.get("zh-CN"))
}

p.foreach(x => {
if (x != None && x != null && x != "") {
val inetAddress = InetAddress.getByName(x)
val geoResponse = geoIPResolver.city(inetAddress)
println(resolve_ip(geoResponse))
}
})
})
})

ssc.start
}
}
build.sbt

libraryDependencies += "com.maxmind.geoip2" % "geoip2" % "2.5.0"

注意:紅色部分須要放到foreachPartition內部,緣由以下:socket

一、減小加載文件次數,一個Partition只加載一次函數

二、resolve_ip 函數參數爲CityResponse,此參數不可序列化,因此要在Partition內部,這樣就不會在節點之間序列化傳輸測試

三、com.maxmind.geoip2 版本須要是 2.5.0,以便和spark自己兼容,不然會報錯以下:ui

val geoIPResolver = new DatabaseReader.Builder(geoDB).withCache(new CHMCache()).build();
java.lang.NoSuchMethodError: com.fasterxml.jackson.databind.node.ArrayNode.<init>(Lcom/fasterxml/jackson/databind/node/JsonNodeFactory;Ljava/util/List;)Vurl

相關文章
相關標籤/搜索