Elasticsearch(二)

1、Java API操做

Elasticsearch的Java客戶端很是強大;它能夠創建一個嵌入式實例並在必要時運行管理任務php

運行一個Java應用程序和Elasticsearch時,有兩種操做模式可供使用。該應用程序可在Elasticsearch集羣中扮演更加主動或更加被動的角色。在更加主動的狀況下(稱爲Node Client),應用程序實例將從集羣接收請求,肯定哪一個節點應處理該請求,就像正常節點所作的同樣。(應用程序甚至能夠託管索引和處理請求。)另外一種模式稱爲Transport Client,它將全部請求都轉發到另外一個Elasticsearch節點,由後者來肯定最終目標html

1. API基本操做

1.1 操做環境準備
1)建立maven工程
2)添加pom文件前端

 
<dependencies>
<dependency>
<groupId>junit </groupId>
<artifactId>junit </artifactId>
<version>4.10 </version>
<scope>test </scope>
</dependency>
<dependency>
<groupId>org.elasticsearch </groupId>
<artifactId>elasticsearch </artifactId>
<version>6.1.1 </version>
</dependency>
<dependency>
<groupId>org.elasticsearch.client </groupId>
<artifactId>transport </artifactId>
<version>6.1.1 </version>
</dependency>
<dependency>
<groupId>org.apache.logging.log4j </groupId>
<artifactId>log4j-core </artifactId>
<version>2.9.0 </version>
</dependency>
</dependencies>

3)等待依賴的jar包下載完成
當直接在ElasticSearch 創建文檔對象時,若是索引不存在的,默認會自動建立,映射採用默認方式 java

1.2 獲取Transport Client
(1)ElasticSearch服務默認端口9300
(2)Web管理平臺端口9200node

 
private TransportClient client;
@SuppressWarnings( "unchecked")
@Before
public void getClient() throws Exception {
// 1 設置鏈接的集羣名稱
Settings settings = Settings.builder().put( "cluster.name", "my-application").build();
// 2 鏈接集羣
client = new PreBuiltTransportClient(settings);
client.addTransportAddress( new InetSocketTransportAddress(InetAddress.getByName( "hsiehchou121"), 9300));
// 3 打印集羣名稱
System.out.println(client.toString());
}

(3)顯示log4j2報錯,在resource目錄下建立一個文件命名爲log4j2.xml並添加以下內容mysql

 
<?xml version="1.0" encoding="UTF-8"?>
<Configuration status="warn">
<Appenders>
<Console name="Console" target="SYSTEM_OUT">
<PatternLayout pattern="%m%n"/>
</Console>
</Appenders>
<Loggers>
<Root level="INFO">
<AppenderRef ref="Console"/>
</Root>
</Loggers>
</Configuration>

1.3 建立索引
源代碼linux

 
@Test
public void createIndex_blog(){
// 1 建立索引
client .admin() .indices() .prepareCreate( "blog2") .get();
// 2 關閉鏈接
client .close();
}

1.4 刪除索引
源代碼ios

 
@Test
public void deleteIndex(){
// 1 刪除索引
client .admin() .indices() .prepareDelete( "blog2") .get();
// 2 關閉鏈接
client .close();
}

1.5 新建文檔(源數據json串)
當直接在ElasticSearch創建文檔對象時,若是索引不存在的,默認會自動建立,映射採用默認方式
源代碼git

 
@Test
public void createIndexByJson() throws UnknownHostException {
// 1 文檔數據準備
String json = "{" + "\"id\ ":\" 1\ "," + "\"title\ ":\"基於Lucene的搜索服務器\ ","
+ "\"content\ ":\"它提供了一個分佈式多用戶能力的全文搜索引擎,基於RESTful web接口\ "" + "}";
// 2 建立文檔
IndexResponse indexResponse = client.prepareIndex( "blog", "article", "1").setSource(json).execute().actionGet();
// 3 打印返回的結果
System. out.println( "index:" + indexResponse.getIndex());
System. out.println( "type:" + indexResponse.getType());
System. out.println( "id:" + indexResponse.getId());
System. out.println( "version:" + indexResponse.getVersion());
System. out.println( "result:" + indexResponse.getResult());
// 4 關閉鏈接
client.close();
}

1.6 新建文檔(源數據map方式添加json)
源代碼
Test
public void createIndexByMap() {github

 
// 1 文檔數據準備
Map< String, Object> json = new HashMap< String, Object>();
json. put( "id", "2");
json. put( "title", "基於Lucene的搜索服務器");
json. put( "content", "它提供了一個分佈式多用戶能力的全文搜索引擎,基於RESTful web接口");
// 2 建立文檔
IndexResponse indexResponse = client.prepareIndex( "blog", "article", "2").setSource(json).execute().actionGet();
// 3 打印返回的結果
System.out. println( "index:" + indexResponse.getIndex());
System.out. println( "type:" + indexResponse.getType());
System.out. println( "id:" + indexResponse.getId());
System.out. println( "version:" + indexResponse.getVersion());
System.out. println( "result:" + indexResponse. getResult());
// 4 關閉鏈接
client. close();
}

