使用Hive UDF和GeoIP庫爲Hive加入IP識別功能

Hive是基於Hadoop的數據管理系統,做爲分析人員的即時分析工具和ETL等工做的執行引擎,對於現在的大數據管理與分析、處理有着很是大的 意義。GeoIP是一套IP映射數據庫,它定時更新,而且提供了各類語言的API,很是適合在作地域相關數據分析時的一個數據源。javascript

Precondition:經過 IP 地址得到用戶的地理位置信息
也就是根據用戶的IP,經過IP數據庫查詢得到信息。 通常IP數據庫中,

每條記錄的基本結構java

IP地址段(起始、結束),以及對應的信息數據
通常包含的信息:國家、區域(省/州)、城市、街道、經緯度、ISP提供商等信息

由於IP數據庫隨着時間常常變化(不過一段時間內變化很小),因此須要有人常常維護和更新。這個數據也不可能徹底準確、也不可能覆蓋全。這是maxmind的城市準確度 http://www.maxmind.com/app/city_accuracy
由於沒有權威的數據組織機構,且常常有變化。各家數據供應商,基本上作着作着就造成本身的一套數據了。

目前,國內用的比較有名的是「純真IP數據庫」,國外經常使用的是 maxmind、ip2location。

IP數據庫是否收費:收費、免費都有。通常有人維護的數據每每都是收費的,準確率和覆蓋率會稍微高一些。

質量方面:
  1. 主要概念是準確率和覆蓋率。
  2. 記錄數據總條數。純真如今是38萬條(2010年07月30日更新)
  3. 是否有人維護。
  4. 數據庫更新頻率:每個月、每週。數據庫會按期更新的,maxmind開源版是每個月更新一次。

查詢形式:
  • 本地,將IP數據庫下載到本地使用,查詢效率高、性能好。經常使用在統計分析方面。具體形式又分爲:
    • 內存查詢:將所有數據直接加載到內存中,便於高性能查詢。或者二進制的數據文件自己就是通過優化的索引文件,能夠直接對文件作查詢。
    • 數據庫查詢:將數據導入到數據庫,再用數據庫查詢。效率沒有內存查詢快。
  • 遠程(web service或ajax),調用遠程第三方服務。查詢效率天然比較低,通常用在網頁應用中。
查詢的本質:輸入一個IP,找到其所在的IP段,通常都是採用二分搜索實現的。


