sqoop2導入數據,是基於mapreduce框架的。sqoop2會將數據分片的信息,而後把分片的信息傳遞給mapreduce框架,每個數據分片對應着一個map任務。java
MapreduceSubmissionEngine類,實現了job的提交。數據庫
initialize方法,從指定的配置文件夾,讀取job的配置,這裏都是全局配置。app
而後設置與job相關的Driver, 源數據和目標數據的Connector, notification的配置。框架
最後指定Mapper,Input和Output相關的類信息。ide
sqoop2支持的數據源能夠是傳統型的數據庫,kafka等。因爲mapreduce默認只支持文本類型的數據源,因此sqoop2本身編寫了InputFormat。oop
首先介紹一下mapreduce的InputFormat的相關知識。InputFormat表示一個類型的數據源,它負責切分數據,數據分片的元信息用InputSplit表示。而後使用RecordReader根據InputSplit,獲取數據,提供給Mapper。ui
SqoopInputFormat是sqoop2自定義的InputFormat。首先看看它是如何定義數據切片的this
public class SqoopInputFormat extends InputFormat<SqoopSplit, NullWritable> { public List<InputSplit> getSplits(JobContext context) throws IOException, InterruptedException { Configuration conf = context.getConfiguration(); // 獲取Partitioner類 String partitionerName = conf.get(MRJobConstants.JOB_ETL_PARTITIONER); Partitioner partitioner = (Partitioner) ClassUtils.instantiate(partitionerName); // 讀取partition相關的配置 PrefixContext connectorContext = new PrefixContext(conf, MRJobConstants.PREFIX_CONNECTOR_FROM_CONTEXT); Object connectorLinkConfig = MRConfigurationUtils.getConnectorLinkConfig(Direction.FROM, conf); Object connectorFromJobConfig = MRConfigurationUtils.getConnectorJobConfig(Direction.FROM, conf); Schema fromSchema = MRConfigurationUtils.getConnectorSchema(Direction.FROM, conf); long maxPartitions = conf.getLong(MRJobConstants.JOB_ETL_EXTRACTOR_NUM, 10); PartitionerContext partitionerContext = new PartitionerContext(connectorContext, maxPartitions, fromSchema); // 獲取分區 List<Partition> partitions = partitioner.getPartitions(partitionerContext, connectorLinkConfig, connectorFromJobConfig); // 實例化InputSplit List<InputSplit> splits = new LinkedList<InputSplit>(); for (Partition partition : partitions) { LOG.debug("Partition: " + partition); SqoopSplit split = new SqoopSplit(); // 保存Partition split.setPartition(partition); splits.add(split); } if(splits.size() > maxPartitions) { throw new SqoopException(MRExecutionError.MAPRED_EXEC_0025, String.format("Got %d, max was %d", splits.size(), maxPartitions)); } return splits; }
數據分片的邏輯是由Partitioner負責的。這裏以GenericJdbcPartitioner爲例,它負責數據庫的切分。線程
Partitioner會根據要切分字段的類型,實現不一樣的切割方法。debug
public class GenericJdbcPartitioner extends Partitioner<LinkConfiguration, FromJobConfiguration> { // 根據int類型切割 protected List<Partition> partitionIntegerColumn() { List<Partition> partitions = new LinkedList<Partition>(); // 獲取字段的最大值和最小值 long minValue = partitionMinValue == null ? Long.MIN_VALUE : Long.parseLong(partitionMinValue); long maxValue = Long.parseLong(partitionMaxValue); // 計算分片間隔 long interval = (maxValue - minValue) / numberPartitions; long remainder = (maxValue - minValue) % numberPartitions; // 當差值小於numberPartitions時 // 舉例說, maxValue = 5, minValue = 1, numberPartitions爲10。 // 那麼numberPartitions最後計算出值爲4 if (interval == 0) { numberPartitions = (int)remainder; } long lowerBound; long upperBound = minValue; // 注意變量 i,是從數字1開始的 for (int i = 1; i < numberPartitions; i++) { lowerBound = upperBound; upperBound = lowerBound + interval; // 將remainder的值,依次添加到前面的分區裏 upperBound += (i <= remainder) ? 1 : 0; GenericJdbcPartition partition = new GenericJdbcPartition(); partition.setConditions( constructConditions(lowerBound, upperBound, false)); partitions.add(partition); } // 添加最後一個分區 GenericJdbcPartition partition = new GenericJdbcPartition(); partition.setConditions( constructConditions(upperBound, maxValue, true)); partitions.add(partition); return partitions; } // 構建where條件語句, 格式爲 lowerBound <= column < upperBound,或者 lowerBound <= column <= upperBound protected String constructConditions( Object lowerBound, Object upperBound, boolean lastOne) { StringBuilder conditions = new StringBuilder(); conditions.append(lowerBound); conditions.append(" <= "); conditions.append(partitionColumnName); conditions.append(" AND "); conditions.append(partitionColumnName); conditions.append(lastOne ? " <= " : " < "); conditions.append(upperBound); return conditions.toString(); } }
首先計算出interval,remainder。 按照每段分區的個數都是interval,多的remainder輪詢的添加到前面的四個分區。 最後一段分區包括maxValue。如下面爲例
maxValue = 23 minValue = 1 numPartition = 5 interval = (23 -1) / 5 = 4 remainder = (23 -1) % 5 = 2
最後切割後的分區爲
起始位置 , 結束位置 , 數目
1 , 6 , 5
6 , 11 , 5
11 , 15 , 4
15 , 19 , 4
19 , 23(包括) , 5
SqoopRecordReader的initialize方法會在Mapper以前調用,保存了split。它的getCurrentKey方法,返回split。
public class SqoopInputFormat extends InputFormat<SqoopSplit, NullWritable> { @Override public RecordReader<SqoopSplit, NullWritable> createRecordReader( InputSplit split, TaskAttemptContext context) { return new SqoopRecordReader(); } public static class SqoopRecordReader extends RecordReader<SqoopSplit, NullWritable> { private boolean delivered = false; private SqoopSplit split = null; @Override public boolean nextKeyValue() { if (delivered) { return false; } else { delivered = true; return true; } } @Override public SqoopSplit getCurrentKey() { return split; } @Override public NullWritable getCurrentValue() { return NullWritable.get(); } @Override public void close() { } @Override public float getProgress() { return delivered ? 1.0f : 0.0f; } @Override public void initialize(InputSplit split, TaskAttemptContext context) { this.split = (SqoopSplit)split; } } }
SqoopMapper實現了mapreduce的Mapper類,它實現了從SqoopSplit獲取分區信息,而後抽取和轉移數據。
public class SqoopMapper extends Mapper<SqoopSplit, NullWritable, SqoopWritable, NullWritable> { //彙報進度線程 private final ScheduledExecutorService progressService = Executors.newSingleThreadScheduledExecutor(); private IntermediateDataFormat<Object> fromIDF = null; private IntermediateDataFormat<Object> toIDF = null; private Matcher matcher; @SuppressWarnings({ "unchecked", "rawtypes" }) @Override public void run(Context context) throws IOException, InterruptedException { Configuration conf = context.getConfiguration(); String extractorName = conf.get(MRJobConstants.JOB_ETL_EXTRACTOR); // 實例化Extractor類 Extractor extractor = (Extractor) ClassUtils.instantiate(extractorName); //獲取鏈接信息 Schema fromSchema = MRConfigurationUtils.getConnectorSchema(Direction.FROM, conf); Schema toSchema = MRConfigurationUtils.getConnectorSchema(Direction.TO, conf); matcher = MatcherFactory.getMatcher(fromSchema, toSchema); String fromIDFClass = conf.get(MRJobConstants.FROM_INTERMEDIATE_DATA_FORMAT); fromIDF = (IntermediateDataFormat<Object>) ClassUtils.instantiate(fromIDFClass); fromIDF.setSchema(matcher.getFromSchema()); String toIDFClass = conf.get(MRJobConstants.TO_INTERMEDIATE_DATA_FORMAT); toIDF = (IntermediateDataFormat<Object>) ClassUtils.instantiate(toIDFClass); toIDF.setSchema(matcher.getToSchema()); PrefixContext subContext = new PrefixContext(conf, MRJobConstants.PREFIX_CONNECTOR_FROM_CONTEXT); Object fromConfig = MRConfigurationUtils.getConnectorLinkConfig(Direction.FROM, conf); Object fromJob = MRConfigurationUtils.getConnectorJobConfig(Direction.FROM, conf); // 獲取對應的SqoopSplit,裏面包含了Partition信息 SqoopSplit split = context.getCurrentKey(); ExtractorContext extractorContext = new ExtractorContext(subContext, new SqoopMapDataWriter(context), fromSchema); try { LOG.info("Starting progress service"); // 每2m彙報一次進度 progressService.scheduleAtFixedRate(new SqoopProgressRunnable(context), 0, 2, TimeUnit.MINUTES); LOG.info("Running extractor class " + extractorName); // 抽取而且轉換數據 extractor.extract(extractorContext, fromConfig, fromJob, split.getPartition()); LOG.info("Extractor has finished"); // 更新已讀行數 context.getCounter(SqoopCounters.ROWS_READ).increment(extractor.getRowsRead()); } catch (Exception e) { throw new SqoopException(MRExecutionError.MAPRED_EXEC_0017, e); } finally { LOG.info("Stopping progress service"); progressService.shutdown(); if (!progressService.awaitTermination(5, TimeUnit.SECONDS)) { LOG.info("Stopping progress service with shutdownNow"); progressService.shutdownNow(); } } } } public class SqoopProgressRunnable implements Runnable { public static final Logger LOG = Logger.getLogger(SqoopProgressRunnable.class); private final TaskInputOutputContext<?,?,?,?> context; public SqoopProgressRunnable(final TaskInputOutputContext<?,?,?,?> ctx) { this.context = ctx; } @Override public void run() { // 執行彙報progress LOG.debug("Auto-progress thread reporting progress"); this.context.progress(); } }
sqoop2爲了擴展mapreduce的數據源,本身實現了InputFormat類,其中包括了SqoopSplit,SqoopInputFormat, Partitioner。SqoopMapper則從SqoopSplit獲取Partition信息,進行數據轉移。