Hadoop、Spark 集羣環境搭建

1.基礎環境搭建

1.1運行環境說明

1.1.1硬軟件環境

主機操做系統:Windows 64位,四核8線程,主頻3.2G,8G內存html

虛擬軟件:VMware Workstation Projava

虛擬機操做系統:CentOS7 64位,單核,2G內存node

 

1.1.2集羣網絡環境

集羣包含三個節點,節點之間能夠免密碼SSH訪問,節點IP地址和主機名分佈以下:linux

序號ios

IP地址web

機器名shell

核數/內存apache

用戶名vim

1bash

192.168.1.61

hadoop1

1核/2G

hadoop

2

192.168.1.62

hadoop2

1核/1G

hadoop

3

192.168.1.63

hadoop3

1核/1G

hadoop

 

1.1.3安裝使用工具

1.1.3.1Linux文件傳輸工具

使用的WinScp,該工具頂部爲工具的菜單和快捷方式,中間部分左面爲本地文件目錄,右邊爲遠程文件目錄,能夠經過拖拽等方式實現文件的下載與上傳,以下圖所示:

 

1.1.3.2Linux命令行執行工具

使用的XShell提供了遠程命令執行,以下圖所示:

 

 

 

1.2搭建主節點機器環境

1.2.1下載安裝虛擬機 VMware Workstation 12 Pro、安裝CentOS7

 

 

1.2.2 設置系統環境

1.2.2.1設置機器名

以 root 用戶登陸,使用#vi /etc/sysconfig/network 打開配置文件,設置機器名稱,新機器名在重啓後生效。

 

 

1.2.2.2設置靜態ip

cd /etc/sysconfig/network-scripts/

sudo vi ifcfg-eno16777736

 

BOOTPROTO=static #dhcp改成static(修改)

IPADDR=192.168.1.61 #靜態IP(增長)

GATEWAY=192.168.1.2 #默認網關,虛擬機安裝的話,一般是2,也就是VMnet8的網關設置(增長)

NETMASK=255.255.255.0 #子網掩碼(增長)

DNS1=192.168.1.2 #DNS 配置

 

重啓網絡服務:

service network restart

查看網絡

ifconfig

 

 

1.2.2.3設置host映射文件

sudo vi /etc/hosts

 

 

1.2.2.4關閉防火牆

sudo firewall-cmd --state #查詢防火牆狀態

sudo systemctl stop firewalld.service #關閉防火牆

 

 

1.2.2.5關閉SElinux

使用getenforce命令查看是否關閉

 

若是不是disabled,修改/etc/selinux/config 文件,將SELINUX=enforcing改成SELINUX=disabled,執行該命令後重啓機器生效。

 

1.2.3 配置運行環境

1.2.3.1修改SSH配置文件

sudo vi /etc/ssh/sshd_config

開放以下配置:

PubkeyAuthentication yes

AuthorizedKeysFile .ssh/authorized_keys

 

配置後重啓服務

service sshd restart

 

1.2.3.2增長hadoop組和用戶

使用以下命令增長hadoop 組和hadoop 用戶(密碼)

#groupadd -g 1000 hadoop

#useradd -u 2000 -g hadoop hadoop

#passwd hadoop

 

1.2.3.3JDK安裝及配置

yum install java-1.8.0-openjdk

使用root用戶配置/etc/profile文件

 

 

生效該配置

source /etc/profile

驗證

java -version

 

 

1.2.3.4Scala安裝及配置

下載Scala安裝包

http://www.scala-lang.org/download/2.10.4.html

用WinScp上傳到/home/hadoop/

解壓縮 scala-2.10.4.tgz

tar -zxf scala-2.10.4.tgz

 

 

使用root用戶配置/etc/profile文件

 

生效該配置

source /etc/profile

驗證

scala -version

 

 

1.3搭建從節點機器環境

1.3.1 克隆主節點機器

 

 

 

1.3.2 配置從節點機器名和靜態ip

以 root 用戶登陸,使用#vi /etc/sysconfig/network 打開配置文件,設置機器名稱,新機器名在重啓後生效。

 

 

cd /etc/sysconfig/network-scripts/

sudo vi ifcfg-eno16777736

 

 

