hbase--在不一樣版本hdfs集羣之間轉移數據

不少人會有這樣一個需求:將一個hdfs集羣上的數據寫入另外一個hdfs集羣所在的hbase數據庫。一般狀況下兩個hdfs集羣的版本差距並不大,這樣的程序會很容易寫。但有時會跨大版本。好比做者所在的廠子,數據都在基於hadoop0.19.2版本修改的hdfs集羣上,要將這樣的數據導入版本爲0.20.2+hdfs集羣,就不能使用同一個hadoop jar包來完成了。如何實現呢?

    最簡單的辦法就是把src集羣的數據導到本地,而後起另外一個進程將本地數據傳到des集羣上去。
   
不過這有幾個問題: java

  • 效率下降
  • 佔用本地磁盤空間
  • 不能應付實時導數據需求
  • 兩個進程須要協調,複雜度增長


    更好的辦法是在同一個進程內一邊讀src數據,一邊寫des集羣。不過這至關於在同一個進程空間內加載兩個版本的hadoop jar包,這就須要在程序中使用兩個classloader來實現。
   
如下代碼能夠實現classloader加載自定義的jar包,並生成須要的Configuration對象: 數據庫

Java代碼 微信

  1. URL[] jarUrls = new URL[1];   
  2. jarUrls[0]=new File(des_jar_path).toURI().toURL();   
  3. ClassLoader jarloader = new URLClassLoader(jarUrls, null);   
  4. Class Proxy = Class.forName("yourclass", true, jarloader);   
  5. Configuration conf = (Configuration)Proxy.newInstance();  

URL[] jarUrls = new URL[1]; ide

jarUrls[0]=new File(des_jar_path).toURI().toURL(); 函數

ClassLoader jarloader = new URLClassLoader(jarUrls, null); oop

Class Proxy = Class.forName("yourclass", true, jarloader); spa

Configuration conf = (Configuration)Proxy.newInstance(); code



    可是因爲在生成HTable對象時,須要使用這個conf對象,而加載這個conf對象的代碼自己是由默認的classloader加載的,也就是0.19.2jar包。因此在以上代碼最後一行所強制轉換的Configuration對象仍然是0.19.2版本的。那怎麼辦呢?
   
琢磨了一會,發現若是要實現以上功能,必須將生成HTable對象,以及之後的全部hbase操做都使用這個新的classloader,所以這個新的classloader必須加載除了0.19.2jar包外全部須要用到的jar包,而後把全部操做都封裝進去。在外面用反射來調用。
   
這樣的話,一般構造函數都不爲空了,所以須要用到Constructor來構造一個自定義的構造函數
   
代碼段以下: orm

Java代碼   對象

  1. main.java   
  2. void init(){   
  3.     ClassLoader jarloader = generateJarLoader();   
  4.     Class Proxy = Class.forName("test.writer.hbasewriter.HBaseProxy", true, jarloader);   
  5.     Constructor con = Proxy.getConstructor(new Class[]{String.class, String.class, boolean.class});   
  6.     Boolean autoflush = param.getBoolValue(ParamsKey.HbaseWriter.autoFlush, true);   
  7.     proxy = con.newInstance(new Object[]{path, tablename, autoflush});   
  8. }   
  9. void put(){   
  10. ...   
  11.     while((line = getLine()) != null) {   
  12.         proxy.getClass().getMethod("generatePut",String.class).invoke(proxy, line.getField(rowkey));   
  13.         Method addPut = proxy.getClass().getMethod("addPut",   
  14.                 new Class[]{String.class, String.class, String.class});   
  15.         addPut.invoke(proxy, new Object[]{field, column, encode});   
  16.         proxy.getClass().getMethod("putLine").invoke(proxy);   
  17.     }   
  18. }   
  19.   
  20. ClassLoader generateJarLoader() throws IOException {   
  21.       String libPath = System.getProperty("java.ext.dirs");   
  22.       FileFilter filter = new FileFilter() {   
  23.       @Override  
  24.       public boolean accept(File pathname) {   
  25.         if(pathname.getName().startsWith("hadoop-0.19.2"))   
  26.           return false;   
  27.         else  
  28.             return pathname.getName().endsWith(".jar");   
  29.       }   
  30.       };   
  31.       File[] jars = new File(libPath).listFiles(filter);   
  32.       URL[] jarUrls = new URL[jars.length+1];   
  33.            
  34.       int k = 0;   
  35.       for (int i = 0; i < jars.length; i++) {   
  36.         jarUrls[k++] = jars[i].toURI().toURL();   
  37.       }   
  38.       jarUrls[k] = new File("hadoop-0.20.205.jar")   
  39.       ClassLoader jarloader = new URLClassLoader(jarUrls, null);   
  40.       return jarloader;   
  41. }  

