個人架構演化筆記 9:ElasticSearch的分詞器IK Analyzer動態添加分詞

需求:動態添加詞彙java

先設計架構吧,這裏用了單機Redis.node

 

計劃把詞彙放在Redis裏,而後ES裏利用redis的pub/sub功能獲取詞彙。redis

~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~算法

 下面開始安裝Redis,express

吐個槽:Redis的版本更新還蠻快,我這1.0.2版本還不知道何時可以升級到最新版。。。apache

~~~~~~~服務器

要想修改代碼,須要對Lucene的原理有個大概的瞭解,知道分詞器是何時以怎樣的方式介入到索引過程當中的。架構

爲了描述方便,這裏拿Lucene 4.0.0的源碼做爲例子,類---org.apache.lucene.index.DocumentWriter.app

文件的140行有less

// Tokenize field and add to postingTable
          TokenStream stream = analyzer.tokenStream(fieldName, reader);
          try {
            for (Token t = stream.next(); t != null; t = stream.next()) {
              position += (t.getPositionIncrement() - 1);
              addPosition(fieldName, t.termText(), position++);
              if (++length > maxFieldLength) break;
            }
          } finally {
            stream.close();
          }

 

那麼我們的IK分詞器的Analyzer是哪一個類呢?

看config/elasticsearch.yml配置來講,

index:
  analysis:
    analyzer:
      ik:
          alias: [ik_analyzer]
          type: org.elasticsearch.index.analysis.IkAnalyzerProvider

 

而後我找到org.elasticsearch.index.analysis.IkAnalyzerProvider這個類的源代碼 

package org.elasticsearch.index.analysis;

import org.elasticsearch.common.inject.Inject;
import org.elasticsearch.common.inject.assistedinject.Assisted;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.env.Environment;
import org.elasticsearch.index.Index;
import org.elasticsearch.index.settings.IndexSettings;
import org.wltea.analyzer.cfg.Configuration;
import org.wltea.analyzer.dic.Dictionary;
import org.wltea.analyzer.lucene.IKAnalyzer;

public class IkAnalyzerProvider extends AbstractIndexAnalyzerProvider<IKAnalyzer> {
    private final IKAnalyzer analyzer;

    @Inject
    public IkAnalyzerProvider(Index index, @IndexSettings Settings indexSettings, Environment env, @Assisted String name, @Assisted Settings settings) {
        super(index, indexSettings, name, settings);
        Dictionary.initial(new Configuration(env));
        analyzer=new IKAnalyzer(indexSettings, settings, env);
    }

    @Override public IKAnalyzer get() {
        return this.analyzer;
    }
}

 因此我認爲真正的分詞類是:org.wltea.analyzer.lucene.IKAnalyzer

 接下來去看這個類的源碼,看看有什麼發現!

/**
 * IK 中文分詞  版本 5.0.1
 * IK Analyzer release 5.0.1
 * 
 * Licensed to the Apache Software Foundation (ASF) under one or more
 * contributor license agreements.  See the NOTICE file distributed with
 * this work for additional information regarding copyright ownership.
 * The ASF licenses this file to You under the Apache License, Version 2.0
 * (the "License"); you may not use this file except in compliance with
 * the License.  You may obtain a copy of the License at
 *
 *     http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 *
 * 源代碼由林良益(linliangyi2005@gmail.com)提供
 * 版權聲明 2012,烏龍茶工做室
 * provided by Linliangyi and copyright 2012 by Oolong studio
 * 
 */
package org.wltea.analyzer.lucene;

import java.io.Reader;

import org.apache.lucene.analysis.Analyzer;
import org.apache.lucene.analysis.Tokenizer;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.env.Environment;

/**
 * IK分詞器,Lucene Analyzer接口實現
 * 兼容Lucene 4.0版本
 */
public final class IKAnalyzer extends Analyzer{
	
	private boolean useSmart;
	
	public boolean useSmart() {
		return useSmart;
	}

	public void setUseSmart(boolean useSmart) {
		this.useSmart = useSmart;
	}