1.3.3 配置SSH免密登陸

1.使用hadoop用戶登陸在三個節點中使用以下命令生成私鑰和公鑰:

cd ~/.ssh/

ssh-keygen -t rsa

2.進入/home/hadoop/.ssh目錄

在三個節點中分別把公鑰命名爲authorized_keys_hadoop1,authorized_keys_hadoop2,authorized_keys_hadoop3,使用命令以下:

cd /home/hadoop/.ssh

cp id_rsa.pub authorized_keys_hadoop1

 

3.把兩個從節點(hadoop二、hadoop3)的公鑰使用scp命令傳送到hadoop1節點的/home/hadoop/.ssh文件夾中

scp authorized_keys_hadoop2 hadoop@hadoop1:/home/hadoop/.ssh

scp authorized_keys_hadoop3 hadoop@hadoop1:/home/hadoop/.ssh

 

4.把三個節點的公鑰信息保存到authorized_key文件中

使用$cat authorized_keys_hadoop1 >> authorized_keys 命令

 

5.把該文件分發到其餘兩個從節點上

使用scp authorized_keys hadoop@hadoop2:/home/hadoop/.ssh把密碼文件分發出去。

 

 

6. 在三臺機器中使用以下設置authorized_keys讀寫權限

chmod 400 authorized_keys

 

7. 測試ssh免密碼登陸是否生效

 

 

2. Hadoop環境搭建

2.1下載hadoop安裝包

下載hadoop安裝包

http://www.apache.org/dyn/closer.cgi/hadoop/common/hadoop-2.7.5/hadoop-2.7.5.tar.gz

 

2.2配置hadoop

1.用WinScp上傳到/home/hadoop/

解壓縮 hadoop-2.7.5.tar.gz

tar -zxf hadoop-2.7.5.tar.gz

 

 

使用root用戶配置/etc/profile文件

 

  1. 修改配置文件(hadoop2.7.1/etc/hadoop/)目錄下,core-site.xml, hdfs-site.xml,mapred-site.xml,yarn-site.xml, hadoop-env.sh。

(1)core-site.xml 配置:

<configuration>
    <!-- 指定hadoop運行時產生文件的存儲路徑 -->
    <property>
        <name>hadoop.tmp.dir</name>
        <value>file:/home/hadoop/hdfs/tmp</value>
    </property>

    <!-- 指定HDFS(namenode)的通訊地址 -->
    <property>
        <name>fs.default.name</name>
        <value>hdfs://hadoop1:9000</value>
    </property>

    <!-- 容許經過httpfs方式訪問hdfs的主機名或者域名 -->
    <property>
        <name>hadoop.proxyuser.root.hosts</name>
        <value>*</value>
    </property>

    <!-- 容許訪問的客戶端的用戶組 -->
    <property>
        <name>hadoop.proxyuser.root.groups</name>
        <value>*</value>
    </property>

</configuration>

 

(2)hdfs-site.xml配置: 

<configuration>
    <!-- 設置namenode的http通信地址 -->
    <property>
        <name>dfs.namenode.http-address</name>
        <value>hadoop1:50070</value>
    </property>

    <!-- 設置secondarynamenode的http通信地址 -->
    <property>
        <name>dfs.namenode.secondary.http-address</name>
        <value>hadoop1:50090</value>
    </property>

    <!-- 設置namenode存放的路徑 -->
    <property>
        <name>dfs.namenode.name.dir</name>
        <value>file:/home/hadoop/hdfs/name</value>
    </property>

    <!-- 設置datanode存放的路徑 -->
    <property>
        <name>dfs.datanode.data.dir</name>
        <value>file:/home/hadoop/hdfs/data</value>
    </property>

    <!-- 設置hdfs副本數量 -->
    <property>
        <name>dfs.replication</name>
        <value>2</value>
    </property>

    <!-- 設置webhdfs -->
    <property>
        <name>dfs.webhdfs.enabled</name>
        <value>true</value>
    </property>

    <!-- 設置permissions -->
    <property>
        <name>dfs.permissions</name>
        <value>false</value>
    </property>
</configuration>

 

(3)mapred-site.xml配置: 

cp mapred-site.xml.template mapred-site.xml

vi mapred-site.xml

