在不一樣版本hdfs集羣之間轉移數據

在不一樣版本hdfs集羣之間轉移數據java

    最簡單的辦法就是把src集羣的數據導到本地,而後起另外一個進程將本地數據傳到des集羣上去。 
    不過這有幾個問題:
web


  • 效率下降 
    微信

  • 佔用本地磁盤空間 
    ide

  • 不能應付實時導數據需求     
    函數

  • 兩個進程須要協調,複雜度增長 
    oop


    
更好的辦法是在同一個進程內一邊讀src數據,一邊寫des集羣。不過這至關於在同一個進程空間內加載兩個版本的hadoop jar包,這就須要在程序中使用兩個classloader來實現。 
    如下代碼能夠實現classloader加載自定義的jar包,並生成須要的Configuration對象:
學習

Java代碼spa


  • URL[] jarUrls = new URL[1];   
    code

  • jarUrls[0]=new File(des_jar_path).toURI().toURL();       
    orm

  • ClassLoader jarloader = new URLClassLoader(jarUrls, null);       

  • Class Proxy = Class.forName("yourclass", true, jarloader);       

  • Configuration conf = (Configuration)Proxy.newInstance();  

URL[] jarUrls = new URL[1];

jarUrls[0]=newFile(des_jar_path).toURI().toURL();

ClassLoader jarloader = newURLClassLoader(jarUrls, null);

Class Proxy =Class.forName("yourclass", true, jarloader);

Configuration conf =(Configuration)Proxy.newInstance();



    
可是因爲在生成HTable對象時,須要使用這個conf對象,而加載這個conf對象的代碼自己是由默認的classloader加載的,也就是0.19.2的jar包。因此在以上代碼最後一行所強制轉換的Configuration對象仍然是0.19.2版本的。那怎麼辦呢? 
    琢磨了一會,發現若是要實現以上功能,必須將生成HTable對象,以及之後的全部hbase操做都使用這個新的classloader,所以這個新的classloader必須加載除了0.19.2的jar包外全部須要用到的jar包,而後把全部操做都封裝進去。在外面用反射來調用。
    這樣的話,一般構造函數都不爲空了,所以須要用到Constructor來構造一個自定義的構造函數 
    代碼段以下:

Java代碼


  • main.java       

  • void init(){       

  •     ClassLoader jarloader = generateJarLoader();       

  •     Class Proxy = Class.forName("test.writer.hbasewriter.HBaseProxy", true, jarloader);       

  •     Constructor con = Proxy.getConstructor(new Class[]{String.class, String.class, boolean.class});       

  •     Boolean autoflush = param.getBoolValue(ParamsKey.HbaseWriter.autoFlush, true);       

  •     proxy = con.newInstance(new Object[]{path, tablename, autoflush});       

  • }       

  • void put(){       

  • ...       

  •     while((line = getLine()) != null) {       

  •         proxy.getClass().getMethod("generatePut",String.class).invoke(proxy, line.getField(rowkey));       

  •         Method addPut = proxy.getClass().getMethod("addPut",       

  •                 new Class[]{String.class, String.class, String.class});       

  •         addPut.invoke(proxy, new Object[]{field, column, encode});       

  •         proxy.getClass().getMethod("putLine").invoke(proxy);       

  •     }       

  • }       

  •   

  • ClassLoader generateJarLoader() throws IOException {       

  •       String libPath = System.getProperty("java.ext.dirs");       

  •       FileFilter filter = new FileFilter() {       

  •       @Override  

  •       public boolean accept(File pathname) {       

  •         if(pathname.getName().startsWith("hadoop-0.19.2"))       

  •           return false;       

  •         else  

  •             return pathname.getName().endsWith(".jar");       

  •       }       

  •       };       

  •       File[] jars = new File(libPath).listFiles(filter);       

  •       URL[] jarUrls = new URL[jars.length+1];   

  •                

  •       int k = 0;   

  •       for (int i = 0; i < jars.length; i++) {       

  •         jarUrls[k++] = jars.toURI().toURL();       

  •       }       

  •       jarUrls[k] = new File("hadoop-0.20.205.jar")       

  •       ClassLoader jarloader = new URLClassLoader(jarUrls, null);       

  •       return jarloader;       

  • }  

