大數據之MapReduce離線計算架構

Author: Lijbhtml

大數據(bigData)java

  • 數據量級大,處理GB/TB/PB級別數據(存儲、分析)
  • 時效性,須要在必定的時間範圍內計算出結果(幾個小時之內)
  • 數據多維度(多樣性),存在形式:傳感器採集信息、web運行日誌、用戶的行爲數據。
  • 數據可疑性,數據要有價值。須要對採集的數據作數據清洗、降噪

大數據解決問題?node

  • 存儲

打破單機存儲瓶頸(數量有限,數據不安全),讀寫效率低下(順序化讀寫)。大數據提出以分佈式存儲作爲大數據存儲和指導思想,經過構建集羣,實現對硬件作水平擴展提高系統存儲能力。目前爲止經常使用海量數據存儲的解決方案:Hadoop HDFS、FastDFS/GlusterFS(傳統文件系統)、MongoDB GridFS、S3等mysql

  • 計算

單機計算所能計算的數據量有限,並且所需時間沒法控制。大數據提出一種新的思惟模式,講計算拆分紅n個小計算任務,而後將計算任務分發給集羣中各個計算節點,由各個計算幾點合做完成計算,咱們將該種計算模式稱爲分佈式計算。目前主流的計算模式:離線計算、近實時計算、在線實時計算等。其中離線計算以Hadoop的MapReduce爲表明、近實時計算以Spark內存計算爲表明、在線實時計算以Storm、KafkaStream、SparkStream爲表明。linux

總結: 以上均是以分佈式的計算思惟去解決問題,由於垂直提高硬件成本高,通常企業優先選擇作分佈式水平擴展。程序員

Hadoop 誕生web

Hadoop由 Apache Software Foundation 公司於 2005 年秋天做爲Lucene的子項目Nutch的一部分正式引入。它受到最早由 Google Lab 開發的 Map/Reduce 和 Google File System(GFS) 的啓發。人們重構Nutch項目中的存儲模塊和計算模塊。2006 年Yahoo團隊加入Nutch工程嘗試將Nutch存儲模塊和計算模塊剝離所以就產生Hadoop1.x版本。2010年yahoo重構了hadoop的計算模塊,進一步優化Hadoop中MapReduce計算框架的管理模型,後續被稱爲Hadoop-2.x版本。hadoop-1.x和hadoop-2.x版本最大的區別是計算框架重構。由於Hadoop-2.x引入了Yarn做爲hadoop2計算框架的資源調度器,使得MapReduce框架擴展性更好,運行管理更加可靠。正則表達式

  • HDFS 存儲
  • MapReduce 計算 (MR2 引入YARN資源管理器)

大數據生態圈算法

hadoop生態-2006(物資文明):sql

  • hdfs hadoop的分佈式文件存儲系統
  • mapreduce指的是hadoop的分佈式計算
  • hbase 基於hdfs的一款nosql數據庫
  • flume 分佈式日誌採集系統
  • kafka 分佈式消息隊列
  • hive 一款作數據倉庫ETL(Extract-Transform-Load)工具,能夠將用戶編寫的SQL翻譯成MapReduce程序。
  • Storm 一款流計算框架,實現數據在線實時處理。
  • Mahout:象夫(騎大象的人),基於MapReduce實現一套算法庫,一般用於機器學習和人工智能。

計算層面-2010年(精神文明)

  • Spark Core:吸收MapReduce框架設計的經驗,有加州伯克利實驗室使用Scala語言設計出了一個基於內存計算的分佈式計算框架,計算性能幾乎是hadoop的MapReduce的10~100倍。
  • Spark SQL:使用SQL語句實現Spark的統計計算邏輯,加強交互性。
  • Spark Stream:Spark流計算框架。
  • Spark MLib :基於Spark計算實現的機器學習語言庫。
  • Spark GraphX:圖形化關係存儲

大數據應用場景?

  • 交通管理
  • 電商系統(日誌分析)
  • 互聯網醫療
  • 金融信息
  • 國家電網

Hadoop Distribute FileSystem

HDFS 架構

The Hadoop Distributed File System (HDFS) is a distributed file system designed to run on commodity hardware(商用硬件).Hardware failure is the norm rather than the exception. An HDFS instance may consist of hundreds or thousands of server machines, each storing part of the file system’s data. The fact that there are a huge number of components and that each component has a non-trivial probability of failure means that some component of HDFS is always non-functional. Therefore, detection of faults and quick, automatic recovery from them is a core architectural goal of HDFS.

  • Namenode和DataNode

HDFS has a master/slave architecture. An HDFS cluster consists of a single NameNode, a master server that manages the file system namespace and regulates access to files by clients. In addition, there are a number of DataNodes, usually one per node in the cluster, which manage storage attached to the nodes that they run on. HDFS exposes a file system namespace and allows user data to be stored in files. Internally, a file is split into one or more blocks and these blocks are stored in a set of DataNodes. The NameNode executes file system namespace operations like opening, closing, and renaming files and directories. It also determines the mapping of blocks to DataNodes. The DataNodes are responsible for serving read and write requests from the file system’s clients. The DataNodes also perform block creation, deletion, and replication upon instruction from the NameNode.

