混合使用ForkJoin+Actor+Future實現一千萬個不重複整數的排序(Scala示例)

 

目標

      實現一千萬個不重複整數的排序,能夠一次性加載到 2G 的內存裏。html

      本文適合於想要了解新語言 Scala 併發異步編程框架 Akka, Future 的筒鞋。 讀完本文後,將瞭解如何綜合使用 ForkJoin 框架、 Akka 模型、以及 Future 進行併發異步編程,還有一系列小的編程點。 java

任務拆分

      首先要進行任務拆分。要實現一千萬個不重複整數的排序, 能夠拆分爲三個子任務: 算法

      (1)  生成一千萬的不重複整數並寫入文件 NumberGeneratorTask; 編程

      (2) 從文件讀取並檢測確實生成的是一千萬個不重複的整數 CheckUnduplicatedNumbersActor; 數組

      (3)  從文件讀取整數進行排序和排序檢測 BigfileSortActor。接下來逐一實現這些子任務。數據結構

      入口以下。這裏使用了 Akka 的框架及 ForkJoin 實例。其中啓動 NumberGeneratorTask 抽離到一個工具類 ForkJoinPoolStartup 來實現,更好地維護和複用, 好比啓動不一樣參數的 NumberGeneratorTask 。多線程

      從 system.actorOf(Props(new CheckUnduplicatedNumbersActor(numbers, bigfileSortActor)), name="checkNumberActor") 能夠看出如何建立帶參數或不帶參數的 Actor 實例。注意到,若是任務流程是從 NumberGeneratorTask -> checkNumberTask -> bigfileSortTask , 那麼,對應的Actor 順序正好是反過來:先建立任務流程最靠後的Actor,再建立流程中靠前的Actor ,由於靠前的Actor 須要持有流程中下一個Actor的引用以便向其發送消息。BigFileSortActor 持有 ActorSystem 實例引用 system , 便於在排序及檢測完成後終止整個 Actor 系統。架構

package scalastudy.concurrent.billionsort

import akka.actor.{ActorSystem, Props}

import scalastudy.concurrent.ForkJoinPoolStartup
import scalastudy.concurrent.config.ActorSystemFactory
import scalastudy.concurrent.billionsort.Constants._

/**
  * Created by shuqin on 16/5/18.
  */
object BillionNumberSort extends App {

    launch()

    def launch(): Unit = {
        ForkJoinPoolStartup.start(createActors(), poolWaitSecs)
    }

    def createActors():NumberGeneratorTask = {
        val system:ActorSystem = ActorSystemFactory.newInstance()
        val bigfileSortActor = system.actorOf(Props(new BigFileSortActor(numbers, system)))
        val checkNumberActor = system.actorOf(Props(new CheckUnduplicatedNumbersActor(numbers, bigfileSortActor)), name="checkNumberActor")
        val numGenTask = new NumberGeneratorTask(numbers, 0, rangeMaxNumber, checkNumberActor)
        return numGenTask
    }

}
package scalastudy.concurrent

import java.util.concurrent.{ForkJoinPool, TimeUnit}

import scalastudy.concurrent.billionsort.NumberGeneratorTask

/**
  * Created by shuqin on 17/4/27.
  */
object ForkJoinPoolStartup {

  def start(entranceTask:NumberGeneratorTask, waitSecs:Int):Unit = {
    val pool = new ForkJoinPool()
    pool.execute(entranceTask)
    pool.shutdown
    pool.awaitTermination(waitSecs, TimeUnit.SECONDS)
    pool.shutdownNow
    assert( pool.isTerminated == true )
  }

}

生成一千萬個不重複整數

ForkJoin的使用

  顯然,這個子任務是能夠採用 ForkJoin 來完成的。 ForkJoin 是分治思想的框架性實現, 將原問題分解爲一樣性質的多個子問題,而後將子問題的解組合起來獲得原問題的解。一般採用二分法。實現上,一般會採用遞歸結構, 注意遞歸不要太深。 actorName ! message 表示向名稱爲 actorName 的Actor 實例發送 message 消息,message 能夠是任意數據結構,字符串、列表、元組、對象等。這裏發送了兩種類型: 整數列表 randInts.map(i=>i+start).toList 或 整數元組 (start, end) 。生成隨機無序整數使用了已有的Java類 RandomSelector 的方法,這代表了,Scala 能夠輕易無縫地使用 Java 現有的代碼和庫。併發

    NumberGeneratorTask 的實現以下:app

package scalastudy.concurrent.billionsort

import java.util.concurrent.RecursiveAction

import akka.actor.ActorRef
import zzz.study.algorithm.select.RandomSelector

import scalastudy.concurrent.billionsort.Constants.threshold
import scalastudy.concurrent.billionsort.Constants.debug

/**
  *  Created by shuqin on 16/5/19.
  *
  * 在 [start, end] 選出 num 個不重複的整數
  *
  */
