sqoop2-數據源

介紹

sqoop2導入數據,是基於mapreduce框架的。sqoop2會將數據分片的信息,而後把分片的信息傳遞給mapreduce框架,每個數據分片對應着一個map任務。java

提交job

MapreduceSubmissionEngine類,實現了job的提交。數據庫

initialize方法,從指定的配置文件夾,讀取job的配置,這裏都是全局配置。app

而後設置與job相關的Driver, 源數據和目標數據的Connector, notification的配置。框架

最後指定Mapper,Input和Output相關的類信息。ide

InputFormat

sqoop2支持的數據源能夠是傳統型的數據庫,kafka等。因爲mapreduce默認只支持文本類型的數據源,因此sqoop2本身編寫了InputFormat。oop

首先介紹一下mapreduce的InputFormat的相關知識。InputFormat表示一個類型的數據源,它負責切分數據,數據分片的元信息用InputSplit表示。而後使用RecordReader根據InputSplit,獲取數據,提供給Mapper。ui

SqoopInputFormat是sqoop2自定義的InputFormat。首先看看它是如何定義數據切片的this

public class SqoopInputFormat extends InputFormat<SqoopSplit, NullWritable> {

  public List<InputSplit> getSplits(JobContext context)
      throws IOException, InterruptedException {
    Configuration conf = context.getConfiguration();
    // 獲取Partitioner類
    String partitionerName = conf.get(MRJobConstants.JOB_ETL_PARTITIONER);
    Partitioner partitioner = (Partitioner) ClassUtils.instantiate(partitionerName);
    // 讀取partition相關的配置
    PrefixContext connectorContext = new PrefixContext(conf, MRJobConstants.PREFIX_CONNECTOR_FROM_CONTEXT);
    Object connectorLinkConfig = MRConfigurationUtils.getConnectorLinkConfig(Direction.FROM, conf);
    Object connectorFromJobConfig = MRConfigurationUtils.getConnectorJobConfig(Direction.FROM, conf);
    Schema fromSchema = MRConfigurationUtils.getConnectorSchema(Direction.FROM, conf);

    long maxPartitions = conf.getLong(MRJobConstants.JOB_ETL_EXTRACTOR_NUM, 10);
    PartitionerContext partitionerContext = new PartitionerContext(connectorContext, maxPartitions, fromSchema);
    // 獲取分區
    List<Partition> partitions = partitioner.getPartitions(partitionerContext, connectorLinkConfig, connectorFromJobConfig);
    // 實例化InputSplit
    List<InputSplit> splits = new LinkedList<InputSplit>();
    for (Partition partition : partitions) {
      LOG.debug("Partition: " + partition);
      SqoopSplit split = new SqoopSplit();
      // 保存Partition
      split.setPartition(partition);
      splits.add(split);
    }

    if(splits.size() > maxPartitions) {
      throw new SqoopException(MRExecutionError.MAPRED_EXEC_0025,
        String.format("Got %d, max was %d", splits.size(), maxPartitions));
    }

    return splits;
  }

數據分片的邏輯是由Partitioner負責的。這裏以GenericJdbcPartitioner爲例,它負責數據庫的切分。線程

GenericJdbcPartitioner

Partitioner會根據要切分字段的類型,實現不一樣的切割方法。debug

public class GenericJdbcPartitioner extends Partitioner<LinkConfiguration, FromJobConfiguration> {
    // 根據int類型切割
  protected List<Partition> partitionIntegerColumn() {
    List<Partition> partitions = new LinkedList<Partition>();
    // 獲取字段的最大值和最小值
    long minValue = partitionMinValue == null ? Long.MIN_VALUE
      : Long.parseLong(partitionMinValue);
    long maxValue = Long.parseLong(partitionMaxValue);
    // 計算分片間隔
    long interval =  (maxValue - minValue) / numberPartitions;
    long remainder = (maxValue - minValue) % numberPartitions;
    // 當差值小於numberPartitions時
    // 舉例說, maxValue = 5,  minValue = 1,  numberPartitions爲10。
    // 那麼numberPartitions最後計算出值爲4
    if (interval == 0) {
      numberPartitions = (int)remainder;
    }

    long lowerBound;
    long upperBound = minValue;
    // 注意變量 i,是從數字1開始的
    for (int i = 1; i < numberPartitions; i++) {
      lowerBound = upperBound;
      upperBound = lowerBound + interval;
      // 將remainder的值,依次添加到前面的分區裏
      upperBound += (i <= remainder) ? 1 : 0;

      GenericJdbcPartition partition = new GenericJdbcPartition();
      partition.setConditions(
          constructConditions(lowerBound, upperBound, false));
      partitions.add(partition);
    }
    // 添加最後一個分區
    GenericJdbcPartition partition = new GenericJdbcPartition();
    partition.setConditions(
        constructConditions(upperBound, maxValue, true));
    partitions.add(partition);

    return partitions;
  }

   // 構建where條件語句, 格式爲 lowerBound <= column < upperBound,或者 lowerBound <= column <= upperBound
  protected String constructConditions(
      Object lowerBound, Object upperBound, boolean lastOne) {
    StringBuilder conditions = new StringBuilder();
    conditions.append(lowerBound);
    conditions.append(" <= ");
    conditions.append(partitionColumnName);
    conditions.append(" AND ");
    conditions.append(partitionColumnName);
    conditions.append(lastOne ? " <= " : " < ");
    conditions.append(upperBound);
    return conditions.toString();
  }

}

首先計算出interval,remainder。 按照每段分區的個數都是interval,多的remainder輪詢的添加到前面的四個分區。 最後一段分區包括maxValue。如下面爲例

maxValue = 23
minValue = 1
numPartition = 5

interval = (23 -1) / 5 = 4
remainder = (23 -1) % 5 = 2

最後切割後的分區爲

起始位置 , 結束位置 , 數目

1 , 6 , 5

6 , 11 , 5

11 , 15 , 4

15 , 19 , 4

19 , 23(包括) , 5

SqoopRecordReader

SqoopRecordReader的initialize方法會在Mapper以前調用,保存了split。它的getCurrentKey方法,返回split。

public class SqoopInputFormat extends InputFormat<SqoopSplit, NullWritable> {

