主機操做系統:Windows 64位,四核8線程,主頻3.2G,8G內存html
虛擬軟件:VMware Workstation Projava
虛擬機操做系統:CentOS7 64位,單核,2G內存node
集羣包含三個節點,節點之間能夠免密碼SSH訪問,節點IP地址和主機名分佈以下:linux
序號ios |
IP地址web |
機器名shell |
核數/內存apache |
用戶名vim |
1bash |
192.168.1.61 |
hadoop1 |
1核/2G |
hadoop |
2 |
192.168.1.62 |
hadoop2 |
1核/1G |
hadoop |
3 |
192.168.1.63 |
hadoop3 |
1核/1G |
hadoop |
1.1.3.1Linux文件傳輸工具
使用的WinScp,該工具頂部爲工具的菜單和快捷方式,中間部分左面爲本地文件目錄,右邊爲遠程文件目錄,能夠經過拖拽等方式實現文件的下載與上傳,以下圖所示:
1.1.3.2Linux命令行執行工具
使用的XShell提供了遠程命令執行,以下圖所示:
1.2.2.1設置機器名
以 root 用戶登陸,使用#vi /etc/sysconfig/network 打開配置文件,設置機器名稱,新機器名在重啓後生效。
1.2.2.2設置靜態ip
cd /etc/sysconfig/network-scripts/
sudo vi ifcfg-eno16777736
BOOTPROTO=static #dhcp改成static(修改)
IPADDR=192.168.1.61 #靜態IP(增長)
GATEWAY=192.168.1.2 #默認網關,虛擬機安裝的話,一般是2,也就是VMnet8的網關設置(增長)
NETMASK=255.255.255.0 #子網掩碼(增長)
DNS1=192.168.1.2 #DNS 配置
重啓網絡服務:
service network restart
查看網絡
ifconfig
1.2.2.3設置host映射文件
sudo vi /etc/hosts
1.2.2.4關閉防火牆
sudo firewall-cmd --state #查詢防火牆狀態
sudo systemctl stop firewalld.service #關閉防火牆
1.2.2.5關閉SElinux
使用getenforce命令查看是否關閉
若是不是disabled,修改/etc/selinux/config 文件,將SELINUX=enforcing改成SELINUX=disabled,執行該命令後重啓機器生效。
1.2.3.1修改SSH配置文件
sudo vi /etc/ssh/sshd_config
開放以下配置:
PubkeyAuthentication yes
AuthorizedKeysFile .ssh/authorized_keys
配置後重啓服務
service sshd restart
1.2.3.2增長hadoop組和用戶
使用以下命令增長hadoop 組和hadoop 用戶(密碼)
#groupadd -g 1000 hadoop
#useradd -u 2000 -g hadoop hadoop
#passwd hadoop
1.2.3.3JDK安裝及配置
yum install java-1.8.0-openjdk
使用root用戶配置/etc/profile文件
生效該配置
source /etc/profile
驗證
java -version
1.2.3.4Scala安裝及配置
下載Scala安裝包
http://www.scala-lang.org/download/2.10.4.html
用WinScp上傳到/home/hadoop/
解壓縮 scala-2.10.4.tgz
tar -zxf scala-2.10.4.tgz
使用root用戶配置/etc/profile文件
生效該配置
source /etc/profile
驗證
scala -version
以 root 用戶登陸,使用#vi /etc/sysconfig/network 打開配置文件,設置機器名稱,新機器名在重啓後生效。
cd /etc/sysconfig/network-scripts/
sudo vi ifcfg-eno16777736
1.使用hadoop用戶登陸在三個節點中使用以下命令生成私鑰和公鑰:
cd ~/.ssh/
ssh-keygen -t rsa
2.進入/home/hadoop/.ssh目錄
在三個節點中分別把公鑰命名爲authorized_keys_hadoop1,authorized_keys_hadoop2,authorized_keys_hadoop3,使用命令以下:
cd /home/hadoop/.ssh
cp id_rsa.pub authorized_keys_hadoop1
3.把兩個從節點(hadoop二、hadoop3)的公鑰使用scp命令傳送到hadoop1節點的/home/hadoop/.ssh文件夾中
scp authorized_keys_hadoop2 hadoop@hadoop1:/home/hadoop/.ssh
scp authorized_keys_hadoop3 hadoop@hadoop1:/home/hadoop/.ssh
4.把三個節點的公鑰信息保存到authorized_key文件中
使用$cat authorized_keys_hadoop1 >> authorized_keys 命令
5.把該文件分發到其餘兩個從節點上
使用scp authorized_keys hadoop@hadoop2:/home/hadoop/.ssh把密碼文件分發出去。
6. 在三臺機器中使用以下設置authorized_keys讀寫權限
chmod 400 authorized_keys
7. 測試ssh免密碼登陸是否生效
下載hadoop安裝包
http://www.apache.org/dyn/closer.cgi/hadoop/common/hadoop-2.7.5/hadoop-2.7.5.tar.gz
1.用WinScp上傳到/home/hadoop/
解壓縮 hadoop-2.7.5.tar.gz
tar -zxf hadoop-2.7.5.tar.gz
使用root用戶配置/etc/profile文件
(1)core-site.xml 配置:
<configuration> <!-- 指定hadoop運行時產生文件的存儲路徑 --> <property> <name>hadoop.tmp.dir</name> <value>file:/home/hadoop/hdfs/tmp</value> </property> <!-- 指定HDFS(namenode)的通訊地址 --> <property> <name>fs.default.name</name> <value>hdfs://hadoop1:9000</value> </property> <!-- 容許經過httpfs方式訪問hdfs的主機名或者域名 --> <property> <name>hadoop.proxyuser.root.hosts</name> <value>*</value> </property> <!-- 容許訪問的客戶端的用戶組 --> <property> <name>hadoop.proxyuser.root.groups</name> <value>*</value> </property> </configuration>
(2)hdfs-site.xml配置:
<configuration> <!-- 設置namenode的http通信地址 --> <property> <name>dfs.namenode.http-address</name> <value>hadoop1:50070</value> </property> <!-- 設置secondarynamenode的http通信地址 --> <property> <name>dfs.namenode.secondary.http-address</name> <value>hadoop1:50090</value> </property> <!-- 設置namenode存放的路徑 --> <property> <name>dfs.namenode.name.dir</name> <value>file:/home/hadoop/hdfs/name</value> </property> <!-- 設置datanode存放的路徑 --> <property> <name>dfs.datanode.data.dir</name> <value>file:/home/hadoop/hdfs/data</value> </property> <!-- 設置hdfs副本數量 --> <property> <name>dfs.replication</name> <value>2</value> </property> <!-- 設置webhdfs --> <property> <name>dfs.webhdfs.enabled</name> <value>true</value> </property> <!-- 設置permissions --> <property> <name>dfs.permissions</name> <value>false</value> </property> </configuration>
(3)mapred-site.xml配置:
cp mapred-site.xml.template mapred-site.xml
vi mapred-site.xml
<configuration> <!-- 框架MR使用YARN --> <property> <name>mapreduce.framework.name</name> <value>yarn</value> </property> <property> <name>mapreduce.jobhistory.address</name> <value>hadoop1:10020</value> </property> <property> <name>mapreduce.jobhistory.webapp.address</name> <value>hadoop1:19888</value> </property> </configuration>
(4)yarn-site.xml
<configuration> <!-- Site specific YARN configuration properties --> <!-- 設置 resourcemanager 在哪一個節點 --> <property> <name>yarn.resourcemanager.hostname</name> <value>hadoop1</value> </property> <!-- 設置 resourcemanager.scheduler 在哪一個節點 --> <property> <name>yarn.resourcemanager.scheduler.address</name> <value>hadoop1:18030</value> </property> <!-- 設置 resourcemanager.webapp 在哪一個節點 --> <property> <name>yarn.resourcemanager.webapp.address</name> <value>hadoop1:8088</value> </property> <!-- 設置 resourcemanager.resource-tracker 在哪一個節點 --> <property> <name>yarn.resourcemanager.resource-tracker.address</name> <value>hadoop1:18025</value> </property> <!-- 設置 resourcemanager.admin 在哪一個節點 --> <property> <name>yarn.resourcemanager.admin.address</name> <value>hadoop1:18141</value> </property> <!-- 設置 nodemanagerr 內存大小 <property> <name>yarn.nodemanager.resource.memory-mb</name> <value>1024</value> </property> --> <!-- 設置 nodemanagerr CPU核數 <property> <name>yarn.nodemanager.resource.cpu-vcores</name> <value>1</value> </property> --> <!-- reducer取數據的方式是mapreduce_shuffle --> <property> <name>yarn.nodemanager.aux-services</name> <value>mapreduce_shuffle</value> </property> <!-- --> <property> <name>yarn.nodemanager.aux-services.mapreduce.shuffle.class</name> <value>org.apache.hadoop.mapred.ShuffleHandler</value> </property> </configuration>
(5)hadoop-env.sh配置:
JAVA_HOME 顯式配置一下
(6)slaves配置:
scp -r hadoop-2.7.5 hadoop@hadoop2:/home/hadoop/
scp -r hadoop-2.7.5 hadoop@hadoop3:/home/hadoop/
在hadop2.7.5目錄下執行命令:
bin/hdfs namenode -format
sbin/start-dfs.sh
Hadoop1:
Hadoop二、Hadoop3:
web頁面查看:192.168.1.61:50070
192.168.1.61:8088
下載spark安裝包
http://spark.apache.org/downloads.html
1.用WinScp上傳到/home/hadoop/
解壓縮 spark-2.2.1-bin-hadoop2.7.tgz
tar -zxf spark-2.2.1-bin-hadoop2.7.tgz
使用root用戶配置/etc/profile文件
2.修改配置文件(spark-2.2.1/conf)目錄下,slaves,spark-env.sh.
(1)打開配置文件conf/slaves
cd /home/hadoop/spark-2.2.1/conf
sudo vi slaves
加入slave配置節點
hadoop1
hadoop2
hadoop3
(2) 打開配置文件conf/spark-env.sh
cd /home/hadoop/spark-2.2.1/conf
cp spark-env.sh.template spark-env.sh
sudo vi spark-env.sh
加入Spark環境配置內容,設置hadoop1爲Master節點
export SPARK_MASTER_HOST=hadoop1
export SPARK_MASTER_PORT=7077
export SPARK_WORKER_CORES=1
export SPARK_WORKER_INSTANCES=1
export SPARK_WORKER_MEMORY=512M
3.向從節點分發Spark程序
cd /home/hadoop/
scp -r spark-2.2.1 hadoop@hadoop2:/home/hadoop/
scp -r spark-2.2.1 hadoop@hadoop3:/home/hadoop/
4.將從節點 /home/hadoop/spark-2.2.1/conf/ spark-env.sh 中的 SPARK_LOCAL_IP 改成從節點IP
cd /home/hadoop/spark-2.2.1/sbin
./start-all.sh
此時在hadoop1上面運行的進程有:Worker和Master
此時在hadoop2和hadoop3上面運行的進程有隻有Worker
web頁面:192.168.1.61:8080
進入hadoop1節點,進入spark的bin目錄,使用spark-shell鏈接集羣
cd /home/hadoop/spark-2.2.1/bin
spark-shell --master spark://hadoop1:7077 --executor-memory 1024m
用Java編寫程序,進行wordcount,輸入文件用的是一篇介紹雲計算的英文文章,命名爲cloudComputing.txt
指望輸出:文中詞語使用頻率。意義爲:單詞的key值 單詞的出現次數,而且按照頻次由高到低進行顯示。
步驟:
1.打開eclipse,建立java工程。導入hadoop包和相關配置文件。
2.進入/hadoop安裝目錄下/share/hadoop/
(1) 、把hdfs文件夾下的jar包和hdfs/lib目錄下的jar包導入工程)。
(2) 、把mapreduce文件夾下的jar包和mapreduce/lib目錄下的jar包導入工程。
(3) 、把yarn文件夾下的jar包和yarn/lib目錄下的jar包導入工程。
(4) 、把common文件夾下的jar包和common/lib下的jar包導入工程。
(5) 、把hadoop安裝目錄下/etc/hadoop中core-site.xml和hdfs-site.xml文件配置到java工程的src目錄下。
3.編寫代碼程序
import java.io.IOException; import java.util.StringTokenizer; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.IntWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.Mapper; import org.apache.hadoop.mapreduce.Reducer; import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; import org.apache.hadoop.util.GenericOptionsParser; publicclass WordCount { /** * 創建Mapper類TokenizerMapper繼承自泛型類Mapper * Mapper類:實現了Map功能基類 * Mapper接口: * WritableComparable接口:實現WritableComparable的類能夠相互比較。全部被用做key的類應該實現此接口。 * Reporter 則可用於報告整個應用的運行進度,本例中未使用。 * */ publicstaticclass TokenizerMapper extends Mapper<Object, Text, Text, IntWritable>{ /** * IntWritable, Text 均是 Hadoop 中實現的用於封裝 Java 數據類型的類,這些類實現了WritableComparable接口, * 都可以被串行化從而便於在分佈式環境中進行數據交換,你能夠將它們分別視爲int,String 的替代品。 * 聲明one常量和word用於存放單詞的變量 */ privatefinalstatic IntWritable one =new IntWritable(1); private Text word =new Text(); /** * Mapper中的map方法: * void map(K1 key, V1 value, Context context) * 映射一個單個的輸入k/v對到一箇中間的k/v對 * 輸出對不須要和輸入對是相同的類型,輸入對能夠映射到0個或多個輸出對。 * Context:收集Mapper輸出的<k,v>對。 * Context的write(k, v)方法:增長一個(k,v)對到context * 編寫Map和Reduce函數.這個Map函數使用StringTokenizer函數對字符串進行分隔,經過write方法把單詞存入word中 * write方法存入(單詞,1)這樣的二元組到context中 */ publicvoid map(Object key, Text value, Context context ) throws IOException, InterruptedException { StringTokenizer itr =new StringTokenizer(value.toString()); while (itr.hasMoreTokens()) { word.set(itr.nextToken()); context.write(word, one); } } } publicstaticclass IntSumReducer extends Reducer<Text,IntWritable,Text,IntWritable> { private IntWritable result =new IntWritable(); /** * Reducer類中的reduce方法: * void reduce(Text key, Iterable<IntWritable> values, Context context) * 中k/v來自於map函數中的context,可能通過了進一步處理(combiner),一樣經過context輸出 */ publicvoid reduce(Text key, Iterable<IntWritable> values, Context context ) throws IOException, InterruptedException { int sum =0; for (IntWritable val : values) { sum += val.get(); } result.set(sum); context.write(key, result); } } publicstaticvoid main(String[] args) throws Exception { /** * Configuration:map/reduce的j配置類,向hadoop框架描述map-reduce執行的工做 */ Configuration conf =new Configuration(); String[] otherArgs =new GenericOptionsParser(conf, args).getRemainingArgs(); if (otherArgs.length !=2) { System.err.println("Usage: wordcount <in><out>"); System.exit(2); } Job job =new Job(conf, "word count"); //設置一個用戶定義的job名稱 job.setJarByClass(WordCount.class); job.setMapperClass(TokenizerMapper.class); //爲job設置Mapper類 job.setCombinerClass(IntSumReducer.class); //爲job設置Combiner類 job.setReducerClass(IntSumReducer.class); //爲job設置Reducer類 job.setOutputKeyClass(Text.class); //爲job的輸出數據設置Key類 job.setOutputValueClass(IntWritable.class); //爲job輸出設置value類 FileInputFormat.addInputPath(job, new Path(otherArgs[0])); //爲job設置輸入路徑 FileOutputFormat.setOutputPath(job, new Path(otherArgs[1]));//爲job設置輸出路徑 System.exit(job.waitForCompletion(true) ?0 : 1); //運行job } }
4.將java文件打包成jar文件DemoWordCount.jar。
5.將DemoWordCount.jar上傳到
/home/hadoop/hadoop-2.7.5/share/hadoop/mapreduce
6.Hadoop執行DemoWordCount.jar 統計cloudComputing.txt詞頻
(1)
# 在hdfs的根目錄下創建input、output目錄 bin/hdfs dfs -mkdir /input bin /hdfs dfs -mkdir /output # 查看HDFS根目錄下的目錄結構 bin/hdfs dfs -ls /
結果以下:
(2)
# 上傳 bin/hdfs dfs -put /home/hadoop/cloudComputing.txt /input # 查看 bin/hdfs dfs -ls /input
結果以下:
(3)
# 將運行結果保存在/output/cloudComputing目錄下 bin/hadoop jar ~/hadoop-2.7.5/share/hadoop/mapreduce/DemoWordCount.jar wordcount /input/cloudComputing.txt /output/cloudComputing # 查看/output/cloudComputing目錄下的文件 bin/hdfs dfs -ls /output/cloudComputing
(4)
# 查看運行結果 bin/hdfs dfs -cat /output/cloudComputing/part-r-00000 | sort -k 2 -n -r|head -20 #sort中 -k 2 表示用以tab爲分隔符的第二個字段來排序 -n表示用數字形式排序 -r表示從大到小排序 顯示結果前20行。
結果以下:
C++ 實現:
wordcount_map.cpp
#include <iostream> #include <string> using namespace std; int main(int argc, char** argv) { string key; string value = "1"; while(cin >> key) { cout<< key << "\t" << value <<endl; } return 0; }
wordcount_reduce.cpp
#include <iostream> #include <string> #include <map> using namespace std; int main(int argc, char** argv) { string key; string value; map<string, int> word2count; map<string, int>::iterator it; while(cin >> key) { cin >> value; it = word2count.find(key); if(it != word2count.end()) { it->second++; } else { word2count.insert(make_pair(key, 1)); } } for(it = word2count.begin(); it != word2count.end(); it++) { cout<< it->first << "\t" << it->second << endl; } return 0; }
編譯成可執行文件:
g++ -o mapperC wordcount_map.cpp
g++ -o reduceC wordcount_reduce.cpp
而後,調用 hadoop-streaming-2.7.5.jar
hadoop jar ~/hadoop-2.7.5/share/hadoop/tools/lib/hadoop-streaming-2.7.5.jar -input /input/cloudComputing.txt -output /output/cloudComputing -mapper ~/mapperC -reducer ~/reduceC -file ~/mapperC -file ~/reduceC
使用 Scala 編寫的程序須要使用 sbt 進行編譯打包。
(1)安裝sbt
下載sbt-launch.jar。並拷貝到/usr/local/sbt
(2)在/usr/local/sbt 中建立 sbt 腳本,添加以下內容:
#!/bin/bash
SBT_OPTS="-Xms512M -Xmx1536M -Xss1M -XX:+CMSClassUnloadingEnabled -XX:MaxPermSize=256M"
java $SBT_OPTS -jar `dirname $0`/sbt-launch.jar "$@"
(3)保存後,爲 ./sbt 腳本增長可執行權限。
chmod u+x ./sbt
(4)最後運行以下命令
./sbt sbt-version
(5)編寫Scala應用程序
cd ~ # 進入用戶主文件夾
mkdir ./sparkapp # 建立應用程序根目錄
mkdir -p ./sparkapp/src/main/scala # 建立所需的文件夾結構
vim WordCount.scala
/* wordcount.scala */ import org.apache.spark.SparkContext import org.apache.spark.SparkContext._ import org.apache.spark.SparkConf object WordCount { def main(args: Array[String]) { val conf = new SparkConf().setAppName("Word count") val sc = new SparkContext(conf) //初始化 SparkContext,SparkContext 的參數 SparkConf 包含了應用程序的信息 val dataFile = sc.textFile("hdfs://hadoop1:9000/input/cloudComputing.txt") val outputFile = "/home/hadoop/spark.wordcount.out" val words = dataFile.flatMap(_.split(" ")) val pairs = words.map(word => (word,1)) val wordcount = pairs.reduceByKey(_ + _).map(pair=>(pair._2,pair._1)).sortByKey(false).map(pair=>(pair._2,pair._1)) wordcount.collect.foreach(pair =>println("key:" + pair._1, " value:" + pair._2)) wordcount.saveAsTextFile(outputFile) sc.stop() } }
該程序依賴SparkAPI,所以咱們須要經過sbt進行編譯打包。在./sparkapp 中新建文件 WordCount.sbt,添加內容以下,聲明該獨立應用程序的信息以及與 Spark 的依賴關係:
文件WordCount.sbt 須要指明 Spark 和 Scala 的版本。
(6)使用sbt打包Scala程序
爲保證 sbt 能正常運行,先執行以下命令檢查整個應用程序的文件結構:
cd ~/sparkapp
find .
接着,/usr/local/sbt/sbt package 打包成JAR
生成jar包的位置:
/home/hadoop/sparkapp/target/scala-2.10/ wordcount_2.10-1.0.jar
(7)經過 spark-submit 運行程序
最後將生成的 jar 包經過 spark-submit 提交到 Spark 中運行了,命令以下:
sbin/spark-submit --master spark://hadoop1:7077 --class "WordCount" --executor-memory 512m ~/sparkapp/target/scala-2.10/wordcount_2.10-1.0.jar 200