class NumberGeneratorTask(num:Int, start:Int, end:Int, checkNumberActor: ActorRef)  extends RecursiveAction {

    override def compute(): Unit = {

        if (debug) {
            println("Select: " + num  + " unduplicated numbers from [" + start + " " + end + ")");
        }

        if (num <= threshold) {

            if (num > end - start+1) {
                checkNumberActor ! start.to(end).toList
            }
            else {
                val randInts = RandomSelector.selectMDisorderedRandInts2(num, end-start+1)
                checkNumberActor ! randInts.map(i=>i+start).toList
            }
        }
        else {
            val middle = start/2 + end/2
            val leftTask = new NumberGeneratorTask(num/2, start, middle, checkNumberActor)
            val rightTask = new NumberGeneratorTask((num+1)/2, middle+1, end, checkNumberActor)

            if (debug) {
                println("Left: [" + start + "-" + middle + "," + num/2 + "]")
                println("Right: [" + (middle+1) + "-" + end + "," + (num+1)/2 + "]")
            }

            leftTask.fork
            rightTask.fork
            leftTask.join
            rightTask.join
            checkNumberActor ! (start, end)
        }
    }

}

檢測生成的一千萬個整數不重複

Actor通訊

     Akka Actor 併發模型一個重要優點在於爲表明單任務的Actor提供了健壯可擴展的消息傳遞通訊機制。繼承 Actor 以後,須要覆寫指定方法 override def receive: Receive ,使用靈活而強大的 case 語句(偏函數)來匹配消息的類型及消息的值,從而作不一樣的判斷和操做。

     怎樣判斷整數生成任務完成從而能夠開始檢測了呢?在 NumberGeneratorTask 生成最後一組整數時並回退到最開始的調用層時,就會發送 (0, Constants.rangeMaxNumber) 做爲信號, 而 CheckUnduplicatedNumbersActor 則經過 case (0,Constants.rangeMaxNumber) 能夠匹配到這一點。

Trait與Actor的分工

     最開始,接收NumberGeneratorTask 傳來的消息進行處理以及檢測生成的整數不重複都寫在了 CheckUnduplicatedNumbersActor 這一個類裏。後來想了想,以爲這個類混雜了不一樣的功能和職責,所以拆分紅了兩個類:CheckUnduplicatedNumbersActor 和 CheckUnduplicatedNumbers 。 其中 CheckUnduplicatedNumbersActor 負責任務協做和計算調度, 而 CheckUnduplicatedNumbers 負責檢測生成的整數不重複的實際工做。職責明晰,各司其責,分開獨立發展。使用 with CheckUnduplicatedNumbers 語法,可使得具體類混入 trait 的功能,實現多重能力繼承,既能利用多重繼承的優點,又能避免多重字段繼承帶來的問題。

策略模式的使用

      CheckUnduplicatedNumbers 使用了策略模式。對於一千萬個整數來講,內存佔用 40M 左右, 2G 內存是裝滴下的, 如果十億個整數,那麼就須要 4G,就不能一次性加載了。所以這裏定義了個接口,並實現了一次性加載策略和位圖策略。可使用位圖來檢測不重複的整數,甚至能夠直接進行排序。可參考 《位圖排序(位圖技術應用)》。 BitMapStrategy 實現了使用位圖技術來對一千萬個不重複整數進行排序的策略。讀者感興趣能夠實現屢次加載策略,以應對內存不夠的情形。

      此外,Source.fromFile(filename).getLines 這裏返回的是迭代器, 若是內存不夠用的話,就必須使用這個方法,而不是 Source.fromFile(filename).getLines.toList , 後者會將全部行所有加載到內存中而致使 OutOfMemoryError . 

package scalastudy.concurrent.billionsort

import java.io.{File, PrintWriter}

import akka.actor.{Actor, ActorRef}

import scala.collection.immutable.List
import scalastudy.concurrent.billionsort.Constants.filename

/**
  * Created by shuqin on 16/5/19.
  */
class CheckUnduplicatedNumbersActor(val numbers:Int, bigfileSortActor: ActorRef) extends Actor
    with CheckUnduplicatedNumbers {


    val fwResult = new PrintWriter(new File(filename))

    var count = 0
    val useBigFileSort = true

    override def receive: Receive = {

        case numberList: List[Int] =>
            fwResult.write(numberList.mkString(" ") + "\n");
            count += numberList.length

        case (0, Constants.rangeMaxNumber) =>
            println("Reach End.")
            println("Expected: " + numbers + " , Actual Received: " + count)
            assert(count == numbers)
            fwResult.flush
            fwResult.close

            checkUnduplicatedNumbers(filename, numbers)
            if (useBigFileSort) {
                bigfileSortActor ! filename
            }

        case _ => println("未知消息,請檢查緣由 !")
    }

}
package scalastudy.concurrent.billionsort

import java.io.{File, PrintWriter}

import zzz.study.datastructure.vector.EnhancedBigNBitsVector