<configuration>
    <!-- 框架MR使用YARN -->
    <property>
        <name>mapreduce.framework.name</name>
        <value>yarn</value>
    </property>

    <property>
        <name>mapreduce.jobhistory.address</name>
        <value>hadoop1:10020</value>
    </property>

    <property>
        <name>mapreduce.jobhistory.webapp.address</name>
        <value>hadoop1:19888</value>
    </property>
</configuration>

 

(4)yarn-site.xml

<configuration>

<!-- Site specific YARN configuration properties -->

    <!-- 設置 resourcemanager 在哪一個節點 -->
    <property>
        <name>yarn.resourcemanager.hostname</name>
        <value>hadoop1</value>
    </property>

    <!-- 設置 resourcemanager.scheduler 在哪一個節點 -->
    <property>
        <name>yarn.resourcemanager.scheduler.address</name>
        <value>hadoop1:18030</value>
    </property>

    <!-- 設置 resourcemanager.webapp 在哪一個節點 -->
    <property>
        <name>yarn.resourcemanager.webapp.address</name>
        <value>hadoop1:8088</value>
    </property>

    <!-- 設置 resourcemanager.resource-tracker 在哪一個節點 -->
    <property>
        <name>yarn.resourcemanager.resource-tracker.address</name>
        <value>hadoop1:18025</value>
    </property>

    <!-- 設置 resourcemanager.admin 在哪一個節點 -->
    <property>
        <name>yarn.resourcemanager.admin.address</name>
        <value>hadoop1:18141</value>
    </property>

    <!-- 設置 nodemanagerr 內存大小
    <property>
        <name>yarn.nodemanager.resource.memory-mb</name>
        <value>1024</value>
    </property>
    -->

    <!-- 設置 nodemanagerr CPU核數
    <property>
        <name>yarn.nodemanager.resource.cpu-vcores</name>
        <value>1</value>
    </property>
    -->

    <!-- reducer取數據的方式是mapreduce_shuffle -->
    <property>
        <name>yarn.nodemanager.aux-services</name>
        <value>mapreduce_shuffle</value>
    </property>

    <!--  -->
    <property>
        <name>yarn.nodemanager.aux-services.mapreduce.shuffle.class</name>
        <value>org.apache.hadoop.mapred.ShuffleHandler</value>
    </property>

</configuration>

 

(5)hadoop-env.sh配置:

JAVA_HOME 顯式配置一下

 

 

(6)slaves配置:

 

 

2.3發送至集羣其餘節點

scp -r hadoop-2.7.5 hadoop@hadoop2:/home/hadoop/

scp -r hadoop-2.7.5 hadoop@hadoop3:/home/hadoop/

 

2.4運行hadoop

2.4.1 初始化HDFS系統

在hadop2.7.5目錄下執行命令:

bin/hdfs namenode -format

 

 

2.4.2 開啓NameNode和DateNode守護進程

sbin/start-dfs.sh

 

2.4.3 使用jps命令查看進程信息

Hadoop1:

 

Hadoop二、Hadoop3:

 

web頁面查看:192.168.1.61:50070

192.168.1.61:8088

 

3. Spark環境搭建

3.1下載spark安裝包

下載spark安裝包

http://spark.apache.org/downloads.html

 

 

3.2配置spark

1.用WinScp上傳到/home/hadoop/

解壓縮 spark-2.2.1-bin-hadoop2.7.tgz

tar -zxf spark-2.2.1-bin-hadoop2.7.tgz

 

使用root用戶配置/etc/profile文件

 

 

2.修改配置文件(spark-2.2.1/conf)目錄下,slaves,spark-env.sh.

(1)打開配置文件conf/slaves

cd /home/hadoop/spark-2.2.1/conf

sudo vi slaves

加入slave配置節點

hadoop1

hadoop2

hadoop3

 

 

(2) 打開配置文件conf/spark-env.sh

cd /home/hadoop/spark-2.2.1/conf

cp spark-env.sh.template spark-env.sh

sudo vi spark-env.sh

加入Spark環境配置內容,設置hadoop1爲Master節點

export SPARK_MASTER_HOST=hadoop1

export SPARK_MASTER_PORT=7077

export SPARK_WORKER_CORES=1

