sqoop2從數據源讀取數據,而後寫入到目的地。sqoop2數據的導入是基於mapreduce的框架,因此sqoop2本身實現了OutputFormat類,支持將結果導入hive,kafka,數據庫等類型。java
public class SqoopNullOutputFormat extends OutputFormat<SqoopWritable, NullWritable> { @Override public RecordWriter<SqoopWritable, NullWritable> getRecordWriter(TaskAttemptContext context) { SqoopOutputFormatLoadExecutor executor = new SqoopOutputFormatLoadExecutor(context); return executor.getRecordWriter(); } }
SqoopNullOutputFormat繼承了OutputFormat,它調用了SqoopOutputFormatLoadExecutor的getRecordWriter方法,返回RecordWriter。sql
toDataFormat存儲要處理的數據。這裏涉及到多線程的讀寫競爭,因此用filled和free兩個信號量來保證。數據庫
public class SqoopOutputFormatLoadExecutor { private volatile boolean readerFinished = false; private volatile boolean writerFinished = false; private volatile IntermediateDataFormat<? extends Object> toDataFormat; private Semaphore filled = new Semaphore(0, true); // 初始值爲0,必須先release private Semaphore free = new Semaphore(1, true); // 初始值爲1,先acquire public RecordWriter<SqoopWritable, NullWritable> getRecordWriter() { // 實例化ConsumerThread線程,處理toDataFormat中的數據 consumerFuture = Executors.newSingleThreadExecutor(new ThreadFactoryBuilder().setNameFormat ("OutputFormatLoader-consumer").build()).submit( new ConsumerThread(context)); return writer; }
SqoopRecordWriter負責toDataFormat的寫,SqoopOutputFormatDataReader負責toDataFormat的讀。多線程
private class SqoopRecordWriter extends RecordWriter<SqoopWritable, NullWritable> { @Override public void write(SqoopWritable key, NullWritable value) throws InterruptedException { // 獲取free鎖,等待讀操做完成 free.acquire(); checkIfConsumerThrew(); toDataFormat.setCSVTextData(key.toString()); // 釋放filled鎖,通知寫操做完成 filled.release(); } } private class SqoopOutputFormatDataReader extends DataReader { @Override public Object[] readArrayRecord() throws InterruptedException { // 獲取filled鎖,等待寫操做完成 acquireSema(); // 若是全部數據已經寫完了,則返回null if (writerFinished) { return null; } try { return toDataFormat.getObjectData(); } finally { // 釋放free鎖,通知讀操做完成 releaseSema(); } } private void acquireSema() throws InterruptedException { try { filled.acquire(); } catch (InterruptedException ex) { throw ex; } } private void releaseSema(){ free.release(); } }
經過filled和free兩個信號量,實現了寫讀的輪詢操做。由於filled的初始值爲0,因此是寫操做在前面。框架
ConsumerThread負責從toDataFormat讀取數據,而後發送到數據目的地。ide
private class ConsumerThread implements Runnable { private final JobContext jobctx; public ConsumerThread(final JobContext context) { jobctx = context; } @SuppressWarnings({ "rawtypes", "unchecked" }) @Override public void run() { LOG.info("SqoopOutputFormatLoadExecutor consumer thread is starting"); try { // 實例化DataReader, 用來讀取toDataFormat的數據 DataReader reader = new SqoopOutputFormatDataReader(); Configuration conf = context.getConfiguration(); // 實例化Loader,負責數據的處理 Loader loader = (Loader) ClassUtils.instantiate(loaderName); PrefixContext subContext = new PrefixContext(conf, MRJobConstants.PREFIX_CONNECTOR_TO_CONTEXT); Object connectorLinkConfig = MRConfigurationUtils .getConnectorLinkConfig(Direction.TO, conf); Object connectorToJobConfig = MRConfigurationUtils .getConnectorJobConfig(Direction.TO, conf); LoaderContext loaderContext = new LoaderContext(subContext, reader, matcher.getToSchema()); LOG.info("Running loader class " + loaderName); // 負責將數據,存儲到目的地 loader.load(loaderContext, connectorLinkConfig, connectorToJobConfig); LOG.info("Loader has finished"); ((TaskAttemptContext) jobctx).getCounter(SqoopCounters.ROWS_WRITTEN).increment( loader.getRowsWritten()); } catch (Throwable t) { // 若是處理數據失敗,會將readerFinished設置爲true // 釋放free鎖,防止另外一個線程一直等待 readerFinished = true; LOG.error("Error while loading data out of MR job.", t); free.release(); throw new SqoopException(MRExecutionError.MAPRED_EXEC_0018, t); } if (!writerFinished) { // 若是讀操做中止,在寫操做以前。說明還有數據是沒有被處理的 readerFinished = true; LOG.error("Reader terminated, but writer is still running!"); free.release(); throw new SqoopException(MRExecutionError.MAPRED_EXEC_0019); } readerFinished = true; } }
Loader有多個子類,支持不一樣的數據源。好比GenericJdbcLoader支持導入到sql數據庫,KafkaLoader支持導入到kafka中等等。下面以GenericJdbcLoader爲例,oop
public class GenericJdbcLoader extends Loader<LinkConfiguration, ToJobConfiguration> { public static final int DEFAULT_ROWS_PER_BATCH = 100; public static final int DEFAULT_BATCHES_PER_TRANSACTION = 100; private int rowsPerBatch = DEFAULT_ROWS_PER_BATCH; private int batchesPerTransaction = DEFAULT_BATCHES_PER_TRANSACTION; private long rowsWritten = 0; @Override public void load(LoaderContext context, LinkConfiguration linkConfig, ToJobConfiguration toJobConfig) throws Exception{ String driver = linkConfig.linkConfig.jdbcDriver; String url = linkConfig.linkConfig.connectionString; String username = linkConfig.linkConfig.username; String password = linkConfig.linkConfig.password; // 實例化JdbcExecutor,用來執行sql語句 GenericJdbcExecutor executor = new GenericJdbcExecutor(driver, url, username, password); executor.setAutoCommit(false); String sql = context.getString(GenericJdbcConnectorConstants.CONNECTOR_JDBC_TO_DATA_SQL); // 開始批處理,寫入數據 executor.beginBatch(sql); try { int numberOfRowsPerBatch = 0; int numberOfBatchesPerTransaction = 0; Object[] array; // 從dataReader循環獲取數據。直到寫操做中止。 while ((array = context.getDataReader().readArrayRecord()) != null) { numberOfRowsPerBatch++; executor.addBatch(array, context.getSchema()); // 每次批處理爲100條數據。每次提交100個批處理。 if (numberOfRowsPerBatch == rowsPerBatch) { numberOfBatchesPerTransaction++; if (numberOfBatchesPerTransaction == batchesPerTransaction) { executor.executeBatch(true); numberOfBatchesPerTransaction = 0; } else { executor.executeBatch(false); } numberOfRowsPerBatch = 0; } rowsWritten ++; } if (numberOfRowsPerBatch != 0 || numberOfBatchesPerTransaction != 0) { // 提交剩下的批處理 executor.executeBatch(true); } executor.endBatch(); } finally { executor.close(); } } @Override public long getRowsWritten() { return rowsWritten; } }
private class SqoopRecordWriter extends RecordWriter<SqoopWritable, NullWritable> { @Override public void close(TaskAttemptContext context) throws InterruptedException, IOException { // 獲取free鎖,等待讀操做完成 free.acquire(); //設置 writerFinished爲true writerFinished = true; // 釋放filled,通知讀線程 filled.release(); // 等待Consume線程完成 waitForConsumer(); LOG.info("SqoopOutputFormatLoadExecutor::SqoopRecordWriter is closed"); } } private void waitForConsumer() { try { // 獲取future的值 consumerFuture.get(); } catch (ExecutionException ex) { Throwable t = ex.getCause(); if (t instanceof SqoopException) { throw (SqoopException) t; } Throwables.propagate(t); } catch (Exception ex) { throw new SqoopException(MRExecutionError.MAPRED_EXEC_0019, ex); } }
sqoop2本身實現了OutputFormat,以支持mapreduce的自定義輸出。ui
OutputFormat採用了多線程的框架。一個線程負責讀取mapreduce的輸出,一個線程負責對讀取的數據,進行處理,存儲到目的地。url