Hive
做爲一個sql
查詢引擎,自帶了一些基本的函數,好比count(計數)
,sum(求和)
,有時候這些基本函數知足不了咱們的需求,這時候就要寫hive hdf(user defined funation)
,又叫用戶自定義函數。編寫Hive UDF
的步驟:html
hadoop version
和hive --version
來分別查看版本)org.apache.hadoop.hive.ql.exec.UDF
類,實現evaluate方法,而後打包;add
方法添加jar 包到分佈式緩存,若是jar包是上傳到$HIVE_HOME/lib/目錄如下,就不須要執行add命令了;create temporary function
建立臨時函數,不加temporary
就建立了一個永久函數;這個是一個比較常見的場景,例如公司的產品有天天都會產生大量的彈幕或者評論,這個時候咱們可能會想去分析一下你們最關心的熱點話題是什麼,或者是咱們會分析最近一段時間的網絡趨勢是什麼,可是這裏有一個問題就是你的詞庫建設的問題,由於你使用通用的詞庫可能不能達到很好的分詞效果,尤爲有不少網絡流行用語它是不在詞庫裏的,還有一個就是停用詞的問題了,由於不少時候停用詞是沒有意義的,因此這裏咱們須要將其過濾,而過濾的方式就是經過停用詞詞表進行過濾。java
這個時候咱們的解決方案主要有兩種,一種是使用第三方提供的一些詞庫,還有一種是自建詞庫,而後有專人去維護,這個也是比較常見的一種狀況。算法
最後一個就是咱們使用的分詞工具,由於目前主流的分詞器不少,選擇不一樣的分詞工具可能對咱們的分詞結果有不少影響。sql
1:Elasticsearch的開源中文分詞器 IK Analysis(Star:2471)數據庫
IK中文分詞器在Elasticsearch上的使用。原生IK中文分詞是從文件系統中讀取詞典,es-ik自己可擴展成從不一樣的源讀取詞典。目前提供從sqlite3數據庫中讀取。es-ik-plugin-sqlite3使用方法: 1. 在elasticsearch.yml中設置你的sqlite3詞典的位置: ik_analysis_db_path: /opt/ik/dictionary.dbapache
2:開源的java中文分詞庫 IKAnalyzer(Star:343)緩存
IK Analyzer 是一個開源的,基於java語言開發的輕量級的中文分詞工具包。從2006年12月推出1.0版開始, IKAnalyzer已經推出了4個大版本。最初,它是以開源項目Luence爲應用主體的,結合詞典分詞和文法分析算法的中文分詞組件。從3.0版本開始,IK發展爲面向Java的公用分詞組件,獨立於Lucene項目網絡
3:java開源中文分詞 Ansj(Star:3019)數據結構
Ansj中文分詞 這是一個ictclas的java實現.基本上重寫了全部的數據結構和算法.詞典是用的開源版的ictclas所提供的.而且進行了部分的人工優化 分詞速度達到每秒鐘大約200萬字左右,準確率能達到96%以上。app
目前實現了.中文分詞. 中文姓名識別 . 詞性標註、用戶自定義詞典,關鍵字提取,自動摘要,關鍵字標記等功能。
能夠應用到天然語言處理等方面,適用於對分詞效果要求高的各類項目.
4:結巴分詞 ElasticSearch 插件(Star:188)
elasticsearch官方只提供smartcn這個中文分詞插件,效果不是很好,好在國內有medcl大神(國內最先研究es的人之一)寫的兩個中文分詞插件,一個是ik的,一個是mmseg的
5:Java分佈式中文分詞組件 - word分詞(Star:672)
word分詞是一個Java實現的分佈式的中文分詞組件,提供了多種基於詞典的分詞算法,並利用ngram模型來消除歧義。能準確識別英文、數字,以及日期、時間等數量詞,能識別人名、地名、組織機構名等未登陸詞
6:Java開源中文分詞器jcseg(Star:400)
Jcseg是什麼? Jcseg是基於mmseg算法的一個輕量級開源中文分詞器,同時集成了關鍵字提取,關鍵短語提取,關鍵句子提取和文章自動摘要等功能,而且提供了最新版本的lucene, solr, elasticsearch的分詞接口, Jcseg自帶了一個 jcseg.properties文件...
庖丁中文分詞庫是一個使用Java開發的,可結合到Lucene應用中的,爲互聯網、企業內部網使用的中文搜索引擎分詞組件。Paoding填補了國內中文分詞方面開源組件的空白,致力於此並希翼成爲互聯網網站首選的中文分詞開源組件。 Paoding中文分詞追求分詞的高效率和用戶良好體驗。
mmseg4j 用 Chih-Hao Tsai 的 MMSeg 算法(http://technology.chtsai.org/mmseg/ )實現的中文分詞器,並實現 lucene 的 analyzer 和 solr 的TokenizerFactory 以方便在Lucene和Solr中使...
9:中文分詞Ansj(Star:3015)
Ansj中文分詞 這是一個ictclas的java實現.基本上重寫了全部的數據結構和算法.詞典是用的開源版的ictclas所提供的.而且進行了部分的人工優化 內存中中文分詞每秒鐘大約100萬字(速度上已經超越ictclas) 文件讀取分詞每秒鐘大約30萬字 準確率能達到96%以上 目前實現了....
ictclas4j中文分詞系統是sinboy在中科院張華平和劉羣老師的研製的FreeICTCLAS的基礎上完成的一個java開源分詞項目,簡化了原分詞程序的複雜度,旨在爲廣大的中文分詞愛好者一個更好的學習機會。
這裏咱們引入了兩個依賴,實際上是兩個不一樣分詞工具
<dependency> <groupId>org.ansj</groupId> <artifactId>ansj_seg</artifactId> <version>5.1.6</version> <scope>compile</scope> </dependency> <dependency> <groupId>com.janeluo</groupId> <artifactId>ikanalyzer</artifactId> <version>2012_u6</version> </dependency>
在開始以前咱們先寫一個demo 玩玩,讓你們有個基本的認識
@Test public void testAnsjSeg() { String str = "我叫李太白,我是一個詩人,我生活在唐朝" ; // 選擇使用哪一種分詞器 BaseAnalysis ToAnalysis NlpAnalysis IndexAnalysis Result result = ToAnalysis.parse(str); System.out.println(result); KeyWordComputer kwc = new KeyWordComputer(5); Collection<Keyword> keywords = kwc.computeArticleTfidf(str); System.out.println(keywords); }
輸出結果
我/r,叫/v,李太白/nr,,/w,我/r,是/v,一個/m,詩人/n,,/w,我/r,生活/vn,在/p,唐朝/t [李太白/24.72276098504223, 詩人/3.0502185968368885, 唐朝/0.8965677022546215, 生活/0.6892230219652541]
由於是停用詞詞庫,自己也不是很大,因此我直接放在項目裏了,固然你也能夠放在其餘地方,例如HDFS 上
代碼很簡單我就不不作詳細解釋了,須要注意的是GenericUDF
裏面的一些方法的使用規則,至於代碼設計的好壞以及還有什麼改進的方案咱們後面再說,下面兩套實現的思路幾乎是一致的,不同的是在使用的分詞工具上的不同
ansj的實現
/** * Chinese words segmentation with user-dict in com.kingcall.dic * use Ansj(a java open source analyzer) */ // 這個信息就是你每次使用desc 進行獲取函數信息的時候返回的 @Description(name = "ansj_seg", value = "_FUNC_(str) - chinese words segment using ansj. Return list of words.", extended = "Example: select _FUNC_('我是測試字符串') from src limit 1;\n" + "[\"我\", \"是\", \"測試\", \"字符串\"]") public class AnsjSeg extends GenericUDF { private transient ObjectInspectorConverters.Converter[] converters; private static final String userDic = "/app/stopwords/com.kingcall.dic"; //load userDic in hdfs static { try { FileSystem fs = FileSystem.get(new Configuration()); FSDataInputStream in = fs.open(new Path(userDic)); BufferedReader br = new BufferedReader(new InputStreamReader(in)); String line = null; String[] strs = null; while ((line = br.readLine()) != null) { line = line.trim(); if (line.length() > 0) { strs = line.split("\t"); strs[0] = strs[0].toLowerCase(); DicLibrary.insert(DicLibrary.DEFAULT, strs[0]); //ignore nature and freq } } MyStaticValue.isNameRecognition = Boolean.FALSE; MyStaticValue.isQuantifierRecognition = Boolean.TRUE; } catch (Exception e) { System.out.println("Error when load userDic" + e.getMessage()); } } @Override public ObjectInspector initialize(ObjectInspector[] arguments) throws UDFArgumentException { if (arguments.length < 1 || arguments.length > 2) { throw new UDFArgumentLengthException( "The function AnsjSeg(str) takes 1 or 2 arguments."); } converters = new ObjectInspectorConverters.Converter[arguments.length]; converters[0] = ObjectInspectorConverters.getConverter(arguments[0], PrimitiveObjectInspectorFactory.writableStringObjectInspector); if (2 == arguments.length) { converters[1] = ObjectInspectorConverters.getConverter(arguments[1], PrimitiveObjectInspectorFactory.writableIntObjectInspector); } return ObjectInspectorFactory.getStandardListObjectInspector(PrimitiveObjectInspectorFactory.writableStringObjectInspector); } @Override public Object evaluate(DeferredObject[] arguments) throws HiveException { boolean filterStop = false; if (arguments[0].get() == null) { return null; } if (2 == arguments.length) { IntWritable filterParam = (IntWritable) converters[1].convert(arguments[1].get()); if (1 == filterParam.get()) filterStop = true; } Text s = (Text) converters[0].convert(arguments[0].get()); ArrayList<Text> result = new ArrayList<>(); if (filterStop) { for (Term words : DicAnalysis.parse(s.toString()).recognition(StopLibrary.get())) { if (words.getName().trim().length() > 0) { result.add(new Text(words.getName().trim())); } } } else { for (Term words : DicAnalysis.parse(s.toString())) { if (words.getName().trim().length() > 0) { result.add(new Text(words.getName().trim())); } } } return result; } @Override public String getDisplayString(String[] children) { return getStandardDisplayString("ansj_seg", children); } }
ikanalyzer的實現
@Description(name = "ansj_seg", value = "_FUNC_(str) - chinese words segment using Iknalyzer. Return list of words.", extended = "Example: select _FUNC_('我是測試字符串') from src limit 1;\n" + "[\"我\", \"是\", \"測試\", \"字符串\"]") public class IknalyzerSeg extends GenericUDF { private transient ObjectInspectorConverters.Converter[] converters; //用來存放停用詞的集合 Set<String> stopWordSet = new HashSet<String>(); @Override public ObjectInspector initialize(ObjectInspector[] arguments) throws UDFArgumentException { if (arguments.length < 1 || arguments.length > 2) { throw new UDFArgumentLengthException( "The function AnsjSeg(str) takes 1 or 2 arguments."); } //讀入停用詞文件 BufferedReader StopWordFileBr = null; try { StopWordFileBr = new BufferedReader(new InputStreamReader(new FileInputStream(new File("stopwords/baidu_stopwords.txt")))); //初如化停用詞集 String stopWord = null; for(; (stopWord = StopWordFileBr.readLine()) != null;){ stopWordSet.add(stopWord); } } catch (FileNotFoundException e) { e.printStackTrace(); } catch (IOException e) { e.printStackTrace(); } converters = new ObjectInspectorConverters.Converter[arguments.length]; converters[0] = ObjectInspectorConverters.getConverter(arguments[0], PrimitiveObjectInspectorFactory.writableStringObjectInspector); if (2 == arguments.length) { converters[1] = ObjectInspectorConverters.getConverter(arguments[1], PrimitiveObjectInspectorFactory.writableIntObjectInspector); } return ObjectInspectorFactory.getStandardListObjectInspector(PrimitiveObjectInspectorFactory.writableStringObjectInspector); } @Override public Object evaluate(DeferredObject[] arguments) throws HiveException { boolean filterStop = false; if (arguments[0].get() == null) { return null; } if (2 == arguments.length) { IntWritable filterParam = (IntWritable) converters[1].convert(arguments[1].get()); if (1 == filterParam.get()) filterStop = true; } Text s = (Text) converters[0].convert(arguments[0].get()); StringReader reader = new StringReader(s.toString()); IKSegmenter iks = new IKSegmenter(reader, true); List<Text> list = new ArrayList<>(); if (filterStop) { try { Lexeme lexeme; while ((lexeme = iks.next()) != null) { if (!stopWordSet.contains(lexeme.getLexemeText())) { list.add(new Text(lexeme.getLexemeText())); } } } catch (IOException e) { } } else { try { Lexeme lexeme; while ((lexeme = iks.next()) != null) { list.add(new Text(lexeme.getLexemeText())); } } catch (IOException e) { } } return list; } @Override public String getDisplayString(String[] children) { return "Usage: evaluate(String str)"; } }
GenericUDF 給咱們提供了一些方法,這些方法能夠用來構建測試須要的環境和參數,這樣咱們就能夠測試這些代碼了
@Test public void testAnsjSegFunc() throws HiveException { AnsjSeg udf = new AnsjSeg(); ObjectInspector valueOI0 = PrimitiveObjectInspectorFactory.javaStringObjectInspector; ObjectInspector valueOI1 = PrimitiveObjectInspectorFactory.javaIntObjectInspector; ObjectInspector[] init_args = {valueOI0, valueOI1}; udf.initialize(init_args); Text str = new Text("我是測試字符串"); GenericUDF.DeferredObject valueObj0 = new GenericUDF.DeferredJavaObject(str); GenericUDF.DeferredObject valueObj1 = new GenericUDF.DeferredJavaObject(0); GenericUDF.DeferredObject[] args = {valueObj0, valueObj1}; ArrayList<Object> res = (ArrayList<Object>) udf.evaluate(args); System.out.println(res); } @Test public void testIkSegFunc() throws HiveException { IknalyzerSeg udf = new IknalyzerSeg(); ObjectInspector valueOI0 = PrimitiveObjectInspectorFactory.javaStringObjectInspector; ObjectInspector valueOI1 = PrimitiveObjectInspectorFactory.javaIntObjectInspector; ObjectInspector[] init_args = {valueOI0, valueOI1}; udf.initialize(init_args); Text str = new Text("我是測試字符串"); GenericUDF.DeferredObject valueObj0 = new GenericUDF.DeferredJavaObject(str); GenericUDF.DeferredObject valueObj1 = new GenericUDF.DeferredJavaObject(0); GenericUDF.DeferredObject[] args = {valueObj0, valueObj1}; ArrayList<Object> res = (ArrayList<Object>) udf.evaluate(args); System.out.println(res); }
咱們看到加載停用詞沒有找到,可是總體仍是跑起來了,由於讀取不到HDFS 上的文件
可是咱們第二個樣例是不須要從HDFS 上加載停用詞信息,因此能夠完美的測試運行
注 後來爲了能在外部更新文件,我將其放在了HDFS 上,和AnsjSeg 中的代碼同樣
add jar /Users/liuwenqiang/workspace/code/idea/HiveUDF/target/HiveUDF-0.0.4.jar; create temporary function ansjSeg as 'com.kingcall.bigdata.HiveUDF.AnsjSeg'; select ansjSeg("我是字符串,你是啥"); -- 開啓停用詞過濾 select ansjSeg("我是字符串,你是啥",1); create temporary function ikSeg as 'com.kingcall.bigdata.HiveUDF.IknalyzerSeg'; select ikSeg("我是字符串,你是啥"); select ikSeg("我是字符串,你是啥",1);
上面方法的第二個參數,就是是否開啓停用詞過濾,咱們使用ikSeg函數演示一下
下面咱們嘗試獲取一下函數的描述信息
若是沒有寫的話,就是下面的這樣的
經過編寫Hive UDF
能夠輕鬆幫咱們實現大量常見需求,其它應該場景還有:
ip
地址轉地區
:將上報的用戶日誌中的ip
字段轉化爲國家-省-市
格式,便於作地域分佈統計分析;Hive SQL
計算的標籤數據,不想編寫Spark
程序,能夠經過UDF
在靜態代碼塊中初始化鏈接池,利用Hive
啓動的並行MR
任務,並行快速導入大量數據到codis
中,應用於一些推薦業務;sql
實現相對複雜的任務,均可以編寫永久Hive UDF
進行轉化;GenericUDF
抽象類來實現,這一節的重點在於代碼的實現以及對GenericUDF
類中方法的理解