main.java

void init(){

  ClassLoader jarloader = generateJarLoader();

  Class Proxy = Class.forName("test.writer.hbasewriter.HBaseProxy", true, jarloader);

  Constructor con = Proxy.getConstructor(new Class[]{String.class, String.class, boolean.class});

  Boolean autoflush = param.getBoolValue(ParamsKey.HbaseWriter.autoFlush, true);

  proxy = con.newInstance(new Object[]{path, tablename, autoflush});

}

void put(){

...

  while((line = getLine()) != null) {

   proxy.getClass().getMethod("generatePut",String.class).invoke(proxy, line.getField(rowkey));

   Method addPut = proxy.getClass().getMethod("addPut",

      new Class[]{String.class, String.class, String.class});

   addPut.invoke(proxy, new Object[]{field, column, encode});

   proxy.getClass().getMethod("putLine").invoke(proxy);

  }

}


ClassLoader generateJarLoader() throws IOException {

      String libPath = System.getProperty("java.ext.dirs");

      FileFilter filter = new FileFilter() {

      @Override

      public boolean accept(File pathname) {

        if(pathname.getName().startsWith("hadoop-0.19.2"))

          return false;

        else

         return pathname.getName().endsWith(".jar");

      }

      };

      File[] jars = new File(libPath).listFiles(filter);

      URL[] jarUrls = new URL[jars.length+1];

  

      int k = 0;

      for (int i = 0; i < jars.length; i++) {

        jarUrls[k++] = jars[i].toURI().toURL();

      }

      jarUrls[k] = new File("hadoop-0.20.205.jar")

      ClassLoader jarloader = new URLClassLoader(jarUrls, null);

      return jarloader;

}

 

Java代碼 

  1. HBaseProxy.java   
  2. public HBaseProxy(String hbase_conf, String tableName, boolean autoflush)   
  3.      throws IOException{   
  4.         Configuration conf = new Configuration();   
  5.         conf.addResource(new Path(hbase_conf));   
  6.         config = new Configuration(conf);   
  7.         htable = new HTable(config, tableName);   
  8.         admin = new HBaseAdmin(config);   
  9.         htable.setAutoFlush(autoflush);   
  10.     }   
  11. public void addPut(String field, String column, String encode) throws IOException {   
  12.     try {   
  13.             p.add(column.split(":")[0].getBytes(), column.split(":")[1].getBytes(),   
  14.                     field.getBytes(encode));   
  15.         } catch (UnsupportedEncodingException e) {   
  16.             p.add(column.split(":")[0].getBytes(), column.split(":")[1].getBytes(),   
  17.                     field.getBytes());   
  18.         }   
  19.            
  20.     }   
  21.     public void generatePut(String rowkey){   
  22.         p = new Put(rowkey.getBytes());   
  23.     }   
  24.        
  25.     public void putLine() throws IOException{   
  26.         htable.put(p);   
  27.     }  

HBaseProxy.java

public HBaseProxy(String hbase_conf, String tableName, boolean autoflush)

     throws IOException{

   Configuration conf = new Configuration();

   conf.addResource(new Path(hbase_conf));

   config = new Configuration(conf);

   htable = new HTable(config, tableName);

   admin = new HBaseAdmin(config);

   htable.setAutoFlush(autoflush);

  }

public void addPut(String field, String column, String encode) throws IOException {

    try {

     p.add(column.split(":")[0].getBytes(), column.split(":")[1].getBytes(),

        field.getBytes(encode));

   } catch (UnsupportedEncodingException e) {

     p.add(column.split(":")[0].getBytes(), column.split(":")[1].getBytes(),

        field.getBytes());

   }

  

  }

    public void generatePut(String rowkey){

   p = new Put(rowkey.getBytes());

  }

 

    public void putLine() throws IOException{

   htable.put(p);

  }


    總之,在同一個進程中加載多個classloader時必定要注意,classloader A所加載的對象是不能轉換成classloader B的對象的,固然也不能使用。兩個空間的相互調用只能用java的基本類型或是反射。

更多分享請關注:www.crxy.cn  

掃一掃關注超人學院微信二維碼:

相關文章
相關標籤/搜索