	/**
	 * IK分詞器Lucene  Analyzer接口實現類
	 * 
	 * 默認細粒度切分算法
	 */
	public IKAnalyzer(){
		this(false);
	}
	
	/**
	 * IK分詞器Lucene Analyzer接口實現類
	 * 
	 * @param useSmart 當爲true時,分詞器進行智能切分
	 */
	public IKAnalyzer(boolean useSmart){
		super();
		this.useSmart = useSmart;
	}

    Settings settings;
    Environment environment;

    public IKAnalyzer(Settings indexSetting,Settings settings, Environment environment) {
        super();
        this.settings=settings;
        this.environment= environment;
    }

	/**
	 * 重載Analyzer接口,構造分詞組件
	 */
	@Override
	protected TokenStreamComponents createComponents(String fieldName, final Reader in) {
		Tokenizer _IKTokenizer = new IKTokenizer(in , settings, environment);
		return new TokenStreamComponents(_IKTokenizer);
	}

}

能夠看到這個類直接繼承了org.apache.lucene.analysis.Analyzer

尋找tokenStream方法沒有找到,正在疑惑之際,看到了這個:

/**
	 * 重載Analyzer接口,構造分詞組件
	 */
	@Override
	protected TokenStreamComponents createComponents(String fieldName, final Reader in) {
		Tokenizer _IKTokenizer = new IKTokenizer(in , settings, environment);
		return new TokenStreamComponents(_IKTokenizer);
	}

看來Lucene1.4.3的版本和Lucene後續版本的接口已經有不少不一樣,可是原理確定同樣。

打開Lucene4.0.0版本的Analyser類一看,果真有一個方法:

protected abstract TokenStreamComponents createComponents(String fieldName,
      Reader reader);

~~~~~~~~~~

也就是說,本質在於:

有多是org.wltea.analyzer.lucene.IKAnalyzer.createComponents().getTokenStream()

而根據代碼

@Override
	protected TokenStreamComponents createComponents(String fieldName, final Reader in) {
		Tokenizer _IKTokenizer = new IKTokenizer(in , settings, environment);
		return new TokenStreamComponents(_IKTokenizer);
	}

知道關鍵在於IKTokenizer類。

~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~

IKTokenizer的源代碼以下:

/**
 * IK 中文分詞  版本 5.0.1
 * IK Analyzer release 5.0.1
 * 
 * Licensed to the Apache Software Foundation (ASF) under one or more
 * contributor license agreements.  See the NOTICE file distributed with
 * this work for additional information regarding copyright ownership.
 * The ASF licenses this file to You under the Apache License, Version 2.0
 * (the "License"); you may not use this file except in compliance with
 * the License.  You may obtain a copy of the License at
 *
 *     http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 *
 * 源代碼由林良益(linliangyi2005@gmail.com)提供
 * 版權聲明 2012,烏龍茶工做室
 * provided by Linliangyi and copyright 2012 by Oolong studio
 * 

 * 
 */
package org.wltea.analyzer.lucene;

import org.apache.lucene.analysis.Tokenizer;
import org.apache.lucene.analysis.tokenattributes.CharTermAttribute;
import org.apache.lucene.analysis.tokenattributes.OffsetAttribute;
import org.apache.lucene.analysis.tokenattributes.TypeAttribute;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.env.Environment;
import org.wltea.analyzer.core.IKSegmenter;
import org.wltea.analyzer.core.Lexeme;

import java.io.IOException;
import java.io.Reader;

/**
 * IK分詞器 Lucene Tokenizer適配器類
 * 兼容Lucene 4.0版本
 */
public final class IKTokenizer extends Tokenizer {
	
	//IK分詞器實現
	private IKSegmenter _IKImplement;
	
	//詞元文本屬性
	private final CharTermAttribute termAtt;
	//詞元位移屬性
	private final OffsetAttribute offsetAtt;
	//詞元分類屬性(該屬性分類參考org.wltea.analyzer.core.Lexeme中的分類常量)
	private final TypeAttribute typeAtt;
	//記錄最後一個詞元的結束位置
	private int endPosition;
	
