轉: 並行計算模型和框架

from:  http://www.ibm.com/developerworks/cn/analytics/library/ba-1507-mapreducefiveframes/index.htmlhtml

 

並行計算模型和框架

目前開源社區有許多並行計算模型和框架可供選擇,按照實現方式、運行機制、依附的產品生態圈等能夠被劃分爲幾個類型,每一個類型各有優缺點,若是可以對各種型的並行計算框架都進行深刻研究及適當的缺點修復,就能夠爲不一樣硬件環境下的海量數據分析需求提供不一樣的軟件層面的解決方案。java

  • 並行計算框架

並行計算或稱平行計算是相對於串行計算來講的。它是一種一次可執行多個指令的算法,目的是提升計算速度,以及經過擴大問題求解規模,解決大型而複雜的計算問題。所謂並行計算可分爲時間上的並行和空間上的並行。時間上的並行就是指流水線技術,而空間上的並行則是指用多個處理器併發的執行計算。並行計算(Parallel Computing)是指同時使用多種計算資源解決計算問題的過程,是提升計算機系統計算速度和處理能力的一種有效手段。它的基本思想是用多個處理器來協同求解同一問題,即將被求解的問題分解成若干個部分,各部分均由一個獨立的處理機來並行計算。並行計算系統既能夠是專門設計的、含有多個處理器的超級計算機,也能夠是以某種方式互連的若干臺的獨立計算機構成的集羣。經過並行計算集羣完成數據的處理,再將處理的結果返回給用戶。算法

  • 國內外研究

歐美髮達國家對於並行計算技術的研究要遠遠早於我國,從最初的並行計算逐漸過渡到網格計算,隨着 Internet 網絡資源的迅速膨脹,因特網容納了海量的各類類型的數據和信息。海量數據的處理對服務器 CPU、IO 的吞吐都是嚴峻的考驗,不管是處理速度、存儲空間、容錯性,仍是在訪問速度等方面,傳統的技術架構和僅靠單臺計算機基於串行的方式愈來愈不適應當前海量數據處理的要求。國內外學者提出不少海量數據處理方法,以改善海量數據處理存在的諸多問題。apache

目前已有的海量數據處理方法在概念上較容易理解,然而因爲數據量巨大,要在可接受的時間內完成相應的處理,只有將這些計算進行並行化處理,經過提取出處理過程當中存在的可並行工做的份量,用分佈式模型來實現這些並行份量的並行執行過程。隨着技術的發展,單機的性能有了日新月異的發展變化,尤爲是內存和處理器等硬件技術,可是硬件技術的發展在理論上老是有限度的,若是說硬件的發展在縱向上提升了系統的性能,那麼並行技術的發展就是從橫向上拓展了處理的方式。編程

2003 年美國 Google 公司對外發布了 MapReduce、GFS、BigData 三篇論文,至此正式將並行計算框架落地爲 MapReduce 框架。數組

我國的並行和分佈式計算技術研究起源於 60 年代末,按照國防科技大學周興銘院士提出的觀點,到目前爲止已經三個階段了。第一階段,自 60 年代末至 70 年代末,主要從事大型機內的並行處理技術研究;第二階段,自 70 年代末至 90 年代初,主要從事向量機和並行多處理器系統研究;第三階段,自 80 年代末至今,主要從事 MPP(Massively Parallel Processor) 系統研究。緩存

儘管我國在並行計算方面開展的研究和應用較早,目前也擁有不少的並行計算資源,但研究和應用的成效相對美國還存在較大的差距,有待進一步的提升和發展。服務器

 

MapReduce

MapReduce 是由谷歌推出的一個編程模型,是一個能處理和生成超大數據集的算法模型,該架構可以在大量普通配置的計算機上實現並行化處理。MapReduce 編程模型結合用戶實現的 Map 和 Reduce 函數。用戶自定義的 Map 函數處理一個輸入的基於 key/value pair 的集合,輸出中間基於 key/value pair 的集合,MapReduce 庫把中間全部具備相同 key 值的 value 值集合在一塊兒後傳遞給 Reduce 函數,用戶自定義的 Reduce 函數合併全部具備相同 key 值的 value 值,造成一個較小 value 值的集合。通常地,一個典型的 MapReduce 程序的執行流程如圖 1 所示。多線程

圖 1 .MapReduce 程序執行流程圖

圖 1 .MapReduce 程序執行流程圖

MapReduce 執行過程主要包括:

  1. 將輸入的海量數據切片分給不一樣的機器處理;
  2. 執行 Map 任務的 Worker 將輸入數據解析成 key/value pair,用戶定義的 Map 函數把輸入的 key/value pair 轉成中間形式的 key/value pair;
  3. 按照 key 值對中間形式的 key/value 進行排序、聚合;
  4. 把不一樣的 key 值和相應的 value 集分配給不一樣的機器,完成 Reduce 運算;
  5. 輸出 Reduce 結果。

