Elastic Stack 筆記(十)Elasticsearch5.6 For Hadoop

博客地址:http://www.moonxy.comjava

1、前言node

ES-Hadoop 是鏈接快速查詢和大數據分析的橋樑,它可以無間隙的在 Hadoop 和 ElasticSearch 上移動數據。ES Hadoop索引 Hadoop 數據到 Elasticsearch,充分利用其查詢速度,大量聚合能力來使它比以往更快,同時可使用 HDFS 做爲 Elasticsearch 長期存檔。ES-Hadoop能夠本地集成 Hadoop 生態系統上的不少流行組件,好比 Spark、Storm、Hive、Pig、Storm、MapReduce等。git

ES-Hadoop 與大數據的關係圖apache

首先須要在機器上配置 SSH 免密登陸,此處再也不講解。json

2、安裝 Hadoopvim

2.1 Hadoop 的三種模式app

Hadoop 主要分爲三種安裝模式,分別爲:單機模式、僞分佈式模式和徹底分佈式模式。下面以僞分佈式模式爲例。框架

1)單機(非分佈式)模式elasticsearch

這種模式在一臺單機上運行,沒有分佈式文件系統,而是直接讀寫本地操做系統的文件系統。分佈式

2)僞分佈式運行模式

這種模式也是在一臺單機上運行,但用不一樣的Java進程模仿分佈式運行中的各種結點: (NameNode,DataNode,JobTracker,TaskTracker,SecondaryNameNode)

請注意分佈式運行中的這幾個結點的區別:

從分佈式存儲的角度來講,集羣中的結點由一個NameNode和若干個DataNode組成,另有一個SecondaryNameNode做爲NameNode的備份。

從分佈式應用的角度來講,集羣中的結點由一個JobTracker和若干個TaskTracker組成,JobTracker負責任務的調度,TaskTracker負責並行執行任務。TaskTracker必須運行在DataNode上,這樣便於數據的本地計算。JobTracker和NameNode則無須在同一臺機器上。一個機器上,既當 namenode,又當 datanode,或者說既是 jobtracker,又是tasktracker。沒有所謂的在多臺機器上進行真正的分佈式計算,故稱爲 "僞分佈式"。

3)徹底分佈式模式

真正的分佈式,由3個及以上的實體機或者虛擬機組件的機羣。

2.2 下載 Hadoop

Apache Hadoop 官方下載地址爲:http://apache.org/dist/hadoop/common/,或者訪問全部歷史版本地址:http://archive.apache.org/dist/hadoop/common/

此處選擇 2.9.1 版本,下載並解壓,以下:

[root@masternode software]# tar zxvf /usr/software/hadoop-2.9.1.tar.gz -C /opt/hadoop
[root@masternode software]# chown -R esuser:esuser /opt/hadoop

2.3 配置 Hadoop

hadoop 包括的配置文件主要有:hadoop-env.sh、core-site.xml、yarn-site.xml、mapred-site.xml、hdfs-site.xml 等均位於 /opt/hadoop/hadoop-2.9.1/etc/hadoop 目錄下。

修改 hadoop-env.sh,添加 JAVA_HOME,以下:

[esuser@masternode hadoop]$ vim /opt/hadoop/hadoop-2.9.1/etc/hadoop/hadoop-env.sh
export JAVA_HOME=/opt/jdk/jdk1.8.0_16

在集羣環境下,即便各結點在 /etc/profile 中都正確地配置了JAVA_HOME,也會報以下錯誤:

localhost: Error: JAVA_HOME is not set and could not be found.

在hadoop-env.sh中,再顯示地從新聲明一遍JAVA_HOME

修改 /etc/profile 系統環境變量,添加 Hadoop 變量,以下:

[root@masternode hadoop-2.9.1]# vim /etc/profile

添加 Hadoop_HOME,以下:

#Hadoop variables
export HADOOP_HOME=/opt/hadoop/hadoop-2.9.1
export PATH=$PATH:$HADOOP_HOME/bin:$HADOOP_HOME/sbin

是配置文件生效(只對當前 Bash 生效),以下:

[root@masternode hadoop-2.9.1]# source /etc/profile

修改 core-site.xml,添加以下配置:

<configuration>
    <property>
        <name>hadoop.tmp.dir</name>
        <value>/opt/hadoop/hadoop-2.9.1/hdfs/tmp</value>
    </property>
    <property>
        <name>fs.defaultFS</name>
        <value>hdfs://localhost:9000</value>
    </property>
</configuration>

屬性名 fs.default.name 已經廢棄,使用新的 fs.defaultFS 來代替。fs.defaultFS 保存了 NameNode 的位置,HDFS 和 MapReduce 組件都須要使用到。

修改 mapred-site.xml,以下:

先從模板複製一份配置文件