	/**
	 * Lucene 4.0 Tokenizer適配器類構造函數
	 * @param in
     */
	public IKTokenizer(Reader in , Settings settings, Environment environment){
	    super(in);
	    offsetAtt = addAttribute(OffsetAttribute.class);
	    termAtt = addAttribute(CharTermAttribute.class);
	    typeAtt = addAttribute(TypeAttribute.class);

		_IKImplement = new IKSegmenter(input , settings, environment);
	}

	/* (non-Javadoc)
	 * @see org.apache.lucene.analysis.TokenStream#incrementToken()
	 */
	@Override
	public boolean incrementToken() throws IOException {
		//清除全部的詞元屬性
		clearAttributes();
		Lexeme nextLexeme = _IKImplement.next();
		if(nextLexeme != null){
			//將Lexeme轉成Attributes
			//設置詞元文本
			termAtt.append(nextLexeme.getLexemeText().toLowerCase());
			//設置詞元長度
			termAtt.setLength(nextLexeme.getLength());
			//設置詞元位移
			offsetAtt.setOffset(nextLexeme.getBeginPosition(), nextLexeme.getEndPosition());
			//記錄分詞的最後位置
			endPosition = nextLexeme.getEndPosition();
			//記錄詞元分類
			typeAtt.setType(nextLexeme.getLexemeTypeString());			
			//返會true告知還有下個詞元
			return true;
		}
		//返會false告知詞元輸出完畢
		return false;
	}
	
	/*
	 * (non-Javadoc)
	 * @see org.apache.lucene.analysis.Tokenizer#reset(java.io.Reader)
	 */
	@Override
	public void reset() throws IOException {
		super.reset();
		_IKImplement.reset(input);
	}	
	
	@Override
	public final void end() {
	    // set final offset
		int finalOffset = correctOffset(this.endPosition);
		offsetAtt.setOffset(finalOffset, finalOffset);
	}
}

 

代碼中有這麼一句:

_IKImplement = new IKSegmenter(input , settings, environment);

再來看看IKSegmenter的類源代碼:

/**
	 * 分詞,獲取下一個詞元
	 * @return Lexeme 詞元對象
	 * @throws java.io.IOException
	 */
	public synchronized Lexeme next()throws IOException{
		Lexeme l = null;
		while((l = context.getNextLexeme()) == null ){
			/*
			 * 從reader中讀取數據,填充buffer
			 * 若是reader是分次讀入buffer的,那麼buffer要  進行移位處理
			 * 移位處理上次讀入的但未處理的數據
			 */
			int available = context.fillBuffer(this.input);
			if(available <= 0){
				//reader已經讀完
				context.reset();
				return null;
				
			}else{
				//初始化指針
				context.initCursor();
				do{
        			//遍歷子分詞器
        			for(ISegmenter segmenter : segmenters){
        				segmenter.analyze(context);
        			}
        			//字符緩衝區接近讀完,須要讀入新的字符
        			if(context.needRefillBuffer()){
        				break;
        			}
   				//向前移動指針
				}while(context.moveCursor());
				//重置子分詞器,爲下輪循環進行初始化
				for(ISegmenter segmenter : segmenters){
					segmenter.reset();
				}
			}
			//對分詞進行歧義處理
			this.arbitrator.process(context, useSmart);
			//將分詞結果輸出到結果集,並處理未切分的單個CJK字符
			context.outputToResult();
			//記錄本次分詞的緩衝區位移
			context.markBufferOffset();			
		}
		return l;
	}

終於找到了久違的next()函數,一切奧義盡在這個函數裏。

~~~~~~~~

每一個詞元都是經過while((l = context.getNextLexeme()) == null ){...}

來完成的,繼續看context.getNextLexeme()函數。