  @Override
  public RecordReader<SqoopSplit, NullWritable> createRecordReader(
      InputSplit split, TaskAttemptContext context) {
    return new SqoopRecordReader();
  }

  public static class SqoopRecordReader
      extends RecordReader<SqoopSplit, NullWritable> {

    private boolean delivered = false;
    private SqoopSplit split = null;

    @Override
    public boolean nextKeyValue() {
      if (delivered) {
        return false;
      } else {
        delivered = true;
        return true;
      }
    }

    @Override
    public SqoopSplit getCurrentKey() {
      return split;
    }

    @Override
    public NullWritable getCurrentValue() {
      return NullWritable.get();
    }

    @Override
    public void close() {
    }

    @Override
    public float getProgress() {
      return delivered ? 1.0f : 0.0f;
    }

    @Override
    public void initialize(InputSplit split, TaskAttemptContext context) {
      this.split = (SqoopSplit)split;
    }
  }

}

SqoopMapper

SqoopMapper實現了mapreduce的Mapper類,它實現了從SqoopSplit獲取分區信息,而後抽取和轉移數據。

public class SqoopMapper extends Mapper<SqoopSplit, NullWritable, SqoopWritable, NullWritable> {

   //彙報進度線程
  private final ScheduledExecutorService progressService = Executors.newSingleThreadScheduledExecutor();
  private IntermediateDataFormat<Object> fromIDF = null;
  private IntermediateDataFormat<Object> toIDF = null;
  private Matcher matcher;

  @SuppressWarnings({ "unchecked", "rawtypes" })
  @Override
  public void run(Context context) throws IOException, InterruptedException {
    Configuration conf = context.getConfiguration();

    String extractorName = conf.get(MRJobConstants.JOB_ETL_EXTRACTOR);
    // 實例化Extractor類
    Extractor extractor = (Extractor) ClassUtils.instantiate(extractorName);
    
    //獲取鏈接信息
    Schema fromSchema = MRConfigurationUtils.getConnectorSchema(Direction.FROM, conf);
    Schema toSchema = MRConfigurationUtils.getConnectorSchema(Direction.TO, conf);
    matcher = MatcherFactory.getMatcher(fromSchema, toSchema);

    String fromIDFClass = conf.get(MRJobConstants.FROM_INTERMEDIATE_DATA_FORMAT);
    fromIDF = (IntermediateDataFormat<Object>) ClassUtils.instantiate(fromIDFClass);
    fromIDF.setSchema(matcher.getFromSchema());
    String toIDFClass = conf.get(MRJobConstants.TO_INTERMEDIATE_DATA_FORMAT);
    toIDF = (IntermediateDataFormat<Object>) ClassUtils.instantiate(toIDFClass);
    toIDF.setSchema(matcher.getToSchema());

    PrefixContext subContext = new PrefixContext(conf, MRJobConstants.PREFIX_CONNECTOR_FROM_CONTEXT);
    Object fromConfig = MRConfigurationUtils.getConnectorLinkConfig(Direction.FROM, conf);
    Object fromJob = MRConfigurationUtils.getConnectorJobConfig(Direction.FROM, conf);
    // 獲取對應的SqoopSplit,裏面包含了Partition信息
    SqoopSplit split = context.getCurrentKey();
    ExtractorContext extractorContext = new ExtractorContext(subContext, new SqoopMapDataWriter(context), fromSchema);

    try {
      LOG.info("Starting progress service");
        // 每2m彙報一次進度
      progressService.scheduleAtFixedRate(new SqoopProgressRunnable(context), 0, 2, TimeUnit.MINUTES);

      LOG.info("Running extractor class " + extractorName);
      // 抽取而且轉換數據
      extractor.extract(extractorContext, fromConfig, fromJob, split.getPartition());
      LOG.info("Extractor has finished");
      // 更新已讀行數
      context.getCounter(SqoopCounters.ROWS_READ).increment(extractor.getRowsRead());
    } catch (Exception e) {
      throw new SqoopException(MRExecutionError.MAPRED_EXEC_0017, e);
    } finally {
      LOG.info("Stopping progress service");
      progressService.shutdown();
      if (!progressService.awaitTermination(5, TimeUnit.SECONDS)) {
        LOG.info("Stopping progress service with shutdownNow");
        progressService.shutdownNow();
      }
    }
  }
}

public class SqoopProgressRunnable implements Runnable {

  public static final Logger LOG = Logger.getLogger(SqoopProgressRunnable.class);
  private final TaskInputOutputContext<?,?,?,?> context;

  public SqoopProgressRunnable(final TaskInputOutputContext<?,?,?,?> ctx) {
    this.context = ctx;
  }

  @Override
  public void run() {
    // 執行彙報progress
    LOG.debug("Auto-progress thread reporting progress");
    this.context.progress();
  }
}

歸納

sqoop2爲了擴展mapreduce的數據源,本身實現了InputFormat類,其中包括了SqoopSplit,SqoopInputFormat, Partitioner。SqoopMapper則從SqoopSplit獲取Partition信息,進行數據轉移。

相關文章
相關標籤/搜索