1.7 新建文檔(源數據es構建器添加json)
源代碼

 
@Test
public void createIndex() throws Exception {
// 1 經過es自帶的幫助類,構建json數據
XContentBuilder builder = XContentFactory.jsonBuilder().startObject().field( "id", 3).field( "title", "基於Lucene的搜索服務器").field( "content", "它提供了一個分佈式多用戶能力的全文搜索引擎,基於RESTful web接口。")
.endObject();
// 2 建立文檔
IndexResponse indexResponse = client.prepareIndex( "blog", "article", "3").setSource(builder). get();
// 3 打印返回的結果
System.out. println( "index:" + indexResponse.getIndex());
System.out. println( "type:" + indexResponse.getType());
System.out. println( "id:" + indexResponse.getId());
System.out. println( "version:" + indexResponse.getVersion());
System.out. println( "result:" + indexResponse. getResult());
// 4 關閉鏈接
client. close();
}

1.8 搜索文檔數據(單個索引)
源代碼
@Test
public void getData() throws Exception {

 
// 1 查詢文檔
GetResponse response = client.prepareGet( "blog", "article", "1"). get();
// 2 打印搜索的結果
System.out. println(response.getSourceAsString());
// 3 關閉鏈接
client. close();
}

1.9 搜索文檔數據(多個索引)
源代碼

 
@ Test
public void getMultiData() {
// 1 查詢多個文檔
MultiGetResponse response = client.prepareMultiGet(). add( "blog", "article", "1"). add( "blog", "article", "2", "3"). add( "blog", "article", "2"). get();
// 2 遍歷返回的結果
for(MultiGetItemResponse itemResponse:response){
GetResponse getResponse = itemResponse.getResponse();
// 若是獲取到查詢結果
if (getResponse.isExists()) {
String sourceAsString = getResponse.getSourceAsString();
System. out.println(sourceAsString);
}
}
// 3 關閉資源
client.close();
}

1.10 更新文檔數據(update)
源代碼

 
@Test
public void updateData() throws Throwable {
// 1 建立更新數據的請求對象
UpdateRequest updateRequest = new UpdateRequest();
updateRequest.index( "blog");
updateRequest.type( "article");
updateRequest.id( "3");
updateRequest.doc(XContentFactory.jsonBuilder().startObject()
// 對沒有的字段添加, 對已有的字段替換
.field( "title", "基於Lucene的搜索服務器")
.field( "content", "它提供了一個分佈式多用戶能力的全文搜索引擎,基於RESTful web接口。大數據前景無限")
.field( "createDate", "2017-8-22").endObject());
// 2 獲取更新後的值
UpdateResponse indexResponse = client.update(updateRequest). get();
// 3 打印返回的結果
System.out. println( "index:" + indexResponse.getIndex());
System.out. println( "type:" + indexResponse.getType());
System.out. println( "id:" + indexResponse.getId());
System.out. println( "version:" + indexResponse.getVersion());
System.out. println( "create:" + indexResponse. getResult());
// 4 關閉鏈接
client. close();
}

1.11 更新文檔數據(upsert)
設置查詢條件, 查找不到則添加IndexRequest內容,查找到則按照UpdateRequest更新

 
@Test
public void testUpsert() throws Exception {
// 設置查詢條件, 查找不到則添加
IndexRequest indexRequest = new IndexRequest( "blog", "article", "5")
.source(XContentFactory.jsonBuilder().startObject().field( "title", "搜索服務器").field( "content", "它提供了一個分佈式多用戶能力的全文搜索引擎,基於RESTful web接口。Elasticsearch是用Java開發的,並做爲Apache許可條款下的開放源碼發佈,是當前流行的企業級搜索引擎。設計用於雲計算中,可以達到實時搜索,穩定,可靠,快速,安裝使用方便。").endObject());
// 設置更新, 查找到更新下面的設置
UpdateRequest upsert = new UpdateRequest( "blog", "article", "5")
.doc(XContentFactory.jsonBuilder().startObject().field( "user", "李四").endObject()).upsert(indexRequest);
client.update(upsert).get();
client.close();
}

1.12 刪除文檔數據(prepareDelete)
源代碼

 
@Test
public void deleteData() {
// 1 刪除文檔數據
DeleteResponse indexResponse = client.prepareDelete( "blog", "article", "5"). get();
// 2 打印返回的結果
System.out. println( "index:" + indexResponse.getIndex());
System.out. println( "type:" + indexResponse.getType());
System.out. println( "id:" + indexResponse.getId());
System.out. println( "version:" + indexResponse.getVersion());
System.out. println( "found:" + indexResponse. getResult());
// 3 關閉鏈接
client. close();
}
2. 條件查詢QueryBuilder