任務成功完成後,MapReduce 的輸出存放在 R 個輸出文件中,通常狀況下,這 R 個輸出文件不須要合併成一個文件,而是做爲另一個 MapReduce 的輸入,或者在另外一個可處理多個分割文件的分佈式應用中使用。

受 Google MapReduce 啓發,許多研究者在不一樣的實驗平臺上實現了 MapReduce 框架,本文將對 Apache Hadoop MapReduce、Apache、Spark、斯坦福大學的 Phoenix,Nokia 研發的 Disco,以及香港科技大學的 Mars 等 5 個 MapReduce 實現框架進行逐一介紹和各方面對比。

  • Hadoop MapReduce

Hadoop 的設計思路來源於 Google 的 GFS 和 MapReduce。它是一個開源軟件框架,經過在集羣計算機中使用簡單的編程模型,可編寫和運行分佈式應用程序處理大規模數據。Hadoop 包含三個子項目:Hadoop Common、Hadoop Distributed File System(HDFS) 和 Hadoop MapReduce。

第一代 Hadoop MapReduce 是一個在計算機集羣上分佈式處理海量數據集的軟件框架,包括一個 JobTracker 和必定數量的 TaskTracker。運行流程圖如圖 2 所示。

圖 2 .Hadoop MapReduce 系統架構圖

圖 2 .Hadoop MapReduce 系統架構圖

在最上層有 4 個獨立的實體,即客戶端、JobTracker、TaskTracker 和分佈式文件系統。客戶端提交 MapReduce 做業;JobTracker 協調做業的運行;JobTracker 是一個 Java 應用程序,它的主類是 JobTracker;TaskTracker 運行做業劃分後的任務,TaskTracker 也是一個 Java 應用程序,它的主類是 TaskTracker。Hadoop 運行 MapReduce 做業的步驟主要包括提交做業、初始化做業、分配任務、執行任務、更新進度和狀態、完成做業等 6 個步驟。

  • Spark MapReduce

Spark 是一個基於內存計算的開源的集羣計算系統,目的是讓數據分析更加快速。Spark 很是小巧玲瓏,由加州伯克利大學 AMP 實驗室的 Matei 爲主的小團隊所開發。使用的語言是 Scala,項目的核心部分的代碼只有 63 個 Scala 文件,很是短小精悍。Spark 啓用了內存分佈數據集,除了可以提供交互式查詢外,它還能夠優化迭代工做負載。Spark 提供了基於內存的計算集羣,在分析數據時將數據導入內存以實現快速查詢,「速度比」基於磁盤的系統,如比 Hadoop 快不少。Spark 最初是爲了處理迭代算法,如機器學習、圖挖掘算法等,以及交互式數據挖掘算法而開發的。在這兩種場景下,Spark 的運行速度能夠達到 Hadoop 的幾百倍。

  • Disco

Disco 是由 Nokia 研究中心開發的,基於 MapReduce 的分佈式數據處理框架,核心部分由 Erlang 語言開發,外部編程接口爲 Python 語言。Disco 是一個開放源代碼的大規模數據分析平臺,支持大數據集的並行計算,能運行在不可靠的集羣計算機上。Disco 可部署在集羣和多核計算機上,還可部署在 Amazon EC2 上。Disco 基於主/從架構 (Master/Slave),圖 3 整體設計架構圖展現了經過一臺主節點 (Master) 服務器控制多臺從節點 (Slave) 服務器的整體設計架構。

圖 3 .Disco 整體架構圖

圖 3 .Disco 整體架構圖

Disco 運行 MapReduce 步驟以下:

  1. Disco 用戶使用 Python 腳本開始 Disco 做業;
  2. 做業請求經過 HTTP 發送到主機;
  3. 主機是一個 Erlang 進程,經過 HTTP 接收做業請求;
  4. 主機經過 SSH 啓動每一個節點處的從機;
  5. 從機在 Worker 進程中運行 Disco 任務。
  • Phoenix

Phoenix 做爲斯坦福大學 EE382a 課程的一類項目,由斯坦福大學計算機系統實驗室開發。Phoenix 對 MapReduce 的實現原則和最初由 Google 實現的 MapReduce 基本相同。不一樣的是,它在集羣中以實現共享內存系統爲目的,共享內存能最小化由任務派生和數據間的通訊所形成的間接成本。Phoenix 可編程多核芯片或共享內存多核處理器 (SMPs 和 ccNUMAs),用於數據密集型任務處理。

  • Mars

Mars 是香港科技大學與微軟、新浪合做開發的基於 GPU 的 MapReduce 框架。目前已經包含字符串匹配、矩陣乘法、倒排索引、字詞統計、網頁訪問排名、網頁訪問計數、類似性評估和 K 均值等 8 項應用,可以在 32 位與 64 位的 Linux 平臺上運行。Mars 框架實現方式和基於 CPU 的 MapReduce 框架很是相似,也由 Map 和 Reduce 兩個階段組成,它的基本工做流程圖如圖 4 所示。

圖 4 .Mars 基本工做流程圖

