MongoDB集羣與LBS應用系列(二)--與Hadoop集成

長期以來,我每開個系列,只有興趣寫一篇,很難持之與恆。爲了克服這個長久以來的性格弱點,以及梳理工做半年的積累。最近一個月會寫兩篇關於Mongo在地理大數據方面的實踐和應用,一篇關於推薦系統的初期準備過程,一篇用戶行爲矩陣的可視化。但願可以立言爲證,自我監督。java

1.驅動準備

言歸正傳,前文MongoDB集羣部署完畢以後,CRUD就是主要需求。NoSQL與普通關係數據庫不一樣的是,避免採用ORM框架對數據庫作操做,這樣會帶來明顯的性能降低[1]。使用原生的Driver是一個較爲合理的選擇,Mongo支持的語言很是多,包括JS,Java,C,C++,Python,Scala等[2]。git

若是是單純的MongoDB項目,咱們會用NodeJS Driver,方便快捷,示例規範,值得推薦。在本文我使用Java Driver,主要是集成Hadoop工程方便。同時還會用到Mongo Hadoop Adapter 能夠選擇到Github 下載源碼編譯,或者直接根據本身Hadoop集羣版本選擇下載Jar包,添加到Hadoop安裝目錄的lib文件夾下[3]。可是在很多公有云平臺上,普通用戶是沒有修改Hadoop系統的權限,沒法添加Jar包,因此在本文的示例代碼中,採用分佈式緩存的方法添加這兩個Jar包。github

2.實現原理與過程

其實Hadoop和MongoDB的集成,很大程度上是將Mongo做爲Hadoop的輸入和輸出源,而Mongo Hadoop Adapter也是主要實現了BSONWritable,MongoInputformat等這些類,也就是說須要自定義Hadoop的序列化類以及輸入輸出格式。mongodb

2.1 Hadoop序列化與反序列化

序列化(serialization)將結構化對象轉化爲二進制字節流,以便網絡傳輸和寫入磁盤。反序列化(deserialization)則是它的逆過程,將字節流轉化爲結構化對象。分佈式系統一般在進程通信和持久化時候會使用序列化。Hadoop系統節點進程通訊使用RPC,該協議存活時間很是短,所以須要其序列化格式具有如下特色:緊湊、快速、可擴展等。Hadoop提供了Writable接口,它定義了對數據的IO流,即須要實現readFields 和 Write兩個方法[4]。數據庫

2.2 Mongo Adapter的源碼實現

Mongo Hadoop Adater所實現的BSONWritable等類,源碼實現體現了上述的規範:apache

//輸出
public void write( DataOutput out ) throws IOException{
        BSONEncoder enc = new BasicBSONEncoder();
        BasicOutputBuffer buf = new BasicOutputBuffer();
        enc.set( buf );
        …………
    }
