package com.dcx.scala.actor import akka.actor.{Actor, ActorRef, ActorSystem, Props} import scala.collection.mutable.HashMap import scala.collection.mutable.ListBuffer import scala.io.Source /** * 思路: * 要有個Server * 要有個Client去通訊,client統計文本後把(qy,3)輸出給Server;Server再把全部的qy聚合,放到ListBuffer中 */ object AkkaWordCount { // 可變長List val list = new ListBuffer[HashMap[String,Int]] def main(args: Array[String]): Unit = { // 輸入數據文本 val files: Array[String] = Array("D:\\tmp\\input\\word3.txt", "D:\\tmp\\input\\word3.txt", "D:\\tmp\\input\\word3.txt") //存放接收到的每一個actor處理的結果數據 //存放有actor返回結果的Future數據 //拿ActorSystem是一個靜態工廠 val weChatApp = ActorSystem("WeChatApp") //拿到兩個Actor的通訊地址 val akkaServerRef: ActorRef = weChatApp.actorOf(Props[AkkaServer],"jianjian1") val clientRef: ActorRef = weChatApp.actorOf(Props(new Client(akkaServerRef)),"jianjian") for (file <- files) { clientRef ! file } // 讓該線程先睡一下,過早進入死循環會致使list沒有3個,一直循環不出來 Thread.sleep(1000) // 若是list把三個文件都放滿了,就退出循環 while(true){ if(list.size == 3){ // 輸出list println(list(list.size -1)) return } } } } //把每次聚合後的值都發送給AkkaServer class Client(val serverRef:ActorRef) extends Actor { override def receive: Receive = { { // 偏函數 經常使用做模式匹配 // case filePath: String => { //// map階段 // val list: List[String] = Source.fromFile(filePath).getLines().toList // val words: List[String] = list.flatMap(_.split(" ")) // val res: Map[String, Int] = words.map((_, 1)).groupBy(_._1).mapValues(_.size) // //異步發送結果數據 res發送到Server,去模式匹配 // serverRef ! res // } case filePath:String => { val list: List[String] = Source.fromFile(filePath).getLines().toList val words: List[String] = list.flatMap(_.split(" ")) // 得出: (qy,3) 格式 val res: Map[String, Int] = words.map((_, 1)).groupBy(_._1).mapValues(_.size) serverRef ! res } } } } import scala.collection.mutable.HashMap class AkkaServer extends Actor { private var hashMap: HashMap[String, Int] = new HashMap[String, Int] override def receive: Receive = { case context: Map[String, Int] =>{ // (qy,3) context.map( (map:(String,Int)) => { // 聚合 val value: Any = hashMap.getOrElse(map._1,None) if(value != None){ hashMap(map._1) = value.asInstanceOf[Int] + map._2 }else{ hashMap(map._1) = map._2 } } ) AkkaWordCount.list += hashMap } } }