SpringBoot開發案例構建分佈式日誌處理系統

日誌處理

其實作這個Demo的目的是如何基於Elasticsearch構建網站日誌處理系統,經過數據同步工具等一些列開源組件來快速構建一個日誌處理系統,項目雛形初步成型中。css

日誌演示網址:http://es.52itstyle.comhtml

區域演示網址:http://es.52itstyle.com/area/indexvue

固然,項目功能會逐步增長,實現一個365°全方位的Demo案例。java

輸入圖片說明
23456.png

開發環境

JDK1.七、Maven、Eclipse、SpringBoot1.5.九、elasticsearch2.4.六、Dubbox2.8.四、zookeeper3.4.六、Redis、kafka、Vue、Iviewnode

版本介紹

spring-boot-starter-parent-1.5.9.RELEASE、spring-data-elasticsearch-2.1.9.RELEAS、elasticsearch-2.4.6(5.0+以上須要依賴JDK8)mysql

截止2018年1月22日,ElasticSearch目前最新的已到6.1.2,可是spring-boot的更新速度遠遠跟不上ElasticSearch更新的速度,目前spring-boot支持的最新版本是elasticsearch-2.4.6。jquery

參考:https://github.com/spring-projects/spring-data-elasticsearch/wiki/Spring-Data-Elasticsearch---Spring-Boot---version-matrixgit

輸入圖片說明
ESD.png

接入方式

使用spring-boot中的spring-data-elasticsearch,可使用兩種內置客戶端接入github

一、節點客戶端(node client): 配置文件中設置爲local:false,節點客戶端以無數據節點(node-master或node-client)身份加入集羣,換言之,它本身不存儲任何數據,可是它知道數據在集羣中的具體位置,而且可以直接轉發請求到對應的節點上。web

二、傳輸客戶端(Transport client): 配置文件中設置爲local:true,這個更輕量的傳輸客戶端可以發送請求到遠程集羣。它本身不加入集羣,只是簡單轉發請求給集羣中的節點。 兩個Java客戶端都經過9300端口與集羣交互,使用Elasticsearch傳輸協議(Elasticsearch Transport Protocol)。集羣中的節點之間也經過9300端口進行通訊。若是此端口未開放,你的節點將不能組成集羣。

服務說明

使用本地ElasticSearch服務(application-dev.properties)
spring.data.elasticsearch.cluster-name=elasticsearch
#默認就是本機,若是要使用遠程服務器,或者局域網服務器,那就須要在這裏配置ip:prot;能夠配置多個,以逗號分隔,至關於集羣。
#Java客戶端:經過9300端口與集羣進行交互
#其餘全部程序語言:均可以使用RESTful API,經過9200端口的與Elasticsearch進行通訊。
#spring.data.elasticsearch.cluster-nodes=192.168.1.180:9300
複製代碼
使用遠程ElasticSearch服務(application-dev.properties)
  • 須要自行安裝ElasticSearch,注意ElasticSearch版本儘可能要與JAR包一致。

  • 下載地址:https://www.elastic.co/downloads/past-releases/elasticsearch-2-4-6

  • 安裝說明:http://www.52itstyle.com/thread-20114-1-1.html

  • 新版本不建議使用root用戶啓動,須要自建ElasticSearch用戶,也可使用如下命令啓動 elasticsearch -Des.insecure.allow.root=true -d 或者在elasticsearch中加入ES_JAVA_OPTS="-Des.insecure.allow.root=true"。

項目結構

