從.net parallel角度解讀spark

對於我這樣一個一直工做在.net平臺上的developer來說,Hadoop,Spark,HBase等這些大數據名詞比較陌生,對於分佈式計算,.net上也有相似的Parallel(我說的不是HDInsight), 這篇文章是我嘗試從.net上的Parallel類庫的角度去講述什麼是spark。算法

 

咱們先從C#的一個爛大街的例子(不是Helloworld),統計一篇文章單詞出現的頻率。sql

下面C#代碼是利用.net Parallel來寫的統計單詞出現頻率。apache

 1 using System;
 2 using System.Collections.Generic;
 3 using System.Linq;
 4 using System.Text;
 5 using System.Threading.Tasks;
 6 
 7 namespace WordCountDemo
 8 {
 9     using System.IO;
10     using System.Threading;
11     class Program
12     {
13         /// <summary>
14         /// 咱們以計算一篇文章中單詞的個數爲例子
15         /// (計算文章單詞個數的demo簡直就是各類大數據計算的HelloWorld)。
16         /// 
17         /// WordCountFlow是數單詞程序
18         /// WordCountDetail對WordCountFlow函數每一行進行拆解並作了詳細解釋。
19         /// </summary>
20         /// <param name="args"></param>
21         static void Main(string[] args)
22         {
23             string filePath = @"D:\BigDataSoftware\spark-2.1.0-bin-hadoop2.7\README.md";
24 
25             WordCountFlow(filePath);
26             Console.WriteLine("----------------------");
27             WordCountDetail(filePath);            
28         }
29 
30         /// <summary>
31         /// 數單詞的程序流程
32         /// </summary>
33         /// <param name="filePath"></param>
34         static void WordCountFlow(string filePath)
35         {
36             File.ReadAllLines(filePath).AsParallel()
37                 .SelectMany(t => t.Split(' '))
38                 .Select(t => new { word = t, tag = 1 })
39                 .GroupBy(t => t.word).Select(t => new { word = t.Key, count = t.Select(p => p.tag).Aggregate((a, b) => a + b) })
40                 // 若是對Aggregate函數不熟悉,上面代碼等同於下行
41                 //.GroupBy(t => t.word).Select(t => new { word = t.Key, count = t.Sum(p => p.tag) });
42                 .ForAll(t => Console.WriteLine($"ParationId:{Thread.CurrentThread.ManagedThreadId}   ({t.word}-{t.count})"));
43         }
44 
45         /// <summary>
46         /// 數單詞程序流程的詳細解釋
47         /// </summary>
48         /// <param name="filePath"></param>
49         static void WordCountDetail(string filePath)
50         {
51             // 讀取整篇文章,文章每一行將做爲一個string存儲到數組lines
52             string[] lines = File.ReadAllLines(filePath);
53             // AsParallel()是Parallel類庫的核心方法,具體的意思是將string[] lines這個數組分割成幾個分區(Partition)。
54             // 假設這篇文章有500行,那麼這個方法會會把string[500]-lines分解成 (string[120] partitionA), 
55             // (string[180] partitionB), (string[150] partitionC),(...) 等幾個Partition
56             // .net runtime將當前程序的負載(主要是cpu使用狀況)狀況爲依據的分區算法來肯定到底要分紅幾個Partition,
57             // 咱們能夠大概認爲cpu有幾個邏輯核(不許確),就會被分解成幾個Partition。
58             // 後續的計算中.net runtime將會針對每個partition申請一個單獨的線程來處理.
59             // 好比:partitionA由001號線程處理,partitionB由002號線程處理。。。
60             ParallelQuery<string> parallelLines = lines.AsParallel();
61             // linesA,linesB,linesC...數組中存儲的每一行根據空格分割成單詞,結果仍然是存放在ParallelQuery<string>這種分塊的結構中
62             // 下面帶有****的註釋,若是對函數式編程沒有了解,能夠直接忽略。
63             // ****若是對函數式編程有所瞭解,會知道lambda天生lazy的,若是下面這行代碼打個斷點,當debug到這行代碼的時候,
64             // ****鼠標移動到parallelWords上時,咱們不會看到每個單詞,
65             // ****runtime並無真正將每一行分解成單詞,這行代碼僅僅是一種計算邏輯。
66             ParallelQuery<string> parallelWords = parallelLines.SelectMany(t => t.Split(' '));
67             // 將每個單子加上標記1,這行代碼返回的類型爲ParallelQuery<var>,var爲runtime自動判斷,此處var的類型的實際應該爲 
68             // class 匿名類型
69             // { 
70             //        public word {get;set;}
71             //        public tag {get;set}
72             //}    
73             var wordparis = parallelWords.Select(t => new { word = t, tag = 1 });
74             // 根據單詞進行分組,同一個分組中的單詞個數求和,相似於以下sql  select word,count(tag) from wordparis group by word
75             // 注意,此處一樣的單詞可能分佈在不一樣的分區中,好比英語中常見的"the",可能partitionA中有3個"the",partitionB中有2個「the",
76             // 可是partitionA和partitionB分別被不一樣的線程處理,若是runtime足夠聰明的話,他應該先計算partitionA的the的個數(the,3),
77             // 而後計算partitionB的the的個數(the,2),最後將整個partition合併而且從新分割(shuffle),在作後續的計算
78             // shuffle後partition的分區和以前partition裏面的數據會不一樣。
79             // 此處wordcountParis的類型爲
80             // class 匿名類型
81             // { 
82             //        public word {get;set;}
83             //        public count {get;set}
84             //}
85             var wordcountParis = wordparis.GroupBy(t => t.word).Select(t => new { word = t.Key, count = t.Select(p => p.tag).Aggregate((a, b) => a + b) });
86             // 打印結果。因爲線程執行的亂序,能夠看到輸出的partitionId也是亂序。
87             wordcountParis.ForAll(t => Console.WriteLine($"ParationId:{Thread.CurrentThread.ManagedThreadId}   ({t.word}-{t.count})"));
88         }
89     }
90 }

   程序運行結果編程

  

 

  經過上面的c#的例子,咱們看到parallel如何將一篇文章分解成多個Partition來而且在不一樣Partition上進行並行計算的,在計算過程當中,可能須要"shuffle",須要對原來的Partition進行從新洗牌。c#

  咱們假設,若是這個程序運行在集羣上,這些Partition分佈在不一樣的機器上,這樣就能夠利用多臺機器的力量而非一臺機器多個線程的力量去作計算了,yeah!,你猜對了,這就是spark,下面的scala的wordCountFlow函數是在spark上統計單詞出現頻率的函數,與c#的WordCountFlow同樣,也是五行代碼,而且這五行代碼的邏輯也徹底相同。只不過spark將數據分佈在不一樣的機器上,而且讓機器進行計算,固然,如你所想,某些狀況下須要shuffle,不一樣機器上的數據將會被匯聚並從新分割成新的分區。雖然Spark中的partition和net parallel中的partition並不徹底對應(spark中的一臺機器上可能有多個paratition) ,shuffle也是spark的專用詞彙,但基本的原理是相似的。數組