import scala.collection.mutable.Set
import scala.io.Source

import scalastudy.concurrent.billionsort.Constants.rangeMaxNumber

/**
  * Created by shuqin on 17/4/26.
  */
trait CheckUnduplicatedNumbers {

  def checkUnduplicatedNumbers(filename:String, numbers:Int): Unit = {

    assert(new OnceLoadStrategy().checkUnduplicatedNumbersInFile(filename, numbers) == true)
    assert(new BitMapStrategy().checkUnduplicatedNumbersInFile(filename,numbers) == true)
    println("checkUnduplicatedNumbers passed.")
  }

  /**
    * 一次性加載全部數到內存, 適用於內存能夠裝下全部數的狀況
    * 好比 10000000 個整數佔用 40M 空間, 2G 內存是綽綽有餘的, 但十億佔用 4G 空間失效
    */
  class OnceLoadStrategy extends CheckUnduplicatedStrategy {

    def checkUnduplicatedNumbersInFile(filename:String, numbers:Int):Boolean = {
      var numbersInFile = 0
      val unDupNumberSet = Set[Int]()
      Source.fromFile(filename).getLines.
        foreach { line =>
          val numbersInLine = line.split("\\s+").map(Integer.parseInt(_)).toSet
          numbersInFile += numbersInLine.size;
          unDupNumberSet ++= numbersInLine
                }
      println(s"Expected: ${numbers} , Actual In File: ${numbersInFile} ")
      println("Unduplicated numbers in File: " + unDupNumberSet.size)
      unDupNumberSet.size == numbers
    }
  }

  /**
    * 使用位圖技術來檢測不重複的數, 實際上還能用於排序
    * N個數只要 4(N/32+1) = N/8 + 4 個字節
    * 十億個數只要 125000004B = 125MB
    * 反過來, 內存 1G 的機器能夠對 80億 的不重複數進行排序
    */
  class BitMapStrategy extends CheckUnduplicatedStrategy {

    val nbitsVector = new EnhancedBigNBitsVector(rangeMaxNumber)

    override def checkUnduplicatedNumbersInFile(filename: String, numbers:Int): Boolean = {
      Source.fromFile(filename).getLines.
        foreach { line =>
          val numbersInLine = line.split("\\s+").map(Integer.parseInt(_)).toSet
          numbersInLine.foreach { num =>
            nbitsVector.setBit(num)
          }
        }

      val undupTotal = checkAndSort(filename)
      println(s"undupTotal: ${undupTotal}")
      assert(undupTotal == numbers)
      return true
    }

    def checkAndSort(filename: String): Integer = {
      val fwFinalResult = new PrintWriter(new File(s"${filename}.sorted.txt"))
      val sorted = nbitsVector.expr()
      var undupTotal = sorted.size()
      fwFinalResult.flush()
      fwFinalResult.close()
      return undupTotal
    }

  }

  trait CheckUnduplicatedStrategy {
    def checkUnduplicatedNumbersInFile(filename:String, numbers:Int):Boolean
  }

} 

大文件排序

      Oh, 終於進入正題了。大文件排序固然採用歸併排序了。 在這個實現裏,值得注意的是採用了 Future 全異步框架。

Future全異步框架

      能夠看到:

  (1) def produceFuture(line:String): Future[List[List[Int]]] 將文件的每一行(包含 threshold 個整數)轉化爲一個對行內整數排序的 Future, 能夠在後續獲取結果; 對於一個文件,就是得到了 futureTasks =  List[Future[List[List[Int]]]] ;  List[List[Int]] 是爲了讓後面的 Reduce 語法上走得通。 

   (2)   val sortedListsFuture = Future.reduce(futureTasks)(cat(_,_)) 將 List[Future[List[List[Int]]]] 整合成一個 TotalFuture, 這個 TotalFuture 的結果是 futureTasks 裏面的全部 Future 結果的鏈接; 每個 Future 的結果是一個已排序的列表; 那麼 TotalFuture 的結果是一個已排序列表的列表。List[Future[List[List[Int]]]] 看着是否是有點頭暈目眩?這生動地說明,要想玩轉編程,數據結構功底要紮實!

   (3)   注意到下面這行代碼: 是將一個 Future A 轉化爲另外一個 Future B. 其中 B 的結果是基於 A. 在本例中,便是將已排序列表的列表合併爲最終列表,但仍然返回的是 Future 而不是最終列表。爲何要這麼寫, 而不是將 sortedListsFuture 的結果取出來再合併呢? 這是因爲以前的全部動做都是異步的。 若是應用只是取排序的結果,那麼也沒什麼; 但若是應用要將 sortedListsFuture 的結果寫入文件呢? 進而還要作一下排序檢測? 那麼, 就不得不在後面加入 TimeUnit.SECONDS.sleep(n) 的代碼, 讓主線程休息一會了(由於前面整個是異步的, 在 sortedListsFuture 還沒完成時,後面的代碼就會被執行了)! 並且你得不斷估計前面的排序/合併操做究竟大約須要多少時間從而不斷調整休眠的時間! 以前就是這樣實現的! 但這樣並不符合 Future 異步框架的初衷! 所以後面,我忽然以爲要寫成全異步的, 也體驗到了寫成全異步應用的滋味~~ :) 要求確實是有點高,須要不斷從 Future 轉換成新的 Future ~~ 同時你也發現, Scala Future 也提供了一個幫助編寫全異步框架的 API ~~