2.1 查詢全部(matchAllQuery)
源代碼

 
@Test
public void matchAllQuery() {
// 1 執行查詢
SearchResponse searchResponse = client.prepareSearch( "blog").setTypes( "article")
.setQuery(QueryBuilders.matchAllQuery()). get();
// 2 打印查詢結果
SearchHits hits = searchResponse.getHits(); // 獲取命中次數,查詢結果有多少對象
System.out. println( "查詢結果有:" + hits.getTotalHits() + "條");
for (SearchHit hit : hits) {
System.out. println(hit.getSourceAsString()); //打印出每條結果
}
// 3 關閉鏈接
client. close();
}

2.2 對全部字段分詞查詢(queryStringQuery)
源代碼

 
@Test
public void query() {
// 1 條件查詢
SearchResponse searchResponse = client.prepareSearch( "blog").setTypes( "article")
.setQuery(QueryBuilders.queryStringQuery( "全文")). get();
// 2 打印查詢結果
SearchHits hits = searchResponse.getHits(); // 獲取命中次數,查詢結果有多少對象
System.out. println( "查詢結果有:" + hits.getTotalHits() + "條");
for (SearchHit hit : hits) {
System.out. println(hit.getSourceAsString()); //打印出每條結果
}
// 3 關閉鏈接
client. close();
}

2.3 通配符查詢(wildcardQuery)

  • :表示多個字符(0個或多個字符)
    ?:表示單個字符
    源代碼

 
@Test
public void wildcardQuery() {
// 1 通配符查詢
SearchResponse searchResponse = client.prepareSearch( "blog").setTypes( "article")
.setQuery(QueryBuilders.wildcardQuery( "content", "*全*")). get();
// 2 打印查詢結果
SearchHits hits = searchResponse.getHits(); // 獲取命中次數,查詢結果有多少對象
System.out. println( "查詢結果有:" + hits.getTotalHits() + "條");
for (SearchHit hit : hits) {
System.out. println(hit.getSourceAsString()); //打印出每條結果
}
// 3 關閉鏈接
client. close();
}

2.4 詞條查詢(TermQuery)
源代碼

 
@Test
public void termQuery() {
// 1 第一field查詢
SearchResponse searchResponse = client.prepareSearch( "blog").setTypes( "article")
.setQuery(QueryBuilders.termQuery( "content", "全文")). get();
// 2 打印查詢結果
SearchHits hits = searchResponse.getHits(); // 獲取命中次數,查詢結果有多少對象
System.out. println( "查詢結果有:" + hits.getTotalHits() + "條");
for (SearchHit hit : hits) {
System.out. println(hit.getSourceAsString()); //打印出每條結果
}
// 3 關閉鏈接
client. close();
}

2.5 模糊查詢(fuzzy)
源代碼

 
@Test
public void fuzzy() {
// 1 模糊查詢
SearchResponse searchResponse = client.prepareSearch( "blog").setTypes( "article")
.setQuery(QueryBuilders.fuzzyQuery( "title", "lucene")). get();
// 2 打印查詢結果
SearchHits hits = searchResponse.getHits(); // 獲取命中次數,查詢結果有多少對象
System.out. println( "查詢結果有:" + hits.getTotalHits() + "條");
Iterator<SearchHit> iterator = hits.iterator();
while (iterator.hasNext()) {
SearchHit searchHit = iterator.next(); // 每一個查詢對象
System.out. println(searchHit.getSourceAsString()); // 獲取字符串格式打印
}
// 3 關閉鏈接
client. close();
}
3. 映射相關操做

源代碼

 
@ Test
public void createMapping() throws Exception {
// 1設置mapping
XContentBuilder builder = XContentFactory.jsonBuilder()
.startObject()
.startObject( "article")
.startObject( "properties")
.startObject( "id1")
.field( "type", "string")
.field( "store", "yes")
.endObject()
.startObject( "title2")
.field( "type", "string")
.field( "store", "no")
.endObject()
.startObject( "content")
.field( "type", "string")
.field( "store", "yes")
.endObject()
.endObject()
.endObject()
.endObject() ;
// 2 添加mapping
PutMappingRequest mapping = Requests.putMappingRequest( "blog4").type( "article").source(builder) ;
client.admin().indices().putMapping(mapping).get() ;
// 3 關閉資源
client.close() ;
}

2、IK分詞器

針對詞條查詢(TermQuery),查看默認中文分詞器的效果:
curl -XGET ‘http://hsiehchou:9200/_analyze?pretty&analyzer=standard’ -d ‘中華人民共和國’
{
「tokens」 : [
{
「token」 : 「中」,
「start_offset」 : 0,
「end_offset」 : 1,
「type」 : 「」,
「position」 : 0
},
{
「token」 : 「華」,
「start_offset」 : 1,
「end_offset」 : 2,
「type」 : 「」,
「position」 : 1
},
{
「token」 : 「人」,
「start_offset」 : 2,
「end_offset」 : 3,
「type」 : 「」,
「position」 : 2
},
{
「token」 : 「民」,
「start_offset」 : 3,
「end_offset」 : 4,
「type」 : 「」,
「position」 : 3
},
{
「token」 : 「共」,
「start_offset」 : 4,
「end_offset」 : 5,
「type」 : 「」,
「position」 : 4
},
{
「token」 : 「和」,
「start_offset」 : 5,
「end_offset」 : 6,
「type」 : 「」,
「position」 : 5
},
{
「token」 : 「國」,
「start_offset」 : 6,
「end_offset」 : 7,
「type」 : 「」,
「position」 : 6
}
]
}