/**
	 * 返回lexeme 
	 * 
	 * 同時處理合並
	 * @return
	 */
	Lexeme getNextLexeme(){
		//從結果集取出,並移除第一個Lexme
		Lexeme result = this.results.pollFirst();
		while(result != null){
    		//數量詞合併
    		this.compound(result);
    		if(Dictionary.getSingleton().isStopWord(this.segmentBuff ,  result.getBegin() , result.getLength())){
       			//是中止詞繼續取列表的下一個
    			result = this.results.pollFirst(); 				
    		}else{
	 			//不是中止詞, 生成lexeme的詞元文本,輸出
	    		result.setLexemeText(String.valueOf(segmentBuff , result.getBegin() , result.getLength()));
	    		break;
    		}
		}
		return result;
	}

剩下的就是看

Dictionary.getSingleton()

看來使用了全局single instance模式。

那麼詞典是何時加載的呢?

/**
	 * 詞典初始化
	 * 因爲IK Analyzer的詞典採用Dictionary類的靜態方法進行詞典初始化
	 * 只有當Dictionary類被實際調用時,纔會開始載入詞典,
	 * 這將延長首次分詞操做的時間
	 * 該方法提供了一個在應用加載階段就初始化字典的手段
	 * @return Dictionary
	 */
	public static Dictionary initial(Configuration cfg){
		if(singleton == null){
			synchronized(Dictionary.class){
				if(singleton == null){
					singleton = new Dictionary();
                    singleton.configuration=cfg;
                    singleton.loadMainDict();
                    singleton.loadSurnameDict();
                    singleton.loadQuantifierDict();
                    singleton.loadSuffixDict();
                    singleton.loadPrepDict();
                    singleton.loadStopWordDict();
                    return singleton;
				}
			}
		}
		return singleton;
	}

也就是調用了Dictionary.initial()函數來加載詞典。

那這個函數又是何時執行的呢?

~~~~~~~~

有好幾個地方執行,固然每次執行都會判斷以前是否執行過,這個經過

