OSS文件上傳及OSS與ODPS之間數據連通

場景描述

        有這樣一種場景,用戶在自建服務器上存有必定數量級的CSV格式業務數據,某一天用戶瞭解到阿里雲的OSS服務存儲性價比高(嘿嘿,顏值高),因而想將CSV數據遷移到雲上OSS中,而且將來還想對這些數據作一些離線分析,挖掘其中存在價值,所以須要將OSS中文件再經過一種方式同步到ODPS數加平臺上,面對這樣需求,小編我通過參考文檔,實踐,調試並修復Bug,實現出如下一種解決方案。

實現目標

    經過OSS的Java SDK以及批量數據通道tunnel SDK實現如下兩個功能:
     (1)將本地CSV文件上傳到OSS;
     (2)將OSS中文件同步到ODPS;

準備工做

     在具體實操以前,有必要對OSS有個瞭解,OSS是個什麼東東,爲何要選用OSS呢,OSS控制檯限制條件,須要注意事項?

OSS是個什麼東東?

      阿里雲對象存儲(Object Storage Service,簡稱OSS),是阿里雲對外提供的海量,安全,低成本,高可靠的雲存儲服務。經過網絡隨時存儲和調用包括文本、圖片、音頻、和視頻在內的各類結構化或非結構化數據文件。 

爲何選用雲產品OSS服務呢?

是什麼緣由導致用戶放棄使用自建服務器存儲數據,而轉向雲產品OSS呢?
     這方面我深有感觸,我之前在上海一家公司工做,原公司全部數據都是存放在自建的五六臺服務器上,從規劃,採購到部署,這其間過程複雜,人力部署也不簡單,並且服務器價格昂貴,開發維護成本高,數據可靠性還低,總之耗時、耗力最重要是影響業務進展。接觸瞭解到OSS後才發現,以前的自建服務器存儲真是太out啦,呵呵,OSS顏值高額,這裏顏值具體有如下幾個方面:
        可靠性高:數據自動多重冗餘備份,規模自動擴展,不影響對外服務;
      安全:提供企業級、用戶級多層次安全保護,受權機制及白名單、防盜鏈、主子帳號功能;
       成本:省去人工擴容硬盤以及運維成本;
       數據處理能力:提供豐富的數據處理服務,好比圖片處理、視頻轉碼、CDN內容加速分發。

OSS控制檯限制條件?

   經過 OSS 控制檯能夠上傳 小於 500 MB  文件。如要上傳的文件大於 500 MB,控制檯會給出超過大小限制警告,而且在任務管理列表, 失敗並嘗試上傳請求三次。異常警告以下圖所示:
04d146e313c2c4b6be5c3e1234be347f6998581d
d0933da8c6cf0a3b5846c64c488b06071f4b103a
解決方法:能夠經過 OSS的SDK 進行上傳。

須要注意幾點

(1) 在OSS中,用戶操做基本數據單元是object,單個對象大小限制爲48.8TB,一個存儲空間中能夠有無 
    限量對象。
(2) 新建Bucket,輸入存儲空間名稱,建立後不支持更改存儲空間名稱,上傳到OSS後不能移動文件存儲位
    置;
(3) 在所屬地域框中,下拉選擇該存儲空間的數據中心。訂購後不支持更換地域。
(4) 刪除存儲空間以前請確保還沒有完成的分片上傳文件產生的碎片文件所有清空,不然沒法刪除存儲空間。
(5) 經過web控制檯上傳文件,一刷新頁面,任務管理中顯示的上傳任務就會消失不見,因此在上傳過程當中
    不要刷新頁面。

本地大文件分片上傳到OSS

       由於使用單次HTTP請求,Object過大會致使上傳時間長。在這段時間出現網絡緣由形成超時或者連接斷開錯誤的時候,上傳容易失敗,能夠考慮斷點續傳上傳(分片上傳)。當Object大於5GB,這種狀況下只能使用斷點續傳上傳(分片上傳),具體參考斷點續傳上傳 ,下面代碼實現上傳本地路徑下ratings.csv文件到OSS object管理中:
見附件中 源代碼.rar 壓縮文件中的 MultipartUploadDemo 類實現 

單線程實現將OSS文件上傳至ODPS(OSS java-SDK與tunnel SDK結合)

      下面代碼實現目標:將OSS中bucket名爲qf-test,object對象爲ratings.csv文件數據導入到ODPS平臺中項目名爲dtstack_dev,表名爲ratings,分區字段爲ds=20160612中。
見附件中 源代碼.rar 壓縮文件中的 OSSToODPS_Upload 類實現 

