Scala開發Hadoop示例

import org.apache.hadoop.conf.{Configuration, Configured};
import org.apache.hadoop.util.{ToolRunner, Tool};
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.io.{LongWritable, Text, IntWritable};
import org.apache.hadoop.mapreduce.{Reducer, Mapper, Job};
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;


/**
 * Created with IntelliJ IDEA.
 * User: riley
 * Date: 8/26/13
 * Time: 1:58 PM
 */
object WordCount extends Configured with Tool
{
    class Map extends Mapper[LongWritable, Text, Text, IntWritable]
    {
        private val one: IntWritable = new IntWritable(1);
        private var word: Text;

        override def map(key: LongWritable, rowLine: Text, context: Mapper[LongWritable, Text, Text, IntWritable]#Context)
        {
            val line = rowLine.toString();
            if (line.isEmpty) return;

            val tokens: Array[String] = line.split(" ");
            for (item: String <- tokens) {
                word.set(item);
                context.write(word, one);
            }
        }
    }

    class Reduce extends Reducer[Text, IntWritable, Text, IntWritable]
    {
        private var count: IntWritable = new IntWritable();

        override def reduce(key: Text, values: Iterable[IntWritable], context: Reducer[Text, IntWritable, Text, IntWritable]#Context)
        {
            var sum: Int = 0;

            for (i: IntWritable <- values) sum = sum + i.get();

            count.set(sum);
            context.write(key, count);
        }
    }

    def run(args: Array[String]) =
    {
        val conf = super.getConf();
        val job = new Job(conf, "WordCount");

        job.setJarByClass(this.getClass);
        job.setOutputKeyClass(classOf[Text]);
        job.setOutputValueClass(classOf[IntWritable]);

        job.setMapperClass(classOf[Map]);
        job.setReducerClass(classOf[Reduce]);
        job.setCombinerClass(classOf[Reduce]);

        FileInputFormat.addInputPath(job, new Path(args(0)));
        FileOutputFormat.setOutputPath(job, new Path(args(1)));

        val status = job.waitForCompletion(true);
        if (status) 0 else 1;
    }

    def main(args: Array[String])
    {
        val conf: Configuration = new Configuration();
        System.exit(ToolRunner.run(conf, this, args));
    }
}
相關文章
相關標籤/搜索