圖 4 .Mars 基本工做流程圖

在開始每一個階段以前,Mars 初始化線程配置,包括 GPU 上線程組的數量和每一個線程組中線程的數量。Mars 在 GPU 內使用大量的線程,在運行時階段會均勻分配任務給線程,每一個線程負責一個 Map 或 Reduce 任務,以小數量的 key/value 對做爲輸入,並經過一種無鎖的方案來管理 MapReduce 框架中的併發寫入。

Mars 的工做流程主要有 7 個操做步驟:

  1. 在主存儲器中輸入 key/value 對,並將它們存儲到數組;
  2. 初始化運行時的配置參數;
  3. 複製主存儲器中的輸入數組到 GPU 設備內存;
  4. 啓動 GPU 上的 Map 階段,並將中間的 key/value 對存儲到數組;
  5. 若是 mSort 選擇 F,即須要排序階段,則對中間結果進行排序;
  6. 若是 noReduce 是 F,即須要 Reduce 階段,則啓動 GPU 上的 Reduce 階段,並輸出最終結果,不然中間結果就是最終結果;
  7. 複製 GPU 設備存儲器中的結果到主存儲器。

上述步驟的 1,2,3,7 這四個步驟的操做由調度器來完成,調度器負責準備數據輸入,在 GPU 上調用 Map 和 Reduce 階段,並將結果返回給用戶。

五種框架的優缺點比較

表 1. 五種框架優缺點比較
  Hadoop MapReduce Spark Phoenix Disco Mars
編程語言 Java 爲主 Scala C Erlang C++
構建平臺 須要首先架構基於 Hadoop 的集羣系統,經過 HDFS 完成運算的數據存儲工做 能夠的單獨運行,也能夠與 Hadoop 框架完整結合 獨立運行,不須要提早部署集羣,運行時系統的實現是創建在 PThread 之上的,也可方便地移植到其餘共享內存線程庫上 整個 Disco 平臺由分佈式存儲系統 DDFS 和 MapReduce 框架組成,DDFS 與計算框架高度耦合,經過監控各個節點上的磁盤使用狀況進行負載均衡 運行時爲 Map 或 Reduce 任務初始化大量的 GPU 線程,併爲每一個線程自動分配少許的 key/value 對來運行任務
功能特色 計算能力很是強,適合超大數據集的應用程序,可是因爲系統開銷等緣由,處理小規模數據的速度不必定比串行程序快,而且自己集羣的穩定性不高 在保證容錯的前提下,用內存來承載工做集,內存的存取速度快於磁盤多個數量級,從而能夠極大提高性能 利用共享內存緩衝區實現通訊,從而避免了因數據複製產生的開銷,但 Phoenix 也存在不能自動執行迭代計算、沒有高效的錯誤發現機制等不足 由一個 Master 服務器和一系列 Worker 節點組成,Master 和 Worker 之間採用基於輪詢的通訊機制,經過 HTTP 的方式傳輸數據。輪詢的時間間隔很差肯定,若時間間隔設置不當,會顯著下降程序的執行性能 因爲 GPU 線程不支持運行時動態調度,因此給每一個 GPU 線程分配的任務是固定的,若輸入數據劃分布均勻,將致使 Map 或 Reduce 階段的負載不均衡,使得整個系統性能急劇下降。同時因爲 GPU 不支持運行時在設備內存中分配空間,須要預先在設備內存中分配好輸入數據和輸出數據的存放空間,可是 Map 和 Reduce 階段輸出數據大小是未知的,而且當多個 GPU 線程同時向共享輸出區域中寫數據時,易形成寫衝突
 

WordCount 實驗

  • 基本原理

單詞計數 (WordCount) 是最簡單也是最能體現 MapReduce 思想的程序之一,能夠稱爲 MapReduce 版"Hello World"。單詞計數主要完成功能是:統計一系列文本文件中每一個單詞出現的次數。

  • 本次實驗步驟

本次實驗的硬件資源基於 x86 服務器 1 臺,配置爲內存 32GB DDR三、E5 CPU/12 核、GPU,實驗數據樣本爲 10M/50M/100M/500M/1000M 的文本文件五個,咱們使用 Hadoop MapReduce、Spark、Phoenix、Disco、Mars 等 MapReduce 框架分別運行文本分析程序,基於結果一致的前提下統計出運行時間、運行時 CPU 佔有率、運行時內存佔有率等數據,並採用這些數據繪製成柱狀圖。

Hadoop MapReduce

首先須要將文件拆分紅 splits,因爲測試用的文件較小,因此每一個文件爲一個 split,並將文件按行分割造成<key,value>對,圖 12 分割過程圖所示。這一步由 MapReduce 框架自動完成,其中偏移量(即 key 值)包括了回車所佔的字符數(Windows 和 Linux 環境會不一樣)。

圖 5 . 分割過程圖

圖 5 . 分割過程圖

將分割好的<key,value>對交給用戶定義的 map 方法進行處理,生成新的<key,value>對,圖 6 執行 map 方法所示。