[esuser@masternode hadoop]$ cp /opt/hadoop/hadoop-2.9.1/etc/hadoop/mapred-site.xml.template /opt/hadoop/hadoop-2.9.1/etc/hadoop/mapred-site.xml

再添加以下配置

<configuration>
    <property>
        <name>mapred.job.tracker</name>
        <value>localhost:9010</value>
    </property>
</configuration>

變量 mapred.job.tracker 保存了 JobTracker 的位置,MapReduce 組件須要知道這個位置。

修改 hdfs-site.xml,添加以下配置,以下:

<configuration>
    <property>
        <name>dfs.replication</name>
        <value>1</value>
    </property>
</configuration>

變量 dfs.replication 制定了每一個 HDFS 數據文件的副本次數,默認爲 3,此處修改成 1。

#並在hdfs-site.xml添加:
#name:
<property>
    <name>dfs.namenode.name.dir</name>
    <value>file://${hadoop.tmp.dir}/dfs/name</value>
    #專門針對name的路徑設置,不放在默認的路徑下,能夠指定咱們的默認物理磁盤
    <description>肯定本地文件系統上DFS名稱節點的位置應該存儲名稱表(fsimage)。 若是這是一個以逗號分隔的列表的目錄,而後名稱表被複制到全部的目錄中,以實現冗餘。</description>
</property>
#data:
    <property>
        <name>dfs.datanode.data.dir</name>
        <value>file://${hadoop.tmp.dir}/dfs/data</value>
        <description>肯定本地文件系統上DFS數據節點的位置應該存儲它的塊。若是這是逗號分隔的目錄列表,而後數據將被存儲在全部命名目錄,一般在不一樣的設備上。目錄應該被標記與HDFS對應的存儲類型([SSD] / [磁盤] / [存檔] / [RAM_DISK])存儲政策。 若是目錄不存在,則默認存儲類型爲DISK沒有明確標記的存儲類型。 不存在的目錄將若是本地文件系統權限容許,則建立它。</description>
    </property>

 啓動 Hadoop 以前,首先格式化 namenode,以下:

[esuser@masternode ~]$ hadoop namenode -format

顯示以下:

2.4 啓動 Hadoop

執行 start-all.sh 腳本和先執行 star-dfs.sh 再執行 start-yarn.sh 是同樣的。

格式化完成以後,啓動 Hadoop,命令以下:

[esuser@masternode hadoop]$ start-all.sh
This script is Deprecated. Instead use start-dfs.sh and start-yarn.sh
Starting namenodes on [localhost]
localhost: starting namenode, logging to /opt/hadoop/hadoop-2.9.1/logs/hadoop-esuser-namenode-masternode.out
localhost: starting datanode, logging to /opt/hadoop/hadoop-2.9.1/logs/hadoop-esuser-datanode-masternode.out
Starting secondary namenodes [0.0.0.0]
0.0.0.0: starting secondarynamenode, logging to /opt/hadoop/hadoop-2.9.1/logs/hadoop-esuser-secondarynamenode-masternode.out
starting yarn daemons
resourcemanager running as process 2212. Stop it first.
localhost: starting nodemanager, logging to /opt/hadoop/hadoop-2.9.1/logs/yarn-esuser-nodemanager-masternode.out

使用 jps 命令查看 JVM 進程,以下:

[esuser@masternode hadoop]$ jps
3504 SecondaryNameNode
3299 DataNode
2212 ResourceManager
3204 NameNode
3718 NodeManager
3855 Jps

正常狀況下會看到 NameNode、Nodemanager、ResourceManager、DataNode 和 SecondaryNameNode,就說明已經啓動成功了。

3、安裝 ES-Hadoop

ES-Hadoop 全部版本下載地址:https://www.elastic.co/downloads/past-releases,找到 ES-Hadoop 5.6.0 版本下載,須要與 Elasticsearch 5.6.0 的版本相互對應一致,下載並解壓到 /opt 目錄下。

ES-Hadoop是一個 jar 包,工做在 hadoop 這邊,ES 這邊不須要安裝。

在 /etc/profile 中添加環境變量:

#ESHADOOP_HOME variables
export ESHADOOP_HOME=/opt/elasticsearch-hadoop-5.6.0
export CLASSPATH=$CLASSPATH:$ESHADOOP_HOME/dist

4、從 HDFS 到 Elasticsearch

首先將 blog.json 上傳到 HDFS,使用以下命令:

hadoop fs -put blog.json /work
#或者
hdfs dfs -put blog.json /work

blog.json 的內容爲:

{"id":"1","title":"git簡介","posttime":"2016-06-11","content":"svn與git的最主要區別..."}
{"id":"2","title":"ava中泛型的介紹與簡單使用","posttime":"2016-06-12","content":"基本操做:CRUD ..."}
{"id":"3","title":"SQL基本操做","posttime":"2016-06-13","content":"svn與git的最主要區別..."}
{"id":"4","title":"Hibernate框架基礎","posttime":"2016-06-14","content":"Hibernate框架基礎..."}
{"id":"5","title":"Shell基本知識","posttime":"2016-06-15","content":"Shell是什麼..."}

