MapReduce處理數據過程主要分紅2個階段:Map階段和Reduce階段。首先執行Map階段,再執行Reduce階段。Map和Reduce的處理邏輯由用戶自定義實現, 但要符合MapReduce框架的約定。java
ODPS MapReduce的輸入和輸出是ODPS表或者分區(不容許用戶自定輸入輸出格式,不提供相似文件系統的接口)app
下面以WordCount爲例,詳解ODPS的MapReduce的執行過程。輸入表只有一列,每條記錄是一個單詞,要求統計單詞出現次數,寫入另一張表中(兩列,一列單詞,一列次數)框架
代碼實現:ide
一、Mapper實現函數
package mr.test; import java.io.IOException; import com.aliyun.odps.data.Record; import com.aliyun.odps.mapred.MapperBase; public class WCMapper extends MapperBase { private Record word; private Record one; @Override public void setup(TaskContext context) throws IOException { word = context.createMapOutputKeyRecord(); one = context.createMapOutputValueRecord(); one.set(new Object[] { 1L }); } @Override public void map(long recordNum, Record record, TaskContext context) throws IOException { System.out.println("recordNum:"+recordNum); for (int i = 0; i < record.getColumnCount(); i++) { String[] words = record.get(i).toString().split("\\s+"); for (String w : words) { word.set(new Object[] { w }); context.write(word, one); } } } @Override public void cleanup(TaskContext context) throws IOException { } }
1)先經過setup()初始化Record實例,setup()只執行一次。ui
2)map()函數多每條Record都會調用一次,在這個函數內,循環遍歷Record字段,對每一個字段經過正則切分,而後輸出<word,one>這種鍵值對。code
3)setup()和map()都會傳入一個TaskContext實例,保存MR任務運行時的上下文信息。blog
4)cleanup()執行收尾工做排序
二、Reducer實現接口
package mr.test; import java.io.IOException; import java.util.Iterator; import com.aliyun.odps.data.Record; import com.aliyun.odps.mapred.ReducerBase; public class WCReducer extends ReducerBase { private Record result; @Override public void setup(TaskContext context) throws IOException { result = context.createOutputRecord(); } @Override public void reduce(Record key, Iterator<Record> values, TaskContext context) throws IOException { long count = 0; while (values.hasNext()) { Record val = values.next(); count += (Long) val.get(0); } result.set(0, key.get(0)); result.set(1, count); context.write(result); } @Override public void cleanup(TaskContext context) throws IOException { } }
1)一樣reduce也提供setup和cleanup方法。
2)reduce函數是對每一個key調用一次,在函數內,它遍歷同一個key的不一樣值,對其進行累加操做,而後生成結果Record輸出。
2)注意:map和reduce在輸出上的區別爲:Reduce調用的context.write(result)輸出結果Record;Map是調用context.write(word,one)輸出鍵值對形式。
三、Driver實現
package mr.test; import com.aliyun.odps.OdpsException; import com.aliyun.odps.data.TableInfo; import com.aliyun.odps.mapred.JobClient; import com.aliyun.odps.mapred.RunningJob; import com.aliyun.odps.mapred.conf.JobConf; import com.aliyun.odps.mapred.utils.InputUtils; import com.aliyun.odps.mapred.utils.OutputUtils; import com.aliyun.odps.mapred.utils.SchemaUtils; public class WCDriver { public static void main(String[] args) throws OdpsException { if(args.length !=2){ System.out.println("參數錯誤"); System.exit(2); } JobConf job = new JobConf(); job.setMapOutputKeySchema(SchemaUtils.fromString("word:string")); job.setMapOutputValueSchema(SchemaUtils.fromString("count:bigint")); InputUtils.addTable(TableInfo.builder().tableName(args[0]).build(), job); OutputUtils.addTable(TableInfo.builder().tableName(args[1]).build(), job); job.setMapperClass(WCMapper.class); job.setReducerClass(WCReducer.class); RunningJob rj = JobClient.runJob(job); rj.waitForCompletion(); } }
1)先初始化一個JobConf實例
2)指定運行的Maper、Reduce類:
job.setMapperClass(WCMapper.class); job.setReducerClass(WCReducer.class);
3)設置map的輸出key和value的類型:
job.setMapOutputKeySchema(SchemaUtils.fromString("word:string")); job.setMapOutputValueSchema(SchemaUtils.fromString("count:bigint"));