多線程實現將OSS文件上傳至ODPS(OSS java-SDK與tunnel SDK結合)

      下面代碼實現目標:將OSS中bucket名爲qf-test,object對象爲data_test/movies.csv 文件數據導入到ODPS平臺中項目名爲dtstack_dev,表名爲movies_odps2中。
import java.io.BufferedReader;
import java.io.IOException;
import java.io.InputStreamReader;
import java.util.ArrayList;
import java.util.concurrent.Callable;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;

import com.aliyun.odps.Column;
import com.aliyun.odps.Odps;
import com.aliyun.odps.PartitionSpec;
import com.aliyun.odps.TableSchema;
import com.aliyun.odps.account.Account;
import com.aliyun.odps.account.AliyunAccount;
import com.aliyun.odps.data.Record;
import com.aliyun.odps.data.RecordWriter;
import com.aliyun.odps.tunnel.TableTunnel;
import com.aliyun.odps.tunnel.TableTunnel.UploadSession;
import com.aliyun.odps.tunnel.TunnelException;
import com.aliyun.oss.OSSClient;
import com.aliyun.oss.model.GetObjectRequest;
import com.aliyun.oss.model.OSSObject;

 class UploadThread implements Callable<Boolean> {
	     private long id;
	     private TableSchema schema = null;
	     private RecordWriter recordWriter = null;
	     private Record record = null;
	     private BufferedReader reader = null;
	     
         public UploadThread(long id, RecordWriter recordWriter, Record record,
                         TableSchema schema,BufferedReader reader) {
        	     this.id = id;
                 this.recordWriter = recordWriter;
                 this.record = record;
                 this.schema = schema;
                 this.reader = reader;
         }

		public Boolean call() throws Exception {
		       while (true) {
		           String line = reader.readLine();
		           if (line == null) break;
		           if(id == 0){  //第一行是字段名,忽略掉
			    	      id++;
			    	      continue;
			       }
		           System.out.println(line);
		           String[] s = line.split(",");
		           for (int i = 0; i < schema.getColumns().size(); i++) {
		               Column column = schema.getColumn(i);
		               switch (column.getType()) {
		                 case BIGINT:
		                       record.setBigint(i, Long.valueOf(s[i]));
		                       break;
		//               case BOOLEAN:
		//                       record.setBoolean(i, str);
		//                       break;
		//               case DATETIME:
		//                       record.setDatetime(i, str);
		//                       break;
		                 case DOUBLE:
		                       record.setDouble(i, Double.valueOf(s[i]));
		                       break;
		               case STRING:
		                       record.setString(i,s[i]);
		                       break;
		                 default:
		                       throw new RuntimeException("Unknown column type: "
		                                       + column.getType());
		               }
		           }
				   recordWriter.write(record);
		       }
			   recordWriter.close();
			   return true;
		}
}

 public class OSSToODPS_UploadThread {
	    private static String accessKeyId = "UQV2yoSSWNgquhhe";
		private static String accessKeySecret = "bG8xSLwhmKYRmtBoE3HbhOBYXvknG6";
		
		private static String endpoint = "http://oss-cn-hangzhou.aliyuncs.com";
		private static String bucketName = "qf-test";
	    private static String key = "data_test/movies.csv";
	    
	    private static String tunnelUrl = "http://dt.odps.aliyun.com";
	    private static String odpsUrl = "http://service.odps.aliyun.com/api";
	    private static String project = "dtstack_dev";
	    private static String table = "movies_odps2";
	    //private static String partition = "ds=20160612";

         private static int threadNum = 10;

         public static void main(String args[]) {
	        	 /*
	              * Constructs a client instance with your account for accessing OSS
	              */
	             OSSClient client = new OSSClient(endpoint, accessKeyId, accessKeySecret);
	             System.out.println("Downloading an object");
	             OSSObject object = client.getObject(new GetObjectRequest(bucketName, key));
	             BufferedReader reader = new BufferedReader(new InputStreamReader(object.getObjectContent()));
	             
                 Account account = new AliyunAccount(accessKeyId, accessKeySecret);
                 Odps odps = new Odps(account);
                 odps.setEndpoint(odpsUrl);
                 odps.setDefaultProject(project);
                 try {
                         TableTunnel tunnel = new TableTunnel(odps);
                         tunnel.setEndpoint(tunnelUrl);
                         //PartitionSpec partitionSpec = new PartitionSpec(partition);
                         UploadSession uploadSession = tunnel.createUploadSession(project,table);  
//                       UploadSession uploadSession = tunnel.createUploadSession(project,
//                                 table, partitionSpec);  //分區

                         System.out.println("Session Status is : "
                                         + uploadSession.getStatus().toString());

                         ExecutorService pool = Executors.newFixedThreadPool(threadNum);
                         ArrayList<Callable<Boolean>> callers = new ArrayList<Callable<Boolean>>();
                         for (int i = 0; i < threadNum; i++) {
                                 RecordWriter recordWriter = uploadSession.openRecordWriter(i);
                                 Record record = uploadSession.newRecord();
                                 callers.add(new UploadThread(i, recordWriter, record,
                                                 uploadSession.getSchema(),reader));
                         }
                         pool.invokeAll(callers);
                         pool.shutdown();

                         Long[] blockList = new Long[threadNum];
                         for (int i = 0; i < threadNum; i++)
                                 blockList[i] = Long.valueOf(i);
                         uploadSession.commit(blockList);
                         reader.close();
                         System.out.println("upload success!");
                 } catch (TunnelException e) {
                         e.printStackTrace();
                 } catch (IOException e) {
                         e.printStackTrace();
                 } catch (InterruptedException e) {
                         e.printStackTrace();
                 }
         }
}

  