//輸入
public void readFields( DataInput in ) throws IOException{
        BSONDecoder dec = new BasicBSONDecoder();
        BSONCallback cb = new BasicBSONCallback();
        // Read the BSON length from the start of the record
       //字節流長度
        byte[] l = new byte[4];
        try {
            in.readFully( l );
            …………
            byte[] data = new byte[dataLen + 4];
            System.arraycopy( l, 0, data, 0, 4 );
            in.readFully( data, 4, dataLen - 4 );
            dec.decode( data, cb );
            _doc = (BSONObject) cb.get();
           ………………
    }

 

所以咱們在編寫MapReduce程序的時候能夠傳遞BsonWritable的key,value鍵值對,而Mongo構建於Bson之上,也就是說能夠將MongoDB視爲HDFS同性質的存儲節點便可。json

3. 代碼實現

在Mongo-Hadoop網站有數個例子,可是講得不夠詳細,本文主要對它的金礦產量的例子作一個補充。完整的Hadoop項目通常包括Mapper,Reduceer,Job三個Java Class,以及一個一個配置文件(configuration.xml)來定義項目的輸入輸出等。Mongo-Hadoop項目會多一個mongo-defaults.xml,固然能夠將二者融合起來。api

3.1  數據準備

從github中下載源碼包,它會包含examples/treasury_yield/src/main/resources/yield_historical_in.json文件,將該json文件上傳到Mongo所在的服務器,使用如下命令將它導入Mongo的testmr數據庫中的example collection中。緩存

mongoimport --host 127.0.0.1 --port 27017 -d testmr -c example --file ./yield_historical_in.json

查看一下數據結構服務器

use testmr
db.example.find().limit(1).pretty()

以下:

{
  "_id": ISODate("1990-01-25T19:00:00-0500"),
  "dayOfWeek": "FRIDAY", "bc3Year": 8.38,
  "bc10Year": 8.49,
  …
}

3.2  Mapper和Reducer還有Job以及mongo-defaults.xml

Mapper是從Mongo中讀取BSONObject

public class MongoTestMapper extends Mapper<Object,BSONObject, IntWritable, DoubleWritable>

以及處理讀過來的<key,value>鍵值對,併發到Reducer中彙總計算。注意value的類型。

public void map(final Object pkey, final BSONObject pvalue,final Context context)
        {
            final int year = ((Date)pvalue.get("_id")).getYear()+1990;
            double bdyear  = ((Number)pvalue.get("bc10Year")).doubleValue();
            try {
                context.write( new IntWritable( year ), new DoubleWritable( bdyear ));
            } catch (IOException e) {
                // TODO Auto-generated catch block
                e.printStackTrace();
            } catch (InterruptedException e) {
                // TODO Auto-generated catch block
                e.printStackTrace();
            }
        }

Reducer會接受Mapper傳過來的鍵值對

public class MongoTestReducer extends Reducer<IntWritable,DoubleWritable,IntWritable,BSONWritable>

進行計算並將結果寫入MongoDB.請注意輸出的Value的類型是BSONWritable.

public void reduce( final IntWritable pKey,
            final Iterable<DoubleWritable> pValues,
            final Context pContext ) throws IOException, InterruptedException{
      int count = 0;
      double sum = 0.0;
      for ( final DoubleWritable value : pValues ){
          sum += value.get();
          count++;
      }

      final double avg = sum / count;

        BasicBSONObject out = new BasicBSONObject();
        out.put("avg", avg);
        pContext.write(pKey, new BSONWritable(out));
    }

 

Job做爲MapReudce主類,主要使用DistributedCache分佈式緩存來添加驅動包,並定義了任務的輸入配置等。以下所示:

//Using Distribute Cache,call it before job define.
        DistributedCache.createSymlink(conf);
//………………
//Using DistributedCache to add Driver Jar File
        DistributedCache.addFileToClassPath(new Path("/user/amap/data/mongo/mongo-2.10.1.jar"), conf);
        DistributedCache.addFileToClassPath(new Path("/user/amap/data/mongo/mongo-hadoop-core_cdh4.3.0-1.1.0.jar"), conf);

// job conf
        Job job = new Job(conf,"VentLam:Mongo-Test-Job");

mongo-defaults.xml 配置文件中定義了很是多的參數,咱們只須要修改輸入輸出URI

   <!-- If you are reading from mongo, the URI -->
    <name>mongo.input.uri</name>
    <value>mongodb://127.0.0.1/testmr.example</value>
  </property>
  <property>
    <!-- If you are writing to mongo, the URI -->
    <name>mongo.output.uri</name>
    <value>mongodb://127.0.0.1/testmr.mongotest</value>
  </property>
  <property>

 

將整個java項目打包爲名爲mongotest的jar包,上傳到Hadoop集羣,執行命令:

hadoop jar mongotest.jar org.ventlam.MongoTestJob

之後會將個人博客涉及到源碼都發布在https://github.com/ventlam/BlogDemo 中,這篇文章對應的是mongohadoop文件夾。

4.參考文獻

[1] What the overhead of Java ORM for MongoDB

http://stackoverflow.com/questions/10600162/what-the-overhead-of-java-orm-for-mongodb

[2] MongoDB Drivers and Client Libraries

http://docs.mongodb.org/ecosystem/drivers/

[3]Getting Started with Hadoop

http://docs.mongodb.org/ecosystem/tutorial/getting-started-with-hadoop/

[4] Interface Writable    http://hadoop.apache.org/docs/stable/api/

知識共享許可協議

本做品由VentLam創做,採用知識共享署名-非商業性使用-相同方式共享 2.5 中國大陸許可協議進行許可。

相關文章
相關標籤/搜索