windows下經過idea鏈接hadoop和spark集羣


###windows下連接hadoop集羣java

一、假如在linux機器上已經搭建好hadoop集羣linux

二、在windows上把hadoop的壓縮包解壓到一個沒有空格的目錄下,好比是D盤根目錄git

三、配置環境變量
HADOOP_HOME=D:\hadoop-2.7.7
Path下添加 %HADOOP_HOME%\bingithub

四、下載類似版本的文件
hadoop.dll #存放在C:\Windows\System32 目錄下
winutils.exe #存放在%HADOOP_HOME%\bin 目錄下apache

#下載地址:
https://github.com/steveloughran/winutilswindows

五、wordcount
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;app

import java.io.IOException;ide


/**
* @author: LUGH1
* @date: 2019-4-8
* @description:
*/
public class WordCount {
public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException {
Configuration conf = new Configuration();
conf.set("fs.defaultFS","hdfs://192.168.88.130:9000");
Job job = Job.getInstance(conf);
job.setJarByClass(WordCount.class);oop

job.setMapperClass(WdMapper.class);
job.setReducerClass(WdReducer.class);idea

job.setMapOutputKeyClass(Text.class);
job.setMapOutputValueClass(IntWritable.class);

job.setOutputKeyClass(Text.class);
job.setOutputValueClass(IntWritable.class);

FileInputFormat.setInputPaths(job, new Path("/test/word.txt"));
FileOutputFormat.setOutputPath(job, new Path("/test/output"));

boolean result = job.waitForCompletion(true);
System.exit(result?0:1);


System.out.println("good job");
}
}

class WdMapper extends Mapper<Object, Text, Text, IntWritable> {
@Override
protected void map(Object key, Text value, Context context) throws IOException, InterruptedException {
String line = value.toString();
String[] split = line.split(" ");
for(String word : split){
context.write(new Text(word), new IntWritable(1));
}
}
}

class WdReducer extends Reducer<Text, IntWritable, Text, IntWritable> {
@Override
protected void reduce(Text key, Iterable<IntWritable> values, Context context) throws IOException, InterruptedException {
int count = 0;
for(IntWritable i : values){
count += i.get();
}
context.write(key,new IntWritable(count));
}
}


###windows下連接spark集羣運行
主要設置:
一、配置master的地址:conf.setMaster("spark://192.168.88.130:7077")
二、配置jar包的位置:conf.setJars(List("hdfs://192.168.88.130:9000/test/sparkT-1.0-SNAPSHOT.jar"))
如上的sparkT-1.0-SNAPSHOT.jar包是經過idea打包而後經過hadoop fs -put上傳在hdfs上的

#代碼
import org.apache.spark.{SparkConf, SparkContext}

object sparkTest {
def main(args: Array[String]): Unit = {
val conf: SparkConf = new SparkConf().setAppName("test").setMaster("spark://192.168.88.130:7077")
// conf.set("spark.driver.host","192.168.88.1")
conf.setJars(List("hdfs://192.168.88.130:9000/test/sparkT-1.0-SNAPSHOT.jar"))
val sc = new SparkContext(conf)
// val path = "E:\\java_product\\test.txt"
val rdd = sc.textFile("hdfs://192.168.88.130:9000/test/word.txt")
// val rdd = sc.textFile("E:\\java_product\\test.txt")
val count = rdd.flatMap(line=>line.split(" ")).map(x=>(x,1)).reduceByKey(_+_)
count.collect().foreach(println) //.saveAsTextFile("hdfs://192.168.88.130:9000/test/wordoupt1")
}

}

相關文章
相關標籤/搜索