sortedListsFuture map {
      value:List[List[Int]] =>
            CollectionUtil.mergeKOrderedList(value)
}

   (4) 因爲後面將排序結果寫入文件以及從文件檢測排序是否 OK 都是同步的,所以,能夠在排序 Future 完成後執行。 注意到 Future 的非阻塞寫法: f.onComplete { case Success(result) => doWith(result) ;  case Failure(ex) => doWith(ex) }   

     (5)  爲了將列表連接起來,也試錯了好幾回:  (x :: y :: Nil).flatten ; 若是寫成 reduce(_ :: _ :: Nil) 是會報錯的; 寫成 reduce(_.flatten :: _.flatten :: Nil) 最終會合併成兩個列表不符合預期。 

package scalastudy.concurrent.billionsort

import java.io.{File, PrintWriter}
import java.util.concurrent.TimeUnit

import akka.actor.{Actor, ActorSystem, Props}

import scala.concurrent.ExecutionContext.Implicits.global
import scala.concurrent.Future
import scala.util.{Failure, Success}
import scalastudy.utils.{CollectionUtil, DefaultFileUtil, PathConstants}


/**
  * Created by shuqin on 16/5/20.
  */
class BigFileSortActor(numbers: Int, actorSystem: ActorSystem) extends Actor with SortChecker {

    override def receive: Receive = {

        case filename:String =>
            println("Received File: " + filename)
            sortFile(filename)
    }

    def produceFuture(line:String): Future[List[List[Int]]] = {
        val origin = line.split("\\s+").map( s => Integer.parseInt(s)).toList
        Future {
            List(origin.sorted)
        }
    }

    def cat(x: List[List[Int]],y:List[List[Int]]): List[List[Int]] = {
        return (x :: y :: Nil).flatten
    }

    def obtainSortedFuture(futureTasks:List[Future[List[List[Int]]]]):Future[List[Int]] = {
        val sortedListsFuture = Future.reduce(futureTasks)(cat(_,_))

        sortedListsFuture map {
            value:List[List[Int]] =>
                CollectionUtil.mergeKOrderedList(value)
        }
    }

    def sortFile(filename:String):Unit = {

        val futureTasks = DefaultFileUtil.readFileLines(filename).map(produceFuture(_))
        println("task numbers: " + futureTasks.size)

        val allNumberSortedFuture = obtainSortedFuture(futureTasks)

        allNumberSortedFuture.onComplete {
            case Success(value:List[Int]) =>
                println("sort finished.")
                writeSorted(value, filename)
                checkSorted(filename, numbers)

                println("sleep 3s and then begin to stop all.")
                TimeUnit.SECONDS.sleep(3)
                actorSystem.shutdown
            case Failure(ex) =>
                println("Sort failed: " + ex.getMessage)
        }
    }

    def writeSorted(allNumberSorted: List[Int], filename: String): Unit = {
        val fwResult = new PrintWriter(new File(filename + ".sorted.txt"))
        fwResult.write(allNumberSorted.mkString("\n"))
        fwResult.flush
        fwResult.close
    }
}

object BigFileSortActorTest {

    def main(args:Array[String]):Unit = {

        val numbers = 10000000
        val system = ActorSystem("BigFileSortActorTest")
        val bigFileSortActor = system.actorOf(Props(new BigFileSortActor(numbers, system)),name="bigFileSortActor")
        bigFileSortActor ! PathConstants.projPath + "/data/" + numbers +".txt"

        TimeUnit.SECONDS.sleep(640)
        system.shutdown

    }

}
合併有序鏈表

     合併有序鏈表是歸併排序的核心環節,也是歸併排序的性能關鍵之所在。

     CollectionUtil 實現了一個二路有序列表合併和K路有序列表合併。 其中二路有序列表合併和K路有序列表均分別使用了三種方法來實現:一種是過程式的插入合併,一種是結合foldLeft 或 reduce 的函數式的實現,一種是更高效的實現。讀者能夠體會三種的差別。過程式的合併時間效率尚可,但空間開銷比較大,大數據量時容易致使OOM, 函數式的作法時間效率不夠優,而更高效的實現儘量結合二者的優點。 

      注意到,K路有序列表合併使用到了 klists.par.reduce(merge) , 將普通列表轉化爲並行列表,以必定空間開銷換取能夠並行地合併大量列表的時間效率。實際調試查看大量列表合併進度時,能夠在 merge 函數的返回結果行之上加一行 println(result.size),查看合併後的列表大小。 

      Scala 的 List 是一個鏈表 (head::(tail::Nil)), 空列表能夠用 List(), Nil 來表示; 將元素添加在列表頭部可以使用 elem :: list , 在列表尾部添加元素使用 list :+ elem ; 列表連接使用 list1 ::: list2。默認的 List 是不可變的,意味着每次操做都會建立一個新的 List , 對於大有序列表合併的空間效率是不能接受的。所以,大有序列表的合併必須採用可變列表 ListBuffer . 至於如何作到 O(n+m), 還須要探索。

     NOTE: 經過性能測試發現,merge 時間效率是最高的,可是當列表很大時會拋GC exceed limit 異常;  mergeInplace 的性能次之,在列表長度到達 10000 時,性能開始急速降低幾十倍; mergeFunctional 的時間性能最差。看上去狀況並不如預料。估計應該是使用 List 姿式不對。後續打算專門開一篇文章討論合併有序列表的優化。

     Scala 的集合性能參考: http://docs.scala-lang.org/overviews/collections/performance-characteristics.html

