wordcount實例

scala的wordcount實例

package com.wondersgroup.myscala

import scala.actors.{Actor, Future}
import scala.collection.mutable.ListBuffer
import scala.io.Source

//首先統計每一個文本中出現的頻率=》彙總
case class SubmitTask(f:String)
case object StopTask

//統計一個文本中單詞出現的次數


class ActorTest3 extends Actor{

  override def act() :Unit = {
    while (true) {
      receive{
        case SubmitTask(f) => {
          //把文件的一行內容做爲一個元素存入list
          val lines = Source.fromFile(f).getLines().toList
          //文件中的每個單詞做爲一個元素存入list
          val words = lines.flatMap(_.split(" "))
          print("----------"+words)
          println("================"+words.map((_,1)))
          //獲得一個map ,當前文本的單詞,以及相應單詞出現的次數
          println("++++++"+words.map((_,1)).groupBy(_._1))
          val result = words.map((_,1)).groupBy(_._1).mapValues(_.size)
          println("&&&&&&&&&&&&&&&&"+result)

          sender ! result

        }

        case StopTask => exit()
      }
    }
  }

}

object ActorTest3{
  def main(args: Array[String]): Unit = {
    //把文本分析任務提交給actor
    val replys = new ListBuffer[Future[Any]]
    val results = new ListBuffer[Map[String,Int]]
    val files = Array("src/wordcount.txt","src/wordcount1.txt")
    for(f <- files) {
      val actor = new ActorTest3
      actor.start()
      val reply = actor !! SubmitTask(f)
      //把處理結果放到replys
      replys += reply
    }

    //對多個文件的處理結果彙總
    while (replys.size > 0) {
      //判斷結果是否可取
      val done = replys.filter(_.isSet)
      print("@@@@@@@@@@@"+done)
      for(res <- done) {
        results += res.apply().asInstanceOf[Map[String,Int]]
        replys -= res
      }
      Thread.sleep(5000)
    }

    //對各個分析結果進行彙總
    val res2 = results.flatten.groupBy(_._1).mapValues(_.foldLeft(0)(_+_._2))
    println("******************"+res2)

  }
}  

 輸出javascript

@@@@@@@@@@@ListBuffer()----------List(python, is, a, very, brief, language, It, is, also, a, shell, language, we, like, python)================List((python,1), (is,1), (a,1), (very,1), (brief,1), (language,1), (It,1), (is,1), (also,1), (a,1), (shell,1), (language,1), (we,1), (like,1), (python,1))
----------List(python, java, go, python, c++, c++, java, ruby, c, javascript, c++)================List((python,1), (java,1), (go,1), (python,1), (c++,1), (c++,1), (java,1), (ruby,1), (c,1), (javascript,1), (c++,1))
++++++Map(java -> List((java,1), (java,1)), c++ -> List((c++,1), (c++,1), (c++,1)), go -> List((go,1)), python -> List((python,1), (python,1)), c -> List((c,1)), ruby -> List((ruby,1)), javascript -> List((javascript,1)))
++++++Map(is -> List((is,1), (is,1)), shell -> List((shell,1)), a -> List((a,1), (a,1)), also -> List((also,1)), language -> List((language,1), (language,1)), brief -> List((brief,1)), python -> List((python,1), (python,1)), It -> List((It,1)), very -> List((very,1)), we -> List((we,1)), like -> List((like,1)))
&&&&&&&&&&&&&&&&Map(is -> 2, shell -> 1, a -> 2, also -> 1, language -> 2, brief -> 1, python -> 2, It -> 1, very -> 1, we -> 1, like -> 1)
&&&&&&&&&&&&&&&&Map(java -> 2, c++ -> 3, go -> 1, python -> 2, c -> 1, ruby -> 1, javascript -> 1)
@@@@@@@@@@@ListBuffer(<function0>, <function0>)******************Map(is -> 2, shell -> 1, a -> 2, java -> 2, c++ -> 3, go -> 1, also -> 1, language -> 2, brief -> 1, python -> 4, It -> 1, c -> 1, ruby -> 1, very -> 1, we -> 1, like -> 1, javascript -> 1)

 mapreduce的wordcount

mapperjava

import java.io.IOException;

import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;
//import org.apache.hadoop.io.*;
//import com.sun.jersey.core.impl.provider.entity.XMLJAXBElementProvider.Text;
/**
 * 輸入key   LongWritable  行號
 * 輸入的value Text   一行內容
 * 輸出的key  Text  單詞 
 * 輸出的value IntWritable  單詞的個數
 * @author lenovo
 *
 */
