一、文本文件test.txtjava
二、建立Scala項目AkkaScalaWordCount項目實現詞頻統計數組
(1)建立AKKAUtils類,提供獲取akka配置的函數app
package net.hw.akka.wc
import java.util.HashMap
import java.util.ArrayList
/**
* Created by howard on 2017/8/27.
*/
object AKKAUtils {
def getConf(ip: String, port: String): HashMap[String, Object] = {
val conf = new HashMap[String, Object]()
val list = new ArrayList[String]()
list.add("akka.remote.netty.tcp")
conf.put("akka.remote.enabled-transports", list)
conf.put("akka.actor.provider", "akka.remote.RemoteActorRefProvider")
conf.put("akka.remote.netty.tcp.hostname", ip)
conf.put("akka.remote.netty.tcp.port", port)
return conf
}
}
(2)建立WcInfo1,封裝從WcDriver發往WcMapper的數據
package net.hw.akka.wc
/**
* Created by howard on 2017/8/27.
*/
case class WcInfo1(data: String, mapFunc: String => Array[(String, Int)],
reduceFunc: Array[(String, Int)] => Map[String, Int]) {
val datax = data
val mapFuncx = mapFunc
val reduceFuncx = reduceFunc
}
WcMapper接收字符串data,調用mapFunc進行處理,返回的是tuple數組arr: Array[(String, Int),因而,WcReducer接收的參數就是arr: Array[(String, Int)。
(3)建立WcInfo2,封裝從WcDriver發往WcMapper的數據
package net.hw.akka.wc
/**
* Created by howard on 2017/8/27.
*/
case class WcInfo2(arr: Array[(String, Int)],
reduceFunc: Array[(String, Int)] => Map[String, Int]) {
val arrx = arr
val reduceFuncx = reduceFunc
}
(4)建立WcDriver
package net.hw.akka.wc
import java.util.Scanner
import akka.actor.ActorSystem
import com.typesafe.config.ConfigFactory
/**
* Created by howard on 2017/8/27.
*/
object WcDriver {
def main(args: Array[String]): Unit = {
val sys = ActorSystem("myAkkaClientSys", ConfigFactory.parseMap(AKKAUtils.getConf("127.0.0.1", "44444")))
val scan = new Scanner(System.in)
while (true) {
val data = scan.nextLine();
val mapFunc = (line: String) => {
val arr = line.split(" ")
arr.map((_, 1))
}
val reduceFunc = (arr: Array[(String, Int)]) => {
arr.groupBy(_._1).mapValues(_.map(_._2)).mapValues(_.reduce(_ + _))
}
sys.actorSelection("akka.tcp://myAkkaServerSys@127.0.0.1:44443/user/mapperActor") ! new WcInfo1(data, mapFunc, reduceFunc);
}
}
}
WcDriver獲取了行信息以後,定義了兩個函數。而後定義本身的ActorSystem對象sys,監聽本機的44444端口,而後往本機44443端口的mapperActor發送信息WcInfo1對象,封裝了讀取的行數據,以及對數據進行處理的兩個函數。
(5)建立WcMapper
package net.hw.akka.wc
import akka.actor.{Actor, ActorSystem, Props}
import com.typesafe.config.ConfigFactory
/**
* Created by howard on 2017/8/27.
*/
object WcMapper {
class MapperActor extends Actor {
def receive = {
case wif1: WcInfo1 => {
val line = wif1.data
val mapFunc = wif1.mapFunc
val reduceFunc = wif1.reduceFunc
val arr = mapFunc(line)
context.actorSelection("akka.tcp://myAkkaServerSys@127.0.0.1:44442/user/reducerActor") ! new WcInfo2(arr, reduceFunc)
}
}
}
def main(args: Array[String]): Unit = {
val sys = ActorSystem("myAkkaServerSys",
ConfigFactory.parseMap(AKKAUtils.getConf("127.0.0.1", "44443")));
sys.actorOf(Props[MapperActor], "mapperActor")
}
}
(6)建立WcReducer
package net.hw.akka.wc
import akka.actor.{Actor, ActorSystem, Props}
import com.typesafe.config.ConfigFactory
/**
* Created by howard on 2017/8/27.
*/
object WcReducer {
class ReducerActor extends Actor {
def receive = {
case wif2: WcInfo2 => {
val arr = wif2.arr
val reduceFunc = wif2.reduceFunc
val map = reduceFunc(arr)
println("統計結果:")
map.foreach(println)
}
}
}
def main(args: Array[String]): Unit = {
val sys = ActorSystem("myAkkaServerSys",
ConfigFactory.parseMap(AKKAUtils.getConf("127.0.0.1", "44442")))
sys.actorOf(Props[ReducerActor], "reducerActor")
}
}
按順序啓動WcReducer、WcMapper、WcDriver,看WcReducer控制檯的統計結果:
本文分享 CSDN - howard2005。
若有侵權,請聯繫 support@oschina.cn 刪除。
本文參與「OSC源創計劃」,歡迎正在閱讀的你也加入,一塊兒分享。tcp