是否提供API:有的IP數據庫提供API,支持多語言(java、javascript、C#等),這樣你就不用本身直接分析數據格式、整理、寫查詢代碼了。

是否提供經緯度:純真IP數據庫不提供經緯度,Maxmind提供,若是作地圖應用,通常是須要經緯度的


而UDF是Hive提供的用戶自定義函數的接口,經過實現它能夠擴展Hive目前已有的內置函數。而爲Hive加入一個IP映射函數,咱們只須要簡單地在UDF中調用GeoIP的Java API便可。git

GeoIP的數據文件能夠從這裏下載:http://www.maxmind.com/download/geoip/database/,因爲需 要國家和城市的信息,我這裏下載的是http://www.maxmind.com/download/geoip/database /GeoLiteCity.dat.gzweb

 

GeoIP的各類語言的API能夠從這裏下載:http://www.maxmind.com/download/geoip/api/ajax

 

操做Steps以下:shell

Step 1:Hive所需添加的IP地址信息識別UDF函數以下:數據庫

package org.hadoop.hive.additionalUDF;

import java.io.File;
import java.io.IOException;
import org.apache.hadoop.hive.ql.exec.UDF;

import com.maxmind.geoip.Location;
import com.maxmind.geoip.LookupService;
import com.maxmind.geoip.regionName;
import com.maxmind.geoip.timeZone;

import java.util.regex.*;

public class IPToCC  extends UDF {
    private static LookupService cl = null;
    private static String ipPattern = "\\d+\\.\\d+\\.\\d+\\.\\d+";
    private static String ipNumPattern = "\\d+";
    
    static LookupService getLS(String dbfile) throws IOException{
        
        //String sep = System.getProperty("file.separator");
        //String dir = "/home/landen/UntarFile/GeoIP";

        //String dbfile = dir + sep + "GeoLiteCity.dat";
        //String dbfile = "GeoLiteCity.dat";
        if(new File(dbfile).exists())
        {
            if(cl == null)
            {
                cl = new LookupService(dbfile,LookupService.GEOIP_MEMORY_CACHE);
            }    
        }
        
        return cl;

    }
    
    /**
     * @param str like "114.43.181.143"
     * */
    
    public String evaluate(String str,String ipDBInfo) {
        try
        {
            Location l1 = null;
            Matcher mIP = Pattern.compile(ipPattern).matcher(str);
            Matcher mIPNum = Pattern.compile(ipNumPattern).matcher(str);
            if(mIP.matches())
                l1 = getLS(ipDBInfo).getLocation(str);
            else if(mIPNum.matches())
                l1 = getLS(ipDBInfo).getLocation(Long.parseLong(str));    
            
            /*System.out.println("countryCode: " + l1.countryCode +
                    "\n countryName: " + l1.countryName +
                    "\n region: " + l1.region +
                    "\n regionName: " + regionName.regionNameByCode(l1.countryCode, l1.region) +
                    "\n city: " + l1.city +
                    "\n latitude: " + l1.latitude +
                    "\n longitude: " + l1.longitude +
                    "\n timezone: " + timeZone.timeZoneByCountryAndRegion(l1.countryCode, l1.region));*/
            
            return String.format("%s\t%s\t%s\t%s\t%s\t%s\t%s\t%s",l1.countryCode,l1.countryName,l1.region,regionName.regionNameByCode(l1.countryCode, l1.region),l1.city,l1.latitude,l1.longitude,timeZone.timeZoneByCountryAndRegion(l1.countryCode, l1.region));
        }
        catch(Exception e)
        {
            e.printStackTrace();
            if(cl != null)
                cl.close();
            return null;
        }
    }
    
    public static void main(String[] args)
    {
        String dbfile = "GeoLiteCity.dat";
        IPToCC ipTocc = new IPToCC();
        String ipAdress = "221.12.10.218";
        
        System.out.println(ipTocc.evaluate(ipAdress,dbfile));
    }

}
Step 2.將以上程序和GeoIP的API程序,一塊兒打成JAR包IPToCC.jar,和數據文件(GeoLiteCity.dat)一塊兒放到Hive所在的服務器的一個位置。而後能夠按照如下兩種方式將以上資源添加到Hive中:
1> 打開Hive執行如下語句:
landen@Master:~/UntarFile/hive-0.10.0$ bin/hive
WARNING: org.apache.hadoop.metrics.jvm.EventCounter is deprecated. Please use org.apache.hadoop.log.metrics.EventCounter in all the log4j.properties files.
Logging initialized using configuration in jar:file:/home/landen/UntarFile/hive-0.10.0/lib/hive-common-0.10.0.jar!/hive-log4j.properties
Hive history file=/home/landen/UntarFile/hive-0.10.0/logs/hive_job_log_landen_201312081638_1930432077.txt
hive (default)> use stuchoosecourse;
OK
Time taken: 5.251 seconds
hive (stuchoosecourse)> add file /home/landen/UntarFile/GeoIP/GeoLiteCity.dat;
Added resource: /home/landen/UntarFile/GeoIP/GeoLiteCity.dat
hive (stuchoosecourse)> add jar /home/landen/UntarFile/hive-0.10.0/lib/IPTocc.jar;
Added /home/landen/UntarFile/hive-0.10.0/lib/IPTocc.jar to class path
Added resource: /home/landen/UntarFile/hive-0.10.0/lib/IPTocc.jar
hive (stuchoosecourse)> create temporary function IP4Tocc as 'org.hadoop.hive.additionalUDF.IPToCC';
OK
Time taken: 0.107 seconds
2> 在啓動hive shell命令前,在$HIVE_HOME/conf目錄下添加.hiverc文件,而後添加以下內容:
add file /home/landen/UntarFile/GeoIP/GeoLiteCity.dat;
add jar /home/landen/UntarFile/hive-0.10.0/lib/IPTocc.jar;
create temporary function IP4Tocc as 'org.hadoop.hive.additionalUDF.IPToCC';
當啓動hive shell命令後,hive會將加載.hiverc文件內容並添加到全局內容中,便於client使用

Step 3:Hive測試內容以下:
hive (stuchoosecourse)> select * from ipidentifier;
OK
ipadress
221.12.10.218
60.180.248.201
125.111.251.118
Time taken: 0.099 seconds
hive (stuchoosecourse)> select IP4Tocc(ipadress,'./GeoLiteCity.dat') from ipidentifier;
Total MapReduce jobs = 1
Launching Job 1 out of 1
Number of reduce tasks is set to 0 since there's no reduce operator
Starting Job = job_201312042044_0020, Tracking URL = http://Master:50030/jobdetails.jsp?jobid=job_201312042044_0020
Kill Command = /home/landen/UntarFile/hadoop-1.0.4/libexec/../bin/hadoop job  -kill job_201312042044_0020
Hadoop job information for Stage-1: number of mappers: 1; number of reducers: 0
2013-12-08 20:54:10,276 Stage-1 map = 0%,  reduce = 0%
2013-12-08 20:54:18,308 Stage-1 map = 100%,  reduce = 0%, Cumulative CPU 2.55 sec
2013-12-08 20:54:19,313 Stage-1 map = 100%,  reduce = 0%, Cumulative CPU 2.55 sec
2013-12-08 20:54:20,317 Stage-1 map = 100%,  reduce = 0%, Cumulative CPU 2.55 sec
2013-12-08 20:54:21,322 Stage-1 map = 100%,  reduce = 0%, Cumulative CPU 2.55 sec
2013-12-08 20:54:22,326 Stage-1 map = 100%,  reduce = 0%, Cumulative CPU 2.55 sec
2013-12-08 20:54:23,331 Stage-1 map = 100%,  reduce = 0%, Cumulative CPU 2.55 sec
2013-12-08 20:54:24,402 Stage-1 map = 100%,  reduce = 100%, Cumulative CPU 2.55 sec
MapReduce Total cumulative CPU time: 2 seconds 550 msec
Ended Job = job_201312042044_0020
MapReduce Jobs Launched:
Job 0: Map: 1   Cumulative CPU: 2.55 sec   HDFS Read: 306 HDFS Write: 188 SUCCESS
Total MapReduce CPU Time Spent: 2 seconds 550 msec
OK
_c0
CN    China    02    Zhejiang    Hangzhou    30.293594    120.16141    Asia/Shanghai
CN    China    02    Zhejiang    Wenzhou    27.999405    120.66681    Asia/Shanghai
CN    China    02    Zhejiang    Ningbo    29.878204    121.5495    Asia/Shanghai
hive (stuchoosecourse)> select split(IP4Tocc(ipadress,'./GeoLiteCity.dat'),'\t') from ipidentifier;
Total MapReduce jobs = 1
Launching Job 1 out of 1
Number of reduce tasks is set to 0 since there's no reduce operator
Starting Job = job_201312042044_0021, Tracking URL = http://Master:50030/jobdetails.jsp?jobid=job_201312042044_0021
Kill Command = /home/landen/UntarFile/hadoop-1.0.4/libexec/../bin/hadoop job  -kill job_201312042044_0021
Hadoop job information for Stage-1: number of mappers: 1; number of reducers: 0
2013-12-08 21:12:46,717 Stage-1 map = 0%,  reduce = 0%
2013-12-08 21:12:56,764 Stage-1 map = 100%,  reduce = 0%, Cumulative CPU 4.28 sec
2013-12-08 21:12:57,768 Stage-1 map = 100%,  reduce = 0%, Cumulative CPU 4.28 sec
2013-12-08 21:12:58,772 Stage-1 map = 100%,  reduce = 0%, Cumulative CPU 4.28 sec
2013-12-08 21:12:59,775 Stage-1 map = 100%,  reduce = 0%, Cumulative CPU 4.28 sec
2013-12-08 21:13:00,778 Stage-1 map = 100%,  reduce = 0%, Cumulative CPU 4.28 sec
2013-12-08 21:13:01,782 Stage-1 map = 100%,  reduce = 0%, Cumulative CPU 4.28 sec
2013-12-08 21:13:02,786 Stage-1 map = 100%,  reduce = 100%, Cumulative CPU 4.28 sec
MapReduce Total cumulative CPU time: 4 seconds 280 msec
Ended Job = job_201312042044_0021
MapReduce Jobs Launched:
Job 0: Map: 1   Cumulative CPU: 4.28 sec   HDFS Read: 306 HDFS Write: 188 SUCCESS
Total MapReduce CPU Time Spent: 4 seconds 280 msec
OK
_c0
["CN","China","02","Zhejiang","Hangzhou","30.293594","120.16141","Asia/Shanghai"]
["CN","China","02","Zhejiang","Wenzhou","27.999405","120.66681","Asia/Shanghai"]
["CN","China","02","Zhejiang","Ningbo","29.878204","121.5495","Asia/Shanghai"]
Time taken: 45.037 seconds
hive (stuchoosecourse)> create table HiddenIPInfo(
                      > IP string,countrycode string,countryname string,region string,regionname string,city string,      
                      > latitude string,longitude string,timezone string);
OK
Time taken: 1.828 seconds
hive (stuchoosecourse)> show tables;
OK
tab_name
hbase_stu_course
hiddenipinfo
ipidentifier
Time taken: 0.486 seconds
hive (stuchoosecourse)> describe hiddenipinfo;
OK
col_name    data_type    comment
ip    string    
countrycode    string    
countryname    string    
region    string    
regionname    string    
city    string    
latitude    string    
longitude    string    
timezone    string    
Time taken: 0.33 seconds
hive (stuchoosecourse)> from(select ipadress,split(IP4Tocc(ipadress,'./GeoLiteCity.dat'),'\t') as IPInfo from ipidentifier)e
                      > insert overwrite table hiddenipinfo
                      > select e.ipadress,e.IPInfo[0] as countrycode,e.IPInfo[1] as countryname,e.IPInfo[2] as region,
                      > e.IPInfo[3] as regionname,e.IPInfo[4] as city,e.IPInfo[5] as latitude,e.IPInfo[6] as longitude,
                      > e.IPInfo[7] as timezone;
Total MapReduce jobs = 3
Launching Job 1 out of 3
Number of reduce tasks is set to 0 since there's no reduce operator
Starting Job = job_201312042044_0023, Tracking URL = http://Master:50030/jobdetails.jsp?jobid=job_201312042044_0023
Kill Command = /home/landen/UntarFile/hadoop-1.0.4/libexec/../bin/hadoop job  -kill job_201312042044_0023
Hadoop job information for Stage-1: number of mappers: 1; number of reducers: 0
2013-12-08 21:58:12,406 Stage-1 map = 0%,  reduce = 0%
2013-12-08 21:58:18,449 Stage-1 map = 100%,  reduce = 0%, Cumulative CPU 1.48 sec
2013-12-08 21:58:19,454 Stage-1 map = 100%,  reduce = 0%, Cumulative CPU 1.48 sec
2013-12-08 21:58:20,458 Stage-1 map = 100%,  reduce = 0%, Cumulative CPU 1.48 sec
2013-12-08 21:58:21,462 Stage-1 map = 100%,  reduce = 0%, Cumulative CPU 1.48 sec
2013-12-08 21:58:22,466 Stage-1 map = 100%,  reduce = 0%, Cumulative CPU 1.48 sec
2013-12-08 21:58:23,470 Stage-1 map = 100%,  reduce = 0%, Cumulative CPU 1.48 sec
2013-12-08 21:58:24,474 Stage-1 map = 100%,  reduce = 100%, Cumulative CPU 1.48 sec
MapReduce Total cumulative CPU time: 1 seconds 480 msec
Ended Job = job_201312042044_0023
Ended Job = 39195028, job is filtered out (removed at runtime).
Ended Job = 1695434910, job is filtered out (removed at runtime).
Moving data to: hdfs://Master:9000/home/landen/UntarFile/hive-0.10.0/warehouse/hive_2013-12-08_21-57-40_106_7083774091282915969/-ext-10000
Loading data to table stuchoosecourse.hiddenipinfo
Deleted hdfs://Master:9000/home/landen/UntarFile/hive-0.10.0/warehouse/stuchoosecourse.db/hiddenipinfo
Table stuchoosecourse.hiddenipinfo stats: [num_partitions: 0, num_files: 1, num_rows: 0, total_size: 233, raw_data_size: 0]
3 Rows loaded to hiddenipinfo
MapReduce Jobs Launched:
Job 0: Map: 1   Cumulative CPU: 1.48 sec   HDFS Read: 306 HDFS Write: 233 SUCCESS
Total MapReduce CPU Time Spent: 1 seconds 480 msec
OK
ipadress    countrycode    countryname    region    regionname    city    latitude    longitude    timezone
Time taken: 45.692 seconds
hive (stuchoosecourse)> show tables;
OK
tab_name
hbase_stu_course
hiddenipinfo
ipidentifier
Time taken: 0.053 seconds
hive (stuchoosecourse)> select * from hiddenipinfo;OKip               countrycode    countryname    region    regionname    city       latitude    longitude    timezone221.12.10.218    CN             China          02        Zhejiang     Hangzhou   30.293594   120.16141    Asia/Shanghai60.180.248.201   CN             China          02        Zhejiang      Wenzhou    27.999405   120.66681    Asia/Shanghai125.111.251.118  CN             China          02        Zhejiang      Ningbo     29.878204   121.5495     Asia/ShanghaiTime taken: 0.083 seconds
相關文章
相關標籤/搜索