圖 6 . 執行 Map 方法過程圖

圖 6 . 執行 Map 方法過程圖

獲得 map 方法輸出的<key,value>對後,Mapper 會將它們按照 key 值進行排序,並執行 Combine 過程,將 key 相同的 value 值累加,獲得 Mapper 的最終輸出結果。圖 7Map 端排序及 Combine 過程所示。

圖 7 . Map 端排序及 Combine 過程

圖 7 . Map 端排序及 Combine 過程

Reducer 先對從 Mapper 接收的數據進行排序,再交由用戶自定義的 reduce 方法進行處理,獲得新的<key,value>對,並做爲 WordCount 的輸出結果,圖 15Reduce 端排序及輸出結果所示。

圖 8 . Reduce 端排序及輸出結果流程圖

圖 8 . Reduce 端排序及輸出結果流程圖

清單 1 . 第一代 Hadoop MapReduce WordCount 示例代碼
import java.io.IOException;
import java.util.StringTokenizer;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.util.GenericOptionsParser;
public class WordCount {
public static class TokenizerMapper
extends Mapper<Object, Text, Text, IntWritable>{
private final static IntWritable one = new IntWritable(1);
private Text word = new Text();
// 開始 Map 過程
public void map(Object key, Text value, Context context)
throws IOException, InterruptedException {
StringTokenizer itr = new StringTokenizer(value.toString());
//遍歷 Map 裏面的字符串
while (itr.hasMoreTokens()) {
word.set(itr.nextToken());
context.write(word, one);
}
}
}
public static class IntSumReducer
extends Reducer<Text,IntWritable,Text,IntWritable> {
private IntWritable result = new IntWritable();
//開始 Reduce 過程
public void reduce(Text key, Iterable<IntWritable> values,Context context)
 throws IOException, InterruptedException {
int sum = 0;
for (IntWritable val : values) {
sum += val.get();
}
result.set(sum);
context.write(key, result);
} 
}
public static void main(String[] args) throws Exception {
Configuration conf = new Configuration();
String[] otherArgs = new GenericOptionsParser(conf, args).getRemainingArgs();
if (otherArgs.length != 2) {
System.err.println("Usage: wordcount <in> <out>");
System.exit(2);
}
Job job = new Job(conf, "word count");
job.setJarByClass(WordCount.class);
job.setMapperClass(TokenizerMapper.class);
job.setCombinerClass(IntSumReducer.class);
job.setReducerClass(IntSumReducer.class);
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(IntWritable.class);
FileInputFormat.addInputPath(job, new Path(otherArgs[0]));
FileOutputFormat.setOutputPath(job, new Path(otherArgs[1]));
System.exit(job.waitForCompletion(true) ? 0 : 1);
}
}

Spark WordCount 實驗

Spark 與 Hadoop MapReduce 的最大區別是它把全部數據保存在內存中,Hadoop MapReduce 須要從外部存儲介質裏把數據讀入到內存,Spark 不須要這一步驟。它的實現原理與 Hadoop MapReduce 沒有太大區別,這裏再也不重複原理,完整的運行代碼如清單 2 所示。

清單 2 . Spark WordCount 示例代碼
SparkConf sparkConf = new SparkConf().setAppName("JavaWordCount");
 JavaSparkContext ctx = new JavaSparkContext(sparkConf);
 JavaRDD<String> lines = ctx.textFile(args[0], Integer.parseInt(args[1]));
 JavaRDD<String> words = lines.flatMap(new FlatMapFunction<String, String>() {
 @Override
 public Iterable<String> call(String s) {
 return Arrays.asList(SPACE.split(s));
 }
 });
 //定義 RDD ones
 JavaPairRDD<String, Integer> ones = words.mapToPair(new PairFunction<String, String, Integer>() {
 @Override
 public Tuple2<String, Integer> call(String s) {
 return new Tuple2<String, Integer>(s, 1);
 }
 });
 //ones.reduceByKey(func, numPartitions)
 JavaPairRDD<String, Integer> counts = ones.reduceByKey(new Function2<Integer, Integer, Integer>() {
 @Override
 public Integer call(Integer i1, Integer i2) {
 return i1 + i2;
 }
 },10);
//輸出 List
List<Tuple2<String, Integer>> output = counts.collect();
 Collections.sort(output, new Comparator<Tuple2<String, Integer>>() {
 @Override
 public int compare(Tuple2<String, Integer> t1,
Tuple2<String, Integer> t2) {
if(t1._2 > t2._2) {
 return -1;
 } else if(t1._2 < t2._2) {
 return 1;
 }
 return 0;
}
 });

Disco WordCount 實驗

MapReduce 框架因爲 Disco 有分佈式文件系統存在,因此通常狀況下都不會單獨使用,都是從分佈式文件系統內取數據後讀入內存,而後再切分數據、進入 MapReduce 階段。首先須要調用 ddfs 的 chunk 命令把文件上傳到 DDFS,而後開始編寫 MapReduce 程序,Disco 外層應用程序採用 Python 編寫。Map 程序實例如清單 3 所示,Reduce 程序示例如清單 4 所示。