export SPARK_WORKER_INSTANCES=1

export SPARK_WORKER_MEMORY=512M

 

 

3.向從節點分發Spark程序

cd /home/hadoop/

scp -r spark-2.2.1 hadoop@hadoop2:/home/hadoop/

scp -r spark-2.2.1 hadoop@hadoop3:/home/hadoop/

 

4.將從節點 /home/hadoop/spark-2.2.1/conf/ spark-env.sh 中的 SPARK_LOCAL_IP 改成從節點IP

3.3運行spark

3.3.1啓動spark

cd /home/hadoop/spark-2.2.1/sbin

./start-all.sh

 

3.3.2驗證啓動

此時在hadoop1上面運行的進程有:Worker和Master

 

此時在hadoop2和hadoop3上面運行的進程有隻有Worker

 

web頁面:192.168.1.61:8080

3.3.3驗證客戶端鏈接

進入hadoop1節點,進入spark的bin目錄,使用spark-shell鏈接集羣

cd /home/hadoop/spark-2.2.1/bin

spark-shell --master spark://hadoop1:7077 --executor-memory 1024m

 

 

4. 分別使用Hadoop 和Spark實現詞頻統計任務

4.1文件內容級輸出指望

用Java編寫程序,進行wordcount,輸入文件用的是一篇介紹雲計算的英文文章,命名爲cloudComputing.txt

 

指望輸出:文中詞語使用頻率。意義爲:單詞的key值 單詞的出現次數,而且按照頻次由高到低進行顯示。

 

4.2 Hadoop使用java編寫程序實現詞頻統計

步驟:

1.打開eclipse,建立java工程。導入hadoop包和相關配置文件。

2.進入/hadoop安裝目錄下/share/hadoop/

  (1) 、把hdfs文件夾下的jar包和hdfs/lib目錄下的jar包導入工程)。

  (2) 、把mapreduce文件夾下的jar包和mapreduce/lib目錄下的jar包導入工程。

  (3) 、把yarn文件夾下的jar包和yarn/lib目錄下的jar包導入工程。

  (4) 、把common文件夾下的jar包和common/lib下的jar包導入工程。

  (5) 、把hadoop安裝目錄下/etc/hadoop中core-site.xml和hdfs-site.xml文件配置到java工程的src目錄下。

 

3.編寫代碼程序

import java.io.IOException;

import java.util.StringTokenizer;

 

import org.apache.hadoop.conf.Configuration;

import org.apache.hadoop.fs.Path;

import org.apache.hadoop.io.IntWritable;

import org.apache.hadoop.io.Text;

import org.apache.hadoop.mapreduce.Job;

import org.apache.hadoop.mapreduce.Mapper;

import org.apache.hadoop.mapreduce.Reducer;

import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;

import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;

import org.apache.hadoop.util.GenericOptionsParser;

 