1. IK分詞器的安裝

1.1 前期準備工做
1)CentOS聯網
配置CentOS能鏈接外網。Linux虛擬機ping www.baidu.com 是暢通的

2)jar包準備
(1)elasticsearch-analysis-ik-master.zip
(下載地址:https://github.com/medcl/elasticsearch-analysis-ik)
(2)apache-maven-3.6.0-bin.tar.gz

1.2 jar包安裝
1)Maven解壓、配置 MAVEN_HOME和PATH。
tar -zxvf apache-maven-3.6.0-bin.tar.gz -C /opt/module/
sudo vi /etc/profile

 
#MAVEN_HOME
export MAVEN_HOME=/opt/module/apache-maven-3.6.0
export PATH= $PATH: $MAVEN_HOME/bin

source /etc/profile
驗證命令:mvn -version

2)Ik分詞器解壓、打包與配置
ik分詞器解壓
unzip elasticsearch-analysis-ik-master.zip -d ./
進入ik分詞器所在目錄

cd elasticsearch-analysis-ik-master
使用maven進行打包

mvn package -Pdist,native -DskipTests -Dtar
打包完成以後,會出現 target/releases/elasticsearch-analysis-ik-{version}.zip

pwd /opt/software/elasticsearch-analysis-ik-master/target/releases
對zip文件進行解壓,並將解壓完成以後的文件拷貝到es所在目錄下的/plugins/

unzip elasticsearch-analysis-ik-6.0.0.zip
cp -r elasticsearch /opt/module/elasticsearch-5.6.1/plugins/

須要修改plugin-descriptor.properties文件,將其中的es版本號改成你所使用的版本號,即完成ik分詞器的安裝
vi plugin-descriptor.properties
修改成
elasticsearch.version=6.1.1
至此,安裝完成,重啓ES!

注意:需選擇與es相同版本的ik分詞器。
安裝方法(2種):
1.
./elasticsearch-plugin install https://github.com/medcl/elasticsearch-analysis-ik/releases/download/v6.1.1/elasticsearch-analysis-ik-6.1.1.zip

2.
cp elasticsearch-analysis-ik-6.1.1.zip ./elasticsearch-6.1.1/plugins/
unzip elasticsearch-analysis-ik-6.1.1.zip -d ik-analyzer

三、elasticsearch-plugin install -f file:///usr/local/elasticsearch-analysis-ik-6.1.1.zip

2. IK分詞器的使用

2.1 命令行查看結果
ik_smart模式
curl -XGET ‘http://hsiehchou121:9200/_analyze?pretty&analyzer=ik_smart’ -d ‘中華人民共和國’

curl -H 「Content-Type:application/json」 -XGET ‘http://192.168.116.121:9200/_analyze?pretty’ -d ‘{「analyzer」:」ik_smasysctl -prt」,」text」:」中華人民共和國」}’
{
「tokens」 : [
{
「token」 : 「中華人民共和國」,
「start_offset」 : 0,
「end_offset」 : 7,
「type」 : 「CN_WORD」,
「position」 : 0
}
]
}

ik_max_word模式
curl -XGET ‘http://hadoop121:9200/_analyze?pretty&analyzer=ik_max_word’ -d ‘中華人民共和國’

curl -H 「Content-Type:application/json」 -XGET ‘http://192.168.116.124:9200/_analyze?pretty’ -d ‘{「analyzer」:」ik_max_word」,」text」:」中華人民共和國」}’

{
「tokens」 : [
{
「token」 : 「中華人民共和國」,
「start_offset」 : 0,
「end_offset」 : 7,
「type」 : 「CN_WORD」,
「position」 : 0
},
{
「token」 : 「中華人民」,
「start_offset」 : 0,
「end_offset」 : 4,
「type」 : 「CN_WORD」,
「position」 : 1
},
{
「token」 : 「中華」,
「start_offset」 : 0,
「end_offset」 : 2,
「type」 : 「CN_WORD」,
「position」 : 2
},
{
「token」 : 「華人」,
「start_offset」 : 1,
「end_offset」 : 3,
「type」 : 「CN_WORD」,
「position」 : 3
},
{
「token」 : 「人民共和國」,
「start_offset」 : 2,
「end_offset」 : 7,
「type」 : 「CN_WORD」,
「position」 : 4
},
{
「token」 : 「人民」,
「start_offset」 : 2,
「end_offset」 : 4,
「type」 : 「CN_WORD」,
「position」 : 5
},
{
「token」 : 「共和國」,
「start_offset」 : 4,
「end_offset」 : 7,
「type」 : 「CN_WORD」,
「position」 : 6
},
{
「token」 : 「共和」,
「start_offset」 : 4,
「end_offset」 : 6,
「type」 : 「CN_WORD」,
「position」 : 7
},
{
「token」 : 「國」,
「start_offset」 : 6,
「end_offset」 : 7,
「type」 : 「CN_CHAR」,
「position」 : 8
}
]
}

2.2 JavaAPI操做
1)建立索引
//建立索引(數據庫)

 
@Test
public void createIndex() {
//建立索引
client .admin() .indices() .prepareCreate( "blog4") .get();
//關閉資源
client .close();
}

2)建立mapping
//建立使用ik分詞器的mapping

 
@ Test
public void createMapping() throws Exception {
// 1設置mapping
XContentBuilder builder = XContentFactory.jsonBuilder()
.startObject()
.startObject( "article")
.startObject( "properties")
.startObject( "id1")
.field( "type", "string")
.field( "store", "yes")
.field( "analyzer", "ik_smart")
.endObject()
.startObject( "title2")
.field( "type", "string")
.field( "store", "no")
.field( "analyzer", "ik_smart")
.endObject()
.startObject( "content")
.field( "type", "string")
.field( "store", "yes")
.field( "analyzer", "ik_smart")
.endObject()
.endObject()
.endObject()
.endObject() ;
// 2 添加mapping
PutMappingRequest mapping = Requests.putMappingRequest( "blog4").type( "article").source(builder) ;
client.admin().indices().putMapping(mapping).get() ;
// 3 關閉資源
client.close() ;
}

3)插入數據
//建立文檔,以map形式

 
@Test
public void createDocumentByMap() {
HashMap< String, String> map = new HashMap<>();
map. put( "id1", "2");
map. put( "title2", "Lucene");
map. put( "content", "它提供了一個分佈式的web接口");
IndexResponse response = client.prepareIndex( "blog4", "article", "3").setSource( map).execute().actionGet();
//打印返回的結果
System.out. println( "結果:" + response. getResult());
System.out. println( "id:" + response.getId());
System.out. println( "index:" + response.getIndex());
System.out. println( "type:" + response.getType());
System.out. println( "版本:" + response.getVersion());
//關閉資源
client. close();
}

4) 詞條查詢
//詞條查詢

 
@ Test
public void queryTerm() {
SearchResponse response = client.prepareSearch( "blog4").setTypes( "article").setQuery(QueryBuilders.termQuery( "content", "提供")). get();
//獲取查詢命中結果
SearchHits hits = response.getHits();
System. out.println( "結果條數:" + hits.getTotalHits());
for (SearchHit hit : hits) {
System. out.println(hit.getSourceAsString());
}
}

Store 的解釋:
官方文檔說 store 默認是 no ,想固然的理解爲也就是說這個 field 是不會 store 的,可是查詢的時候也能查詢出來。

通過查找資料瞭解到原來 store 的意思是,是否在 _source 以外在獨立存儲一份。這裏要說一下 _source 這是源文檔,當索引數據的時候, elasticsearch 會保存一份源文檔到 _source 。若是文檔的某一字段設置了 store 爲 yes (默認爲 no),這時候會在 _source 存儲以外再爲這個字段獨立進行存儲,這麼作的目的主要是針對內容比較多的字段。

若是放到 _source 返回的話,由於_source 是把全部字段保存爲一份文檔,命中後讀取只須要一次 IO,包含內容特別多的字段會很佔帶寬影響性能。一般咱們也不須要完整的內容返回(可能只關心摘要),這時候就不必放到 _source 裏一塊兒返回了(固然也能夠在查詢時指定返回字段)。

3、Logstash

1. Logstash簡介

Logstash is a tool for managing events and logs. You can use it to collect logs, parse them, and store them for later use (like, for searching).

logstash是一個數據分析軟件,主要目的是分析log日誌。整一套軟件能夠看成一個MVC模型,logstash是controller層,Elasticsearch是一個model層,kibana是view層。

首先將數據傳給logstash,它將數據進行過濾和格式化(轉成JSON格式),而後傳給Elasticsearch進行存儲、建搜索的索引,kibana提供前端的頁面再進行搜索和圖表可視化,它是調用Elasticsearch的接口返回的數據進行可視化。logstash和Elasticsearch是用Java寫的,kibana使用node.js框架。

這個軟件官網有很詳細的使用說明,https://www.elastic.co/,除了docs以外,還有視頻教程。這篇博客集合了docs和視頻裏面一些比較重要的設置和使用。

2. Logstash 安裝

直接下載官方發佈的二進制包的,能夠訪問 https://www.elastic.co/downloads/logstash 頁面找對應操做系統和版本,點擊下載便可。

在終端中,像下面這樣運行命令來啓動 Logstash 進程:
輸入(讀取數據):file、es。 輸出:file、es、kafka

 
bin/logstash -e 'input{stdin{}}output{stdout{codec=>rubydebug}}'

-f文件 -e命令 標準輸入、輸出(命令行)

注意:若是出現以下報錯,請調高虛擬機內存容量
Java HotSpot(TM) 64-Bit Server VM warning: INFO: os::commit_memory(0x00000000c5330000, 986513408, 0) failed; error=’Cannot allocate memory’ (errno=12)

而後你會發現終端在等待你的輸入。沒問題,敲入 Hello World,回車,

{
「@version」 => 「1」,
「host」 => 「*「,
「message」 => 「hello world」,
「@timestamp」 => 2019-03-18T02:51:18.578Z
}

每位系統管理員都確定寫過不少相似這樣的命令
cat randdata | awk ‘{print $2}’ | sort | uniq -c | tee sortdata

Logstash 就像管道符同樣!
你輸入(就像命令行的 cat )數據,而後處理過濾(就像 awk 或者 uniq 之類)數據,最後輸出(就像 tee )到其餘地方

3. Logstash 配置

3.1 input配置
讀取文件(File)

 
input {
file {
path => [ "/var/log/*.log", "/var/log/message"]
type => "system"
start_position => "beginning"
}
}
output{stdout{codec=>rubydebug}}

有一些比較有用的配置項,能夠用來指定 FileWatch 庫的行爲

discover_interval
logstash 每隔多久去檢查一次被監聽的 path 下是否有新文件。默認值是 15 秒

exclude
不想被監聽的文件能夠排除出去,這裏跟 path 同樣支持 glob 展開

close_older
一個已經監聽中的文件,若是超過這個值的時間內沒有更新內容,就關閉監聽它的文件句柄。默認是 3600 秒,即一小時

ignore_older
在每次檢查文件列表的時候,若是一個文件的最後修改時間超過這個值,就忽略這個文件。默認是 86400 秒,即一天

sincedb_path
若是你不想用默認的 $HOME/.sincedb(Windows 平臺上在 C:\Windows\System32\config\systemprofile.sincedb),能夠經過這個配置定義 sincedb 文件到其餘位置

sincedb_write_interval
logstash 每隔多久寫一次 sincedb 文件,默認是 15 秒

stat_interval
logstash 每隔多久檢查一次被監聽文件狀態(是否有更新),默認是 1 秒

start_position
logstash 從什麼位置開始讀取文件數據,默認是結束位置,也就是說 logstash 進程會以相似 tail -F 的形式運行。若是你是要導入原有數據,把這個設定改爲 「beginning」,logstash 進程就從頭開始讀取,相似 less +F 的形式運行

啓動命令:../bin/logstash -f ./input_file.conf
測試命令:echo ‘hehe’ >> test.log
echo ‘hehe2’ >> message

標準輸入(Stdin)
咱們已經見過好幾個示例使用 stdin 了。這也應該是 logstash 裏最簡單和基礎的插件了

 
input {
stdin {
add_field => { "key" => "value"}
codec => "plain"
tags => [ "add"]
type => "std"
}
}
output{stdout{codec=>rubydebug}}

用上面的新 stdin 設置從新運行一次最開始的 hello world 示例。我建議你們把整段配置都寫入一個文本文件,而後運行命令:../bin/logstash -f ./input_stdin.conf。輸入 「hello world」 並回車後,你會在終端看到以下輸出

 
{
"message" => "hello world",
"@version" => "1",
"@timestamp" => "2014-08-08T06:48:47.789Z",
"type" => "std",
"tags" => [
[ 0] "add"
],
"key" => "value",
"host" => "raochenlindeMacBook-Air.local"
}

解釋
type 和 tags 是 logstash 事件中兩個特殊的字段。一般來講咱們會在輸入區段中經過 type 來標記事件類型。而 tags 則是在數據處理過程當中,由具體的插件來添加或者刪除的

最多見的用法是像下面這樣

 
input {
stdin {
type => "web"
}
}
filter {
if [ type] == "web" {
grok {
match => [ "message", %{COMBINEDAPACHELOG}]
}
}
}
output {
if "_grokparsefailure" in [tags] {
nagios_nsca {
nagios_status => "1"
}
} else {
elasticsearch {
}
}
}

3.2 codec配置
Codec 是 logstash 從 1.3.0 版開始新引入的概念(Codec 來自 Coder/decoder 兩個單詞的首字母縮寫)

在此以前,logstash 只支持純文本形式輸入,而後以過濾器處理它。但如今,咱們能夠在輸入期處理不一樣類型的數據,這全是由於有了 codec 設置

因此,這裏須要糾正以前的一個概念。Logstash 不僅是一個input | filter | output 的數據流,而是一個 input | decode | filter | encode | output 的數據流!codec 就是用來 decode、encode 事件的

codec 的引入,使得 logstash 能夠更好更方便的與其餘有自定義數據格式的運維產品共存,好比 graphite、fluent、netflow、collectd,以及使用 msgpack、json、edn 等通用數據格式的其餘產品等

事實上,咱們在第一個 「hello world」 用例中就已經用過 codec 了 —— rubydebug 就是一種 codec!雖然它通常只會用在 stdout 插件中,做爲配置測試或者調試的工具

採用 JSON 編碼
在早期的版本中,有一種下降 logstash 過濾器的 CPU 負載消耗的作法盛行於社區(在當時的 cookbook 上有專門的一節介紹):直接輸入預約義好的 JSON 數據,這樣就能夠省略掉 filter/grok 配置!