package scalastudy.utils

import scala.collection.mutable
import scala.collection.mutable.{ListBuffer, Map}

import scala.math.pow

/**
  * Created by lovesqcc on 16-4-2.
  */
object CollectionUtil {

  def main(args: Array[String]): Unit = {

    testSortByValue
    testAllMergeIsRight

    testPerf(merge)
    testPerf(mergeInplace)
    // testPerf(mergeFunctional)
  }

  def testSortByValue():Unit = {
    val map = Map("shuqin" -> 31, "yanni" -> 28)
    sortByValue(map).foreach { println }
  }

  def testAllMergeIsRight(): Unit = {
    testMerge(merge)
    testMerge(mergeFunctional)
    testMerge(mergeInplace)

    testMergeKOrderedList(mergeKOrderedList)
    testMergeKOrderedList(mergeKOrderedListFunctional)
    testMergeKOrderedList(mergeKOrderedListIneffective)
  }

  def testMerge(merge: (List[Int], List[Int]) => List[Int]):Unit = {
    assert(merge(Nil, Nil) == List())
    assert(merge(List(), Nil) == List())
    assert(merge(List(), List()) == List())
    assert(merge(List(), List(1,3)) == List(1,3))
    assert(merge(List(4,2), List()) == List(4,2))
    assert(merge(List(4,2), Nil) == List(4,2))
    assert(merge(List(2,4), List(1,3)) == List(1,2,3,4))
    assert(merge(List(2,4), List(1,3,5)) == List(1,2,3,4,5))
    assert(merge(List(2,4,6), List(1,3)) == List(1,2,3,4,6))
    assert(merge(List(2,4,6), List(8,10)) == List(2,4,6,8,10))
    println("test merge list passed.")
  }

  def testMergeKOrderedList(mergeKOrderedList: List[List[Int]] => List[Int]):Unit = {
    assert(mergeKOrderedList(Nil) == List())
    assert(mergeKOrderedList(List()) == List())
    assert(mergeKOrderedList(List(List())) == List())
    assert(mergeKOrderedList(List(List(1,2))) == List(1,2))
    assert(mergeKOrderedList(List(List(), List())) == List())
    assert(mergeKOrderedList(List(List(), List(1,3))) == List(1,3))
    assert(mergeKOrderedList(List(List(2,4), List())) == List(2,4))
    assert(mergeKOrderedList(List(List(2,4), List(1,3))) == List(1,2,3,4))
    assert(mergeKOrderedList(List(List(2,4), List(1,3,5))) == List(1,2,3,4,5))
    assert(mergeKOrderedList(List(List(2,4,6), List(1,3))) == List(1,2,3,4,6))
    assert(mergeKOrderedList(List(List(2,4,7), List(1,6), List(3,5))) == List(1,2,3,4,5,6,7))
    assert(mergeKOrderedList(List(List(2,4,9), List(1,7), List(3,6), List(5,8))) == List(1,2,3,4,5,6,7,8,9))
    println("test mergeKOrderedList passed.")
  }

  def testPerf(merge: (List[Int], List[Int]) => List[Int]):Unit = {
    val n = 10
    val numbers = (1 to 7).map(pow(n,_).intValue)
    println(numbers)
    numbers.foreach {
      num =>
        val methodName = merge.toString()
        val start = System.currentTimeMillis
        val xList = (1 to num).filter(_ % 2 == 0).toList
        val yList = (1 to num).filter(_ % 2 == 1).toList
        val merged = merge(xList, yList)
        val mergedSize = merged.size
        val end = System.currentTimeMillis
        val cost = end - start
        println(s"method=${methodName}, numbers=${num}, merged size: ${mergedSize}, merge cost: ${cost} ms")
    }
  }