package wordCountExample

import org.apache.spark.{SparkConf, SparkContext, TaskContext}

/**
  * Created by StevenChennet on 2017/3/10.
  */
object WordCount {
  def main(args: Array[String]): Unit = {
    // 文件路徑
    val filePath="D:\\BigDataSoftware\\spark-2.1.0-bin-hadoop2.7\\README.md"

    wordCountFlow(filePath)
  }
  def wordCountFlow(filePath:String ):Unit={
    // sparkContext對象使用一個SparkConf對象來構造
    // SparkConf主要進行一些設置,好比說local【*】表示儘可能開啓更多線程並行處理
    // SparkContext是spark執行任務的核心對象
    // 下面五行代碼與C#的WordCountFlow五行代碼一一對應
    new SparkContext(new SparkConf().setAppName("WordCount").setMaster("local[*]")).textFile(filePath)
      .flatMap(_.split(" "))
      .map((_,1))
      .reduceByKey(_+_)
      .foreach(t=>println( s"Partition: ${ TaskContext.getPartitionId() }  (${t._1}}-${t._2}})"))
  }
}

  據友情提醒,上面的Scala代碼的lambda太難看了,我轉換一下方式多線程

  

new SparkContext(new SparkConf().setAppName("WordCount").setMaster("local[*]")).textFile(filePath)
      .flatMap(line=>line.split(" "))
      .map(word=>(word,1))
      .reduceByKey((a,b)=>a+b)
      .foreach(t=>println( s"Partition: ${ TaskContext.getPartitionId() }  (${t._1}}-${t._2}})"))
  }

  

  程序運行結果分佈式

  

  在net parallel中,若是某個線程在計算過程當中崩潰了,那可能致使整個程序都crash掉,若是是集羣運算,由於一臺宕機而讓整個集羣崩潰可不是一個好決策,spark能夠在計算以前先對要計算的內容持久化,若是一臺機器crash,能夠將這臺機器的計算任務拉到另一臺機器上進行從新計算。函數式編程

相關文章
相關標籤/搜索