sqoop2 - 數據導出

介紹

sqoop2從數據源讀取數據,而後寫入到目的地。sqoop2數據的導入是基於mapreduce的框架,因此sqoop2本身實現了OutputFormat類,支持將結果導入hive,kafka,數據庫等類型。java

OutputFormat

public class SqoopNullOutputFormat extends OutputFormat<SqoopWritable, NullWritable> {

  @Override
  public RecordWriter<SqoopWritable, NullWritable> getRecordWriter(TaskAttemptContext context) {
    SqoopOutputFormatLoadExecutor executor = new SqoopOutputFormatLoadExecutor(context);
    return executor.getRecordWriter();
  }
}

SqoopNullOutputFormat繼承了OutputFormat,它調用了SqoopOutputFormatLoadExecutor的getRecordWriter方法,返回RecordWriter。sql

SqoopOutputFormatLoadExecutor

toDataFormat存儲要處理的數據。這裏涉及到多線程的讀寫競爭,因此用filled和free兩個信號量來保證。數據庫

public class SqoopOutputFormatLoadExecutor {

  private volatile boolean readerFinished = false;
  private volatile boolean writerFinished = false;
  private volatile IntermediateDataFormat<? extends Object> toDataFormat;
  private Semaphore filled = new Semaphore(0, true);    // 初始值爲0,必須先release
  private Semaphore free = new Semaphore(1, true);    // 初始值爲1,先acquire


  public RecordWriter<SqoopWritable, NullWritable> getRecordWriter() {
    // 實例化ConsumerThread線程,處理toDataFormat中的數據
    consumerFuture = Executors.newSingleThreadExecutor(new ThreadFactoryBuilder().setNameFormat
        ("OutputFormatLoader-consumer").build()).submit(
            new ConsumerThread(context));
    return writer;
  }

toDataFormat的讀寫

SqoopRecordWriter負責toDataFormat的寫,SqoopOutputFormatDataReader負責toDataFormat的讀。多線程

private class SqoopRecordWriter extends RecordWriter<SqoopWritable, NullWritable> {

    @Override
    public void write(SqoopWritable key, NullWritable value) throws InterruptedException {
       // 獲取free鎖,等待讀操做完成
      free.acquire();
      checkIfConsumerThrew();
      toDataFormat.setCSVTextData(key.toString());
      // 釋放filled鎖,通知寫操做完成
      filled.release();
    }
  }

  private class SqoopOutputFormatDataReader extends DataReader {

    @Override
    public Object[] readArrayRecord() throws InterruptedException {
      // 獲取filled鎖,等待寫操做完成
      acquireSema();
      // 若是全部數據已經寫完了,則返回null
      if (writerFinished) {
        return null;
      }
      try {
        return toDataFormat.getObjectData();
      } finally {
        // 釋放free鎖,通知讀操做完成
        releaseSema();
      }
    }

    private void acquireSema() throws InterruptedException {
      try {
        filled.acquire();
      } catch (InterruptedException ex) {
        throw ex;
      }
    }

    private void releaseSema(){
      free.release();
    }
  }

經過filled和free兩個信號量,實現了寫讀的輪詢操做。由於filled的初始值爲0,因此是寫操做在前面。框架

ConsumerThread

ConsumerThread負責從toDataFormat讀取數據,而後發送到數據目的地。ide

private class ConsumerThread implements Runnable {

    private final JobContext jobctx;

    public ConsumerThread(final JobContext context) {
      jobctx = context;
    }

    @SuppressWarnings({ "rawtypes", "unchecked" })
    @Override
    public void run() {
      LOG.info("SqoopOutputFormatLoadExecutor consumer thread is starting");
      try {
        // 實例化DataReader, 用來讀取toDataFormat的數據
        DataReader reader = new SqoopOutputFormatDataReader();
        Configuration conf = context.getConfiguration();
        // 實例化Loader,負責數據的處理
        Loader loader = (Loader) ClassUtils.instantiate(loaderName);

        PrefixContext subContext = new PrefixContext(conf,
            MRJobConstants.PREFIX_CONNECTOR_TO_CONTEXT);
        Object connectorLinkConfig = MRConfigurationUtils
            .getConnectorLinkConfig(Direction.TO, conf);
        Object connectorToJobConfig = MRConfigurationUtils
            .getConnectorJobConfig(Direction.TO, conf);

        LoaderContext loaderContext = new LoaderContext(subContext, reader, matcher.getToSchema());

        LOG.info("Running loader class " + loaderName);
        // 負責將數據,存儲到目的地
        loader.load(loaderContext, connectorLinkConfig, connectorToJobConfig);
        LOG.info("Loader has finished");
        ((TaskAttemptContext) jobctx).getCounter(SqoopCounters.ROWS_WRITTEN).increment(
            loader.getRowsWritten());

      } catch (Throwable t) {
        // 若是處理數據失敗,會將readerFinished設置爲true
        // 釋放free鎖,防止另外一個線程一直等待
        readerFinished = true;
        LOG.error("Error while loading data out of MR job.", t);
        free.release();
        throw new SqoopException(MRExecutionError.MAPRED_EXEC_0018, t);
      }

      if (!writerFinished) {
        // 若是讀操做中止,在寫操做以前。說明還有數據是沒有被處理的
        readerFinished = true;
        LOG.error("Reader terminated, but writer is still running!");
        free.release();
        throw new SqoopException(MRExecutionError.MAPRED_EXEC_0019);
      }

      readerFinished = true;
    }
  }

GenericJdbcLoader

Loader有多個子類,支持不一樣的數據源。好比GenericJdbcLoader支持導入到sql數據庫,KafkaLoader支持導入到kafka中等等。下面以GenericJdbcLoader爲例,oop

public class GenericJdbcLoader extends Loader<LinkConfiguration, ToJobConfiguration> {