清單 3 . Map 程序段
def fun_map(line, params):
 for word in line.split():
 yield word, 1
清單 4 . Reduce 程序段
def fun_reduce(iter, params):
 from disco.util import kvgroup
 for word, counts in kvgroup(sorted(iter)):
 yield word, sum(counts)
清單 5 . Map/Reduce 任務
from disco.core import Job, result_iterator
def map(line, params):
 for word in line.split():
 yield word, 1
def reduce(iter, params):
 from disco.util import kvgroup
 for word, counts in kvgroup(sorted(iter)):
 yield word, sum(counts)
if __name__ == '__main__':
 job = Job().run(input=["http://discoproject.org/media/text/chekhov.txt"],
 map=map,
 reduce=reduce)
 for word, count in result_iterator(job.wait(show=True)):
 print(word, count)
Note

Phoenix WordCount 實驗

Phoenix 是基於 CPU 的 MapReduce 框架,因此它也是採用將數據分割後讀入內存,而後開始 MapReduce 處理階段這樣的傳統方式。Phoenix 並不禁用戶決定切分每一個 Map 分配到的數據塊的大小,它是根據集羣系統的實際 Cache 大小來切分的,這樣能夠避免出現分配到 Map 的數據塊過大或者太小的狀況出現。過大的數據快會致使 Map 執行較慢,太小的數據快會致使 Map 資源浪費,由於每次啓動 Map 線程都須要消耗必定的系統資源。Map 階段切分好的文本被多個 Map 並行執行,Phoenix 支持 100 個左右的 Map 並行執行,一個工做節點下能夠有若干個 Map 並行執行。只有當一個工做節點上全部的 Map 任務都結束後纔開始 Reduce 階段。Reduce 階段繼續沿用了動態任務調度機制,同時容許用戶自定義數據分區規則。

清單 6 . Phoenix 的 wordCount 程序段
#include <stdio.h>
#include <strings.h>
#include <string.h>
#include <stddef.h>
#include <stdlib.h>
#include <unistd.h>
#include <assert.h>
#include <sys/mman.h>
#include <sys/stat.h>
#include <sys/time.h>
#include <fcntl.h>
#include <ctype.h>
#include <inttypes.h>
#include "map_reduce.h"
#include "stddefines.h"
#include "sort.h"
#define DEFAULT_DISP_NUM 10
typedef struct {
 int fpos;
 off_t flen;
 char *fdata;
 int unit_size;
} wc_data_t;
enum {
 IN_WORD,
 NOT_IN_WORD
};
 struct timeval begin, end;
#ifdef TIMING
 unsigned int library_time = 0;
#endif
/** mystrcmp()
 * Comparison function to compare 2 words
 */
int mystrcmp(const void *s1, const void *s2)
{
 return strcmp((const char *)s1, (const char *) s2);
}
/** mykeyvalcmp()
 * Comparison function to compare 2 ints
 */
int mykeyvalcmp(const void *v1, const void *v2)
{
 keyval_t* kv1 = (keyval_t*)v1;
 keyval_t* kv2 = (keyval_t*)v2;
 intptr_t *i1 = kv1->val;
 intptr_t *i2 = kv2->val;
 if (i1 < i2) return 1;
 else if (i1 > i2) return -1;
 else {
 return strcmp((char *)kv1->key, (char *)kv2->key);
 //return 0;
 }
}
/** wordcount_分割器 ()
 * 內存裏面進行 Map 計算
 */
int wordcount_splitter(void *data_in, int req_units, map_args_t *out)
{
 wc_data_t * data = (wc_data_t *)data_in; 
 assert(data_in);
 assert(out);
 assert(data->flen >= 0);
 assert(data->fdata);
 assert(req_units);
 assert(data->fpos >= 0);
 // End of file reached, return FALSE for no more data
 if (data->fpos >= data->flen) return 0;
 // Set the start of the next data
 out->data = (void *)&data->fdata[data->fpos];
 // Determine the nominal length
 out->length = req_units * data->unit_size;
 if (data->fpos + out->length > data->flen)
 out->length = data->flen - data->fpos;
 // Set the length to end at a space
 for (data->fpos += (long)out->length;
 data->fpos < data->flen && 
 data->fdata[data->fpos] != ' ' && data->fdata[data->fpos] != '\t' &&
 data->fdata[data->fpos] != '\r' && data->fdata[data->fpos] != '\n';
 data->fpos++, out->length++);
 return 1;
}
/** wordcount_locator()
 * Return the memory address where this map task would heavily access.
 */
void *wordcount_locator (map_args_t *task)
{
 assert (task);
 return task->data;
}
/** wordcount_map()
 * 對文本進行計數
 */
