1) InputFormatClassmysql
com.cloudera.sqoop.mapreduce.db.DataDrivenDBInputFormatsql
2) OutputFormatClass1)TextFile apache
com.cloudera.sqoop.mapreduce.RawKeyTextOutputFormat微信
2)SequenceFile app
org.apache.hadoop.mapreduce.lib.output.SequenceFileOutputFormatoop
3)AvroDataFile fetch
com.cloudera.sqoop.mapreduce.AvroOutputFormaturl
3)Mapper1)TextFile spa
com.cloudera.sqoop.mapreduce.TextImportMapper 命令行
2)SequenceFile
com.cloudera.sqoop.mapreduce.SequenceFileImportMapper
3)AvroDataFile
com.cloudera.sqoop.mapreduce.AvroImportMapper
4)taskNumbers
1)mapred.map.tasks(對應num-mappers參數)
2)job.setNumReduceTasks(0);
這裏以命令行:import –connectjdbc:mysql://localhost/test –username root –password 123456 –query「select sqoop_1.id as foo_id, sqoop_2.id as bar_id from sqoop_1,sqoop_2 WHERE $CONDITIONS」 –target-dir /user/sqoop/test -split-bysqoop_1.id –hadoop-home=/home/hdfs/hadoop-0.20.2-CDH3B3 –num-mappers2
注:紅色部分參數,後接根據命令衍生的參數值
1)設置Input
DataDrivenImportJob.configureInputFormat(Jobjob, String tableName,String tableClassName, String splitByCol)
a)DBConfiguration.configureDB(Configurationconf, String driverClass,
String dbUrl,String userName, String passwd, Integer fetchSize)
1).mapreduce.jdbc.driver.classcom.mysql.jdbc.Driver
2).mapreduce.jdbc.url jdbc:mysql://localhost/test
3).mapreduce.jdbc.username root
4).mapreduce.jdbc.password 123456
5).mapreduce.jdbc.fetchsize -2147483648
b)DataDrivenDBInputFormat.setInput(Jobjob,Class<? extends DBWritable> inputClass, String inputQuery, StringinputBoundingQuery)
1)job.setInputFormatClass(DBInputFormat.class);
2)mapred.jdbc.input.bounding.querySELECT MIN(sqoop_1.id), MAX(sqoop_2.id) FROM (select sqoop_1.id as foo_id,sqoop_2.id as bar_id from sqoop_1 ,sqoop_2 WHERE (1 = 1)) AS t1
3)job.setInputFormatClass(com.cloudera.sqoop.mapreduce.db.DataDrivenDBInputFormat.class);
4)mapreduce.jdbc.input.orderbysqoop_1.id
c)mapreduce.jdbc.input.class QueryResult
d)sqoop.inline.lob.length.max 16777216
2)設置Output
ImportJobBase.configureOutputFormat(Jobjob, String tableName,String tableClassName)
a)job.setOutputFormatClass(getOutputFormatClass()); b)FileOutputFormat.setOutputCompressorClass(job, codecClass);
c)SequenceFileOutputFormat.setOutputCompressionType(job,CompressionType.BLOCK);
d)FileOutputFormat.setOutputPath(job,outputPath);
3)設置Map
DataDrivenImportJob.configureMapper(Job job,String tableName,String tableClassName)
a)job.setOutputKeyClass(Text.class);
b)job.setOutputValueClass(NullWritable.class);
c)job.setMapperClass(com.cloudera.sqoop.mapreduce.TextImportMapper);
4)設置task number
JobBase.configureNumTasks(Job job)
mapred.map.tasks 4
job.setNumReduceTasks(0);
更多精彩內容請關注:http://bbs.superwu.cn
關注超人學院微信二維碼: