【ODPS】MapReduce基礎

MapReduce處理數據過程主要分紅2個階段:Map階段和Reduce階段。首先執行Map階段,再執行Reduce階段。Map和Reduce的處理邏輯由用戶自定義實現, 但要符合MapReduce框架的約定。java

  • 在正式執行Map前,須要將輸入數據進行」分片」。所謂分片,就是將輸入數據切分爲大小相等的數據塊,每一塊做爲單個Map Worker的輸入被處理, 以便於多個Map Worker同時工做。
  • 分片完畢後,多個Map Worker就能夠同時工做了。每一個Map Worker在讀入各自的數據後,進行計算處理,最終輸出給Reduce。Map Worker在輸出數據時, 須要爲每一條輸出數據指定一個Key。這個Key值決定了這條數據將會被髮送給哪個Reduce Worker。Key值和Reduce Worker是多對一的關係, 具備相同Key的數據會被髮送給同一個Reduce Worker,單個Reduce Worker有可能會接收到多個Key值的數據。
  • 在進入Reduce階段以前,MapReduce框架會對數據按照Key值排序,使得具備相同Key的數據彼此相鄰。若是用戶指定了」合併操做」(Combiner), 框架會調用Combiner,將具備相同Key的數據進行聚合。Combiner的邏輯能夠由用戶自定義實現。與經典的MapReduce框架協議不一樣,在ODPS中, Combiner的輸入、輸出的參數必須與Reduce保持一致。這部分的處理一般也叫作」洗牌」(Shuffle)。
  • 接下來進入Reduce階段。相同的Key的數據會到達同一個Reduce Worker。同一個Reduce Worker會接收來自多個Map Worker的數據。 每一個Reduce Worker會對Key相同的多個數據進行Reduce操做。最後,一個Key的多條數據通過Reduce的做用後,將變成了一個值。

ODPS MapReduce的輸入和輸出是ODPS表或者分區(不容許用戶自定輸入輸出格式,不提供相似文件系統的接口)app

下面以WordCount爲例,詳解ODPS的MapReduce的執行過程。輸入表只有一列,每條記錄是一個單詞,要求統計單詞出現次數,寫入另一張表中(兩列,一列單詞,一列次數)框架

  • Map處理輸入,每獲取一個單詞,將單詞的Count設置爲1,並將此<Word, Count>對輸出,此時以Word做爲輸出數據的Key;
  • 在Shuffle階段前期,首先對每一個Map Worker的輸出,按照Key值,即Word值排序。排序後進行Combine操做,即將Key值(Word值)相同的Count累加, 構成一個新的<Word, Count>對。此過程被稱爲合併排序;
  • 在Shuffle階段後期,數據被髮送到Reduce端。Reduce Worker收到數據後依賴Key值再次對數據排序;
  • 每一個Reduce Worker對數據進行處理時,採用與Combiner相同的邏輯,將Key值(Word值)相同的Count累加,獲得輸出結果;

代碼實現:ide

一、Mapper實現函數

package mr.test;
 
import java.io.IOException;
 
import com.aliyun.odps.data.Record;
import com.aliyun.odps.mapred.MapperBase;
 
public class WCMapper extends MapperBase {
	private Record word;
	private Record one;
 
	@Override
	public void setup(TaskContext context) throws IOException {
		word = context.createMapOutputKeyRecord();
		one = context.createMapOutputValueRecord();
		one.set(new Object[] { 1L });
	}
 
	@Override
	public void map(long recordNum, Record record, TaskContext context)
			throws IOException {
			
		System.out.println("recordNum:"+recordNum);
		for (int i = 0; i < record.getColumnCount(); i++) {
			String[] words = record.get(i).toString().split("\\s+");
			for (String w : words) {
				word.set(new Object[] { w });
				context.write(word, one);
			}
		}
	}
 
	@Override
	public void cleanup(TaskContext context) throws IOException {
	}
 
}

1)先經過setup()初始化Record實例,setup()只執行一次。ui

2)map()函數多每條Record都會調用一次,在這個函數內,循環遍歷Record字段,對每一個字段經過正則切分,而後輸出<word,one>這種鍵值對。code

3)setup()和map()都會傳入一個TaskContext實例,保存MR任務運行時的上下文信息。blog

4)cleanup()執行收尾工做排序

 

二、Reducer實現接口

package mr.test;
 
import java.io.IOException;
import java.util.Iterator;
 
import com.aliyun.odps.data.Record;
import com.aliyun.odps.mapred.ReducerBase;
 
public class WCReducer extends ReducerBase {
 
	private Record result;
 
	@Override
	public void setup(TaskContext context) throws IOException {
		result = context.createOutputRecord();
	}
 
	@Override
	public void reduce(Record key, Iterator<Record> values, TaskContext context)
			throws IOException {
 
		long count = 0;
		while (values.hasNext()) {
			Record val = values.next();
			count += (Long) val.get(0);
		}
		result.set(0, key.get(0));
		result.set(1, count);
		context.write(result);
	}
 
	@Override
	public void cleanup(TaskContext context) throws IOException {
	}
 
}

1)一樣reduce也提供setup和cleanup方法。

2)reduce函數是對每一個key調用一次,在函數內,它遍歷同一個key的不一樣值,對其進行累加操做,而後生成結果Record輸出。

2)注意:map和reduce在輸出上的區別爲:Reduce調用的context.write(result)輸出結果Record;Map是調用context.write(word,one)輸出鍵值對形式。

三、Driver實現

package mr.test;
 
import com.aliyun.odps.OdpsException;
import com.aliyun.odps.data.TableInfo;
import com.aliyun.odps.mapred.JobClient;
import com.aliyun.odps.mapred.RunningJob;
import com.aliyun.odps.mapred.conf.JobConf;
import com.aliyun.odps.mapred.utils.InputUtils;
import com.aliyun.odps.mapred.utils.OutputUtils;
import com.aliyun.odps.mapred.utils.SchemaUtils;
 
public class WCDriver {
 
	public static void main(String[] args) throws OdpsException {
		if(args.length !=2){
			System.out.println("參數錯誤");
			System.exit(2);
		}
		
		JobConf job = new JobConf();
 
		job.setMapOutputKeySchema(SchemaUtils.fromString("word:string"));
		job.setMapOutputValueSchema(SchemaUtils.fromString("count:bigint"));
 
		InputUtils.addTable(TableInfo.builder().tableName(args[0]).build(),
				job);
		OutputUtils.addTable(TableInfo.builder().tableName(args[1]).build(),
				job);
 
		job.setMapperClass(WCMapper.class);
		job.setReducerClass(WCReducer.class);
 
		RunningJob rj = JobClient.runJob(job);
		rj.waitForCompletion();
	}
 
}

1)先初始化一個JobConf實例

2)指定運行的Maper、Reduce類:

      job.setMapperClass(WCMapper.class);
      job.setReducerClass(WCReducer.class);

3)設置map的輸出key和value的類型:

     job.setMapOutputKeySchema(SchemaUtils.fromString("word:string"));
     job.setMapOutputValueSchema(SchemaUtils.fromString("count:bigint"));
相關文章
相關標籤/搜索