void wordcount_map(map_args_t *args) 
{
 char *curr_start, curr_ltr;
 int state = NOT_IN_WORD;
 int i;
 assert(args);
 char *data = (char *)args->data;
 assert(data);
 curr_start = data;
 for (i = 0; i < args->length; i++)
 {
 curr_ltr = toupper(data[i]);
 switch (state)
 {
 case IN_WORD:
 data[i] = curr_ltr;
 if ((curr_ltr < 'A' || curr_ltr > 'Z') && curr_ltr != '\'')
 {
 data[i] = 0;
 emit_intermediate(curr_start, (void *)1, &data[i] - curr_start + 1);
 state = NOT_IN_WORD;
 }
 break;
 default:
 case NOT_IN_WORD:
 if (curr_ltr >= 'A' && curr_ltr <= 'Z')
 {
 curr_start = &data[i];
 data[i] = curr_ltr;
 state = IN_WORD;
 }
 break;
 }
 }
 // Add the last word
 if (state == IN_WORD)
 {
 data[args->length] = 0;
 emit_intermediate(curr_start, (void *)1, &data[i] - curr_start + 1);
 }
}
/** wordcount_reduce()
 * 計算字符
 */
void wordcount_reduce(void *key_in, iterator_t *itr)
{
 char *key = (char *)key_in;
 void *val;
 intptr_t sum = 0;
 assert(key);
 assert(itr);
 while (iter_next (itr, &val))
 {
 sum += (intptr_t)val;
 }
 emit(key, (void *)sum);
}
void *wordcount_combiner (iterator_t *itr)
{
 void *val;
 intptr_t sum = 0;
 assert(itr);
 while (iter_next (itr, &val))
 {
 sum += (intptr_t)val;
 }
 return (void *)sum;
}
int main(int argc, char *argv[]) 
{
 final_data_t wc_vals;
 int i;
 int fd;
 char * fdata;
 int disp_num;
 struct stat finfo;
 char * fname, * disp_num_str;
 struct timeval starttime,endtime;
 get_time (&begin);
 // 確保文件名
 if (argv[1] == NULL)
 {
 printf("USAGE: %s <filename> [Top # of results to display]\n", argv[0]);
 exit(1);
 }
 fname = argv[1];
 disp_num_str = argv[2];
 printf("Wordcount: Running...\n");
 // 讀取文件
 CHECK_ERROR((fd = open(fname, O_RDONLY)) < 0);
 // Get the file info (for file length)
 CHECK_ERROR(fstat(fd, &finfo) < 0);
#ifndef NO_MMAP
 // 內存裏面開始調用 map
 CHECK_ERROR((fdata = mmap(0, finfo.st_size + 1, 
 PROT_READ | PROT_WRITE, MAP_PRIVATE, fd, 0)) == NULL);
#else
 int ret;
 fdata = (char *)malloc (finfo.st_size);
 CHECK_ERROR (fdata == NULL);
 ret = read (fd, fdata, finfo.st_size);
 CHECK_ERROR (ret != finfo.st_size);
#endif
 CHECK_ERROR((disp_num = (disp_num_str == NULL) ? 
 DEFAULT_DISP_NUM : atoi(disp_num_str)) <= 0);
 wc_data_t wc_data;
 wc_data.unit_size = 5; // approx 5 bytes per word
 wc_data.fpos = 0;
 wc_data.flen = finfo.st_size;
 wc_data.fdata = fdata;
 CHECK_ERROR (map_reduce_init ());
 map_reduce_args_t map_reduce_args;
 memset(&map_reduce_args, 0, sizeof(map_reduce_args_t));
 map_reduce_args.task_data = &wc_data;
 map_reduce_args.map = wordcount_map;
 map_reduce_args.reduce = wordcount_reduce;
 map_reduce_args.combiner = wordcount_combiner;
 map_reduce_args.splitter = wordcount_splitter;
 map_reduce_args.locator = wordcount_locator;
 map_reduce_args.key_cmp = mystrcmp;
 map_reduce_args.unit_size = wc_data.unit_size;
 map_reduce_args.partition = NULL; // use default
 map_reduce_args.result = &wc_vals;
 map_reduce_args.data_size = finfo.st_size;
 map_reduce_args.L1_cache_size = atoi(GETENV("MR_L1CACHESIZE"));//1024 * 1024 * 2;
 map_reduce_args.num_map_threads = atoi(GETENV("MR_NUMTHREADS"));//8;
 map_reduce_args.num_reduce_threads = atoi(GETENV("MR_NUMTHREADS"));//16;
 map_reduce_args.num_merge_threads = atoi(GETENV("MR_NUMTHREADS"));//8;
 map_reduce_args.num_procs = atoi(GETENV("MR_NUMPROCS"));//16;
 map_reduce_args.key_match_factor = (float)atof(GETENV("MR_KEYMATCHFACTOR"));//2;
 printf("Wordcount: Calling MapReduce Scheduler Wordcount\n");
 gettimeofday(&starttime,0);
 get_time (&end);
#ifdef TIMING
 fprintf (stderr, "initialize: %u\n", time_diff (&end, &begin));
#endif
 get_time (&begin);
 CHECK_ERROR(map_reduce (&map_reduce_args) < 0);
 get_time (&end);
#ifdef TIMING
 library_time += time_diff (&end, &begin);
#endif
 get_time (&begin);
 gettimeofday(&endtime,0);
 printf("Wordcount: Completed %ld\n",(endtime.tv_sec - starttime.tv_sec));
 printf("Wordcount: MapReduce Completed\n");
 printf("Wordcount: Calling MapReduce Scheduler Sort\n");
 mapreduce_sort(wc_vals.data, wc_vals.length, sizeof(keyval_t), mykeyvalcmp);
 CHECK_ERROR (map_reduce_finalize ());
 printf("Wordcount: MapReduce Completed\n");
 dprintf("\nWordcount: Results (TOP %d):\n", disp_num);
 for (i = 0; i < disp_num && i < wc_vals.length; i++)
 {
 keyval_t * curr = &((keyval_t *)wc_vals.data)[i];
 dprintf("%15s - %" PRIdPTR "\n", (char *)curr->key, (intptr_t)curr->val);
 }
 free(wc_vals.data);
#ifndef NO_MMAP
 CHECK_ERROR(munmap(fdata, finfo.st_size + 1) < 0);
#else
 free (fdata);
#endif
 CHECK_ERROR(close(fd) < 0);
 get_time (&end);
#ifdef TIMING
 fprintf (stderr, "finalize: %u\n", time_diff (&end, &begin));
#endif
 return 0;
}