  public static final int DEFAULT_ROWS_PER_BATCH = 100;
  public static final int DEFAULT_BATCHES_PER_TRANSACTION = 100;
  private int rowsPerBatch = DEFAULT_ROWS_PER_BATCH;
  private int batchesPerTransaction = DEFAULT_BATCHES_PER_TRANSACTION;
  private long rowsWritten = 0;

  @Override
  public void load(LoaderContext context, LinkConfiguration linkConfig, ToJobConfiguration toJobConfig) throws Exception{
    String driver = linkConfig.linkConfig.jdbcDriver;
    String url = linkConfig.linkConfig.connectionString;
    String username = linkConfig.linkConfig.username;
    String password = linkConfig.linkConfig.password;
    // 實例化JdbcExecutor,用來執行sql語句
    GenericJdbcExecutor executor = new GenericJdbcExecutor(driver, url, username, password);
    executor.setAutoCommit(false);
    String sql = context.getString(GenericJdbcConnectorConstants.CONNECTOR_JDBC_TO_DATA_SQL);
    // 開始批處理,寫入數據
    executor.beginBatch(sql);
    try {
      int numberOfRowsPerBatch = 0;
      int numberOfBatchesPerTransaction = 0;
      Object[] array;
      
      // 從dataReader循環獲取數據。直到寫操做中止。
      while ((array = context.getDataReader().readArrayRecord()) != null) {
        numberOfRowsPerBatch++;
        executor.addBatch(array, context.getSchema());

        // 每次批處理爲100條數據。每次提交100個批處理。
        if (numberOfRowsPerBatch == rowsPerBatch) {
          numberOfBatchesPerTransaction++;
          if (numberOfBatchesPerTransaction == batchesPerTransaction) {
            executor.executeBatch(true);
            numberOfBatchesPerTransaction = 0;
          } else {
            executor.executeBatch(false);
          }
          numberOfRowsPerBatch = 0;
        }
        rowsWritten ++;
      }

      if (numberOfRowsPerBatch != 0 || numberOfBatchesPerTransaction != 0) {
        // 提交剩下的批處理
        executor.executeBatch(true);
      }

      executor.endBatch();

    } finally {
      executor.close();
    }
  }

  @Override
  public long getRowsWritten() {
    return rowsWritten;
  }

}

讀寫關閉

private class SqoopRecordWriter extends RecordWriter<SqoopWritable, NullWritable> {

    @Override
    public void close(TaskAttemptContext context)
            throws InterruptedException, IOException {
      // 獲取free鎖,等待讀操做完成
      free.acquire();
      //設置 writerFinished爲true
      writerFinished = true;
      // 釋放filled,通知讀線程
      filled.release();
      // 等待Consume線程完成
      waitForConsumer();
      LOG.info("SqoopOutputFormatLoadExecutor::SqoopRecordWriter is closed");
    }
  }

  private void waitForConsumer() {
    try {
       // 獲取future的值
      consumerFuture.get();
    } catch (ExecutionException ex) {
      Throwable t = ex.getCause();
      if (t instanceof SqoopException) {
        throw (SqoopException) t;
      }
      Throwables.propagate(t);
    } catch (Exception ex) {
      throw new SqoopException(MRExecutionError.MAPRED_EXEC_0019, ex);
    }
  }

歸納

sqoop2本身實現了OutputFormat,以支持mapreduce的自定義輸出。ui

OutputFormat採用了多線程的框架。一個線程負責讀取mapreduce的輸出,一個線程負責對讀取的數據,進行處理,存儲到目的地。url

相關文章
相關標籤/搜索