publicclass WordCount {

    /**

     * 創建Mapper類TokenizerMapper繼承自泛型類Mapper

     * Mapper類:實現了Map功能基類

     * Mapper接口:

     * WritableComparable接口:實現WritableComparable的類能夠相互比較。全部被用做key的類應該實現此接口。

     * Reporter 則可用於報告整個應用的運行進度,本例中未使用。

     *

     */

  publicstaticclass TokenizerMapper

       extends Mapper<Object, Text, Text, IntWritable>{

        /**

         * IntWritable, Text 均是 Hadoop 中實現的用於封裝 Java 數據類型的類,這些類實現了WritableComparable接口,

         * 都可以被串行化從而便於在分佈式環境中進行數據交換,你能夠將它們分別視爲int,String 的替代品。

     * 聲明one常量和word用於存放單詞的變量

         */

    privatefinalstatic IntWritable one =new IntWritable(1);

    private Text word =new Text();

    /**

         * Mapper中的map方法:

         * void map(K1 key, V1 value, Context context)

         * 映射一個單個的輸入k/v對到一箇中間的k/v對

         * 輸出對不須要和輸入對是相同的類型,輸入對能夠映射到0個或多個輸出對。

         * Context:收集Mapper輸出的<k,v>對。

         * Context的write(k, v)方法:增長一個(k,v)對到context

         * 編寫Map和Reduce函數.這個Map函數使用StringTokenizer函數對字符串進行分隔,經過write方法把單詞存入word中

     * write方法存入(單詞,1)這樣的二元組到context中

     */ 

    publicvoid map(Object key, Text value, Context context

                    ) throws IOException, InterruptedException {

      StringTokenizer itr =new StringTokenizer(value.toString());

      while (itr.hasMoreTokens()) {

        word.set(itr.nextToken());

        context.write(word, one);

      }

    }

  }

 

  publicstaticclass IntSumReducer

       extends Reducer<Text,IntWritable,Text,IntWritable> {

    private IntWritable result =new IntWritable();

    /**

         * Reducer類中的reduce方法:

      * void reduce(Text key, Iterable<IntWritable> values, Context context)

         * 中k/v來自於map函數中的context,可能通過了進一步處理(combiner),一樣經過context輸出          

         */

    publicvoid reduce(Text key, Iterable<IntWritable> values,

                       Context context

                       ) throws IOException, InterruptedException {

      int sum =0;

      for (IntWritable val : values) {

        sum += val.get();

      }

      result.set(sum);

      context.write(key, result);

    }

  }

 

  publicstaticvoid main(String[] args) throws Exception {

        /**

         * Configuration:map/reduce的j配置類,向hadoop框架描述map-reduce執行的工做

         */

    Configuration conf =new Configuration();

    String[] otherArgs =new GenericOptionsParser(conf, args).getRemainingArgs();

    if (otherArgs.length !=2) {

      System.err.println("Usage: wordcount <in><out>");

      System.exit(2);

    }

    Job job =new Job(conf, "word count");    //設置一個用戶定義的job名稱

    job.setJarByClass(WordCount.class);

    job.setMapperClass(TokenizerMapper.class);    //爲job設置Mapper類

    job.setCombinerClass(IntSumReducer.class);    //爲job設置Combiner類

    job.setReducerClass(IntSumReducer.class);    //爲job設置Reducer類

    job.setOutputKeyClass(Text.class);        //爲job的輸出數據設置Key類

    job.setOutputValueClass(IntWritable.class);    //爲job輸出設置value類

    FileInputFormat.addInputPath(job, new Path(otherArgs[0]));    //爲job設置輸入路徑

    FileOutputFormat.setOutputPath(job, new Path(otherArgs[1]));//爲job設置輸出路徑

    System.exit(job.waitForCompletion(true) ?0 : 1);        //運行job

  }

}

 

4.將java文件打包成jar文件DemoWordCount.jar。

5.將DemoWordCount.jar上傳到

/home/hadoop/hadoop-2.7.5/share/hadoop/mapreduce

 

6.Hadoop執行DemoWordCount.jar 統計cloudComputing.txt詞頻

(1)

# 在hdfs的根目錄下創建input、output目錄

bin/hdfs dfs -mkdir /input

 bin /hdfs dfs -mkdir /output


# 查看HDFS根目錄下的目錄結構

bin/hdfs dfs -ls /

結果以下:

 

(2)

# 上傳

bin/hdfs dfs -put /home/hadoop/cloudComputing.txt /input

# 查看

bin/hdfs dfs -ls /input

結果以下:

 

(3)

# 將運行結果保存在/output/cloudComputing目錄下

bin/hadoop jar ~/hadoop-2.7.5/share/hadoop/mapreduce/DemoWordCount.jar wordcount /input/cloudComputing.txt /output/cloudComputing


# 查看/output/cloudComputing目錄下的文件

bin/hdfs dfs -ls /output/cloudComputing

 

 

(4)

# 查看運行結果

bin/hdfs dfs -cat /output/cloudComputing/part-r-00000 | sort -k 2 -n -r|head -20

#sort中 -k 2 表示用以tab爲分隔符的第二個字段來排序 -n表示用數字形式排序 -r表示從大到小排序 顯示結果前20行。

結果以下:

 

 

C++ 實現:

wordcount_map.cpp

#include <iostream>
#include <string>
using namespace std;

int main(int argc, char** argv)
{
        string key;
        string value = "1";
        while(cin >> key)
        {
                cout<< key << "\t" << value <<endl;
        }
        return 0;
}

 

wordcount_reduce.cpp