├─src
│  ├─main
│  │  ├─java
│  │  │  └─com
│  │  │      └─itstyle
│  │  │          └─es
│  │  │              │  Application.java
│  │  │              │  
│  │  │              ├─common
│  │  │              │  ├─constant
│  │  │              │  │      PageConstant.java
│  │  │              │  │      
│  │  │              │  └─interceptor
│  │  │              │          MyAdapter.java
│  │  │              │          
│  │  │              └─log
│  │  │                  ├─controller
│  │  │                  │      LogController.java
│  │  │                  │      
│  │  │                  ├─entity
│  │  │                  │      Pages.java
│  │  │                  │      SysLogs.java
│  │  │                  │      
│  │  │                  ├─repository
│  │  │                  │      ElasticLogRepository.java
│  │  │                  │      
│  │  │                  └─service
│  │  │                      │  LogService.java
│  │  │                      │  
│  │  │                      └─impl
│  │  │                              LogServiceImpl.java
│  │  │                              
│  │  ├─resources
│  │  │  │  application-dev.properties
│  │  │  │  application-prod.properties
│  │  │  │  application-test.properties
│  │  │  │  application.yml
│  │  │  │  spring-context-dubbo.xml
│  │  │  │  
│  │  │  ├─static
│  │  │  │  ├─iview
│  │  │  │  │  │  iview.css
│  │  │  │  │  │  iview.min.js
│  │  │  │  │  │  
│  │  │  │  │  └─fonts
│  │  │  │  │          ionicons.eot
│  │  │  │  │          ionicons.svg
│  │  │  │  │          ionicons.ttf
│  │  │  │  │          ionicons.woff
│  │  │  │  │          
│  │  │  │  ├─jquery
│  │  │  │  │      jquery-3.2.1.min.js
│  │  │  │  │      
│  │  │  │  └─vue
│  │  │  │          vue.min.js
│  │  │  │          
│  │  │  └─templates
│  │  │      └─log
│  │  │              index.html
│  │  │              
│  │  └─webapp
│  │      │  index.jsp
│  │      │  
│  │      └─WEB-INF
│  │              web.xml
│  │              
│  └─test
│      └─java
│          └─com
│              └─itstyle
│                  └─es
│                      └─test
│                              Logs.java
│                              


複製代碼

項目演示

演示網址:http://es.52itstyle.com

項目截圖

搜索頁面
ES_index.png

分頁查詢

使用ElasticsearchTemplate模板插入了20萬條數據,本地向外網服務器(1核1G),用時60s+,一分鐘左右的時間。雖然索引庫容量有增長,可是等了大約 10分鐘左右的時間才能搜索出來。

分頁查詢到10000+的時候系統報錯,Result window is too large,修改config下的elasticsearch.yml 追加如下代碼便可:

# 自行定義數量
index.max_result_window : '10000000'
複製代碼

參考:https://www.elastic.co/guide/en/elasticsearch/reference/current/index-modules.html

Java API

Elasticsearch爲Java用戶提供了兩種內置客戶端:

  • 節點客戶端(node client):

節點客戶端,顧名思義,其自己也是Elasticsearch集羣的一個組成部分。以無數據節點(none data node)身份加入集羣,換言之,它本身不存儲任何數據,可是它知道數據在集羣中的具體位置,而且可以直接轉發請求到對應的節點上。

  • 傳輸客戶端(Transport client):

這個更輕量的傳輸客戶端可以發送請求到遠程集羣。它本身不加入集羣,只是簡單轉發請求給集羣中的節點。兩個Java客戶端都經過9300端口與集羣交互,使用Elasticsearch傳輸協議(Elasticsearch Transport Protocol)。集羣中的節點之間也經過9300端口進行通訊。若是此端口未開放,你的節點將不能組成集羣。

安裝Elasticsearch-Head

elasticsearch-head是一個界面化的集羣操做和管理工具,能夠對集羣進行傻瓜式操做。你能夠經過插件把它集成到es(首選方式),也能夠安裝成一個獨立webapp。

es-head主要有三個方面的操做:

  • 顯示集羣的拓撲,而且可以執行索引和節點級別操做
  • 搜索接口可以查詢集羣中原始json或表格格式的檢索數據
  • 可以快速訪問並顯示集羣的狀態

插件安裝方式、參考:https://github.com/mobz/elasticsearch-head

  • for Elasticsearch 5.x: site plugins are not supported. Run as a standalone server
  • for Elasticsearch 2.x: sudo elasticsearch/bin/plugin install mobz/elasticsearch-head
  • for Elasticsearch 1.x: sudo elasticsearch/bin/plugin -install mobz/elasticsearch-head/1.x
  • for Elasticsearch 0.x: sudo elasticsearch/bin/plugin -install mobz/elasticsearch-head/0.9

