akka案例:統計單詞個數

一、文本文件test.txtjava


二、建立Scala項目AkkaScalaWordCount項目實現詞頻統計數組


(1)建立AKKAUtils類,提供獲取akka配置的函數app

package net.hw.akka.wc

import java.util.HashMap
import java.util.ArrayList

/**
* Created by howard on 2017/8/27.
*/
object AKKAUtils {
def getConf(ip: String, port: String): HashMap[String, Object] = {
val conf = new HashMap[String, Object]()
val list = new ArrayList[String]()
list.add("akka.remote.netty.tcp")
conf.put("akka.remote.enabled-transports", list)
conf.put("akka.actor.provider", "akka.remote.RemoteActorRefProvider")
conf.put("akka.remote.netty.tcp.hostname", ip)
conf.put("akka.remote.netty.tcp.port", port)
return conf
}
}

(2)建立WcInfo1,封裝從WcDriver發往WcMapper的數據
package net.hw.akka.wc

/**
* Created by howard on 2017/8/27.
*/
case class WcInfo1(data: String, mapFunc: String => Array[(String, Int)],
reduceFunc: Array[(String, Int)] => Map[String, Int]) {
val datax = data
val mapFuncx = mapFunc
val reduceFuncx = reduceFunc
}

WcMapper接收字符串data,調用mapFunc進行處理,返回的是tuple數組arr: Array[(String, Int),因而,WcReducer接收的參數就是arr: Array[(String, Int)。

(3)建立WcInfo2,封裝從WcDriver發往WcMapper的數據
package net.hw.akka.wc

/**
* Created by howard on 2017/8/27.
*/
case class WcInfo2(arr: Array[(String, Int)],
reduceFunc: Array[(String, Int)] => Map[String, Int]) {
val arrx = arr
val reduceFuncx = reduceFunc
}

(4)建立WcDriver
package net.hw.akka.wc

import java.util.Scanner

import akka.actor.ActorSystem
import com.typesafe.config.ConfigFactory

/**
* Created by howard on 2017/8/27.
*/
object WcDriver {
def main(args: Array[String]): Unit = {
val sys = ActorSystem("myAkkaClientSys", ConfigFactory.parseMap(AKKAUtils.getConf("127.0.0.1", "44444")))

val scan = new Scanner(System.in)
while (true) {
val data = scan.nextLine();
val mapFunc = (line: String) => {
val arr = line.split(" ")
arr.map((_, 1))
}
val reduceFunc = (arr: Array[(String, Int)]) => {
arr.groupBy(_._1).mapValues(_.map(_._2)).mapValues(_.reduce(_ + _))
}

sys.actorSelection("akka.tcp://myAkkaServerSys@127.0.0.1:44443/user/mapperActor") ! new WcInfo1(data, mapFunc, reduceFunc);
}
}
}

WcDriver獲取了行信息以後,定義了兩個函數。而後定義本身的ActorSystem對象sys,監聽本機的44444端口,而後往本機44443端口的mapperActor發送信息WcInfo1對象,封裝了讀取的行數據,以及對數據進行處理的兩個函數。

(5)建立WcMapper
package net.hw.akka.wc

import akka.actor.{Actor, ActorSystem, Props}
import com.typesafe.config.ConfigFactory

/**
* Created by howard on 2017/8/27.
*/
object WcMapper {

class MapperActor extends Actor {
def receive = {
case wif1: WcInfo1 => {
val line = wif1.data
val mapFunc = wif1.mapFunc
val reduceFunc = wif1.reduceFunc
val arr = mapFunc(line)
context.actorSelection("akka.tcp://myAkkaServerSys@127.0.0.1:44442/user/reducerActor") ! new WcInfo2(arr, reduceFunc)
}
}
}

def main(args: Array[String]): Unit = {
val sys = ActorSystem("myAkkaServerSys",
ConfigFactory.parseMap(AKKAUtils.getConf("127.0.0.1", "44443")));
sys.actorOf(Props[MapperActor], "mapperActor")
}
}

(6)建立WcReducer
package net.hw.akka.wc

import akka.actor.{Actor, ActorSystem, Props}
import com.typesafe.config.ConfigFactory

/**
* Created by howard on 2017/8/27.
*/
object WcReducer {
class ReducerActor extends Actor {
def receive = {
case wif2: WcInfo2 => {
val arr = wif2.arr
val reduceFunc = wif2.reduceFunc
val map = reduceFunc(arr)
println("統計結果:")
map.foreach(println)
}
}
}

def main(args: Array[String]): Unit = {
val sys = ActorSystem("myAkkaServerSys",
ConfigFactory.parseMap(AKKAUtils.getConf("127.0.0.1", "44442")))
sys.actorOf(Props[ReducerActor], "reducerActor")
}
}

按順序啓動WcReducer、WcMapper、WcDriver,看WcReducer控制檯的統計結果:


本文分享 CSDN - howard2005。
若有侵權,請聯繫 support@oschina.cn 刪除。
本文參與「OSC源創計劃」,歡迎正在閱讀的你也加入,一塊兒分享。tcp

相關文章
相關標籤/搜索