這個建議依然有效,不過在當前版本中須要稍微作一點配置變更 —— 由於如今有專門的 codec 設置

配置示例

 
input {
stdin {
add_field => { "key" => "value"}
codec => "json"
type => "std"
}
}
output{stdout{codec=>rubydebug}}

輸入:
{「simCar」:18074045598,」validityPeriod」:」1996-12-06」,」unitPrice」:9,」quantity」:19,」amount」:35,」imei」:887540376467915,」user」:」test」}

運行結果:
{
「imei」 => 887540376467915,
「unitPrice」 => 9,
「user」 => 「test」,
「@timestamp」 => 2019-03-19T05:01:53.451Z,
「simCar」 => 18074045598,
「host」 => 「zzc-203」,
「amount」 => 35,
「@version」 => 「1」,
「key」 => 「value」,
「type」 => 「std」,
「validityPeriod」 => 「1996-12-06」,
「quantity」 => 19
}

3.3 filter配置
Grok插件

logstash擁有豐富的filter插件,它們擴展了進入過濾器的原始數據,進行復雜的邏輯處理,甚至能夠無中生有的添加新的 logstash 事件到後續的流程中去!Grok 是 Logstash 最重要的插件之一。也是迄今爲止使蹩腳的、無結構的日誌結構化和可查詢的最好方式。Grok在解析 syslog logs、apache and other webserver logs、mysql logs等任意格式的文件上表現完美

這個工具很是適用於系統日誌,Apache和其餘網絡服務器日誌,MySQL日誌等。

配置:

 
input {
stdin {
type => "std"
}
}
filter {
grok {
match=>{ "message"=> "%{IP:client} %{WORD:method} %{URIPATHPARAM:request} %{NUMBER:bytes} %{NUMBER:duration}" }
}
}
output{stdout{codec=>rubydebug}}

輸入:55.3.244.1 GET /index.html 15824 0.043
輸出:
{
「@version」 => 「1」,
「host」 => 「zzc-203」,
「request」 => 「/index.html」,
「bytes」 => 「15824」,
「duration」 => 「0.043」,
「method」 => 「GET」,
「@timestamp」 => 2019-03-19T05:09:55.777Z,
「message」 => 「55.3.244.1 GET /index.html 15824 0.043」,
「type」 => 「std」,
「client」 => 「55.3.244.1」
}

grok模式的語法以下:
%{SYNTAX:SEMANTIC}

SYNTAX:表明匹配值的類型,例如3.44能夠用NUMBER類型所匹配,127.0.0.1可使用IP類型匹配。
SEMANTIC:表明存儲該值的一個變量名稱,例如 3.44 多是一個事件的持續時間,127.0.0.1多是請求的client地址。因此這兩個值能夠用 %{NUMBER:duration} %{IP:client} 來匹配。

你也能夠選擇將數據類型轉換添加到Grok模式。默認狀況下,全部語義都保存爲字符串。若是您但願轉換語義的數據類型,例如將字符串更改成整數,則將其後綴爲目標數據類型。例如%{NUMBER:num:int}將num語義從一個字符串轉換爲一個整數。目前惟一支持的轉換是int和float。

Logstash附帶約120個模式。你能夠在這裏找到它們https://github.com/logstash-plugins/logstash-patterns-core/tree/master/patterns

自定義類型
更多時候logstash grok沒辦法提供你所須要的匹配類型,這個時候咱們可使用自定義。

建立自定義 patterns 文件。
①建立一個名爲patterns其中建立一個文件postfix (文件名可有可無,隨便起),在該文件中,將須要的模式寫爲模式名稱,空格,而後是該模式的正則表達式。例如:

POSTFIX_QUEUEID [0-9A-F]{10,11}

②而後使用這個插件中的patterns_dir設置告訴logstash目錄是你的自定義模式。

配置:

 
input {
stdin {
type => "std"
}
}
filter {
grok {
patterns_dir => [ "./patterns"]
match => { "message" => "%{SYSLOGBASE} %{POSTFIX_QUEUEID:queue_id}: %{GREEDYDATA:syslog_message}" }
}
}
output{stdout{codec=>rubydebug}}

輸入:
Jan 1 06:25:43 mailserver14 postfix/cleanup[21403]: BEF25A72965: message-id=<20130101142543.5828399CCAF@mailserver1

輸出:
{
「queue_id」 => 「BEF25A72965」,
「message」 => 「Jan 1 06:25:43 mailserver14 postfix/cleanup[21403]: BEF25A72965: message-id=<20130101142543.5828399CCAF@mailserver1」,
「pid」 => 「21403」,
「program」 => 「postfix/cleanup」,
「@version」 => 「1」,
「type」 => 「std」,
「logsource」 => 「mailserver14」,
「host」 => 「zzc-203」,
「timestamp」 => 「Jan 1 06:25:43」,
「syslog_message」 => 「message-id=<20130101142543.5828399CCAF@mailserver1」,
「@timestamp」 => 2019-03-19T05:31:37.405Z
}

