mapreduce job所須要的各類參數在Sqoop中的實現

1) InputFormatClassmysql

com.cloudera.sqoop.mapreduce.db.DataDrivenDBInputFormatsql

2) OutputFormatClass1)TextFile apache

com.cloudera.sqoop.mapreduce.RawKeyTextOutputFormat微信

2)SequenceFile app

org.apache.hadoop.mapreduce.lib.output.SequenceFileOutputFormatoop

3)AvroDataFile fetch

com.cloudera.sqoop.mapreduce.AvroOutputFormaturl

3)Mapper1)TextFile spa

com.cloudera.sqoop.mapreduce.TextImportMapper               命令行

2)SequenceFile

com.cloudera.sqoop.mapreduce.SequenceFileImportMapper      

3)AvroDataFile

com.cloudera.sqoop.mapreduce.AvroImportMapper

4)taskNumbers

1)mapred.map.tasks(對應num-mappers參數)   

2)job.setNumReduceTasks(0);

這裏以命令行:import –connectjdbc:mysql://localhost/test  –username root –password 123456 –query「select sqoop_1.id as foo_id, sqoop_2.id as bar_id from sqoop_1,sqoop_2  WHERE $CONDITIONS」 –target-dir /user/sqoop/test -split-bysqoop_1.id   –hadoop-home=/home/hdfs/hadoop-0.20.2-CDH3B3  –num-mappers2

注:紅色部分參數,後接根據命令衍生的參數值

1)設置Input

DataDrivenImportJob.configureInputFormat(Jobjob, String tableName,String tableClassName, String splitByCol)

a)DBConfiguration.configureDB(Configurationconf, String driverClass,

     String dbUrl,String userName, String passwd, Integer fetchSize)

1).mapreduce.jdbc.driver.classcom.mysql.jdbc.Driver

2).mapreduce.jdbc.url  jdbc:mysql://localhost/test            

3).mapreduce.jdbc.username  root

4).mapreduce.jdbc.password  123456

5).mapreduce.jdbc.fetchsize -2147483648

b)DataDrivenDBInputFormat.setInput(Jobjob,Class<? extends DBWritable> inputClass, String inputQuery, StringinputBoundingQuery)

1)job.setInputFormatClass(DBInputFormat.class);               

2)mapred.jdbc.input.bounding.querySELECT MIN(sqoop_1.id), MAX(sqoop_2.id) FROM (select sqoop_1.id as foo_id,sqoop_2.id as bar_id from sqoop_1 ,sqoop_2  WHERE  (1 = 1)) AS t1

3)job.setInputFormatClass(com.cloudera.sqoop.mapreduce.db.DataDrivenDBInputFormat.class);

4)mapreduce.jdbc.input.orderbysqoop_1.id

c)mapreduce.jdbc.input.class QueryResult

d)sqoop.inline.lob.length.max 16777216

2)設置Output

ImportJobBase.configureOutputFormat(Jobjob, String tableName,String tableClassName)

a)job.setOutputFormatClass(getOutputFormatClass());              b)FileOutputFormat.setOutputCompressorClass(job, codecClass);

c)SequenceFileOutputFormat.setOutputCompressionType(job,CompressionType.BLOCK);

d)FileOutputFormat.setOutputPath(job,outputPath);

3)設置Map

DataDrivenImportJob.configureMapper(Job job,String tableName,String tableClassName)

    a)job.setOutputKeyClass(Text.class);
     b)job.setOutputValueClass(NullWritable.class);
c)job.setMapperClass(com.cloudera.sqoop.mapreduce.TextImportMapper);

4)設置task number

JobBase.configureNumTasks(Job job)

mapred.map.tasks 4

job.setNumReduceTasks(0);

更多精彩內容請關注:http://bbs.superwu.cn

關注超人學院微信二維碼:

相關文章
相關標籤/搜索