main.java

void init(){

  ClassLoader jarloader = generateJarLoader();

  Class Proxy =Class.forName("test.writer.hbasewriter.HBaseProxy", true, jarloader);

  Constructor con = Proxy.getConstructor(new Class[]{String.class,String.class, boolean.class});

  Boolean autoflush =param.getBoolValue(ParamsKey.HbaseWriter.autoFlush, true);

  proxy = con.newInstance(new Object[]{path, tablename, autoflush});

}

void put(){

...

  while((line = getLine()) != null) {

   proxy.getClass().getMethod("generatePut",String.class).invoke(proxy,line.getField(rowkey));

   Method addPut = proxy.getClass().getMethod("addPut",

      new Class[]{String.class, String.class, String.class});

   addPut.invoke(proxy, new Object[]{field, column, encode});

   proxy.getClass().getMethod("putLine").invoke(proxy);

  }

}

ClassLoader generateJarLoader()throws IOException {

      String libPath =System.getProperty("java.ext.dirs");

      FileFilter filter = new FileFilter() {

      @Override

      public boolean accept(File pathname) {

        if(pathname.getName().startsWith("hadoop-0.19.2"))

          return false;

        else

         returnpathname.getName().endsWith(".jar");

      }

      };

      File[] jars = newFile(libPath).listFiles(filter);

      URL[] jarUrls = new URL[jars.length+1];

   

      int k = 0;

      for (int i = 0; i < jars.length; i++){

        jarUrls[k++] = jars.toURI().toURL();

      }

      jarUrls[k] = newFile("hadoop-0.20.205.jar")

      ClassLoader jarloader = newURLClassLoader(jarUrls, null);

     return jarloader;

}

Java代碼


  • HBaseProxy.java       

  • public HBaseProxy(String hbase_conf, String tableName, boolean autoflush)       

  •      throws IOException{       

  •         Configuration conf = new Configuration();       

  •         conf.addResource(new Path(hbase_conf));       

  •         config = new Configuration(conf);       

  •         htable = new HTable(config, tableName);       

  •         admin = new HBaseAdmin(config);       

  •         htable.setAutoFlush(autoflush);       

  •     }       

  • public void addPut(String field, String column, String encode) throws IOException {       

  •     try {       

  •             p.add(column.split(":")[0].getBytes(), column.split(":")[1].getBytes(),   

  •                     field.getBytes(encode));       

  •         } catch (UnsupportedEncodingException e) {       

  •             p.add(column.split(":")[0].getBytes(), column.split(":")[1].getBytes(),   

  •                     field.getBytes());       

  •         }       

  •                

  •     }       

  •     public void generatePut(String rowkey){       

  •         p = new Put(rowkey.getBytes());       

  •     }       

  •            

  •     public void putLine() throws IOException{       

  •         htable.put(p);       

  •     }  

HBaseProxy.java

public HBaseProxy(Stringhbase_conf, String tableName, boolean autoflush)

     throws IOException{

   Configuration conf = new Configuration();

   conf.addResource(new Path(hbase_conf));

   config = new Configuration(conf);

   htable = new HTable(config, tableName);

   admin = new HBaseAdmin(config);

   htable.setAutoFlush(autoflush);

  }

public void addPut(Stringfield, String column, String encode) throws IOException {

    try {

     p.add(column.split(":")[0].getBytes(), column.split(":")[1].getBytes(),

        field.getBytes(encode));

   } catch (UnsupportedEncodingException e) {

     p.add(column.split(":")[0].getBytes(),column.split(":")[1].getBytes(),

        field.getBytes());

   }

   

  }

    public void generatePut(String rowkey){

   p = new Put(rowkey.getBytes());

  }

  

    public void putLine() throws IOException{

   htable.put(p);

  }


    總之,在同一個進程中加載多個classloader時必定要注意,classloader A所加載的對象是不能轉換成classloader B的對象的,固然也不能使用。兩個空間的相互調用只能用java的基本類型或是反射。


更多精彩內容請關注:http://bbs.superwu.cn

關注超人學院微信二維碼:

關注超人學院java免費學習交流羣:

相關文章
相關標籤/搜索