安裝成功之後會在plugins目錄下出現一個head目錄,代表安裝已經成功。

瀏覽截圖:

Elasticsearch-Head
ES_head.png

x-pack監控

Elasticsearch、Logstash 隨着 Kibana 的命名升級直接從2.4跳躍到了5.0,5.x版本的 ELK 在版本對應上要求相對較高,再也不支持5.x和2.x的混搭,同時 Elastic 作了一個 package ,對本來的 marvel、watch、alert 作了一個封裝,造成了 x-pack 。

安裝:https://www.elastic.co/guide/en/elasticsearch/reference/6.1/installing-xpack-es.html

用戶管理

x-pack安裝以後有一個超級用戶elastic ,其默認的密碼是changeme,擁有對全部索引和數據的控制權,可使用該用戶建立和修改其餘用戶,固然這裏能夠經過kibana的web界面進行用戶和用戶組的管理。

修改elastic用戶的密碼:

curl -XPUT -u elastic 'localhost:9200/_xpack/security/user/elastic/_password' -d '{ "password" : "123456" }'
複製代碼

IK Analysis for Elasticsearch

下載安裝:

  • 方式一 - download pre-build package from here: https://github.com/medcl/elasticsearch-analysis-ik/releases unzip plugin to folder your-es-root/plugins/

  • 方式一二 - use elasticsearch-plugin to install ( version > v5.5.1 ): ./bin/elasticsearch-plugin install https://github.com/medcl/elasticsearch-analysis-ik/releases/download/v6.0.0/elasticsearch-analysis-ik-6.0.0.zip

因爲Elasticsearch版本是2.4.6,這裏選擇IK版本爲1.10.6

wget https://github.com/medcl/elasticsearch-analysis-ik/releases/download/v1.10.6/elasticsearch-analysis-ik-1.10.6.zip
複製代碼

下載解壓之後在 Elasticsearch 的config下的elasticsearch.yml文件中,添加以下代碼(2.0以上能夠不設置)。

index:  
      analysis:                     
        analyzer:        
          ik:  
              alias: [ik_analyzer]  
              type: org.elasticsearch.index.analysis.IkAnalyzerProvider  
          ik_max_word:  
              type: ik  
              use_smart: false  
          ik_smart:  
              type: ik  
              use_smart: true
複製代碼

或者

index.analysis.analyzer.ik.type : 「ik」
複製代碼

安裝前:

http://192.168.1.180:9200/_analyze?analyzer=standard&pretty=true&text=我愛你中國
{
  "tokens" : [ {
    "token" : "我",
    "start_offset" : 0,
    "end_offset" : 1,
    "type" : "<IDEOGRAPHIC>",
    "position" : 0
  }, {
    "token" : "愛",
    "start_offset" : 1,
    "end_offset" : 2,
    "type" : "<IDEOGRAPHIC>",
    "position" : 1
  }, {
    "token" : "你",
    "start_offset" : 2,
    "end_offset" : 3,
    "type" : "<IDEOGRAPHIC>",
    "position" : 2
  }, {
    "token" : "中",
    "start_offset" : 3,
    "end_offset" : 4,
    "type" : "<IDEOGRAPHIC>",
    "position" : 3
  }, {
    "token" : "國",
    "start_offset" : 4,
    "end_offset" : 5,
    "type" : "<IDEOGRAPHIC>",
    "position" : 4
  } ]
}
複製代碼

安裝後:

http://121.42.155.213:9200/_analyze?analyzer=ik&pretty=true&text=我愛你中國
{
  "tokens" : [ {
    "token" : "我愛你",
    "start_offset" : 0,
    "end_offset" : 3,
    "type" : "CN_WORD",
    "position" : 0
  }, {
    "token" : "愛你",
    "start_offset" : 1,
    "end_offset" : 3,
    "type" : "CN_WORD",
    "position" : 1
  }, {
    "token" : "中國",
    "start_offset" : 3,
    "end_offset" : 5,
    "type" : "CN_WORD",
    "position" : 2
  } ]
}

複製代碼

數據同步

