【摘要】Sqoop是一種用於在Apache Hadoop和結構化數據存儲(如關係數據庫)之間高效傳輸批量數據的工具 。本文將簡單介紹Sqoop做業執行時相關的類及方法,並將該過程與MapReduce的執行結合,分析數據如何從源端遷移到目的端。sql
拋開MR的執行過程,Sqoop執行時用到的關鍵類總共有5個,Initializer、Partitioner、Extractor、Loader、Destroyer。執行流程以下圖所示數據庫
所以,每次新建一個鏈接器都要實現上述5個類。apache
Initializer是在sqoop任務提交到MR以前被調用,主要是作遷移前的準備,例如鏈接數據源,建立臨時表,添加依賴的jar包等。它是sqoop做業生命週期的第一步,主要API以下segmentfault
public abstract void initialize(InitializerContext context, LinkConfiguration linkConfiguration,JobConfiguration jobConfiguration);併發
public List<String> getJars(InitializerContext context, LinkConfiguration linkConfiguration,JobConfiguration jobConfiguration){app
return new LinkedList<String>();
}工具
public abstract Schema getSchema(InitializerContext context, LinkConfiguration linkConfiguration,JobConfiguration jobConfiguration) {oop
return new NullSchema();
}spa
其中getSchema()方法被From或者To端的connector在提取或者載入數據時用來匹配數據。例如,一個GenericJdbcConnector會調用它獲取源端Mysql的數據庫名,表名,表中的字段信息等。線程
Destroyer 是在做業執行結束後被實例化,這是Sqoop做業的最後一步。清理任務,刪除臨時表,關閉鏈接器等。
public abstract void destroy(DestroyerContext context,
LinkConfiguration linkConfiguration,JobConfiguration jobConfiguration);
Partitioner建立分區Partition,Sqoop默認建立10個分片,主要API以下
public abstract List<Partition> getPartitions(PartitionerContext context,
LinkConfiguration linkConfiguration, FromJobConfiguration jobConfiguration);
Partition類中實現了readFields()方法和write()方法,方便讀寫
public abstract class Partition {
public abstract void readFields(DataInput in) throws IOException;
public abstract void write(DataOutput out) throws IOException;
public abstract String toString();
}
Extractor類根據分片partition和配置信息從源端提取數據,寫入SqoopMapDataWriter中,SqoopMapDataWriter是SqoopMapper的內部類它繼承了DataWriter類。此外它打包了SqoopWritable類,以中間數據格式保存從源端讀取到的數據。
public abstract void extract(ExtractorContext context,
LinkConfiguration linkConfiguration, JobConfiguration jobConfiguration, SqoopPartition partition);
該方法內部核心代碼以下
while (resultSet.next()) {
...
context.getDataWriter().writeArrayRecord(array);
...
}
loader從源端接受數據,並將其載入目的端,它必須實現以下接口
public abstract void load(LoaderContext context,
ConnectionConfiguration connectionConfiguration, JobConfiguration jobConfiguration) throws Exception;
load方法從SqoopOutputFormatDataReader中讀取,它讀取「中間數據格式表示形式」 _中的數據並將其加載到數據源。此外Loader必須迭代的調用DataReader()直到它讀完。
while ((array = context.getDataReader().readArrayRecord()) != null) {
...
}
上一節避開MR執行過程,僅僅從Extractor和Loader過程描述遷移過程。下面將結合MR的執行過程詳細的介紹一個Sqoop遷移做業流程。
初始化
1)做業初始化階段,SqoopInputFormat讀取給源端數據分片的過程
這裏每一個Partition分片會交給一個Mapper執行。每一個Mapper分別啓動一個extractor線程和Loader線程遷移數據。
Mapper
2)做業執行階段的Mapper過程
private Class SqoopMapDataWriter extends DataWriter {
... private void writeContent() { ... context.wirte(writable, NullWritable.get()); // 這裏的writable 是SqoopWritable的一個對象 ... } ...
}
注意:這裏的Context中存的是KV對,K是SqoopWritable,而V僅是一個空的Writable對象。SqoopWritable中實現了write和readField,用於序列化和反序列化。
Reducer
3)做業執行階段的Reduce過程,
public RecordWriter<SqoopWritable, NullWritable> getRecordWriter() {
executorService = Executors.newSingleThreadExecutor(...); consumerFuture = executorService.submit(new ConsumerThread(context)); return writer;
}
private class ConsumerThread implements Runnable {
... public void run() { ... Loader.load(loaderContext, connectorLinkConfig, ConnectorToJobConfig); ... } ...
}
注意:
以上即爲Sqoop做業執行時相關的類及方法內容,但願對你們在進行數據遷移過程當中有所幫助。