需求:動態添加詞彙java
先設計架構吧,這裏用了單機Redis.node
計劃把詞彙放在Redis裏,而後ES裏利用redis的pub/sub功能獲取詞彙。redis
~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~算法
下面開始安裝Redis,express
吐個槽:Redis的版本更新還蠻快,我這1.0.2版本還不知道何時可以升級到最新版。。。apache
~~~~~~~服務器
要想修改代碼,須要對Lucene的原理有個大概的瞭解,知道分詞器是何時以怎樣的方式介入到索引過程當中的。架構
爲了描述方便,這裏拿Lucene 4.0.0的源碼做爲例子,類---org.apache.lucene.index.DocumentWriter.app
文件的140行有less
// Tokenize field and add to postingTable TokenStream stream = analyzer.tokenStream(fieldName, reader); try { for (Token t = stream.next(); t != null; t = stream.next()) { position += (t.getPositionIncrement() - 1); addPosition(fieldName, t.termText(), position++); if (++length > maxFieldLength) break; } } finally { stream.close(); }
那麼我們的IK分詞器的Analyzer是哪一個類呢?
看config/elasticsearch.yml配置來講,
index: analysis: analyzer: ik: alias: [ik_analyzer] type: org.elasticsearch.index.analysis.IkAnalyzerProvider
而後我找到org.elasticsearch.index.analysis.IkAnalyzerProvider這個類的源代碼
package org.elasticsearch.index.analysis; import org.elasticsearch.common.inject.Inject; import org.elasticsearch.common.inject.assistedinject.Assisted; import org.elasticsearch.common.settings.Settings; import org.elasticsearch.env.Environment; import org.elasticsearch.index.Index; import org.elasticsearch.index.settings.IndexSettings; import org.wltea.analyzer.cfg.Configuration; import org.wltea.analyzer.dic.Dictionary; import org.wltea.analyzer.lucene.IKAnalyzer; public class IkAnalyzerProvider extends AbstractIndexAnalyzerProvider<IKAnalyzer> { private final IKAnalyzer analyzer; @Inject public IkAnalyzerProvider(Index index, @IndexSettings Settings indexSettings, Environment env, @Assisted String name, @Assisted Settings settings) { super(index, indexSettings, name, settings); Dictionary.initial(new Configuration(env)); analyzer=new IKAnalyzer(indexSettings, settings, env); } @Override public IKAnalyzer get() { return this.analyzer; } }
因此我認爲真正的分詞類是:org.wltea.analyzer.lucene.IKAnalyzer
接下來去看這個類的源碼,看看有什麼發現!
/** * IK 中文分詞 版本 5.0.1 * IK Analyzer release 5.0.1 * * Licensed to the Apache Software Foundation (ASF) under one or more * contributor license agreements. See the NOTICE file distributed with * this work for additional information regarding copyright ownership. * The ASF licenses this file to You under the Apache License, Version 2.0 * (the "License"); you may not use this file except in compliance with * the License. You may obtain a copy of the License at * * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. * See the License for the specific language governing permissions and * limitations under the License. * * 源代碼由林良益(linliangyi2005@gmail.com)提供 * 版權聲明 2012,烏龍茶工做室 * provided by Linliangyi and copyright 2012 by Oolong studio * */ package org.wltea.analyzer.lucene; import java.io.Reader; import org.apache.lucene.analysis.Analyzer; import org.apache.lucene.analysis.Tokenizer; import org.elasticsearch.common.settings.Settings; import org.elasticsearch.env.Environment; /** * IK分詞器,Lucene Analyzer接口實現 * 兼容Lucene 4.0版本 */ public final class IKAnalyzer extends Analyzer{ private boolean useSmart; public boolean useSmart() { return useSmart; } public void setUseSmart(boolean useSmart) { this.useSmart = useSmart; } /** * IK分詞器Lucene Analyzer接口實現類 * * 默認細粒度切分算法 */ public IKAnalyzer(){ this(false); } /** * IK分詞器Lucene Analyzer接口實現類 * * @param useSmart 當爲true時,分詞器進行智能切分 */ public IKAnalyzer(boolean useSmart){ super(); this.useSmart = useSmart; } Settings settings; Environment environment; public IKAnalyzer(Settings indexSetting,Settings settings, Environment environment) { super(); this.settings=settings; this.environment= environment; } /** * 重載Analyzer接口,構造分詞組件 */ @Override protected TokenStreamComponents createComponents(String fieldName, final Reader in) { Tokenizer _IKTokenizer = new IKTokenizer(in , settings, environment); return new TokenStreamComponents(_IKTokenizer); } }
能夠看到這個類直接繼承了org.apache.lucene.analysis.Analyzer
尋找tokenStream方法沒有找到,正在疑惑之際,看到了這個:
/** * 重載Analyzer接口,構造分詞組件 */ @Override protected TokenStreamComponents createComponents(String fieldName, final Reader in) { Tokenizer _IKTokenizer = new IKTokenizer(in , settings, environment); return new TokenStreamComponents(_IKTokenizer); }
看來Lucene1.4.3的版本和Lucene後續版本的接口已經有不少不一樣,可是原理確定同樣。
打開Lucene4.0.0版本的Analyser類一看,果真有一個方法:
protected abstract TokenStreamComponents createComponents(String fieldName, Reader reader);
~~~~~~~~~~
也就是說,本質在於:
有多是org.wltea.analyzer.lucene.IKAnalyzer.createComponents().getTokenStream()
而根據代碼
@Override protected TokenStreamComponents createComponents(String fieldName, final Reader in) { Tokenizer _IKTokenizer = new IKTokenizer(in , settings, environment); return new TokenStreamComponents(_IKTokenizer); }
知道關鍵在於IKTokenizer類。
~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
IKTokenizer的源代碼以下:
/** * IK 中文分詞 版本 5.0.1 * IK Analyzer release 5.0.1 * * Licensed to the Apache Software Foundation (ASF) under one or more * contributor license agreements. See the NOTICE file distributed with * this work for additional information regarding copyright ownership. * The ASF licenses this file to You under the Apache License, Version 2.0 * (the "License"); you may not use this file except in compliance with * the License. You may obtain a copy of the License at * * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. * See the License for the specific language governing permissions and * limitations under the License. * * 源代碼由林良益(linliangyi2005@gmail.com)提供 * 版權聲明 2012,烏龍茶工做室 * provided by Linliangyi and copyright 2012 by Oolong studio * * */ package org.wltea.analyzer.lucene; import org.apache.lucene.analysis.Tokenizer; import org.apache.lucene.analysis.tokenattributes.CharTermAttribute; import org.apache.lucene.analysis.tokenattributes.OffsetAttribute; import org.apache.lucene.analysis.tokenattributes.TypeAttribute; import org.elasticsearch.common.settings.Settings; import org.elasticsearch.env.Environment; import org.wltea.analyzer.core.IKSegmenter; import org.wltea.analyzer.core.Lexeme; import java.io.IOException; import java.io.Reader; /** * IK分詞器 Lucene Tokenizer適配器類 * 兼容Lucene 4.0版本 */ public final class IKTokenizer extends Tokenizer { //IK分詞器實現 private IKSegmenter _IKImplement; //詞元文本屬性 private final CharTermAttribute termAtt; //詞元位移屬性 private final OffsetAttribute offsetAtt; //詞元分類屬性(該屬性分類參考org.wltea.analyzer.core.Lexeme中的分類常量) private final TypeAttribute typeAtt; //記錄最後一個詞元的結束位置 private int endPosition; /** * Lucene 4.0 Tokenizer適配器類構造函數 * @param in */ public IKTokenizer(Reader in , Settings settings, Environment environment){ super(in); offsetAtt = addAttribute(OffsetAttribute.class); termAtt = addAttribute(CharTermAttribute.class); typeAtt = addAttribute(TypeAttribute.class); _IKImplement = new IKSegmenter(input , settings, environment); } /* (non-Javadoc) * @see org.apache.lucene.analysis.TokenStream#incrementToken() */ @Override public boolean incrementToken() throws IOException { //清除全部的詞元屬性 clearAttributes(); Lexeme nextLexeme = _IKImplement.next(); if(nextLexeme != null){ //將Lexeme轉成Attributes //設置詞元文本 termAtt.append(nextLexeme.getLexemeText().toLowerCase()); //設置詞元長度 termAtt.setLength(nextLexeme.getLength()); //設置詞元位移 offsetAtt.setOffset(nextLexeme.getBeginPosition(), nextLexeme.getEndPosition()); //記錄分詞的最後位置 endPosition = nextLexeme.getEndPosition(); //記錄詞元分類 typeAtt.setType(nextLexeme.getLexemeTypeString()); //返會true告知還有下個詞元 return true; } //返會false告知詞元輸出完畢 return false; } /* * (non-Javadoc) * @see org.apache.lucene.analysis.Tokenizer#reset(java.io.Reader) */ @Override public void reset() throws IOException { super.reset(); _IKImplement.reset(input); } @Override public final void end() { // set final offset int finalOffset = correctOffset(this.endPosition); offsetAtt.setOffset(finalOffset, finalOffset); } }
代碼中有這麼一句:
_IKImplement = new IKSegmenter(input , settings, environment);
再來看看IKSegmenter的類源代碼:
/** * 分詞,獲取下一個詞元 * @return Lexeme 詞元對象 * @throws java.io.IOException */ public synchronized Lexeme next()throws IOException{ Lexeme l = null; while((l = context.getNextLexeme()) == null ){ /* * 從reader中讀取數據,填充buffer * 若是reader是分次讀入buffer的,那麼buffer要 進行移位處理 * 移位處理上次讀入的但未處理的數據 */ int available = context.fillBuffer(this.input); if(available <= 0){ //reader已經讀完 context.reset(); return null; }else{ //初始化指針 context.initCursor(); do{ //遍歷子分詞器 for(ISegmenter segmenter : segmenters){ segmenter.analyze(context); } //字符緩衝區接近讀完,須要讀入新的字符 if(context.needRefillBuffer()){ break; } //向前移動指針 }while(context.moveCursor()); //重置子分詞器,爲下輪循環進行初始化 for(ISegmenter segmenter : segmenters){ segmenter.reset(); } } //對分詞進行歧義處理 this.arbitrator.process(context, useSmart); //將分詞結果輸出到結果集,並處理未切分的單個CJK字符 context.outputToResult(); //記錄本次分詞的緩衝區位移 context.markBufferOffset(); } return l; }
終於找到了久違的next()函數,一切奧義盡在這個函數裏。
~~~~~~~~
每一個詞元都是經過while((l = context.getNextLexeme()) == null ){...}
來完成的,繼續看context.getNextLexeme()函數。
/** * 返回lexeme * * 同時處理合並 * @return */ Lexeme getNextLexeme(){ //從結果集取出,並移除第一個Lexme Lexeme result = this.results.pollFirst(); while(result != null){ //數量詞合併 this.compound(result); if(Dictionary.getSingleton().isStopWord(this.segmentBuff , result.getBegin() , result.getLength())){ //是中止詞繼續取列表的下一個 result = this.results.pollFirst(); }else{ //不是中止詞, 生成lexeme的詞元文本,輸出 result.setLexemeText(String.valueOf(segmentBuff , result.getBegin() , result.getLength())); break; } } return result; }
剩下的就是看
Dictionary.getSingleton()
看來使用了全局single instance模式。
那麼詞典是何時加載的呢?
/** * 詞典初始化 * 因爲IK Analyzer的詞典採用Dictionary類的靜態方法進行詞典初始化 * 只有當Dictionary類被實際調用時,纔會開始載入詞典, * 這將延長首次分詞操做的時間 * 該方法提供了一個在應用加載階段就初始化字典的手段 * @return Dictionary */ public static Dictionary initial(Configuration cfg){ if(singleton == null){ synchronized(Dictionary.class){ if(singleton == null){ singleton = new Dictionary(); singleton.configuration=cfg; singleton.loadMainDict(); singleton.loadSurnameDict(); singleton.loadQuantifierDict(); singleton.loadSuffixDict(); singleton.loadPrepDict(); singleton.loadStopWordDict(); return singleton; } } } return singleton; }
也就是調用了Dictionary.initial()函數來加載詞典。
那這個函數又是何時執行的呢?
~~~~~~~~
有好幾個地方執行,固然每次執行都會判斷以前是否執行過,這個經過
if(singleton == null){ synchronized(Dictionary.class){ if(singleton == null){
能夠看出來。
~~~~~~~~~~~~~~~~
我猜想第1次應該是:
@Inject public IkAnalyzerProvider(Index index, @IndexSettings Settings indexSettings, Environment env, @Assisted String name, @Assisted Settings settings) { super(index, indexSettings, name, settings); Dictionary.initial(new Configuration(env)); analyzer=new IKAnalyzer(indexSettings, settings, env); }
下面回到任務:經過redis來發布詞彙和停詞。
思路:
1)修改es下面的elasticsearch-analysis-ik-1.2.6.jar,解壓縮,將Dictionary.class反編譯成java文件,添加以下2個函數
public void addMainWords(Collection<String> words) { if (words != null) { for (String word : words) { if (word != null) { singleton._MainDict.fillSegment(word.trim().toLowerCase().toCharArray()); } } } } public void addStopWords(Collection<String> words) { if (words != null) { for (String word : words) { if (word != null) { singleton._StopWords.fillSegment(word.trim().toLowerCase().toCharArray()); } } } }
再打包回elasticsearch-analysis-ik-1.2.6.jar放到服務器裏。
能夠正常運行,說明這部分代碼沒有問題。得到了全局的詞典集合句柄。
如何從Redis裏獲取數據呢?
一個辦法是寫一個線程
內容以下:
package org.wltea.analyzer.dic; import java.io.BufferedReader; import java.io.File; import java.io.FileInputStream; import java.io.FileNotFoundException; import java.io.IOException; import java.io.InputStream; import java.io.InputStreamReader; import java.util.Collection; import java.util.Iterator; import java.util.List; import java.util.ArrayList; import java.util.Collection; import java.util.HashSet; import java.util.Iterator; import java.util.Set; import java.util.Date; import java.lang.Thread; import redis.clients.jedis.Jedis; import org.elasticsearch.common.logging.ESLogger; import org.elasticsearch.common.logging.Loggers; import org.wltea.analyzer.dic.Dictionary; import redis.clients.jedis.HostAndPort; import redis.clients.jedis.JedisCluster; public class DictThread extends Thread { private static DictThread dt=null; public static DictThread getInstance(){ DictThread result = null; synchronized(DictThread.class){ if(null==dt){ dt = new DictThread(); result = dt; } } return result; } private DictThread() { this.logger = Loggers.getLogger("redis-thread"); this.init(); } private Jedis jc=null; private ESLogger logger = null; public void run() { while (true) { this.logger.info("[Redis Thread]" + "*****cycle of redis"); init(); pull(); sleep(); } } public void pull() { if (null == jc) return; // 從集合里拉取 ArrayList<String> words = new ArrayList<String>(); Set set = null; // 1拉取主詞 set = jc.smembers("ik_main"); Iterator t = set.iterator(); while (t.hasNext()) { Object obj = t.next(); words.add(obj.toString()); } Dictionary.getSingleton().addMainWords(words); words.clear(); // 2拉取停詞 set = jc.smembers("ik_stop"); t = set.iterator(); while (t.hasNext()) { Object obj = t.next(); words.add(obj.toString()); } Dictionary.getSingleton().addStopWords(words); words.clear(); } private void init() { if (null == jc) { ////Set<HostAndPort> jedisClusterNodes = new HashSet<HostAndPort>(); // Jedis Cluster will attempt to discover cluster nodes // automatically //jedisClusterNodes.add(new HostAndPort("192.168.56.200", 6379)); //jc = new JedisCluster(jedisClusterNodes); jc = new Jedis("192.168.56.200",6379); } } private void sleep() { // sleep 5 seconds,then loop to next try { Thread.sleep(5 * 1000); } catch (InterruptedException e) { // TODO Auto-generated catch block e.printStackTrace(); } } }
下面的問題就是啓動這個線程!
package org.elasticsearch.index.analysis; import org.elasticsearch.index.analysis.IkAnalyzerProvider; import org.elasticsearch.common.inject.Inject; import org.elasticsearch.common.inject.assistedinject.Assisted; import org.elasticsearch.common.settings.Settings; import org.elasticsearch.env.Environment; import org.elasticsearch.index.Index; import org.elasticsearch.index.settings.IndexSettings; import org.wltea.analyzer.cfg.Configuration; import org.wltea.analyzer.dic.Dictionary; import org.wltea.analyzer.lucene.IKAnalyzer; import org.elasticsearch.common.logging.ESLogger; import org.elasticsearch.common.logging.Loggers; import org.wltea.analyzer.dic.DictThread; public class SpecificIkAnalyzerProvider extends IkAnalyzerProvider { private ESLogger logger = null; @Inject public SpecificIkAnalyzerProvider(Index index, @IndexSettings Settings indexSettings, Environment env, @Assisted String name, @Assisted Settings settings) { super(index, indexSettings, env, name, settings); //here, let us start our pull thread... :) this.logger = Loggers.getLogger("SpecificIkAnalyzerProvider"); this.logger.info("[SpecificIkAnalyzerProvider] system start,begin to start pull redis thread..."); //new DictThread().start(); DictThread dt = DictThread.getInstance(); if(null!=dt){ dt.start(); } } }
系統成功運行,還能夠,比較簡單。
上面的代碼在獲取redis那部分其實還有一部分工做要優化,之後有時間再弄吧。
而後修改配置文件,修改分詞器爲:
index: analysis: analyzer: ik: alias: [ik_analyzer] type: org.elasticsearch.index.analysis.IkAnalyzerProvider
修改成:
index: analysis: analyzer: ik: alias: [ik_analyzer] type: org.elasticsearch.index.analysis.SpecificIkAnalyzerProvider
PS:吐槽,這網上下的jar反編譯工具把我坑慘了,浪費了不少時間!