使用第三方工具類庫elasticsearch-jdbc實現MySql到elasticsearch的同步。

同步架構圖
elasticsearch-jdbc.png

運行環境:

centos7.五、JDK八、elasticsearch-jdbc-2.3.2.0

安裝步驟:

  • 這裏是列表文本第一步:下載(可能很卡、請耐心等待) wget http://xbib.org/repository/org/xbib/elasticsearch/importer/elasticsearch-jdbc/2.3.2.0/elasticsearch-jdbc-2.3.2.0-dist.zip
  • 這裏是列表文本第二步:解壓 unzip elasticsearch-jdbc-2.3.2.0-dist.zip
  • 這裏是列表文本第三步:配置腳本mysql_import_es.sh
#!/bin/sh
# elasticsearch-jdbc 安裝路徑
bin=/home/elasticsearch-jdbc-2.3.2.0/bin
lib=/home/elasticsearch-jdbc-2.3.2.0/lib
echo '{ "type" : "jdbc", "jdbc": { # 若是數據庫中存在Json文件 這裏設置成false,不然會同步出錯 "detect_json":false, "url":"jdbc:mysql://127.0.0.1:3306/itstyle_log??useUnicode=true&characterEncoding=utf-8&useSSL=false&allowMultiQueries=true", "user":"root", "password":"root", # 若是想自動生成_id,去掉第一個獲取字段便可;若是想Id做爲主鍵,把id設置爲_id便可 "sql":"SELECT id AS _id,id,user_id AS userId ,username,operation,time,method,params,ip,device_type AS deviceType,log_type AS logType,exception_detail AS exceptionDetail, gmt_create AS gmtCreate,plat_from AS platFrom FROM sys_log", "elasticsearch" : { "host" : "127.0.0.1",#elasticsearch服務地址 "port" : "9300" #遠程elasticsearch服務 此端口必定要開放 }, "index" : "elasticsearch",# 索引名至關於庫 "type" : "sysLog" # 類型名至關於表 } }' | java \
       -cp "${lib}/*" \
       -Dlog4j.configurationFile=${bin}/log4j2.xml \
       org.xbib.tools.Runner \
       org.xbib.tools.JDBCImporter

複製代碼
  • 這裏是列表文本第四部:受權並執行
chmod +x mysql_import_es.sh
./mysql_import_es.sh
複製代碼

ElasticSearchRepository和ElasticSearchTemplate

Spring-data-elasticsearch是Spring提供的操做ElasticSearch的數據層,封裝了大量的基礎操做,經過它能夠很方便的操做ElasticSearch的數據。

ElasticSearchRepository的基本使用

/**
 * @param <T>
 * @param <ID>
 * @author Rizwan Idrees
 * @author Mohsin Husen
 */
@NoRepositoryBean
public interface ElasticsearchRepository<T, ID extends Serializable> extends ElasticsearchCrudRepository<T, ID> {

	<S extends T> S index(S entity);

	Iterable<T> search(QueryBuilder query);

	Page<T> search(QueryBuilder query, Pageable pageable);

	Page<T> search(SearchQuery searchQuery);

	Page<T> searchSimilar(T entity, String[] fields, Pageable pageable);

	void refresh();

	Class<T> getEntityClass();
}
複製代碼

ElasticsearchRepository裏面有幾個特殊的search方法,這些是ES特有的,和普通的JPA區別的地方,用來構建一些ES查詢的。 主要是看QueryBuilder和SearchQuery兩個參數,要完成一些特殊查詢就主要看構建這兩個參數。

輸入圖片說明
1.png

通常狀況下,咱們不是直接是new NativeSearchQuery,而是使用NativeSearchQueryBuilder。 經過NativeSearchQueryBuilder.withQuery(QueryBuilder1).withFilter(QueryBuilder2).withSort(SortBuilder1).withXXXX().build();這樣的方式來完成NativeSearchQuery的構建。

輸入圖片說明
2.png
輸入圖片說明
3.png

ElasticSearchTemplate的使用

ElasticSearchTemplate更可能是對ESRepository的補充,裏面提供了一些更底層的方法。