#include <iostream>
#include <string>
#include <map>
using namespace std;

int main(int argc, char** argv)
{
        string key;
        string value;
        map<string, int> word2count;
        map<string, int>::iterator it;
        while(cin >> key)
        {
                cin >> value;
                it = word2count.find(key);
                if(it != word2count.end())
                {
                        it->second++;
                }
                else
                {
                        word2count.insert(make_pair(key, 1));
                }
        }
        for(it = word2count.begin(); it != word2count.end(); it++)
        {
                cout<< it->first << "\t" << it->second << endl;
        }
        return 0;
}

  

編譯成可執行文件:

g++ -o mapperC wordcount_map.cpp

g++ -o reduceC wordcount_reduce.cpp

 

而後,調用 hadoop-streaming-2.7.5.jar

hadoop jar ~/hadoop-2.7.5/share/hadoop/tools/lib/hadoop-streaming-2.7.5.jar -input /input/cloudComputing.txt -output /output/cloudComputing -mapper ~/mapperC -reducer ~/reduceC -file ~/mapperC -file ~/reduceC

 

 

4.3 Spark使用scala編寫程序實現詞頻統計

使用 Scala 編寫的程序須要使用 sbt 進行編譯打包。

(1)安裝sbt

下載sbt-launch.jar。並拷貝到/usr/local/sbt

(2)在/usr/local/sbt 中建立 sbt 腳本,添加以下內容:

#!/bin/bash

SBT_OPTS="-Xms512M -Xmx1536M -Xss1M -XX:+CMSClassUnloadingEnabled -XX:MaxPermSize=256M"

java $SBT_OPTS -jar `dirname $0`/sbt-launch.jar "$@"

(3)保存後,爲 ./sbt 腳本增長可執行權限。

chmod u+x ./sbt

(4)最後運行以下命令

          ./sbt sbt-version

 

 

(5)編寫Scala應用程序

cd ~           # 進入用戶主文件夾

mkdir ./sparkapp        # 建立應用程序根目錄

mkdir -p ./sparkapp/src/main/scala # 建立所需的文件夾結構

vim WordCount.scala

/* wordcount.scala */
import org.apache.spark.SparkContext
import org.apache.spark.SparkContext._
import org.apache.spark.SparkConf

object WordCount {
 def main(args: Array[String]) {
   val conf = new SparkConf().setAppName("Word count")
   val sc = new SparkContext(conf) //初始化 SparkContext,SparkContext 的參數 SparkConf 包含了應用程序的信息

   val dataFile = sc.textFile("hdfs://hadoop1:9000/input/cloudComputing.txt")
   val outputFile = "/home/hadoop/spark.wordcount.out"

   val words = dataFile.flatMap(_.split(" "))
   val pairs = words.map(word => (word,1))
   val wordcount = pairs.reduceByKey(_ + _).map(pair=>(pair._2,pair._1)).sortByKey(false).map(pair=>(pair._2,pair._1))
   wordcount.collect.foreach(pair =>println("key:" + pair._1, "    value:" + pair._2))
   wordcount.saveAsTextFile(outputFile)
   sc.stop()
 }
}

 

該程序依賴SparkAPI,所以咱們須要經過sbt進行編譯打包。在./sparkapp 中新建文件 WordCount.sbt,添加內容以下,聲明該獨立應用程序的信息以及與 Spark 的依賴關係:

 

文件WordCount.sbt 須要指明 Spark 和 Scala 的版本。

 

(6)使用sbt打包Scala程序

      爲保證 sbt 能正常運行,先執行以下命令檢查整個應用程序的文件結構:

cd ~/sparkapp

find .

 

接着,/usr/local/sbt/sbt package 打包成JAR

 

生成jar包的位置:

/home/hadoop/sparkapp/target/scala-2.10/ wordcount_2.10-1.0.jar

 

(7)經過 spark-submit 運行程序

最後將生成的 jar 包經過 spark-submit 提交到 Spark 中運行了,命令以下:

sbin/spark-submit --master spark://hadoop1:7077 --class "WordCount" --executor-memory 512m ~/sparkapp/target/scala-2.10/wordcount_2.10-1.0.jar 200        

相關文章
相關標籤/搜索