編寫程序:

package com.es.hd;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.BytesWritable;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
import org.elasticsearch.hadoop.mr.EsOutputFormat;
import java.io.IOException;

public class HdfsToES {
    
    public static class MyMapper extends Mapper<Object, Text, NullWritable, BytesWritable> {
        public void map(Object key, Text value, Mapper<Object, Text, NullWritable, BytesWritable>.Context context)
                throws IOException, InterruptedException {
            byte[] line = value.toString().trim().getBytes();
            BytesWritable blog = new BytesWritable(line);
            context.write(NullWritable.get(), blog);
        }
    }

    public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException {
        Configuration conf = new Configuration();
        conf.setBoolean("mapred.map.tasks.speculative.execution", false);
        conf.setBoolean("mapred.reduce.tasks.speculative.execution", false);
        conf.set("es.nodes", "192.168.56.110:9200");
        conf.set("es.resource", "blog/cnblogs");
        conf.set("es.mapping.id", "id");
        conf.set("es.input.json", "yes");

        Job job = Job.getInstance(conf, "hadoop es write test");
        job.setMapperClass(HdfsToES.MyMapper.class);
        job.setInputFormatClass(TextInputFormat.class);
        job.setOutputFormatClass(EsOutputFormat.class);
        job.setMapOutputKeyClass(NullWritable.class);
        job.setMapOutputValueClass(BytesWritable.class);

        FileInputFormat.setInputPaths(job, new Path("hdfs://localhost:9000//work/blog.json"));
        job.waitForCompletion(true);
    }
}

5、從 Elasticsearch 到 HDFS

5.1 讀取索引到 HDFS

讀取 Elasticsearch 一個類型中的所有數據到 HDFS,這裏讀取索引爲 blog 類型爲 cnblogs 的全部文檔,以下:

package com.es.hd;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.io.Writable;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.elasticsearch.hadoop.mr.EsInputFormat;
import java.io.IOException;

public class EsToHDFS {
    public static class MyMapper extends Mapper<Writable, Writable, NullWritable, Text> {
        @Override
        protected void map(Writable key, Writable value, Context context) throws IOException, InterruptedException {
            Text text = new Text();
            text.set(value.toString());
            context.write(NullWritable.get(), text);
        }
    }

    public static void main(String[] args) throws Exception {
        Configuration configuration = new Configuration();
        configuration.set("es.nodes", "192.168.56.110:9200");
        configuration.set("es.resource", "blog/cnblogs");
        configuration.set("es.output.json", "true");
        Job job = Job.getInstance(configuration, "hadoop es write test");
        job.setMapperClass(MyMapper.class);
        job.setNumReduceTasks(1);
        job.setMapOutputKeyClass(NullWritable.class);
        job.setMapOutputValueClass(Text.class);
        job.setInputFormatClass(EsInputFormat.class);
        FileOutputFormat.setOutputPath(job, new Path("hdfs://localhost:9000/work/blog_cnblogs"));
        job.waitForCompletion(true);
    }
}

5.2 查詢 Elasticsearch 寫入 HDFS

能夠穿入查詢條件對 Elastticsearch 中的文檔進行搜索,再把文檔查詢結果寫入 HDFS。這裏查詢 title 中含有關鍵詞 git 的文檔,以下:

package com.es.hd;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.io.Writable;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.elasticsearch.hadoop.mr.EsInputFormat;
import java.io.IOException;

public class EsQueryToHDFS {
    public static class MyMapper extends Mapper<Writable, Writable, Text, Text> {
        @Override
        protected void map(Writable key, Writable value, Context context) throws IOException, InterruptedException {
            context.write(new Text(key.toString()), new Text(value.toString()));
        }
    }

    public static void main(String[] args) throws Exception {
        Configuration configuration = new Configuration();
        configuration.set("es.nodes", "192.168.56.110:9200");
        configuration.set("es.resource", "blog/cnblogs");
        configuration.set("es.output.json", "true");
        configuration.set("es.query", "?q=title:git");
        Job job = Job.getInstance(configuration, "query es to HDFS");
        job.setMapperClass(MyMapper.class);
        job.setNumReduceTasks(1);
        job.setMapOutputKeyClass(Text.class);
        job.setMapOutputValueClass(Text.class);
        job.setInputFormatClass(EsInputFormat.class);
        FileOutputFormat.setOutputPath(job, new Path("hdfs://localhost:9000/work/es_query_to_HDFS"));
        job.waitForCompletion(true);
    }
}
相關文章
相關標籤/搜索