namenode:命名節點,用於管理集羣中元數據(描述了數據塊到datanode的映射關係以及塊的副本信息)以及DataNode。控制數據塊副本數(副本因子)能夠配置dfs.replication參數,若是是單機模式須要配置1,其次nenodenode的client訪問入口fs.defaultFS。在第一次搭建hdfs的時候須要執行hdfs namenode -foramt做用就是爲namenode建立元數據初始化文件fsimage.

dataNode:存儲block數據,響應客戶端對block讀寫請求,執行namenode指令,同時向namenode彙報自身的狀態信息。

block:文件的切割單位默認是128MB,能夠經過配置dfs.blocksize

Rack: 機架,用於優化存儲和計算,標識datanode所在物理主機的位置信息,能夠經過hdfs dfsadmin -printTopology

  • namenode & secondarynamenode

閱讀:http://hadoop.apache.org/docs/stable/hadoop-project-dist/hadoop-hdfs/HdfsDesign.html

Hadoop HDFS(分佈式存儲)搭建

  • 配置主機名和IP的映射關係

    [root@CentOS ~]# vi /etc/hosts 127.0.0.1 localhost localhost.localdomain localhost4 localhost4.localdomain4 ::1 localhost localhost.localdomain localhost6 localhost6.localdomain6 192.168.29.128 CentOS

  • 關閉系統防火牆

    [root@CentOS ~]# clear [root@CentOS ~]# service iptables stop iptables: Setting chains to policy ACCEPT: filter [ OK ] iptables: Flushing firewall rules: [ OK ] iptables: Unloading modules: [ OK ] [root@CentOS ~]# chkconfig iptables off [root@CentOS ~]# chkconfig --list | grep iptables iptables 0:off 1:off 2:off 3:off 4:off 5:off 6:off [root@CentOS ~]#

  • 配置本機SSH免密碼登陸

    [root@CentOS ~]# ssh-keygen -t rsa Generating public/private rsa key pair. Enter file in which to save the key (/root/.ssh/id_rsa): Enter passphrase (empty for no passphrase): Enter same passphrase again: Your identification has been saved in /root/.ssh/id_rsa. Your public key has been saved in /root/.ssh/id_rsa.pub. The key fingerprint is: de:83:c8:81:77:d7:db:f2:79:da:97:8b:36:d5:78:01 root@CentOS The key's randomart image is: +--[ RSA 2048]----+ | | | E | | . | | . . . | | . o S . . .o| | o = + o..o| | o o o o o..| | . =.+o| | ..=++| +-----------------+ [root@CentOS ~]# ssh-copy-id CentOS root@centos's password: **** Now try logging into the machine, with "ssh 'CentOS'", and check in:

    .ssh/authorized_keys

    to make sure we haven't added extra keys that you weren't expecting [root@CentOS ~]# ssh CentOS Last login: Mon Oct 15 19:47:46 2018 from 192.168.29.1 [root@CentOS ~]# exit logout Connection to CentOS closed.

  • 配置JAVA開發環境

    [root@CentOS ~]# rpm -ivh jdk-8u171-linux-x64.rpm Preparing... ########################################### [100%] 1:jdk1.8 ########################################### [100%] Unpacking JAR files... tools.jar... plugin.jar... javaws.jar... deploy.jar... rt.jar... jsse.jar... charsets.jar... localedata.jar...

    [root@CentOS ~]# vi .bashrc JAVA_HOME=/usr/java/latest PATH=$PATH:$JAVA_HOME/bin CLASSPATH=. export JAVA_HOME export PATH export CLASSPATH [root@CentOS ~]# source .bashrc [root@CentOS ~]# jps 1674 Jps

  • 配置安裝hadoop hdfs

    [root@CentOS ~]# tar -zxf hadoop-2.6.0_x64.tar.gz -C /usr/ HADOOP_HOME=/usr/hadoop-2.6.0 JAVA_HOME=/usr/java/latest PATH=$PATH:$JAVA_HOME/bin:$HADOOP_HOME/bin:$HADOOP_HOME/sbin CLASSPATH=. export JAVA_HOME export PATH export CLASSPATH export HADOOP_HOME [root@CentOS ~]# source .bashrc [root@CentOS ~]# hadoop classpath /usr/hadoop-2.6.0/etc/hadoop:/usr/hadoop-2.6.0/share/hadoop/common/lib/:/usr/hadoop-2.6.0/share/hadoop/common/:/usr/hadoop-2.6.0/share/hadoop/hdfs:/usr/hadoop-2.6.0/share/hadoop/hdfs/lib/:/usr/hadoop-2.6.0/share/hadoop/hdfs/:/usr/hadoop-2.6.0/share/hadoop/yarn/lib/:/usr/hadoop-2.6.0/share/hadoop/yarn/:/usr/hadoop-2.6.0/share/hadoop/mapreduce/lib/:/usr/hadoop-2.6.0/share/hadoop/mapreduce/:/usr/hadoop-2.6.0/contrib/capacity-scheduler/*.jar

  • 修改core-site.xml文件

[root@CentOS ~]# vi /usr/hadoop-2.6.0/etc/hadoop/core-site.xml

<property>
    <name>fs.defaultFS</name>
    <value>hdfs://CentOS:9000</value>
</property>
<property>
    <name>hadoop.tmp.dir</name>
    <value>/usr/hadoop-2.6.0/hadoop-${user.name}</value>
</property>
  • 修改hdfs-site.xml

[root@CentOS ~]# vi /usr/hadoop-2.6.0/etc/hadoop/hdfs-site.xml

