目前開源社區有許多並行計算模型和框架可供選擇,按照實現方式、運行機制、依附的產品生態圈等可以被劃分爲幾個類型,每個類型各有優缺點,如果能夠對各類型的並行計算框架都進行深入研究及適當的缺點修復,就可以爲不同硬件環境下的海量數據分析需求提供不同的軟件層面的解決方案。
並行計算或稱平行計算是相對於串行計算來說的。它是一種一次可執行多個指令的算法,目的是提高計算速度,以及通過擴大問題求解規模,解決大型而複雜的計算問題。所謂並行計算可分爲時間上的並行和空間上的並行。時間上的並行就是指流水線技術,而空間上的並行則是指用多個處理器併發的執行計算。並行計算(Parallel Computing)是指同時使用多種計算資源解決計算問題的過程,是提高計算機系統計算速度和處理能力的一種有效手段。它的基本思想是用多個處理器來協同求解同一問題,即將被求解的問題分解成若干個部分,各部分均由一個獨立的處理機來並行計算。並行計算系統既可以是專門設計的、含有多個處理器的超級計算機,也可以是以某種方式互連的若干臺的獨立計算機構成的集羣。通過並行計算集羣完成數據的處理,再將處理的結果返回給用戶。
歐美髮達國家對於並行計算技術的研究要遠遠早於我國,從最初的並行計算逐漸過渡到網格計算,隨着 Internet 網絡資源的迅速膨脹,因特網容納了海量的各種類型的數據和信息。海量數據的處理對服務器 CPU、IO 的吞吐都是嚴峻的考驗,不論是處理速度、存儲空間、容錯性,還是在訪問速度等方面,傳統的技術架構和僅靠單臺計算機基於串行的方式越來越不適應當前海量數據處理的要求。國內外學者提出很多海量數據處理方法,以改善海量數據處理存在的諸多問題。
目前已有的海量數據處理方法在概念上較容易理解,然而由於數據量巨大,要在可接受的時間內完成相應的處理,只有將這些計算進行並行化處理,通過提取出處理過程中存在的可並行工作的分量,用分佈式模型來實現這些並行分量的並行執行過程。隨着技術的發展,單機的性能有了突飛猛進的發展變化,尤其是內存和處理器等硬件技術,但是硬件技術的發展在理論上總是有限度的,如果說硬件的發展在縱向上提高了系統的性能,那麼並行技術的發展就是從橫向上拓展了處理的方式。
2003 年美國 Google 公司對外發布了 MapReduce、GFS、BigData 三篇論文,至此正式將並行計算框架落地爲 MapReduce 框架。
我國的並行和分佈式計算技術研究起源於 60 年代末,按照國防科技大學周興銘院士提出的觀點,到目前爲止已經三個階段了。第一階段,自 60 年代末至 70 年代末,主要從事大型機內的並行處理技術研究;第二階段,自 70 年代末至 90 年代初,主要從事向量機和並行多處理器系統研究;第三階段,自 80 年代末至今,主要從事 MPP(Massively Parallel Processor) 系統研究。
儘管我國在並行計算方面開展的研究和應用較早,目前也擁有很多的並行計算資源,但研究和應用的成效相對美國還存在較大的差距,有待進一步的提高和發展。
MapReduce 是由谷歌推出的一個編程模型,是一個能處理和生成超大數據集的算法模型,該架構能夠在大量普通配置的計算機上實現並行化處理。MapReduce 編程模型結合用戶實現的 Map 和 Reduce 函數。用戶自定義的 Map 函數處理一個輸入的基於 key/value pair 的集合,輸出中間基於 key/value pair 的集合,MapReduce 庫把中間所有具有相同 key 值的 value 值集合在一起後傳遞給 Reduce 函數,用戶自定義的 Reduce 函數合併所有具有相同 key 值的 value 值,形成一個較小 value 值的集合。一般地,一個典型的 MapReduce 程序的執行流程如圖 1 所示。
MapReduce 執行過程主要包括:
任務成功完成後,MapReduce 的輸出存放在 R 個輸出文件中,一般情況下,這 R 個輸出文件不需要合併成一個文件,而是作爲另外一個 MapReduce 的輸入,或者在另一個可處理多個分割文件的分佈式應用中使用。
受 Google MapReduce 啓發,許多研究者在不同的實驗平臺上實現了 MapReduce 框架,本文將對 Apache Hadoop MapReduce、Apache、Spark、斯坦福大學的 Phoenix,Nokia 研發的 Disco,以及香港科技大學的 Mars 等 5 個 MapReduce 實現框架進行逐一介紹和各方面對比。
Hadoop 的設計思路來源於 Google 的 GFS 和 MapReduce。它是一個開源軟件框架,通過在集羣計算機中使用簡單的編程模型,可編寫和運行分佈式應用程序處理大規模數據。Hadoop 包含三個子項目:Hadoop Common、Hadoop Distributed File System(HDFS) 和 Hadoop MapReduce。
第一代 Hadoop MapReduce 是一個在計算機集羣上分佈式處理海量數據集的軟件框架,包括一個 JobTracker 和一定數量的 TaskTracker。運行流程圖如圖 2 所示。
在最上層有 4 個獨立的實體,即客戶端、JobTracker、TaskTracker 和分佈式文件系統。客戶端提交 MapReduce 作業;JobTracker 協調作業的運行;JobTracker 是一個 Java 應用程序,它的主類是 JobTracker;TaskTracker 運行作業劃分後的任務,TaskTracker 也是一個 Java 應用程序,它的主類是 TaskTracker。Hadoop 運行 MapReduce 作業的步驟主要包括提交作業、初始化作業、分配任務、執行任務、更新進度和狀態、完成作業等 6 個步驟。
Spark 是一個基於內存計算的開源的集羣計算系統,目的是讓數據分析更加快速。Spark 非常小巧玲瓏,由加州伯克利大學 AMP 實驗室的 Matei 爲主的小團隊所開發。使用的語言是 Scala,項目的核心部分的代碼只有 63 個 Scala 文件,非常短小精悍。Spark 啓用了內存分佈數據集,除了能夠提供交互式查詢外,它還可以優化迭代工作負載。Spark 提供了基於內存的計算集羣,在分析數據時將數據導入內存以實現快速查詢,「速度比」基於磁盤的系統,如比 Hadoop 快很多。Spark 最初是爲了處理迭代算法,如機器學習、圖挖掘算法等,以及交互式數據挖掘算法而開發的。在這兩種場景下,Spark 的運行速度可以達到 Hadoop 的幾百倍。
Disco 是由 Nokia 研究中心開發的,基於 MapReduce 的分佈式數據處理框架,核心部分由 Erlang 語言開發,外部編程接口爲 Python 語言。Disco 是一個開放源代碼的大規模數據分析平臺,支持大數據集的並行計算,能運行在不可靠的集羣計算機上。Disco 可部署在集羣和多核計算機上,還可部署在 Amazon EC2 上。Disco 基於主/從架構 (Master/Slave),圖 3 總體設計架構圖展示了通過一臺主節點 (Master) 服務器控制多臺從節點 (Slave) 服務器的總體設計架構。
Disco 運行 MapReduce 步驟如下:
Phoenix 作爲斯坦福大學 EE382a 課程的一類項目,由斯坦福大學計算機系統實驗室開發。Phoenix 對 MapReduce 的實現原則和最初由 Google 實現的 MapReduce 基本相同。不同的是,它在集羣中以實現共享內存系統爲目的,共享內存能最小化由任務派生和數據間的通信所造成的間接成本。Phoenix 可編程多核芯片或共享內存多核處理器 (SMPs 和 ccNUMAs),用於數據密集型任務處理。
Mars 是香港科技大學與微軟、新浪合作開發的基於 GPU 的 MapReduce 框架。目前已經包含字符串匹配、矩陣乘法、倒排索引、字詞統計、網頁訪問排名、網頁訪問計數、相似性評估和 K 均值等 8 項應用,能夠在 32 位與 64 位的 Linux 平臺上運行。Mars 框架實現方式和基於 CPU 的 MapReduce 框架非常類似,也由 Map 和 Reduce 兩個階段組成,它的基本工作流程圖如圖 4 所示。
在開始每個階段之前,Mars 初始化線程配置,包括 GPU 上線程組的數量和每個線程組中線程的數量。Mars 在 GPU 內使用大量的線程,在運行時階段會均勻分配任務給線程,每個線程負責一個 Map 或 Reduce 任務,以小數量的 key/value 對作爲輸入,並通過一種無鎖的方案來管理 MapReduce 框架中的併發寫入。
Mars 的工作流程主要有 7 個操作步驟:
上述步驟的 1,2,3,7 這四個步驟的操作由調度器來完成,調度器負責準備數據輸入,在 GPU 上調用 Map 和 Reduce 階段,並將結果返回給用戶。
五種框架的優缺點比較
單詞計數 (WordCount) 是最簡單也是最能體現 MapReduce 思想的程序之一,可以稱爲 MapReduce 版"Hello World"。單詞計數主要完成功能是:統計一系列文本文件中每個單詞出現的次數。
本次實驗的硬件資源基於 x86 服務器 1 臺,配置爲內存 32GB DDR3、E5 CPU/12 核、GPU,實驗數據樣本爲 10M/50M/100M/500M/1000M 的文本文件五個,我們使用 Hadoop MapReduce、Spark、Phoenix、Disco、Mars 等 MapReduce 框架分別運行文本分析程序,基於結果一致的前提下統計出運行時間、運行時 CPU 佔有率、運行時內存佔有率等數據,並採用這些數據繪製成柱狀圖。
首先需要將文件拆分成 splits,由於測試用的文件較小,所以每個文件爲一個 split,並將文件按行分割形成<key,value>對,圖 12 分割過程圖所示。這一步由 MapReduce 框架自動完成,其中偏移量(即 key 值)包括了回車所佔的字符數(Windows 和 Linux 環境會不同)。
將分割好的<key,value>對交給用戶定義的 map 方法進行處理,生成新的<key,value>對,圖 6 執行 map 方法所示。
得到 map 方法輸出的<key,value>對後,Mapper 會將它們按照 key 值進行排序,並執行 Combine 過程,將 key 相同的 value 值累加,得到 Mapper 的最終輸出結果。圖 7Map 端排序及 Combine 過程所示。
Reducer 先對從 Mapper 接收的數據進行排序,再交由用戶自定義的 reduce 方法進行處理,得到新的<key,value>對,並作爲 WordCount 的輸出結果,圖 15Reduce 端排序及輸出結果所示。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
|
import java.io.IOException;
import java.util.StringTokenizer;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.util.GenericOptionsParser;
public class WordCount {
public static class TokenizerMapper
extends Mapper<
Object
, Text, Text, IntWritable>{
private final static IntWritable one = new IntWritable(1);
private Text word = new Text();
// 開始 Map 過程
public void map(Object key, Text value, Context context)
throws IOException, InterruptedException {
StringTokenizer itr = new StringTokenizer(value.toString());
//遍歷 Map 裏面的字符串
while (itr.hasMoreTokens()) {
word.set(itr.nextToken());
context.write(word, one);
}
}
}
public static class IntSumReducer
extends Reducer<
Text
,IntWritable,Text,IntWritable> {
private IntWritable result = new IntWritable();
//開始 Reduce 過程
public void reduce(Text key, Iterable<
IntWritable
> values,Context context)
throws IOException, InterruptedException {
int sum = 0;
for (IntWritable val : values) {
sum += val.get();
}
result.set(sum);
context.write(key, result);
}
}
public static void main(String[] args) throws Exception {
Configuration conf = new Configuration();
String[] otherArgs = new GenericOptionsParser(conf, args).getRemainingArgs();
if (otherArgs.length != 2) {
System.err.println("Usage: wordcount <
in
> <
out
>");
System.exit(2);
}
Job job = new Job(conf, "word count");
job.setJarByClass(WordCount.class);
job.setMapperClass(TokenizerMapper.class);
job.setCombinerClass(IntSumReducer.class);
job.setReducerClass(IntSumReducer.class);
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(IntWritable.class);
FileInputFormat.addInputPath(job, new Path(otherArgs[0]));
FileOutputFormat.setOutputPath(job, new Path(otherArgs[1]));
System.exit(job.waitForCompletion(true) ? 0 : 1);
}
}
|
Spark 與 Hadoop MapReduce 的最大區別是它把所有數據保存在內存中,Hadoop MapReduce 需要從外部存儲介質裏把數據讀入到內存,Spark 不需要這一步驟。它的實現原理與 Hadoop MapReduce 沒有太大區別,這裏不再重複原理,完整的運行代碼如清單 2 所示。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
|
SparkConf sparkConf = new SparkConf().setAppName("JavaWordCount");
JavaSparkContext ctx = new JavaSparkContext(sparkConf);
JavaRDD<
String
> lines = ctx.textFile(args[0], Integer.parseInt(args[1]));
JavaRDD<
String
> words = lines.flatMap(new FlatMapFunction<
String
, String>() {
@Override
public Iterable<
String
> call(String s) {
return Arrays.asList(SPACE.split(s));
}
});
//定義 RDD ones
JavaPairRDD<
String
, Integer> ones = words.mapToPair(new PairFunction<
String
, String, Integer>() {
@Override
public Tuple2<
String
, Integer> call(String s) {
return new Tuple2<
String
, Integer>(s, 1);
}
});
//ones.reduceByKey(func, numPartitions)
JavaPairRDD<
String
, Integer> counts = ones.reduceByKey(new Function2<
Integer
, Integer, Integer>() {
@Override
public Integer call(Integer i1, Integer i2) {
return i1 + i2;
}
},10);
//輸出 List
List<
Tuple2
<String, Integer>> output = counts.collect();
Collections.sort(output, new Comparator<
Tuple2
<String, Integer>>() {
@Override
public int compare(Tuple2<
String
, Integer> t1,
Tuple2<
String
, Integer> t2) {
if(t1._2 > t2._2) {
return -1;
} else if(t1._2 < t2._2) {
return 1;
}
return 0;
}
});
|
MapReduce 框架由於 Disco 有分佈式文件系統存在,所以一般情況下都不會單獨使用,都是從分佈式文件系統內取數據後讀入內存,然後再切分數據、進入 MapReduce 階段。首先需要調用 ddfs 的 chunk 命令把文件上傳到 DDFS,然後開始編寫 MapReduce 程序,Disco 外層應用程序採用 Python 編寫。Map 程序實例如清單 3 所示,Reduce 程序示例如清單 4 所示。
1
2
3
|
def fun_map(line, params):
for word in line.split():
yield word, 1
|
1
2
3
4
|
def fun_reduce(iter, params):
from disco.util import kvgroup
for word, counts in kvgroup(sorted(iter)):
yield word, sum(counts)
|
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
|
from disco.core import Job, result_iterator
def map(line, params):
for word in line.split():
yield word, 1
def reduce(iter, params):
from disco.util import kvgroup
for word, counts in kvgroup(sorted(iter)):
yield word, sum(counts)
if __name__ == '__main__':
job = Job().run(input=["http://discoproject.org/media/text/chekhov.txt"],
map=map,
reduce=reduce)
for word, count in result_iterator(job.wait(show=True)):
print(word, count)
Note
|
Phoenix 是基於 CPU 的 MapReduce 框架,所以它也是採用將數據分割後讀入內存,然後開始 MapReduce 處理階段這樣的傳統方式。Phoenix 並不由用戶決定切分每個 Map 分配到的數據塊的大小,它是根據集羣系統的實際 Cache 大小來切分的,這樣可以避免出現分配到 Map 的數據塊過大或者過小的情況出現。過大的數據快會導致 Map 執行較慢,過小的數據快會導致 Map 資源浪費,因爲每次啓動 Map 線程都需要消耗一定的系統資源。Map 階段切分好的文本被多個 Map 並行執行,Phoenix 支持 100 個左右的 Map 並行執行,一個工作節點下可以有若干個 Map 並行執行。只有當一個工作節點上所有的 Map 任務都結束後纔開始 Reduce 階段。Reduce 階段繼續沿用了動態任務調度機制,同時允許用戶自定義數據分區規則。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
65
66
67
|