Mars MapReduce

Mars 框架中,Map 和 Reduce 的處理階段都在 GPU 內進行,Map 和 Reduce 的分割數據階段都在 CPU 內進行,這是與其餘基於 CPU 的 MapReduce 框架的最大不一樣。Mars 更多的是利用 CPU、GPU 緩存來替代內存,執行數據分割、處理過程。

具體的 Word count 的流程以下所示:

  1. 準備 key/value 鍵值對,將這些鍵值對存儲在數組裏面;
  2. 初始化 MapReduce 上下文,設置參數 (根據不一樣的 GPU 須要根據 CUDA 核心數目設置併發線程數);
  3. 數據預處理,首先打開文件,將文件全部內容讀入內存,而後申請一塊同文件大小的顯存,將文件內容中小寫字符轉爲大寫 (這樣的影響 word,Word 算通一個單詞)。
  4. 開始 MapReduce 階段。根據併發線程數和文件大寫切換內存中的文件,每塊切分後的任務記錄下該任務在內存中的偏移位置和長度視爲 value, 顯存的指針地址視爲 key,將任務添加的任務池。將處理後的內存內容複製到剛剛申請的顯存中。接着開始 Map 流程,將內存中的任務池複製到顯存,申請顯存塊用於存放 Map 產生的數據,開啓多線程併發執行用戶定義的 map 流程 MAP_COUNT_FUNC,這個是 Mars 因爲 GPU 程序的特殊性而設計的,用於記錄 map 產生的 key 和 value 的長度 (sizeof)。調用 MAP_FUNC 方法,輸入任務記錄,輸出單詞以及單詞所在的位置;
  5. 若是 noSort 是 F,對結果排序;
  6. 若是 noReduce 是 F,GPU 開始 reduce 階段,生成最終的結果集。不然,當即輸出最後的結果集;
  7. 結果輸出,從 GPU 設備拷貝最終的結果集到內存,而後輸出到屏幕。

經過上述的 7 個步驟,WordCount 的計算過程所有完成而且輸出結果集。