<property>
    <name>dfs.replication</name>
    <value>1</value>
</property>
  • 修改slaves文本文件

[root@CentOS ~]# vi /usr/hadoop-2.6.0/etc/hadoop/slaves

CentOS
  • 初始化HDFS

    [root@CentOS ~]# hdfs namenode -format ... 18/10/15 20:06:03 INFO namenode.NNConf: ACLs enabled? false 18/10/15 20:06:03 INFO namenode.NNConf: XAttrs enabled? true 18/10/15 20:06:03 INFO namenode.NNConf: Maximum size of an xattr: 16384 18/10/15 20:06:03 INFO namenode.FSImage: Allocated new BlockPoolId: BP-665637298-192.168.29.128-1539605163213 18/10/15 20:06:03 INFO common.Storage: Storage directory /usr/hadoop-2.6.0/hadoop-root/dfs/name has been successfully formatted. 18/10/15 20:06:03 INFO namenode.NNStorageRetentionManager: Going to retain 1 images with txid >= 0 18/10/15 20:06:03 INFO util.ExitUtil: Exiting with status 0 18/10/15 20:06:03 INFO namenode.NameNode: SHUTDOWN_MSG: /************************************************************ SHUTDOWN_MSG: Shutting down NameNode at CentOS/192.168.29.128 ************************************************************/ [root@CentOS ~]# ls /usr/hadoop-2.6.0/ bin hadoop-root lib LICENSE.txt README.txt share etc include libexec NOTICE.txt sbin

只須要在第一次啓動HDFS的時候執行,之後重啓無需執行該命令

  • 啓動HDFS服務

    [root@CentOS ~]# start-dfs.sh Starting namenodes on [CentOS] CentOS: starting namenode, logging to /usr/hadoop-2.6.0/logs/hadoop-root-namenode-CentOS.out CentOS: starting datanode, logging to /usr/hadoop-2.6.0/logs/hadoop-root-datanode-CentOS.out Starting secondary namenodes [0.0.0.0] The authenticity of host '0.0.0.0 (0.0.0.0)' can't be established. RSA key fingerprint is 7c:95:67:06:a7:d0:fc:bc:fc:4d:f2:93:c2:bf:e9:31. Are you sure you want to continue connecting (yes/no)? yes 0.0.0.0: Warning: Permanently added '0.0.0.0' (RSA) to the list of known hosts. 0.0.0.0: starting secondarynamenode, logging to /usr/hadoop-2.6.0/logs/hadoop-root-secondarynamenode-CentOS.out [root@CentOS ~]# jps 2132 SecondaryNameNode 1892 NameNode 2234 Jps 1998 DataNode

用戶能夠經過瀏覽器訪問:http://192.168.29.128:50070

HDFS Shell

[root@CentOS ~]# hadoop fs -help | hdfs dfs -help
Usage: hadoop fs [generic options]
	[-appendToFile <localsrc> ... <dst>]
	[-cat [-ignoreCrc] <src> ...]
	[-chmod [-R] <MODE[,MODE]... | OCTALMODE> PATH...]
	[-copyFromLocal [-f] [-p] [-l] <localsrc> ... <dst>]
	[-copyToLocal [-p] [-ignoreCrc] [-crc] <src> ... <localdst>]
	[-cp [-f] [-p | -p[topax]] <src> ... <dst>]
	[-get [-p] [-ignoreCrc] [-crc] <src> ... <localdst>]
	[-help [cmd ...]]
	[-ls [-d] [-h] [-R] [<path> ...]]
	[-mkdir [-p] <path> ...]
	[-moveFromLocal <localsrc> ... <dst>]
	[-moveToLocal <src> <localdst>]
	[-mv <src> ... <dst>]
	[-put [-f] [-p] [-l] <localsrc> ... <dst>]
	[-rm [-f] [-r|-R] [-skipTrash] <src> ...]
	[-rmdir [--ignore-fail-on-non-empty] <dir> ...]
	[-tail [-f] <file>]
	[-text [-ignoreCrc] <src> ...]
	[-touchz <path> ...]
	[-usage [cmd ...]]
[root@CentOS ~]# hadoop fs -copyFromLocal /root/hadoop-2.6.0_x64.tar.gz /
[root@CentOS ~]# hdfs dfs -copyToLocal /hadoop-2.6.0_x64.tar.gz ~/

參考:http://hadoop.apache.org/docs/stable/hadoop-project-dist/hadoop-common/FileSystemShell.html

Java API 操做HDFS

  • Windows開發環境搭建

    • 解壓hadoop的安裝包到C:/
    • 配置HADOOP_HOME環境變量
    • 拷貝winutils.exe和hadoop.dll文件到hadoop安裝目錄的bin目錄下
    • windows上配置CentOS主機名和IP的映射關係
    • 重啓IDEA,以便系統識別HADOOP_HOME環境變量
  • 導入Maven依賴

    <dependency> <groupId>junit</groupId> <artifactId>junit</artifactId> <version>4.12</version> </dependency> <dependency> <groupId>org.apache.hadoop</groupId> <artifactId>hadoop-hdfs</artifactId> <version>2.6.0</version> </dependency> <dependency> <groupId>org.apache.hadoop</groupId> <artifactId>hadoop-common</artifactId> <version>2.6.0</version> </dependency> <dependency> <groupId>log4j</groupId> <artifactId>log4j</artifactId> <version>1.2.17</version> </dependency>

  • 關閉hdfs權限

    org.apache.hadoop.security.AccessControlException: Permission denied: user=HIAPAD, access=WRITE, inode="/":root:supergroup:drwxr-xr-x at org.apache.hadoop.hdfs.server.namenode.FSPermissionChecker.checkFsPermission(FSPermissionChecker.java:271) ...