編程實現中遇到Bug

Apache httpclient包衝突
Exception in thread "main" java.lang.NoSuchFieldError: INSTANCE
    at org.apache.http.conn.ssl.SSLConnectionSocketFactory.<clinit>(SSLConnectionSocketFactory.java:144)
    at com.aliyun.oss.common.comm.DefaultServiceClient.createHttpClientConnectionManager(DefaultServiceClient.java:232)
    at com.aliyun.oss.common.comm.DefaultServiceClient.<init>(DefaultServiceClient.java:78)
    at com.aliyun.oss.OSSClient.<init>(OSSClient.java:273)
    at com.aliyun.oss.OSSClient.<init>(OSSClient.java:194)
    at UploadToODPS.main(UploadToODPS.java:53)
工程裏可能有包衝突。緣由是OSS Java SDK使用了Apache httpclient 4.4.1,而我的工程使用了與Apache httpclient 4.4.1衝突的Apache httpclient。如上述發生錯誤的工程裏,使用了Apache httpclient 4.1.2:
使用統一版本。若是我的工程裏使用與Apache httpclient 4.4.1衝突版本,請也使用4.4.1版本。去掉其它版本的Apache httpclient依賴。
e3ecc8dcfe8d4afa62f245631ac31ca5d7d3b65f
recordWriter.write(record) 寫入位置不正確
在單線程編碼實現從OSS傳數據到ODPS 代碼中 recordWriter.write(record) 寫入位置不正確,以下代碼顯示:
         for (int i = 0; i < schema.getColumns().size(); i++) {
                Column column = schema.getColumn(i);
                switch (column.getType()) {
                  case BIGINT:
                        record.setBigint(i, Long.valueOf(s[i]));
                        break;
                  case DOUBLE:
                        record.setDouble(i, Double.valueOf(s[i]));
                        break;
                  default:
                        throw new RuntimeException("Unknown column type: "
                                        + column.getType());
                  recordWriter.write(record);  //寫入位置不正確
                }
      }
      // recordWriter.write(record);  //放到for循環外,寫入位置正確
recordWriter.write(record)寫入位置不對,將recordWriter.write(record)放置到for循環內,會出現如下奇怪異常:
c584e5bc5fd26158a89eab887cc0260f3e63f70a
正確位置是:將recordWriter.write(record)放置到for循環外,結果以下表顯示:
85755924049248cf00d85b5ca4ceec2b9aa12715
上傳代碼中 partition="20160612" 字符串寫法不對
須要注意,指定分區字符串在程序中正確寫法:
private static String partition = "ds=20160612"; (必須加上分區字段名)
PartitionSpec partitionSpec = new PartitionSpec(partition);
不正確寫法以下:
private static String partition = "20160612";(缺乏分區字段名)
 
 
多線程上 傳任務無端中斷,以下是異常截圖
228ae8f8d824e68991f2512704c1936202dc3d08
經過多線程將OSS中文件同步到ODPS表中時,實現多任務的併發執行,在編碼實現時要注意 reader.close()位置要放正確:
UploadSession uploadSession = tunnel.createUploadSession(project,table, partitionSpec);
OSSObject object = client.getObject(new GetObjectRequest(bucketName, key));
BufferedReader reader = new BufferedReader(new InputStreamReader(object.getObjectContent()));
Long[] blockList = new Long[threadNum];
uploadSession.commit(blockList);
將reader.close()放到Callable接口中call()方法裏是不對滴,call方法是線程異步執行地方,開啓的全部線程不斷地異步從OSS的緩衝字符輸入流reader中讀取OSS中數據,若是在call()方法中就將reader關閉,也就是說將輸入數據源關閉,直接致使線程讀取失敗。所以reader.close()應該放在線程外部,即uploadSession.commit()位置後邊,以下。
uploadSession.commit(blockList);
reader.close();  //正確位置
System.out.println("upload success!");
相關文章
相關標籤/搜索