這裏咱們主要實現快讀批量插入的功能,插入20萬條數據,本地向外網服務器(1核1G),用時60s+,一分鐘左右的時間。雖然索引庫容量有增長,可是等了大約10分鐘左右的時間才能搜索出來。

//批量同步或者插入數據
public void bulkIndex(List<SysLogs> logList) {  
	long start = System.currentTimeMillis();
    int counter = 0;  
    try {  
        List<IndexQuery> queries = new ArrayList<>();  
        for (SysLogs log : logList) {  
            IndexQuery indexQuery = new IndexQuery();  
            indexQuery.setId(log.getId()+ "");  
            indexQuery.setObject(log);  
            indexQuery.setIndexName("elasticsearch");  
            indexQuery.setType("sysLog");  
            //也可使用IndexQueryBuilder來構建  
            //IndexQuery index = new IndexQueryBuilder().withId(person.getId() + "").withObject(person).build();  
            queries.add(indexQuery);  
            if (counter % 1000 == 0) {  
            	elasticSearchTemplate.bulkIndex(queries);  
                queries.clear();  
                System.out.println("bulkIndex counter : " + counter);  
            }  
            counter++;  
        }  
        if (queries.size() > 0) {  
        	elasticSearchTemplate.bulkIndex(queries);  
        }
        long end = System.currentTimeMillis();
        System.out.println("bulkIndex completed use time:"+ (end-start));  
        
    } catch (Exception e) {  
        System.out.println("IndexerService.bulkIndex e;" + e.getMessage());  
        throw e;  
    }  
} 
複製代碼

Redis日誌隊列

見包:com.itstyle.es.common.redis

監聽配置 RedisListener:

@Component
public class RedisListener {
    private static final Logger LOGGER = LoggerFactory.getLogger(RedisListener.class);
	@Bean
	RedisMessageListenerContainer container(
			RedisConnectionFactory connectionFactory,
			MessageListenerAdapter listenerAdapter) {
		LOGGER.info("啓動監聽"); 
		RedisMessageListenerContainer container = new RedisMessageListenerContainer();
		container.setConnectionFactory(connectionFactory);
		container.addMessageListener(listenerAdapter, new PatternTopic("itstyle_log"));
		return container;
	}

	@Bean
	MessageListenerAdapter listenerAdapter(Receiver receiver) {
		return new MessageListenerAdapter(receiver, "receiveMessage");
	}

	@Bean
	Receiver receiver(CountDownLatch latch) {
		return new Receiver(latch);
	}

	@Bean
	CountDownLatch latch() {
		return new CountDownLatch(1);
	}

	@Bean
	StringRedisTemplate template(RedisConnectionFactory connectionFactory) {
		return new StringRedisTemplate(connectionFactory);
	}
}
複製代碼

日誌接收Receiver:

public class Receiver {
    private static final Logger LOGGER = LoggerFactory.getLogger(Receiver.class);
    @Autowired
	private  ElasticLogRepository elasticLogRepository;
    private CountDownLatch latch;

    @Autowired
    public Receiver(CountDownLatch latch) {
        this.latch = latch;
    }

    public void receiveMessage(String message) {
        LOGGER.info("接收log消息 <{}>",message);
        if(message == null){
            LOGGER.info("接收log消息 <" + null + ">");
        }else {
        	ObjectMapper mapper = new ObjectMapper();  
			try {
				SysLogs log = mapper.readValue(message, SysLogs.class);
				elasticLogRepository.save(log);
				LOGGER.info("接收log消息內容 <{}>",log.getOperation());
			} catch (JsonParseException e) {
				e.printStackTrace();
			} catch (JsonMappingException e) {
				e.printStackTrace();
			} catch (IOException e) {
				e.printStackTrace();
			} catch (Exception e) {
				e.printStackTrace();
			}
        }
        latch.countDown();
    }
}
複製代碼

測試 LogController:http://lip:port/redisLog

Kafka日誌隊列

見包: com.itstyle.es.common.kafka

碼雲下載:SpringBoot開發案例構建分佈式日誌處理系統

相關文章
相關標籤/搜索