配置hdfs-site.xml(方案1)

<property>
    <name>dfs.permissions.enabled</name>
    <value>false</value>
</property>

修改JVM啓動參數配置-DHADOOP_USER_NAME=root(方案2)

java xxx -DHADOOP_USER_NAME=root

執行chmod修改目錄讀寫權限

[root@CentOS ~]# hdfs dfs -chmod -R 777 /
  • java API 案例

    private FileSystem fileSystem; private Configuration conf; @Before public void before() throws IOException { conf=new Configuration(); conf.set("fs.defaultFS","hdfs://CentOS:9000"); conf.set("dfs.replication","1"); conf.set("fs.trash.interval","1"); fileSystem=FileSystem.get(conf); assertNotNull(fileSystem); } @Test public void testUpload01() throws IOException { InputStream is=new FileInputStream("C:\Users\HIAPAD\Desktop\買家須知.txt"); Path path = new Path("/bb.txt"); OutputStream os=fileSystem.create(path); IOUtils.copyBytes(is,os,1024,true); /byte[] bytes=new byte[1024]; while (true){ int n=is.read(bytes); if(n==-1) break; os.write(bytes,0,n); } os.close(); is.close();/ } @Test public void testUpload02() throws IOException { Path src=new Path("file:///C:\Users\HIAPAD\Desktop\買家須知.txt"); Path dist = new Path("/dd.txt"); fileSystem.copyFromLocalFile(src,dist); } @Test public void testDownload01() throws IOException { Path dist=new Path("file:///C:\Users\HIAPAD\Desktop\11.txt"); Path src = new Path("/dd.txt"); fileSystem.copyToLocalFile(src,dist); //fileSystem.copyToLocalFile(false,src,dist,true); } @Test public void testDownload02() throws IOException { OutputStream os=new FileOutputStream("C:\Users\HIAPAD\Desktop\22.txt"); InputStream is = fileSystem.open(new Path("/dd.txt")); IOUtils.copyBytes(is,os,1024,true); } @Test public void testDelete() throws IOException { Path path = new Path("/dd.txt"); fileSystem.delete(path,true); } @Test public void testListFiles() throws IOException { Path path = new Path("/"); RemoteIterator<LocatedFileStatus> files = fileSystem.listFiles(path, true); while(files.hasNext()){ LocatedFileStatus file = files.next(); System.out.println(file.getPath()+" "+file.getLen()+" "+file.isFile()); } } @Test public void testDeleteWithTrash() throws IOException { Trash trash=new Trash(conf); trash.moveToTrash(new Path("/aa.txt")); } @After public void after() throws IOException { fileSystem.close(); }

Hadoop MapReduce

MapReduce是一種編程模型,用於大規模數據集的並行運算。概念"Map(映射)"和"Reduce(歸約)",是它們的主要思想,都是從函數式編程語言(適合在網絡中傳遞方法)裏借來的,還有從矢量編程語言裏借來的特性。

Hadoop中MapReduce計算框架充分的利用了存儲節點所在物理主機的內存、CPU、網絡、少量磁盤完成對大數據集的分佈式計算。框架通常會在全部的DataNode所在的物理主機上啓動NodeManager服務,NodeManager服務用於管理該服務運行的物理節點的計算資源。除此以外系統通常會啓動一個ResourceManager用於統籌整個計算過程當中的資源調度問題。

MapReduce計算核心思想是將一個大的計算任務,拆分紅若干個小任務,每一個小任務獨立運行,而且獲得計算結果,通常是將計算結果存儲在本地。當第一批次任務執行結束,系統會啓動第二批次任務,第二批次的任務做用是將第一批次的計算臨時結果經過網路下載聚集到本地,而後在本地執行最終的彙總計算。

能夠理解爲當使用MapReduce執行大數據統計分析時,系統會將分析數據進行切分,咱們將切分信息 稱爲任務切片(實質是對分析目標數據的一種邏輯區間映射)。任務在執行的時候會更具任務切片的數目決定Map階段計算的並行度。也就意味着Map階段完成的是數據的局部計算。一個Map任務就表明着一個計算資源。當全部的Map任務都完成了對應區間的數據的局部計算後,Map任務會將計算結果存儲在本地磁盤上。緊接着系統會按照系統預設的彙總並行度啓動多個Reduce任務對Map階段計算結果進行彙總,而且將結果內容輸出到HDFS、MySQL、NoSQL中。

Map Reduce 2 架構

ResourceManager:統籌管理計算資源

NodeManager:啓動計算資源(Container)例如:MRAppMaster、YarnChild同時NM鏈接RM彙報自身一些資源佔用信息。

MRAppMaster:應用的Master負責任務計算過程當中的任務監控、故障轉移,每一個Job只有一個。

YARNChild:表示MapTask、ReduceTask的總稱,表示一個計算進程。

Yarn架構圖

構建MR運行環境

  • etc/hadoop/mapred-site.xml

[root@CentOS ~]# cp /usr/hadoop-2.6.0/etc/hadoop/mapred-site.xml.template /usr/hadoop-2.6.0/etc/hadoop/mapred-site.xml

<property>
        <name>mapreduce.framework.name</name>
        <value>yarn</value>
</property>
  • etc/hadoop/yarn-site.xml

    <property> <name>yarn.nodemanager.aux-services</name> <value>mapreduce_shuffle</value> </property> <property> <name>yarn.resourcemanager.hostname</name> <value>CentOS</value> </property>

  • 啓動YARN

    [root@CentOS ~]# start-yarn.sh [root@CentOS ~]# jps 5250 Jps 4962 ResourceManager 5043 NodeManager 3075 DataNode 3219 SecondaryNameNode 2959 NameNode

MR 第一個HelloWorld

INFO 192.168.0.1 wx com.baizhi.service.IUserService#login
INFO 192.168.0.4 wx com.baizhi.service.IUserService#login
INFO 192.168.0.2 wx com.baizhi.service.IUserService#login
INFO 192.168.0.3 wx com.baizhi.service.IUserService#login
INFO 192.168.0.1 wx com.baizhi.service.IUserService#login

SQL

create table t_access(
    level varchar(32),
    ip  varchar(64),
    app varchar(64),
    server varchar(128)
)
select ip,sum(1) from t_access group by ip;
    reduce(ip,[1,1,1,..])      map(ip,1)
  • Mapper邏輯

    public class AccessMapper extends Mapper<LongWritable,Text,Text,IntWritable> { /** *INFO 192.168.0.1 wx com.baizhi.service.IUserService#login * @param key : 行字節偏移量 * @param value:一行文本數據 * @param context * @throws IOException * @throws InterruptedException */ @Override protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException { String[] tokens=value.toString().split(" "); String ip=tokens[1]; context.write(new Text(ip),new IntWritable(1)); } }

  • Reducer邏輯

    import org.apache.hadoop.io.IntWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Reducer;

    import java.io.IOException;

    public class AccessReducer extends Reducer<Text,IntWritable,Text,IntWritable> { @Override protected void reduce(Text key, Iterable<IntWritable> values, Context context) throws IOException, InterruptedException { int total=0; for (IntWritable value : values) { total+=value.get(); } context.write(key,new IntWritable(total)); } }

  • 任務提交

    public class CustomJobSubmitter extends Configured implements Tool {

    public int run(String[] args) throws Exception {
          //1.封裝job
          Configuration conf=getConf();
          Job job=Job.getInstance(conf);
    
          //2.設置分析|輸出數據格式類型
          job.setInputFormatClass(TextInputFormat.class);
          job.setOutputFormatClass(TextOutputFormat.class);
    
          //3.設置數據讀入和寫出路徑
          Path src = new Path("/demo/access");
          TextInputFormat.addInputPath(job,src);
          Path dst = new Path("/demo/res");//必須爲null
          TextOutputFormat.setOutputPath(job,dst);
    
          //4.設置Mapper和Reducer邏輯
          job.setMapperClass(AccessMapper.class);
          job.setReducerClass(AccessReducer.class);
    
          //5.設置Mapper和Reducer的輸出k/v類型
          job.setMapOutputKeyClass(Text.class);
          job.setMapOutputValueClass(IntWritable.class);
    
          job.setOutputKeyClass(Text.class);
          job.setOutputValueClass(IntWritable.class);
    
          //6.提交任務
          job.waitForCompletion(true);
          return 0;
      }
    
      public static void main(String[] args) throws Exception {
          ToolRunner.run(new CustomJobSubmitter(),args);
      }

    }

  • 任務提交

    • jar任務發佈 job.setJarByClass(CustomJobSubmitter.class); [root@CentOS ~]# hadoop jar mr01-1.0-SNAPSHOT.jar com.baizhi.mr.CustomJobSubmitter
    • 本地仿真 Exception in thread "main" java.lang.UnsatisfiedLinkError: org.apache.hadoop.io.nativeio.NativeIO$Windows.access0(Ljava/lang/String;I)Z at org.apache.hadoop.io.nativeio.NativeIO$Windows.access0(Native Method) at org.apache.hadoop.io.nativeio.NativeIO$Windows.access(NativeIO.java:557) 覆蓋NativeIO文件,將557行代碼修改以下 public static boolean access(String path, AccessRight desiredAccess) throws IOException { return true; }
    • 跨平臺發佈
      • 將core|hdfs|yarn|mapred-site.xml拷貝項目的resources目錄
      • 在mapred-site.xml中開啓跨平臺提交 <property> <name>mapreduce.app-submission.cross-platform</name> <value>true</value> </property>
      • 在任務提交代碼中添加如下代碼片斷 conf.addResource("core-site.xml"); conf.addResource("hdfs-site.xml"); conf.addResource("mapred-site.xml"); conf.addResource("yarn-site.xml"); conf.set("mapreduce.job.jar","file:///xxxx.jar"); 本地提交時,因爲須要拷貝計算資源到HDFS全部可能會報如下異常: xception in thread "main" org.apache.hadoop.security.AccessControlException: Permission denied: user=HIAPAD, access=EXECUTE, inode="/tmp":root:supergroup:drwx------ at org.apache.hadoop.hdfs.server.namenode.FSPermissionChecker.checkFsPermission(FSPermissionChecker.java:271) at org.apache.hadoop.hdfs.server.namenode.FSPermissionChecker.check(FSPermissionChecker.java:257) at org.apache.hadoop.hdfs.server.namenode.FSPermissionChecker.checkTraverse(FSPermissionChecker.java:208) at org.apache.hadoop.hdfs.server.namenode.FSPermissionChecker.checkPermission(FSPermissionChecker.java:171) 以上異常解決方案,參考上一個章節關閉hdfs權限!

InputFormat/OutputFormat

  • InputFormat
    • FileInputFormat
      • TextInputFormat key/Value :key表示行字節的偏移量、value表示一行文本數據 切片計算規則 :以文件爲單位,以SpliSize作切割
      • NlineInputFormat key/Value :key表示行字節的偏移量、value表示一行文本數據 切片計算規則 :以文件爲單位,以n行做爲一個切片 mapreduce.input.lineinputformat.linespermap=1000控制一個切片的數據量
      • KeyValueTextInputFormat key/Value :key 、value使用\t分割一行數據(默認) 切片計算規則 :以文件爲單位,以SpliSize作切割 mapreduce.input.keyvaluelinerecordreader.key.value.separator=\t
      • SequenceFileInputFormat
      • CombineTextInputFormat key/Value :key表示行字節的偏移量、value表示一行文本數據 切片計算規則 :以SpliSize作切割 能夠解決小文件計算,優化MR任務。要求全部小文件格式必須一致
    • MultipleInputs 解決不一樣格式輸入的數據,要求Mapper端輸出的key/value必須一致。
    • DBInputFormat
    • TableInputFormat(第三方)
  • OutputFormat
    • FileOutputFormat
      • TextOutputFormat
      • SequenceFileOutputFormat
      • LazyOutputFormat
    • MultipleOutputs
    • DBOutputFormat
    • TableOutputFormat(第三方)

Map Reduce Shuffle(洗牌)

  • 掌握MR任務提交源碼流程

  • 解決MR任務計算過程當中的Jar包依賴

    JobSubmitter(DFS|YARNRunner) submitJobInternal(Job.this, cluster); checkSpecs(job);#檢查輸出目錄是否存在 JobID jobId = submitClient.getNewJobID();#獲取jobid #構建資源目錄 Path submitJobDir = new Path(jobStagingArea, jobId.toString()); #代碼jar|上傳第三方資源jar|配置文件 copyAndConfigureFiles(job, submitJobDir); int maps = writeSplits(job, submitJobDir);#任務切片 writeConf(conf, submitJobFile);#上傳任務上下文信息job.xml status = submitClient.submitJob(#提交任務給ResourceManager jobId, submitJobDir.toString(), job.getCredentials());

解決程序在運行期間的jar包依賴問題

  • 提交時依賴,配置HADOOP_CLASSPATH

    HADOOP_CLASSPATH=/root/mysql-connector-java-xxxx.jar HADOOP_HOME=/usr/hadoop-2.6.0 JAVA_HOME=/usr/java/latest PATH=$PATH:$JAVA_HOME/bin:$HADOOP_HOME/bin:$HADOOP_HOME/sbin CLASSPATH=. export JAVA_HOME export PATH export CLASSPATH export HADOOP_HOME export HADOOP_CLASSPATH

通常是在任務提交初期,須要鏈接第三方數據庫,計算任務切片。

  • 任務計算時依賴
    • 能夠經過程序設置 conf.set("tmpjars","file://jar路徑,...");
    • 若是是hadoop jar發佈任務 hadoop jar xxx.jar 主類名 -libjars jar路徑,...
  • Map Shuffle和Reduce Shuffle
  • 只有Mapper、沒有Reducer(數據清洗|降噪)

使用正則表達式提取子串

112.116.25.18 - [11/Aug/2017:09:57:24 +0800] "POST /json/startheartbeatactivity.action HTTP/1.1" 200 234 "http://wiki.wang-inc.com/pages/resumedraft.action?draftId=12058756&draftShareId=014b0741-df00-4fdc-90ca-4bf934f914d1" Mozilla/5.0 (Windows NT 10.0; WOW64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/59.0.3071.115 Safari/537.36 - 0.023 0.023 12.129.120.121:8090 200

String regex="^(\\d{3}\\.\\d{3}\\.\\d{1,3}\\.\\d{1,3})\\s-\\s\\[(.*)\\]\\s\".*\"\\s(\\d+)\\s(\\d+)\\s.*";
Pattern pattern = Pattern.compile(regex);
 Matcher matcher = pattern.matcher(input);
if(matcher.matches()){
    String value= matcher.group(1);//獲取第一個()匹配的內容
}
  • 自定義WritableComparable

    • Map端輸出key類型,必須實現WritableComparable(key須要參與排序)
    • Map端輸出value類型,必須實現Writable便可 Reduce端輸出的key/value須要注意哪些事項?252
  • 如何幹預MR程序的分區策略

    public class CustomPartitioner<K, V> extends Partitioner<K, V> {

    /** Use {@link Object#hashCode()} to partition. */
    public int getPartition(K key, V value,
                            int numReduceTasks) {
      return (key.hashCode() & Integer.MAX_VALUE) % numReduceTasks;
    }

    } job.setPartitionerClass(CustomPartitioner.class); //或者 conf.setClass("mapreduce.job.partitioner.class",CustomPartitioner.class,Partitioner.class);

爲什麼在作MR計算的時候,會產生數據傾斜?

由於不合理的KEY,致使了數據的分佈不均勻。選擇合適的key做爲統計依據,使得數據可以在愛分區均勻分佈。通常須要程序員對分析的數據有必定的預判!

  • 開啓Map端壓縮(只能在實際的環境下測試)

能夠有效,減小Reduce Shuffle過程的網絡帶寬佔用。可能在計算過程當中須要消耗額外的CPU進行數據的壓縮和解壓縮。

conf.setBoolean("mapreduce.map.output.compress", true);
conf.setClass("mapreduce.map.output.compress.codec",
GzipCodec.class, CompressionCodec.class);

若是是本地仿真可能會拋出not a gzip file錯誤,所以推薦你們在集羣環境下測試!

  • combiner

job.setCombinerClass(CombinerReducer.class);

CombinerReducer實際就是一個Class extends Reducer,combiner通常發生在溢寫階段和溢寫文件合併階段。

HDFS|YRAN HA

環境準備

  • CentOS-6.5 64 bit
  • jdk-7u79-linux-x64.rpm
  • hadoop-2.6.0.tar.gz
  • zookeeper-3.6.4.tar.gz

安裝CentOS主機-物理節點

CentOSA CentOSB CentOSC
192.168.29.129 192.168.29.130 192.168.29.131

基礎配置

  • 主機名和IP映射關係

    [root@CentOSX ~]# clear [root@CentOSX ~]# vi /etc/hosts

    127.0.0.1 localhost localhost.localdomain localhost4 localhost4.localdomain4 ::1 localhost localhost.localdomain localhost6 localhost6.localdomain6 192.168.128.133 CentOSA 192.168.128.134 CentOSB 192.168.128.135 CentOSC

  • 關閉防火牆

    [root@CentOSX ~]# service iptables stop iptables: Setting chains to policy ACCEPT: filter [ OK ] iptables: Flushing firewall rules: [ OK ] iptables: Unloading modules: [ OK ] [root@CentOSX ~]# chkconfig iptables off

  • SSH免密碼登陸

[root@CentOSX ~]# ssh-keygen -t rsa
[root@CentOSX ~]# ssh-copy-id CentOSA
[root@CentOSX ~]# ssh-copy-id CentOSB
[root@CentOSX ~]# ssh-copy-id CentOSC
  • 同步全部物理節點的時鐘

    [root@CentOSA ~]# date -s '2018-09-16 11:28:00' Sun Sep 16 11:28:00 CST 2018 [root@CentOSA ~]# clock -w [root@CentOSA ~]# date Sun Sep 16 11:28:13 CST 2018

安裝JDK配置JAVA_HOME環境變量

[root@CentOSX ~]# rpm -ivh jdk-8u171-linux-x64.rpm 
[root@CentOSX ~]# vi .bashrc 

JAVA_HOME=/usr/java/latest
CLASSPATH=.
PATH=$PATH:$JAVA_HOME/bin
export JAVA_HOME
export CLASSPATH
export PATH
[root@CentOSX ~]# source .bashrc

安裝zookeeper&啓動Zookeeper

[root@CentOSX ~]# tar -zxf zookeeper-3.4.6.tar.gz -C /usr/
[root@CentOSX ~]# vi /usr/zookeeper-3.4.6/conf/zoo.cfg
tickTime=2000
dataDir=/root/zkdata
clientPort=2181
initLimit=5
syncLimit=2
server.1=CentOSA:2887:3887
server.2=CentOSB:2887:3887
server.3=CentOSC:2887:3887
[root@CentOSX ~]# mkdir /root/zkdata
[root@CentOSA ~]# echo 1 >> zkdata/myid
[root@CentOSB ~]# echo 2 >> zkdata/myid
[root@CentOSC ~]# echo 3 >> zkdata/myid
[root@CentOSX zookeeper-3.4.6]# ./bin/zkServer.sh start zoo.cfg
JMX enabled by default
Using config: /usr/zookeeper-3.4.6/bin/../conf/zoo.cfg
Starting zookeeper ... STARTED
[root@CentOSX zookeeper-3.4.6]# ./bin/zkServer.sh status zoo.cfg

Hadoop配置與安裝

[root@CentOSX ~]# tar -zxf hadoop-2.6.0_x64.tar.gz -C /usr/
[root@CentOSX ~]# vi .bashrc 

HADOOP_HOME=/usr/hadoop-2.6.0
JAVA_HOME=/usr/java/latest
CLASSPATH=.
PATH=$PATH:$JAVA_HOME/bin:$HADOOP_HOME/bin:$HADOOP_HOME/sbin
export JAVA_HOME
export CLASSPATH
export PATH
export HADOOP_HOME
[root@CentOSX ~]# source .bashrc
  • core-site.xml

    <property> <name>fs.defaultFS</name> <value>hdfs://mycluster</value> </property> <property> <name>hadoop.tmp.dir</name> <value>/usr/hadoop-2.6.0/hadoop-${user.name}</value> </property> <property> <name>fs.trash.interval</name> <value>30</value> </property> <property> <name>net.topology.script.file.name</name> <value>/usr/hadoop-2.6.0/etc/hadoop/rack.sh</value> </property>

建立機架腳本文件,該腳本能夠根據IP判斷機器所處的物理位置

[root@CentOSX ~]# vi /usr/hadoop-2.6.0/etc/hadoop/rack.sh

while [ $# -gt 0 ] ; do
	  nodeArg=$1
	  exec</usr/hadoop-2.6.0/etc/hadoop/topology.data
	  result="" 
	  while read line ; do
		ar=( $line ) 
		if [ "${ar[0]}" = "$nodeArg" ] ; then
		  result="${ar[1]}"
		fi
	  done 
	  shift 
	  if [ -z "$result" ] ; then
		echo -n "/default-rack"
	  else
		echo -n "$result "
	  fi
done

[root@CentOSX ~]# chmod u+x /usr/hadoop-2.6.0/etc/hadoop/rack.sh
[root@CentOSX ~]# vi /usr/hadoop-2.6.0/etc/hadoop/topology.data

192.168.128.133 /rack1
192.168.128.134 /rack1
192.168.128.135 /rack2

[root@CentOSX ~]# /usr/hadoop-2.6.0/etc/hadoop/rack.sh 192.168.128.133
/rack1
  • hdfs-site.xml

    <property> <name>dfs.replication</name> <value>3</value> </property> <property> <name>dfs.ha.automatic-failover.enabled</name> <value>true</value> </property> <property> <name>ha.zookeeper.quorum</name> <value>CentOSA:2181,CentOSB:2181,CentOSC:2181</value> </property> <property> <name>dfs.nameservices</name> <value>mycluster</value> </property> <property> <name>dfs.ha.namenodes.mycluster</name> <value>nn1,nn2</value> </property> <property> <name>dfs.namenode.rpc-address.mycluster.nn1</name> <value>CentOSA:9000</value> </property> <property> <name>dfs.namenode.rpc-address.mycluster.nn2</name> <value>CentOSB:9000</value> </property> <property> <name>dfs.namenode.shared.edits.dir</name> <value>qjournal://CentOSA:8485;CentOSB:8485;CentOSC:8485/mycluster</value> </property> <property> <name>dfs.client.failover.proxy.provider.mycluster</name> <value>org.apache.hadoop.hdfs.server.namenode.ha.ConfiguredFailoverProxyProvider</value> </property> <property> <name>dfs.ha.fencing.methods</name> <value>sshfence</value> </property> <property> <name>dfs.ha.fencing.ssh.private-key-files</name> <value>/root/.ssh/id_rsa</value> </property>

  • slaves

    CentOSC[root@CentOSX ~]# vi /usr/hadoop-2.6.0/etc/hadoop/slaves

    CentOSA CentOSB CentOSC

HDFS啓動

[root@CentOSX ~]# hadoop-daemon.sh start journalnode //等上10秒鐘,再進行下一步操做
[root@CentOSA ~]# hdfs namenode -format
[root@CentOSA ~]# hadoop-daemon.sh start namenode
[root@CentOSB ~]# hdfs namenode -bootstrapStandby (下載active的namenode元數據)
[root@CentOSB ~]# hadoop-daemon.sh start namenode
[root@CentOSA|B ~]# hdfs zkfc -formatZK (能夠在CentOSA或者CentOSB任意一臺註冊namenode信息)
[root@CentOSA ~]# hadoop-daemon.sh start zkfc (哨兵)
[root@CentOSB ~]# hadoop-daemon.sh start zkfc (哨兵)
[root@CentOSX ~]# hadoop-daemon.sh start datanode

查看機架

[root@CentOSA ~]# hdfs dfsadmin -printTopology
Rack: /rack1
   192.168.29.129:50010 (CentOSA)
   192.168.29.130:50010 (CentOSB)

Rack: /rack2
   192.168.29.131:50010 (CentOSC)

集羣啓動和關閉

[root@CentOSA ~]# start|stop-dfs.sh #任意一臺均可以執行

若是重啓過程當中,由於journalnode初始化過慢,致使namenode啓動失敗,請在執行失敗的namenode節點上執行hadoop-daemon.sh start namenode

構建Yarn的集羣

  • 修改mapred-site.xml

    <property> <name>mapreduce.framework.name</name> <value>yarn</value> </property>

  • 修改yarn-site.xml

    <property> <name>yarn.nodemanager.aux-services</name> <value>mapreduce_shuffle</value> </property> <property> <name>yarn.resourcemanager.ha.enabled</name> <value>true</value> </property> <property> <name>yarn.resourcemanager.cluster-id</name> <value>cluster1</value> </property> <property> <name>yarn.resourcemanager.ha.rm-ids</name> <value>rm1,rm2</value> </property> <property> <name>yarn.resourcemanager.hostname.rm1</name> <value>CentOSB</value> </property> <property> <name>yarn.resourcemanager.hostname.rm2</name> <value>CentOSC</value> </property> <property> <name>yarn.resourcemanager.zk-address</name> <value>CentOSA:2181,CentOSB:2181,CentOSC:2181</value> </property>

  • 啓動YARN

    [root@CentOSB ~]# yarn-daemon.sh start resourcemanager [root@CentOSC ~]# yarn-daemon.sh start resourcemanager [root@CentOSX ~]# yarn-daemon.sh start nodemanager

查看ResourceManager HA狀態

[root@CentOSA ~]# yarn rmadmin -getServiceState rm1
active
[root@CentOSA ~]# yarn rmadmin -getServiceState rm2
standby
相關文章
相關標籤/搜索