if(singleton == null){
			synchronized(Dictionary.class){
				if(singleton == null){

能夠看出來。

~~~~~~~~~~~~~~~~

我猜想第1次應該是:

@Inject
    public IkAnalyzerProvider(Index index, @IndexSettings Settings indexSettings, Environment env, @Assisted String name, @Assisted Settings settings) {
        super(index, indexSettings, name, settings);
        Dictionary.initial(new Configuration(env));
        analyzer=new IKAnalyzer(indexSettings, settings, env);
    }

 

下面回到任務:經過redis來發布詞彙和停詞。

思路:

1)修改es下面的elasticsearch-analysis-ik-1.2.6.jar,解壓縮,將Dictionary.class反編譯成java文件,添加以下2個函數

public void addMainWords(Collection<String> words)
  {
    if (words != null) {
      for (String word : words) {
        if (word != null) {
          singleton._MainDict.fillSegment(word.trim().toLowerCase().toCharArray());
        }
      }
    }
  }
  
  public void addStopWords(Collection<String> words)
  {
    if (words != null) {
      for (String word : words) {
        if (word != null) {
          singleton._StopWords.fillSegment(word.trim().toLowerCase().toCharArray());
        }
      }
    }
  }

再打包回elasticsearch-analysis-ik-1.2.6.jar放到服務器裏。

能夠正常運行,說明這部分代碼沒有問題。得到了全局的詞典集合句柄。

如何從Redis裏獲取數據呢?

一個辦法是寫一個線程

內容以下:

package org.wltea.analyzer.dic;

import java.io.BufferedReader;
import java.io.File;
import java.io.FileInputStream;
import java.io.FileNotFoundException;
import java.io.IOException;
import java.io.InputStream;
import java.io.InputStreamReader;
import java.util.Collection;
import java.util.Iterator;
import java.util.List;
import java.util.ArrayList;
import java.util.Collection;
import java.util.HashSet;
import java.util.Iterator;
import java.util.Set;
import java.util.Date;
import java.lang.Thread;
import redis.clients.jedis.Jedis;

import org.elasticsearch.common.logging.ESLogger;
import org.elasticsearch.common.logging.Loggers;
import org.wltea.analyzer.dic.Dictionary;

import redis.clients.jedis.HostAndPort;
import redis.clients.jedis.JedisCluster;

public class DictThread extends Thread {	
	
	private static DictThread dt=null;
	
	public static DictThread getInstance(){
		DictThread result = null;
		synchronized(DictThread.class){
			if(null==dt){
				dt = new DictThread();
				result = dt;
			}
		}
		return result;
	}
	private DictThread() {
		this.logger = Loggers.getLogger("redis-thread");
		this.init();
	}
	
	private Jedis jc=null;
	private ESLogger logger = null;	

	public void run() {
		while (true) {
			this.logger.info("[Redis Thread]" + "*****cycle of redis");
			init();
			pull();
			sleep();
		}
	}

	public void pull() {
		if (null == jc)
			return;
		// 從集合里拉取
		
		ArrayList<String> words = new ArrayList<String>();
		Set set = null;

		// 1拉取主詞
		set = jc.smembers("ik_main");
		Iterator t = set.iterator();
		while (t.hasNext()) {
			Object obj = t.next();
			words.add(obj.toString());
		}
		Dictionary.getSingleton().addMainWords(words);
		words.clear();

		// 2拉取停詞
		set = jc.smembers("ik_stop");
		t = set.iterator();
		while (t.hasNext()) {
			Object obj = t.next();
			words.add(obj.toString());
		}
		Dictionary.getSingleton().addStopWords(words);
		words.clear();
	}

	private void init() {
		if (null == jc) {
			////Set<HostAndPort> jedisClusterNodes = new HashSet<HostAndPort>();
			// Jedis Cluster will attempt to discover cluster nodes
			// automatically
			//jedisClusterNodes.add(new HostAndPort("192.168.56.200", 6379));
			//jc = new JedisCluster(jedisClusterNodes);
			jc = new Jedis("192.168.56.200",6379);
		}

	}

	private void sleep() {
		// sleep 5 seconds,then loop to next
		try {
			Thread.sleep(5 * 1000);
		} catch (InterruptedException e) {
			// TODO Auto-generated catch block
			e.printStackTrace();
		}
	}
}

 

 

下面的問題就是啓動這個線程!

 

package org.elasticsearch.index.analysis;

import org.elasticsearch.index.analysis.IkAnalyzerProvider;
import org.elasticsearch.common.inject.Inject;
import org.elasticsearch.common.inject.assistedinject.Assisted;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.env.Environment;
import org.elasticsearch.index.Index;
import org.elasticsearch.index.settings.IndexSettings;
import org.wltea.analyzer.cfg.Configuration;
import org.wltea.analyzer.dic.Dictionary;
import org.wltea.analyzer.lucene.IKAnalyzer;
import org.elasticsearch.common.logging.ESLogger;
import org.elasticsearch.common.logging.Loggers;
import org.wltea.analyzer.dic.DictThread;

public class SpecificIkAnalyzerProvider extends IkAnalyzerProvider {

	private ESLogger logger = null;
	 @Inject
	    public SpecificIkAnalyzerProvider(Index index, @IndexSettings Settings indexSettings, Environment env, @Assisted String name, @Assisted Settings settings) {
	        super(index, indexSettings, env, name, settings);
	        //here, let us start our pull thread... :)	        
	        this.logger = Loggers.getLogger("SpecificIkAnalyzerProvider");
	        this.logger.info("[SpecificIkAnalyzerProvider] system start,begin to start pull redis thread...");
	        //new DictThread().start();
	        DictThread dt = DictThread.getInstance();
	        if(null!=dt){
	        	dt.start();
	        }
	        
	        
	    }
}

 

系統成功運行,還能夠,比較簡單。

上面的代碼在獲取redis那部分其實還有一部分工做要優化,之後有時間再弄吧。

而後修改配置文件,修改分詞器爲:

index:
  analysis:
    analyzer:
      ik:
          alias: [ik_analyzer]
          type: org.elasticsearch.index.analysis.IkAnalyzerProvider

修改成:

index:
  analysis:
    analyzer:
      ik:
          alias: [ik_analyzer]
          type: org.elasticsearch.index.analysis.SpecificIkAnalyzerProvider

 

PS:吐槽,這網上下的jar反編譯工具把我坑慘了,浪費了不少時間!

相關文章
相關標籤/搜索