深刻淺出Sqoop之遷移過程源碼分析

【摘要】Sqoop是一種用於在Apache Hadoop和結構化數據存儲(如關係數據庫)之間高效傳輸批量數據的工具 。本文將簡單介紹Sqoop做業執行時相關的類及方法,並將該過程與MapReduce的執行結合,分析數據如何從源端遷移到目的端。sql

Sqoop做業執行過程

拋開MR的執行過程,Sqoop執行時用到的關鍵類總共有5個,Initializer、Partitioner、Extractor、Loader、Destroyer。執行流程以下圖所示數據庫

  • Initializer:初始化階段,源數據校驗,參數初始化等工做;
  • Partitioner:源數據分片,根據做業併發數來決定源數據要切分多少片;
  • Extractor:開啓extractor線程,根據用戶配置從內存中構造數據寫入隊列;
  • Loader:開啓loader線程,從隊列中讀取數據並拋出;
  • Destroyer:資源回收,斷開sqoop與數據源的鏈接,並釋放資源;

所以,每次新建一個鏈接器都要實現上述5個類。apache

Initializer

Initializer是在sqoop任務提交到MR以前被調用,主要是作遷移前的準備,例如鏈接數據源,建立臨時表,添加依賴的jar包等。它是sqoop做業生命週期的第一步,主要API以下segmentfault

public abstract void initialize(InitializerContext context, LinkConfiguration linkConfiguration,JobConfiguration jobConfiguration);併發

public List<String> getJars(InitializerContext context, LinkConfiguration linkConfiguration,JobConfiguration jobConfiguration){app

return new LinkedList<String>();

}工具

public abstract Schema getSchema(InitializerContext context, LinkConfiguration linkConfiguration,JobConfiguration jobConfiguration) {oop

return new NullSchema();

}spa

其中getSchema()方法被From或者To端的connector在提取或者載入數據時用來匹配數據。例如,一個GenericJdbcConnector會調用它獲取源端Mysql的數據庫名,表名,表中的字段信息等。線程

Destroyer

Destroyer 是在做業執行結束後被實例化,這是Sqoop做業的最後一步。清理任務,刪除臨時表,關閉鏈接器等。

public abstract void destroy(DestroyerContext context,

LinkConfiguration linkConfiguration,JobConfiguration jobConfiguration);

Partitioner

Partitioner建立分區Partition,Sqoop默認建立10個分片,主要API以下

public abstract List<Partition> getPartitions(PartitionerContext context,

LinkConfiguration linkConfiguration, FromJobConfiguration jobConfiguration);

Partition類中實現了readFields()方法和write()方法,方便讀寫

public abstract class Partition {
public abstract void readFields(DataInput in) throws IOException;
public abstract void write(DataOutput out) throws IOException;
public abstract String toString();
}

Extractor

Extractor類根據分片partition和配置信息從源端提取數據,寫入SqoopMapDataWriter中,SqoopMapDataWriter是SqoopMapper的內部類它繼承了DataWriter類。此外它打包了SqoopWritable類,以中間數據格式保存從源端讀取到的數據。

public abstract void extract(ExtractorContext context,

LinkConfiguration linkConfiguration,
                          JobConfiguration jobConfiguration,
                          SqoopPartition partition);

該方法內部核心代碼以下

while (resultSet.next()) {
...
context.getDataWriter().writeArrayRecord(array);
...
}

Loader

loader從源端接受數據,並將其載入目的端,它必須實現以下接口

public abstract void load(LoaderContext context,

ConnectionConfiguration connectionConfiguration,
                       JobConfiguration jobConfiguration) throws Exception;

load方法從SqoopOutputFormatDataReader中讀取,它讀取「中間數據格式表示形式」 _中的數據並將其加載到數據源。此外Loader必須迭代的調用DataReader()直到它讀完。

while ((array = context.getDataReader().readArrayRecord()) != null) {
...
}

MapReduce執行過程

上一節避開MR執行過程,僅僅從Extractor和Loader過程描述遷移過程。下面將結合MR的執行過程詳細的介紹一個Sqoop遷移做業流程。

初始化

1)做業初始化階段,SqoopInputFormat讀取給源端數據分片的過程

  • SqoopInputFormat的getSplits方法會調用Partitioner類的getPartitions方法
  • 將返回的Partition列表包裝到SqoopSplit中;
  • 默認分片個數爲10

這裏每一個Partition分片會交給一個Mapper執行。每一個Mapper分別啓動一個extractor線程和Loader線程遷移數據。

Mapper

2)做業執行階段的Mapper過程

  • SqoopMapper包含了一個SqoopMapDataWriter類,
  • Mapper的run()調用Extractor.extract方法,該方法迭代的獲取源端數據再調用DataWriter寫入Context中

private Class SqoopMapDataWriter extends DataWriter {

...
    private void writeContent() {
        ...
        context.wirte(writable, NullWritable.get());  // 這裏的writable 是SqoopWritable的一個對象
        ...
    }
    ...

}

注意:這裏的Context中存的是KV對,K是SqoopWritable,而V僅是一個空的Writable對象。SqoopWritable中實現了write和readField,用於序列化和反序列化。

Reducer

3)做業執行階段的Reduce過程,

  • SqoopOutputFormatLoadExecutor包裝了SqoopOuputFormatDataReader,SqoopRecordWriter, ConsumerThread三個內部類;
  • SqoopNullOutputFormat調用getRecordWriter時建立一個線程:ConsumerThread,代碼以下

public RecordWriter<SqoopWritable, NullWritable> getRecordWriter() {

executorService = Executors.newSingleThreadExecutor(...);
 consumerFuture = executorService.submit(new ConsumerThread(context));
 return writer;

}

  • ConsumerThread集成了Runnable接口,線程內部調用Loader.load(...)方法,該方法用DataReader迭代的從Context中讀取出SqoopWritable,並將其寫入一箇中間數據格式再寫入目的端數據庫中。

private class ConsumerThread implements Runnable {

...
public void run() {
    ...
    Loader.load(loaderContext, connectorLinkConfig, ConnectorToJobConfig);
    ...
}
...

}

注意:

  • 再本地模式下,Sqoop提交任務時沒有設置SqoopReducer.class,MR會調用一個默認的reducer.class。
  • setContent就是SqoopRecordWriter.write(...),它將SqoopWritable反序列化後存入中間存儲格式中,即IntermediateDataFormat。與之對應,getContent就是從該中間存儲格式中讀取數據。
  • Sqoop定義了一個可插拔的中間數據格式抽象類,IntermediateDataFormat類,SqoopWritable打包了這個抽象類用來保存中間數據。

以上即爲Sqoop做業執行時相關的類及方法內容,但願對你們在進行數據遷移過程當中有所幫助。

點擊關注,第一時間瞭解華爲雲新鮮技術~

相關文章
相關標籤/搜索