package com.wondersgroup.myscala import scala.actors.{Actor, Future} import scala.collection.mutable.ListBuffer import scala.io.Source //首先統計每一個文本中出現的頻率=》彙總 case class SubmitTask(f:String) case object StopTask //統計一個文本中單詞出現的次數 class ActorTest3 extends Actor{ override def act() :Unit = { while (true) { receive{ case SubmitTask(f) => { //把文件的一行內容做爲一個元素存入list val lines = Source.fromFile(f).getLines().toList //文件中的每個單詞做爲一個元素存入list val words = lines.flatMap(_.split(" ")) print("----------"+words) println("================"+words.map((_,1))) //獲得一個map ,當前文本的單詞,以及相應單詞出現的次數 println("++++++"+words.map((_,1)).groupBy(_._1)) val result = words.map((_,1)).groupBy(_._1).mapValues(_.size) println("&&&&&&&&&&&&&&&&"+result) sender ! result } case StopTask => exit() } } } } object ActorTest3{ def main(args: Array[String]): Unit = { //把文本分析任務提交給actor val replys = new ListBuffer[Future[Any]] val results = new ListBuffer[Map[String,Int]] val files = Array("src/wordcount.txt","src/wordcount1.txt") for(f <- files) { val actor = new ActorTest3 actor.start() val reply = actor !! SubmitTask(f) //把處理結果放到replys replys += reply } //對多個文件的處理結果彙總 while (replys.size > 0) { //判斷結果是否可取 val done = replys.filter(_.isSet) print("@@@@@@@@@@@"+done) for(res <- done) { results += res.apply().asInstanceOf[Map[String,Int]] replys -= res } Thread.sleep(5000) } //對各個分析結果進行彙總 val res2 = results.flatten.groupBy(_._1).mapValues(_.foldLeft(0)(_+_._2)) println("******************"+res2) } }
輸出javascript
@@@@@@@@@@@ListBuffer()----------List(python, is, a, very, brief, language, It, is, also, a, shell, language, we, like, python)================List((python,1), (is,1), (a,1), (very,1), (brief,1), (language,1), (It,1), (is,1), (also,1), (a,1), (shell,1), (language,1), (we,1), (like,1), (python,1)) ----------List(python, java, go, python, c++, c++, java, ruby, c, javascript, c++)================List((python,1), (java,1), (go,1), (python,1), (c++,1), (c++,1), (java,1), (ruby,1), (c,1), (javascript,1), (c++,1)) ++++++Map(java -> List((java,1), (java,1)), c++ -> List((c++,1), (c++,1), (c++,1)), go -> List((go,1)), python -> List((python,1), (python,1)), c -> List((c,1)), ruby -> List((ruby,1)), javascript -> List((javascript,1))) ++++++Map(is -> List((is,1), (is,1)), shell -> List((shell,1)), a -> List((a,1), (a,1)), also -> List((also,1)), language -> List((language,1), (language,1)), brief -> List((brief,1)), python -> List((python,1), (python,1)), It -> List((It,1)), very -> List((very,1)), we -> List((we,1)), like -> List((like,1))) &&&&&&&&&&&&&&&&Map(is -> 2, shell -> 1, a -> 2, also -> 1, language -> 2, brief -> 1, python -> 2, It -> 1, very -> 1, we -> 1, like -> 1) &&&&&&&&&&&&&&&&Map(java -> 2, c++ -> 3, go -> 1, python -> 2, c -> 1, ruby -> 1, javascript -> 1) @@@@@@@@@@@ListBuffer(<function0>, <function0>)******************Map(is -> 2, shell -> 1, a -> 2, java -> 2, c++ -> 3, go -> 1, also -> 1, language -> 2, brief -> 1, python -> 4, It -> 1, c -> 1, ruby -> 1, very -> 1, we -> 1, like -> 1, javascript -> 1)
mapperjava
import java.io.IOException; import org.apache.hadoop.io.IntWritable; import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Mapper; //import org.apache.hadoop.io.*; //import com.sun.jersey.core.impl.provider.entity.XMLJAXBElementProvider.Text; /** * 輸入key LongWritable 行號 * 輸入的value Text 一行內容 * 輸出的key Text 單詞 * 輸出的value IntWritable 單詞的個數 * @author lenovo * */ public class WordCountMapper extends Mapper<LongWritable, Text, Text, IntWritable>{ Text k =new Text(); IntWritable v = new IntWritable(1); // @SuppressWarnings("unused") @Override protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException { // 1 將一行內容轉化爲String String line = value.toString(); // 2 切分 String[] words = line.split(" "); // 3 循環寫出到下一個階段 寫 for (String word : words) { k.set(word); context.write(k,v);//寫入 } } }
reducerpython
import java.io.IOException; import org.apache.hadoop.io.IntWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Reducer; public class WordCountReducer extends Reducer<Text, IntWritable, Text,IntWritable>{ // hello 1 // hello 1 @Override //相同的進來 protected void reduce(Text key, Iterable<IntWritable> values,Context context) throws IOException, InterruptedException { // 1 彙總 單詞總個數 int sum = 0; for (IntWritable count : values) { sum +=count.get(); } // 2 輸出單詞的總個數 context.write(key, new IntWritable(sum)); } }
driverc++
import java.io.IOException; import org.apache.hadoop.io.Text; import org.apache.hadoop.io.compress.BZip2Codec; import org.apache.hadoop.io.compress.CompressionCodec; import org.apache.hadoop.io.compress.DefaultCodec; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.IntWritable; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; public class WordCountDriver { public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException { // 1獲取job信息 Configuration configuration = new Configuration(); // 開啓 map 端輸出壓縮 configuration.setBoolean("mapreduce.map.output.compress", true); // 設置 map 端輸出壓縮方式 // configuration.setClass("mapreduce.map.output.compress.codec", BZip2Codec.class, CompressionCodec.class); configuration.setClass("mapreduce.map.output.compress.codec", DefaultCodec.class, CompressionCodec.class); Job job = Job.getInstance(configuration); // 2 獲取jar包位置 job.setJarByClass(WordCountDriver.class); // 3 關聯mapper he reducer job.setMapperClass(WordCountMapper.class); job.setReducerClass(WordCountReducer.class); // 4 設置map輸出數據類型 job.setMapOutputKeyClass(Text.class); job.setMapOutputValueClass(IntWritable.class); // 5 設置最終輸出類型 job.setOutputKeyClass(Text.class); job.setOutputValueClass(IntWritable.class); // 9 添加combiner 進入reduce以前先進行合併,不是全部的map都能合併,須要知足要求 // job.setCombinerClass(WordcountCombiner.class); // 8 設置讀取輸入文件切片的類 多個小文件的處理方式 使用CombineTextInputFormat 系統默認TextInputFormat // job.setInputFormatClass(CombineTextInputFormat.class); // CombineTextInputFormat.setMaxInputSplitSize(job, 4194304); // CombineTextInputFormat.setMinInputSplitSize(job, 2097152); // 6 設置數據輸入 輸出文件的 路徑 FileInputFormat.setInputPaths(job, new Path(args[0])); FileOutputFormat.setOutputPath(job, new Path(args[1])); // 設置 reduce 端輸出壓縮開啓 FileOutputFormat.setCompressOutput(job, true); // 設置壓縮的方式 FileOutputFormat.setOutputCompressorClass(job, BZip2Codec.class); // FileOutputFormat.setOutputCompressorClass(job, GzipCodec.class); // FileOutputFormat.setOutputCompressorClass(job, DefaultCodec.class); // 7提交代碼 boolean result = job.waitForCompletion(true); System.exit(result?0:1); } }
combinershell
import java.io.IOException; import org.apache.hadoop.io.IntWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Reducer; public class WordcountCombiner extends Reducer<Text, IntWritable, Text, IntWritable>{ @Override protected void reduce(Text key, Iterable<IntWritable> values, Context context) throws IOException, InterruptedException { // 1 彙總 int sum = 0; for (IntWritable value : values) { sum += value.get(); } // 2 輸出 context.write(key, new IntWritable(sum)); } }