GeoIP 地址查詢歸類
GeoIP 是最多見的免費 IP 地址歸類查詢庫,同時也有收費版能夠採購。GeoIP 庫能夠根據 IP 地址提供對應的地域信息,包括國別,省市,經緯度等,對於可視化地圖和區域統計很是有用。

配置:

 
input {
stdin {
type => "std"
}
}
filter {
geoip {
source => "message"
}
}
output{stdout{codec=>rubydebug}}

輸入:183.60.92.253
輸出:
{
「type」 => 「std」,
「@version」 => 「1」,
「@timestamp」 => 2019-03-19T05:39:26.714Z,
「host」 => 「zzc-203」,
「message」 => 「183.60.92.253」,
「geoip」 => {
「country_code3」 => 「CN」,
「latitude」 => 23.1167,
「region_code」 => 「44」,
「region_name」 => 「Guangdong」,
「location」 => {
「lon」 => 113.25,
「lat」 => 23.1167
},
「city_name」 => 「Guangzhou」,
「country_name」 => 「China」,
「continent_code」 => 「AS」,
「country_code2」 => 「CN」,
「timezone」 => 「Asia/Shanghai」,
「ip」 => 「183.60.92.253」,
「longitude」 => 113.25
}
}

3.4 output配置
標準輸出(Stdout)

保存成文件(File)
經過日誌收集系統將分散在數百臺服務器上的數據集中存儲在某中心服務器上,這是運維最原始的需求。Logstash 固然也能作到這點。

和 LogStash::Inputs::File 不一樣, LogStash::Outputs::File 裏可使用 sprintf format 格式來自動定義輸出到帶日期命名的路徑。

配置:

 
input {
stdin {
type => "std"
}
}
output {
file {
path => "../data_test/%{+yyyy}/%{+MM}/%{+dd}/%{host}.log"
codec => line { format => "custom format: %{message}"}
}
}

啓動後輸入,可看到文件

服務器間傳輸文件(File)

配置:
接收日誌服務器配置:

 
input {
tcp {
mode => "server"
port => 9600
ssl_enable => false
}
}
filter {
json {
source => "message"
}
}
output {
file {
path => "/usr/local/logstash-6.6.2/data_test/%{+YYYY-MM-dd}/%{servip}-%{filename}"
codec => line { format => "%{message}"}
}
}

發送日誌服務器配置:

 
input{
file {
path => [ "/usr/local/logstash-6.6.2/data_test/send.log"]
type => "ecolog"
start_position => "beginning"
}
}
filter {
if [ type] =~ /^ecolog/ {
ruby {
code => "file_name = event.get('path').split('/')[-1]
event.set('file_name',file_name)
event.set('servip','接收方ip')"
}
mutate {
rename => { "file_name" => "filename"}
}
}
}
output {
tcp {
host => "接收方ip"
port => 9600
codec => json_lines
}
}

從發送方發送message,接收方能夠看到寫出文件。

寫入到ES
配置:

 
input {
stdin {
type => "log2es"
}
}
output {
elasticsearch {
hosts => [ "192.168.109.133:9200"]
index => "logstash-%{type}-%{+YYYY.MM.dd}"
document_type => "%{type}"
sniffing => true
template_overwrite => true
}
}

在head插件中能夠看到數據。
sniffing : 尋找其餘es節點

實戰舉例:將錯誤日誌寫入es。
配置:

 
input {
file {
path => [ "/usr/local/logstash-6.6.2/data_test/run_error.log"]
type => "error"
start_position => "beginning"
}
}
output {
elasticsearch {
hosts => [ "192.168.109.133:9200"]
index => "logstash-%{type}-%{+YYYY.MM.dd}"
document_type => "%{type}"
sniffing => true
template_overwrite => true
}
}

4、Kibana

Kibana是一個開源的分析和可視化平臺,設計用於和Elasticsearch一塊兒工做。
你用Kibana來搜索,查看,並和存儲在Elasticsearch索引中的數據進行交互。
你能夠輕鬆地執行高級數據分析,而且以各類圖標、表格和地圖的形式可視化數據

Kibana使得理解大量數據變得很容易。它簡單的、基於瀏覽器的界面使你可以快速建立和共享動態儀表板,實時顯示Elasticsearch查詢的變化

安裝步驟:
解壓:tar -zxvf kibana-6.6.2-linux-x86_64.tar.gz
修改 kibana.yml 配置文件:
server.port: 5601
server.host: 「192.168.116.121」 ———-部署kinana服務器的ip
elasticsearch.hosts: [「http://192.168.116.121:9200「]
kibana.index: 「.kibana」

啓動kibana,報錯:
[error][status][plugin:remote_clusters@6.6.2] Status changed from red to red - X-Pack plugin is not installed on the [data] Elasticsearch cluster.

解決,卸載x-pack插件
elasticsearch-plugin remove x-pack
kibana-plugin remove x-pack

安裝好後啓動便可。頁面操做

相關文章
相關標籤/搜索