  /**
    * 對指定 Map 按值排序
    */
  def sortByValue(m: Map[String,Int]): Map[String,Int] = {
    val sortedm = new mutable.LinkedHashMap[String,Int]
    m.toList.sortWith{case(kv1,kv2) => kv1._2 > kv2._2}.foreach { t =>
      sortedm(t._1) = t._2
                                                                }
    return sortedm
  }

  /**
    * 合併兩個有序列表
    * 將 yList 合併到 xList 上
    * 結合了 mergeFunctional 和 mergeIneffective 的優點
    * 沒有空間開銷,時間複雜度爲 O(n+m), n,m 分別是 xList, yList 的列表長度
* xList and yList should both be ListBuffer , and return ListBuffer
* * TODO not implemented
*/ def mergeInplace(xList: List[Int], yList: List[Int]): List[Int] = { (xList, yList) match { case (Nil, Nil) => List[Int]() case (Nil, _) => yList case (_, Nil) => xList case (hx :: xtail, hy :: ytail) => var result = List[Int]() var xListP = List[Int]() var yListP = List[Int]() if (hx > hy) { result = hy :: Nil xListP = xList yListP = ytail } else { result = hx:: Nil yListP = yList xListP = xtail } while (xListP != Nil && yListP != Nil) { if (xListP.head > yListP.head) { result = result :+ yListP.head yListP = yListP.tail } else { result = result :+ xListP.head xListP = xListP.tail } } if (xListP == Nil) { result = result ::: yListP } if (yListP == Nil) { result = result ::: xListP } // println("xsize=" + xList.size + ", ysize= " + yList.size + ", merged=" + result.size) result } } /** * 合併兩個有序列表 * * 因爲每次插入 yList 元素到 xList 都要從頭遍歷,所以算法時間複雜度是 O(n*m) */ def mergeFunctional(xList: List[Int], yList: List[Int]): List[Int] = { (xList, yList) match { case (Nil, Nil) => List[Int]() case (Nil, _) => yList case (_, Nil) => xList case (hx :: xtail, hy :: ytail) => yList.foldLeft(xList)(insert) } } def insert(xList:List[Int], y:Int): List[Int] = { (xList, y) match { case (Nil, _) => y :: Nil case (hx :: xtail, _) => if (hx > y) { y :: xList } else { var result = hx :: Nil var pCurr = xtail while (pCurr != Nil && pCurr.head < y) { result = result :+ pCurr.head pCurr = pCurr.tail } (result :+ y) ::: pCurr } } } /** * 合併兩個有序列表 * 將 yList 與 xList 合併到一個全新的鏈表上 * 因爲使用指針是漸進地合併,所以算法時間複雜度是 O(n+m) n,m 分別是 xList, yList 的列表長度 * 因爲有列表複製操做,且是漸進地合併,所以算法空間複雜度也是 O(n+m) */ def merge(xList: List[Int], yList: List[Int]): List[Int] = { if (xList.isEmpty) { return yList } if (yList.isEmpty) { return xList } val result = ListBuffer[Int]() var xListC = xList var yListC = yList while (!xListC.isEmpty && !yListC.isEmpty ) { if (xListC.head < yListC.head) { result.append(xListC.head) xListC = xListC.tail } else { result.append(yListC.head) yListC = yListC.tail } } if (xListC.isEmpty) { result.appendAll(yListC) } if (yListC.isEmpty) { result.appendAll(xListC) } result.toList } /** * 合併k個有序列表 * 轉化爲並行容器進行並行地合併,有空間開銷 */ def mergeKOrderedList(klists: List[List[Int]]): List[Int] = { if (klists.isEmpty) { return List[Int]() } if (klists.size == 1) { return klists.head } klists.par.reduce(merge) } /** * 合併k個有序列表 * 使用函數式逐個地合併 */ def mergeKOrderedListFunctional(klists: List[List[Int]]): List[Int] = { if (klists.isEmpty) { return List[Int]() } if (klists.size == 1) { return klists.head } klists.reduce(merge) } /** * 合併k個有序列表 * 使用插入逐個地合併 */ def mergeKOrderedListIneffective(klists: List[List[Int]]): List[Int] = { if (klists.isEmpty) { return List[Int]() } var nlist = klists.size if (nlist == 1) { return klists.head } var klistp = klists; val kbuf = ListBuffer[List[Int]]() while (nlist > 1) { for (i <- 0 to nlist/2-1) { kbuf.insert(i, merge(klistp(2*i), klistp(2*i+1))) if (nlist%2 == 1) { kbuf.append(klistp(nlist-1)) } } nlist = nlist - nlist/2 klistp = kbuf.toList } kbuf.toList.head } }
排序後檢測

     排序後檢測,既能夠作成一個 Actor ,也能夠作成一個 trait. 若是排序檢測自己在整個任務協做中佔有一席之地,那麼作成Actor比較合適;若是隻是一個配合性的動做,那麼作成 trait 會更直接。這裏選擇做爲一個trait, 而 BigFileSortActor 經過 with SortChecker 來借用它的排序檢測能力。 

     這裏提供了 checkSort 的過程式實現和函數式實現,讀者可體會其中的差別。因爲迭代器迭代一次後就變成空,所以迭代過程當中要記錄迭代次數,來與指定的整數數目進行比較斷言。

package scalastudy.concurrent.billionsort

import scala.io.Source

/**
  * Created by shuqin on 17/4/25.
  */
trait SortChecker {

  /**
    * 每次比較列表的兩個數, 後一個不小於前一個
    * NOTE: 使用迭代器模式
    */
  def checkSorted(filename:String, numbers:Int): Unit = {
    val numIterator = Source.fromFile(filename + ".sorted.txt").getLines().map(line => Integer.parseInt(line.trim))
    checkSort(numIterator, numbers)
    println("test sorted passed.")
  }

  /**
    * 函數式實現
    */
  def checkSort(numIterator: Iterator[Int], numbers:Int):Unit = {
    var count = 1
    numIterator.reduceLeft((prev,next) => {
      assert(prev <= next); count += 1 ; next;
    } )
    assert(count == numbers)
  }

  /**
    * 過程式實現
    */
  def checkSortProcedural(numIterator: Iterator[Int], numbers:Int): Unit = {
    var last = 0
    var count = 0
    numIterator.foreach {
      num =>
        assert(num >= last)
        last = num
        count += 1
    }
    assert(count == numbers)
  }

  def checkSort(numList: List[Int], numbers:Int): Unit = {
    checkSort(numList.iterator, numbers)
  }

}

輔助類

      (1)  Constants 包含了本示例所須要的常量,便於性能調優。 

      (2)  從 N 個數中選出不重複的 M 個數參見 RandomSelector 的實現。 算法出處:《編程珠璣》第十二章 取樣問題。

package scalastudy.concurrent.billionsort

import scalastudy.utils.PathConstants

/**
  * Created by shuqin on 17/4/27.
  */
object Constants {

  // 生成的整數中不超過的最大數
  val rangeMaxNumber = 1000000000

  // 在 [0, rangeMaxNumber] 生成 numbers 個不重複的整數
  val numbers = 10000000

  // 每次生成不超過 threshold 個不重複的整數數組; 
  // 該值不能太小, 不然會因遞歸層次過深致使內存不足.
  val threshold = numbers / 10

  // 存儲生成的不重複整數
  val filename = PathConstants.projPath + s"/data/${numbers}.txt"

  // ForkJoin 池終止前的等待時間
  val poolWaitSecs = 15

  // Debug 選項
  val debug = false

}
package zzz.study.algorithm.select;

import java.util.Arrays;
import java.util.Collection;
import java.util.HashSet;
import java.util.Random;
import java.util.Set;
import java.util.TreeSet;

public class RandomSelector {
    
    private RandomSelector() {
        
    }
    
    private static Random rand = new Random(47);
    
    /**
     * bigRandInt: 返回一個很是大的隨機整數,該整數的二進制位數不小於 bits
     */
    public static int bigRandInt(int bits)
    { 
         if (bits >= 32 || bits <= 0) {
             throw new IllegalArgumentException("參數 " + bits + " 錯誤,必須爲小於 32 的正整數!");
         }
         int baseNum = 1 << (bits - 1);
         return rand.nextInt(Integer.MAX_VALUE - baseNum) + baseNum;
    }
    
    /**
     * randRange: 生成給定範圍的隨機整數
     * @param low  範圍下限
     * @param high 範圍上限(不包含)
     * @return 給定範圍的隨機整數
     */
    public static int randRange(int low, int high)
    {
        if (high <= low) {
            throw new IllegalArgumentException("參數 [" + low + "," + high + "] 錯誤,第一個參數必須小於第二個參數!");
        }
        return bigRandInt(30) % (high-low) + low;    
    }
    
    /**
     * selectMOrderedRandInts : 從指定集合中隨機選擇指定數目的整數,並以有序輸出
     * @param m 須要選取的整數數目
     * @param n 指定整數集合 [0:n-1]
     * @return 隨機選取的有序整數列表
     */
    public static int[] selectMOrderedRandInts(int m, int n)
    {
        checkParams(m, n);
        int[] result = new int[m];
        int remaining = n;
        int selector = m;    
        for (int k=0, i=0; i < n; i++) {
            if ((bigRandInt(30) % remaining) < selector) {
                result[k++] = i;
                selector--;
            }
            remaining--;    
        }
        return result;    
    }
    
    /**
     * selectMOrderedRandInts2 : 從指定集合中隨機選擇指定數目的整數,並以有序輸出
     * @param m 須要選取的整數數目
     * @param n 指定整數集合 [0:n-1]
     * @return 隨機選取的有序整數列表
     */
    public static int[] selectMOrderedRandInts2(int m, int n)
    {
        checkParams(m, n);
        Set<Integer> holder = new TreeSet<Integer>();
        while (holder.size() < m) {
            holder.add(bigRandInt(30) % n);
        }
        return collectionToArray(holder);
    }
    
    /**
     * selectMOrderedRandInts3 : 從指定集合中隨機選擇指定數目的整數,並以有序輸出
     * @param m 須要選取的整數數目
     * @param n 指定整數集合 [0:n-1]
     * @return 隨機選取的有序整數列表
     */
    public static int[] selectMOrderedRandInts3(int m, int n)
    {
        checkParams(m, n);
        int[] arr = selectMDisorderedRandInts3(m, n);
        Arrays.sort(arr);
        return arr;
    }
    
    /**
     * selectMDisorderedRandInts2: 從指定整數集合中隨機選擇指定數目的整數,並以無序輸出
     * @param m 須要選取的整數數目
     * @param n 指定整數集合 [0:n-1]
     * @return 隨機選取的無序整數列表
     */
    public static int[] selectMDisorderedRandInts2(int m, int n)
    {
        checkParams(m, n);
        Set<Integer> intSet = new HashSet<Integer>();
        while (intSet.size() < m) {
            intSet.add(bigRandInt(30) % n);
        }
        return collectionToArray(intSet);
    }
    
    /**
     * selectMDisorderedRandInts3: 從指定整數集合中隨機選擇指定數目的整數,並以無序輸出
     * @param m 須要選取的整數數目
     * @param n 指定整數集合 [0:n-1]
     * @return 隨機選取的無序整數列表
     */
    public static int[] selectMDisorderedRandInts3(int m, int n)
    {
        checkParams(m, n);
        int[] arr = new int[n];
        for (int i=0; i < n; i++) {
            arr[i] = i;
        }
        for (int k=0; k < m; k++) {
            int j = randRange(k, n);
            int tmp = arr[k];
            arr[k] = arr[j];
            arr[j] = tmp;
        }
        return Arrays.copyOf(arr, m);
    }
    
    public static void checkParams(int m, int n)
    {
        if (m > n || m <= 0 || n <= 0 ) {
            throw new IllegalArgumentException("參數 [" + m + "," + n + "] 錯誤,必須均爲正整數,且第一個參數必須小於或等於第二個參數!");
        }
    }
    
    /**
     * collectionToArray : 將指定整數集合轉化爲整型數組列表
     * @param collection 指定整數集合
     * @return 要返回的整型數組列表,若給定集合爲空,則返回 null
     */
    public static int[] collectionToArray(Collection<Integer> collection)
    {
        if (collection == null || collection.size() == 0) {
            return null;
        }
        int[] result = new int[collection.size()];
        int k = 0;
        for (Integer integer : collection) {
            result[k] = integer;
            k++;
        }
        return result;
    }
    
    /**
     * printArray: 打印數組的便利方法,每打印十個數換行 
     * @param arr 指定要打印的數組
     */
    public static void printArray(int[] arr)
    {
        for (int i=0; i < arr.length; i++) {
            System.out.printf("%d%c", arr[i], i%10==9 ? '\n' : ' ');
        }
    }
    
}     

     

小結

      原本只是想寫一個 ForkJoin 的示例,但寫着寫着就加入了 akka, future 的元素, 是在解決問題的過程當中逐漸引入的。我以爲這種學習的方式很好,就是在解決一個問題的過程當中,能夠綜合地探索和學習到不少不一樣的東西。傳統的學習講究"按部就班"的方式,可是"跳躍式+快速試錯"也許是學習新技術的更好的方法。 :)

      對於Scala併發異步編程,能夠總結以下:

      (1)  ForkJoin 很是適合於數據併發或數據並行的計算,在分佈式計算架構之上就演變成 Map-Reduce 計算模型了;

      (2)  Akka-Actor 併發模型很是適合於任務協做和通訊的併發任務。多線程與鎖同步機制的問題就在於,線程之間沒有通訊的通道,只好經過在內存區域開闢若干共享可變的狀態來協調線程之間的協做; 而 Actor 模型則爲表明任務的Actor之間的通訊和協做經過了消息傳遞機制。正應了那句話:經過通訊來共享內存,而不是經過內存來共享通訊。

      (3)  Scala Future API 提供了一個全異步的框架。不像 Java 那樣只能生成一個 Future 隨後取數據, Scala Future 能夠經過各類計算操做映射成各類各樣的 Future, 並且能夠級聯、組合這些 Future 獲得新的 Future,  而後才從轉換後的最終 Future 中獲取結果,而且提供了非阻塞的處理結果的方式, 是靈活、可擴展的異步編程框架。

      (4)  在執行某些容器的大量獨立操做時,能夠採用並行計算。Scala 提供了並行容器的實現以及簡便的串行容器轉並行容器的方法,充分利用多核的能力作並行計算。

相關文章
相關標籤/搜索