Elasticsearch (ES)是一個基於Lucene構建的開源、分佈式、RESTful 接口全文搜索引擎。Elasticsearch 仍是一個分佈式文檔數據庫,其中每一個字段均是被索引的數據且可被搜索,它可以擴展至數以百計的服務器存儲以及處理PB級的數據。它能夠在很短的時間內在儲、搜索和分析大量的數據。它一般做爲具備複雜搜索場景狀況下的核心發動機。 Elasticsearch就是爲高可用和可擴展而生的。能夠經過購置性能更強的服務器來完成
1.橫向可擴展性:只須要增長臺服務器,作一點兒配置,啓動一下Elasticsearch就能夠併入集羣。 2.分片機制提供更好的分佈性:同一個索引分紅多個分片(sharding), 這點相似於HDFS的塊機制;分而治之的方式可提高處理效率。 3.高可用:提供複製( replica) 機制,一個分片能夠設置多個複製,使得某臺服務器在宕機的狀況下,集羣仍舊能夠照常運行,並會把服務器宕機丟失的數據信息複製恢復到其餘可用節點上。 4.使用簡單:共需一條命令就能夠下載文件,而後很快就能搭建一一個站內搜索引擎。
大型分佈式日誌分析系統ELK elasticsearch(存儲日誌)+logstash(收集日誌)+kibana(展現數據) 大型電商商品搜索系統、網盤搜索引擎等。
倒排表以字或詞爲關鍵字進行索引,表中關鍵字所對應的記錄表項記錄了出現這個字或詞的全部文檔,一個表項就是一個字表段,它記錄該文檔的ID和字符在該文檔中出現的位置狀況。 因爲每一個字或詞對應的文檔數量在動態變化,因此倒排表的創建和維護都較爲複雜,可是在查詢的時候因爲能夠一次獲得查詢關鍵字所對應的全部文檔,因此效率高於正排表。在全文檢索中,檢索的快速響應是一個最爲關鍵的性能,而索引創建因爲在後臺進行,儘管效率相對低一些,但不會影響整個搜索引擎的效率。
文檔內容:java
序號c++
文檔內容數據庫
1安全
小俊是一家科技公司創始人,開的汽車是奧迪a8l,加速爽。服務器
2多線程
小薇是一家科技公司的前臺,開的汽車是保時捷911併發
3app
小紅買了小薇的保時捷911,加速爽。elasticsearch
4分佈式
小明是一家科技公司開發主管,開的汽車是奧迪a6l,加速爽。
5
小軍是一家科技公司開發,開的汽車是比亞迪速銳,加速有點慢
倒排索引會對以上文檔內容進行關鍵詞分詞,可使用關鍵次直接定位到文檔內容。
單詞ID
單詞
倒排列表docId
1
小
1,2,3,4,5
2
一家
1,2,4,5
3
科技公司
1,2,4,5
4
開發
4,5
5
汽車
1,2,4,5
6
奧迪
1,4
7
加速爽
1,3,4
8
保時捷
2,3
9
保時捷911
2
10
比亞迪
5
需求:有大量的文本文檔,以下所示: a.txt hello tom hello jim hello kitty hello rose b.txt hello jerry hello jim hello kitty hello jack c.txt hello jerry hello java hello c++ hello c++ 須要獲得如下結果: hello a.txt-->4 b.txt-->4 c.txt-->4 java c.txt-->1 jerry b.txt-->1 c.txt-->1 ....
思路:
一、先寫一個mr程序:統計出每一個單詞在每一個文件中的總次數
hello-a.txt 4
hello-b.txt 4
hello-c.txt 4
java-c.txt 1
jerry-b.txt 1
jerry-c.txt 1
要點1:map方法中,如何獲取所處理的這一行數據所在的文件名?
worker在調用map方法時,會傳入一個context,而context中包含了這個worker所讀取的數據切片信息。而切片信息又包含這個切片所在的文件信息,那麼就能夠在map中:
FileSplit split=context.getInputSplit();
String fileName=split.getPath().getName();
要點二:setup方法
worker在正式處理數據以前,會先調用一次setup方法,因此,常利用這個機制來作一些初始化操做
二、而後在寫一個mr程序,讀取上述結果數據:
map: 根據-切,以單詞作key,後面一段做爲value
reduce: 拼接values裏面的每一段,以單詞作key,拼接結果作value,輸出便可
代碼實現
public class IndexStepOne { public static class IndexStepOneMapper extends Mapper<LongWritable,Text,Text,IntWritable>{ /** * 產生: <單詞-文件名,1><單詞-文件名,1> */ @Override protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException { /** * 若是map task讀的是文件:劃分範圍是:《文件路徑,偏移量範圍》 * 若是map task讀的是數據庫的數據,劃分的任務範圍是:《庫名.表名,行範圍》 * 因此給抽象的getInputSplit */ //每一個map task所處理的數據任務範圍 FileSplit inputSplit = (FileSplit) context.getInputSplit(); String fileName = inputSplit.getPath().getName(); String[] words = value.toString().split(" "); for(String w:words){ //單詞-文件名 1 context.write(new Text(w+"-"+fileName),new IntWritable(1)); } } } public static class IndexStepOneReduce extends Reducer<Text,IntWritable,Text,IntWritable>{ @Override protected void reduce(Text key, Iterable<IntWritable> values, Context context) throws IOException, InterruptedException { int count=0; for(IntWritable value:values){ count+=value.get(); } context.write(key,new IntWritable(count)); } } public static void main(String[] args) throws Exception{ Configuration conf = new Configuration(); Job job = Job.getInstance(conf); //動態獲取jar包在哪裏 job.setJarByClass(IndexStepOne.class); //2.封裝參數:本次job所要調用的mapper實現類 job.setMapperClass(IndexStepOneMapper.class); job.setReducerClass(IndexStepOneReduce.class); //3.封裝參數:本次job的Mapper實現類產生的數據key,value的類型 job.setMapOutputKeyClass(Text.class); job.setMapOutputValueClass(IntWritable.class); //4.封裝參數:本次Reduce返回的key,value數據類型 job.setOutputKeyClass(Text.class); job.setOutputValueClass(IntWritable.class); //6.封裝參數:想要啓動的reduce task的數量 job.setNumReduceTasks(3); FileInputFormat.setInputPaths(job,new Path("F:\\mrdata\\index\\input")); FileOutputFormat.setOutputPath(job,new Path("F:\\mrdata\\index\\out1")); boolean res = job.waitForCompletion(true); System.exit(res ? 0:-1); } }
運行輸出
part-r-000000 part-r-000001 part-r-0000002
hello-c.txt 4 jack-b.txt 1 java-c.txt 1 jerry-b.txt 1 kitty-a.txt 1 rose-a.txt 1
c++-c.txt 2 hello-a.txt 4 jerry-c.txt 1 jim-a.txt 1 kitty-b.txt 1 tom-a.txt 1
hello-b.txt 4 jim-b.txt 1
public class IndexStepOne2 { public static class IndexStepOneMapper extends Mapper<LongWritable,Text,Text,Text>{ @Override protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException { String[] split = value.toString().split("-"); context.write(new Text(split[0]), new Text(split[1]. replaceAll("\t","-->"))); } } public static class IndexStepOneReduce extends Reducer<Text,Text,Text,Text>{ //reduce階段對相同的key進行處理,相同key發給同一個reduce task處理 @Override protected void reduce(Text key, Iterable<Text> values, Context context) throws IOException, InterruptedException { //StringBuffer是線程安全的,StringBuild是線程不安全的 //這裏沒有多線程併發,用StringBuild更快 StringBuilder sb = new StringBuilder(); /** * <hello a.txt-->4> <hello b.txt-->4> <hello c.txt-->4> * <java c.txt-->1> * <jetty b.txt-->1><jetty c.tex-->1> */ /** * hello a.txt-->4 b.txt-->4 c.txt-->4 * java c.txt-->1 * jerry b.txt-->1 c.txt-->1 */ for(Text value:values){ sb.append(value.toString()).append("\t"); } context.write(key,new Text(sb.toString())); } } public static void main(String[] args) throws Exception{ Configuration conf = new Configuration(); Job job = Job.getInstance(conf); //動態獲取jar包在哪裏 job.setJarByClass(IndexStepOne2.class); //2.封裝參數:本次job所要調用的mapper實現類 job.setMapperClass(IndexStepOneMapper.class); job.setReducerClass(IndexStepOneReduce.class); //3.封裝參數:本次job的Mapper實現類產生的數據key,value的類型 job.setMapOutputKeyClass(Text.class); job.setMapOutputValueClass(Text.class); //4.封裝參數:本次Reduce返回的key,value數據類型 job.setOutputKeyClass(Text.class); job.setOutputValueClass(Text.class); //6.封裝參數:想要啓動的reduce task的數量 job.setNumReduceTasks(1); FileInputFormat.setInputPaths(job,new Path("F:\\mrdata\\index\\out1")); FileOutputFormat.setOutputPath(job,new Path("F:\\mrdata\\index\\out2")); boolean res = job.waitForCompletion(true); System.exit(res ? 0:-1); } }
運行輸出
c++ c.txt-->2 hello a.txt-->4 b.txt-->4 c.txt-->4 jack b.txt-->1 java c.txt-->1 jerry b.txt-->1 c.txt-->1 jim a.txt-->1 b.txt-->1 kitty b.txt-->1 a.txt-->1 rose a.txt-->1 tom a.txt-->1