清單 7 . Mars 的 Map 程序段
#ifndef __MAP_CU__
#define __MAP_CU__
#include "MarsInc.h"
#include "global.h"
__device__ int hash_func(char* str, int len)
{
int hash, i;
for (i = 0, hash=len; i < len; i++)
hash = (hash<<4)^(hash>>28)^str[i];
return hash;
}
__device__ void MAP_COUNT_FUNC//(void *key, void *val, size_t keySize, size_t valSize)
{
WC_KEY_T* pKey = (WC_KEY_T*)key;
WC_VAL_T* pVal = (WC_VAL_T*)val;
char* ptrBuf = pKey->file + pVal->line_offset;
int line_size = pVal->line_size;
char* p = ptrBuf;
int lsize = 0;
int wsize = 0;
char* start = ptrBuf;
while(1)
{
for (; *p >= 'A' && *p <= 'Z'; p++, lsize++);
*p = '\0';
++p;
++lsize;
wsize = (int)(p - start);
if (wsize > 6)
{
//printf("%s, wsize:%d\n", start, wsize);
EMIT_INTER_COUNT_FUNC(wsize, sizeof(int));
}
for (; (lsize < line_size) && (*p < 'A' || *p > 'Z'); p++, lsize++);
if (lsize >= line_size) break;
start = p;
}
}
__device__ void MAP_FUNC//(void *key, void val, size_t keySize, size_t valSize)
{
WC_KEY_T* pKey = (WC_KEY_T*)key;
WC_VAL_T* pVal = (WC_VAL_T*)val;
char* filebuf = pKey->file;
char* ptrBuf = filebuf + pVal->line_offset;
int line_size = pVal->line_size;
char* p = ptrBuf;
char* start = ptrBuf;
int lsize = 0;
int wsize = 0;
while(1)
{
for (; *p >= 'A' && *p <= 'Z'; p++, lsize++);
*p = '\0';
++p;
++lsize;
wsize = (int)(p - start);
int* o_val = (int*)GET_OUTPUT_BUF(0);
*o_val = wsize;
if (wsize > 6) 
{
//printf("%s, %d\n", start, wsize);
EMIT_INTERMEDIATE_FUNC(start, o_val, wsize, sizeof(int));
}
for (; (lsize < line_size) && (*p < 'A' || *p > 'Z'); p++, lsize++);
if (lsize >= line_size) break;
start = p;
}
}
#endif //__MAP_CU__
清單 8 . Mars 的 Reduce 程序段
#ifndef __REDUCE_CU__
#define __REDUCE_CU__
#include "MarsInc.h"
__device__ void REDUCE_COUNT_FUNC//(void* key, void* vals, size_t keySize, size_t valCount)
{
}
__device__ void REDUCE_FUNC//(void* key, void* vals, size_t keySize, size_t valCount)
{
}
#endif //__REDUCE_CU__
 

五種框架 WordCount 實驗性能對比

圖 9 . 實驗運行時間比較圖

圖 9 . 實驗運行時間比較圖

圖 9 實驗運行時間比較圖是分析不一樣大小的文本文件所消耗的時間對比圖。從上圖能夠看出,Hadoop MapReduce 的運行時間最長,緣由是 Hadoop 生態環境包含內容過多,因此每次任務啓動時首先須要加載所需資源包,而後緩慢地發起任務,而且因爲自己是用性能較差的 Java 語言編寫的,因此致使總體計算時間長、性能差。Phoenix 因爲採用彙編和 C 語言編寫,內核很小,運行時所用資源不多,因此整個測試過程耗時也較少。Spark 框架在 WordCount 實驗中消耗的時長較 Disco 稍少,可是比 Phoenix、Mars 耗時太多。耗時最短的兩個框架是 Mars 和 Phoenix。須要時長從高到低分別是 Hadoop MapReduce、Disco、Spark、Phoenix、Mars。

圖 10 .CPU 使用率對比圖

圖 10 .CPU 使用率對比圖

圖 10-CPU 使用率比較圖是分析任務執行過程中 CPU 使用率狀況圖。從上圖能夠看出,Hadoop MapReduce、Disco 這兩個框架須要佔用的 CPU 資源在 1000M 文本處理時基本到達最大飽和度 (大於 90%),Apache Spark 的 CPU 使用率沒有徹底伴隨着文本文件增大而大幅上漲,Phoenix 和 Mars 基本控制在對 CPU 使用率較低的範圍內。

圖 11 . 內存使用率對比圖

圖 11 . 內存使用率對比圖

圖 11 內存使用率比較圖是分析任務執行過程當中內存使用狀況對比。從圖中能夠看出,Mars 和 Phoenix 這兩款框架所使用的內存在文本數據較小時是最少的,隨着文本數據的增大,Apache Spark 隨着數據量增大而內存大幅增長,Mars 和 Phoenix 有必定幅度的內存使用增長趨勢。當數據量達到本次測試最大的 1000M 文本時,Spark 框架對內存的消耗是最小的,Hadoop MapReduce 和 Disco 須要佔用較多的內存。

從上面的測試結果咱們得出,若是用戶只須要處理海量的文本文件,不須要考慮存儲、二次數據挖掘等,採用 Phoenix 或者 Mars 是最大性價比的選擇,可是因爲 Mars 必須在 GPU 上運行,自己 GPU 因爲價格因素,致使不太可能在實際應用場景裏推廣,因此綜合來看 Phoenix 是性價比最高的框架。若是應用程序須要處理的數據量很是大,而且客戶但願計算出的數據能夠被存儲和二次計算或數據挖掘,那 Hadoop MapReduce 較好,由於整個 Hadoop 生態圈龐大,支持性很好。Apache Spark 因爲架構層面設計不一樣,因此對於 CPU、內存的使用率一直保持較低狀態,它將來能夠用於海量數據分析用途。

 

結束語

現實世界不少實例均可用 MapReduce 編程模型來表示,MapReduce 做爲一個通用可擴展的、高容錯性的並行處理模型,可有效地處理海量數據,不斷地從中分析挖掘出有價值的信息。MapReduce 封裝了並行處理、負載均衡、容錯、數據本地化等技術難點細節。經過本文測試用例能夠證實 MapReduce 適用於海量文本分析的應用場景,能夠爲處理大數據提供技術支撐。

相關文章
相關標籤/搜索