public class WordCountMapper extends Mapper<LongWritable, Text, Text, IntWritable>{
	
	Text k =new Text();
	IntWritable v = new IntWritable(1);
//	@SuppressWarnings("unused")
	@Override
	protected void map(LongWritable key, Text value, Context context)
			throws IOException, InterruptedException {
		
		//  1 將一行內容轉化爲String
		String line = value.toString();
		
		// 2 切分
		String[] words = line.split(" ");
		
		// 3 循環寫出到下一個階段   寫
		for (String word : words) {

			k.set(word);
			context.write(k,v);//寫入
		}
	}
}  

reducerpython

import java.io.IOException;

import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;

public class WordCountReducer extends Reducer<Text, IntWritable, Text,IntWritable>{
	
	// hello 1
	// hello 1
	
	@Override
	//相同的進來
	protected void reduce(Text key, Iterable<IntWritable> values,Context context) 
			throws IOException, InterruptedException {
		//  1 彙總 單詞總個數
		int sum = 0;
		for (IntWritable count : values) {
			sum +=count.get();
		}
		
		// 2 輸出單詞的總個數
		
		context.write(key, new IntWritable(sum));
	}
}  

driverc++

import java.io.IOException;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.io.compress.BZip2Codec;
import org.apache.hadoop.io.compress.CompressionCodec;
import org.apache.hadoop.io.compress.DefaultCodec;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;


public class WordCountDriver {
	public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException {

		// 1獲取job信息
		Configuration configuration = new Configuration();
		
		// 開啓 map 端輸出壓縮
		configuration.setBoolean("mapreduce.map.output.compress", true);
		// 設置 map 端輸出壓縮方式
//		configuration.setClass("mapreduce.map.output.compress.codec", BZip2Codec.class, CompressionCodec.class);
		configuration.setClass("mapreduce.map.output.compress.codec", DefaultCodec.class, CompressionCodec.class);

		Job job = Job.getInstance(configuration);

		// 2 獲取jar包位置

		job.setJarByClass(WordCountDriver.class);

		// 3 關聯mapper he reducer
		job.setMapperClass(WordCountMapper.class);
		job.setReducerClass(WordCountReducer.class);

		// 4 設置map輸出數據類型
		job.setMapOutputKeyClass(Text.class);
		job.setMapOutputValueClass(IntWritable.class);

		// 5 設置最終輸出類型
		job.setOutputKeyClass(Text.class);
		job.setOutputValueClass(IntWritable.class);

		
		// 9 添加combiner     進入reduce以前先進行合併,不是全部的map都能合併,須要知足要求
//		job.setCombinerClass(WordcountCombiner.class);
		
		
		// 8 設置讀取輸入文件切片的類     多個小文件的處理方式 使用CombineTextInputFormat     系統默認TextInputFormat
		
//		job.setInputFormatClass(CombineTextInputFormat.class);
//		CombineTextInputFormat.setMaxInputSplitSize(job, 4194304);
//		CombineTextInputFormat.setMinInputSplitSize(job, 2097152);
		// 6 設置數據輸入 輸出文件的 路徑
		FileInputFormat.setInputPaths(job, new Path(args[0]));
		FileOutputFormat.setOutputPath(job, new Path(args[1]));
		
		// 設置 reduce 端輸出壓縮開啓
		FileOutputFormat.setCompressOutput(job, true);
		// 設置壓縮的方式
		 FileOutputFormat.setOutputCompressorClass(job, BZip2Codec.class); 
		// FileOutputFormat.setOutputCompressorClass(job, GzipCodec.class); 
		// FileOutputFormat.setOutputCompressorClass(job, DefaultCodec.class); 
		
		// 7提交代碼
		
		boolean result = job.waitForCompletion(true);
		System.exit(result?0:1);
	}
}  

combinershell

import java.io.IOException;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;

public class WordcountCombiner extends Reducer<Text, IntWritable, Text, IntWritable>{
	
	@Override
	protected void reduce(Text key, Iterable<IntWritable> values,
			Context context) throws IOException, InterruptedException {
		// 1 彙總
		int sum = 0;
		for (IntWritable value : values) {
			sum += value.get();
		}
		
		// 2 輸出
		context.write(key, new IntWritable(sum));
	}
} 
相關文章
相關標籤/搜索