散仙,在上篇文章中介紹了,如何使用Apache Pig與Lucene集成,還不知道的道友們,能夠先看下上篇,熟悉下具體的流程。
在與Lucene集成過程當中,咱們發現最終還要把生成的Lucene索引,拷貝至本地磁盤,才能提供檢索服務,這樣以來,比較繁瑣,並且有如下幾個缺點:
(一)在生成索引以及最終能提供正常的服務以前,索引通過屢次落地操做,這無疑會給磁盤和網絡IO,帶來巨大影響
(二)Lucene的Field的配置與其UDF函數的代碼耦合性過強,並且提供的配置也比較簡單,不太容易知足,靈活多變的檢索需求和服務,若是改動索引配置,則有可能須要從新編譯源碼。
(三)對Hadoop的分佈式存儲系統HDFS依賴過強,若是使用與Lucene集成,那麼則意味着你提供檢索的Web服務器,則必須跟hadoop的存儲節點在一個機器上,不然,沒法從HDFS上下拉索引,除非你本身寫程序,或使用scp再次從目標機傳輸,這樣無疑又增長了,系統的複雜性。
鑑於有以上幾個缺點,因此建議你們使用Solr或ElasticSearch這樣的封裝了Lucene更高級的API框架,那麼Solr與ElasticSearch和Lucene相比,又有什麼優勢呢?
(1)在最終的寫入數據時,咱們能夠直接最終結果寫入solr或es,同時也能夠在HDFS上保存一份,做爲災備。
(2)使用了solr或es,這時,咱們字段的配置徹底與UDF函數代碼無關,咱們的任何字段配置的變更,都不會影響Pig的UDF函數的代碼,而在UDF函數裏,惟一要作的,就是將最終數據,提供給solr和es服務。
(3)solr和es都提供了restful風格的http操做方式,這時候,咱們的檢索集羣徹底能夠與Hadoop集羣分離,從而讓他們各自都專一本身的服務。
下面,散仙就具體說下如何使用Pig和Solr集成?
(1)依舊訪問這個地址下載源碼壓縮包。
(2)提取出本身想要的部分,在eclipse工程中,修改定製適合本身環境的的代碼(Solr版本是否兼容?hadoop版本是否兼容?,Pig版本是否兼容?)。
(3)使用ant從新打包成jar
(4)在pig裏,註冊相關依賴的jar包,並使用索引存儲
注意,在github下載的壓縮裏直接提供了對SolrCloud模式的提供,而沒有提供,普通模式的函數,散仙在這裏稍做修改後,能夠支持普通模式的Solr服務,代碼以下:
SolrOutputFormat函數
java
Java代碼 git
package com.pig.support.solr; github
import java.io.IOException; web
import java.util.ArrayList; apache
import java.util.List; 服務器
import java.util.concurrent.Executors; 微信
import java.util.concurrent.ScheduledExecutorService; restful
import java.util.concurrent.TimeUnit; 網絡
import org.apache.hadoop.io.Writable; app
import org.apache.hadoop.mapreduce.JobContext;
import org.apache.hadoop.mapreduce.OutputCommitter;
import org.apache.hadoop.mapreduce.RecordWriter;
import org.apache.hadoop.mapreduce.TaskAttemptContext;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.solr.client.solrj.SolrServer;
import org.apache.solr.client.solrj.SolrServerException;
import org.apache.solr.client.solrj.impl.CloudSolrServer;
import org.apache.solr.client.solrj.impl.HttpSolrServer;
import org.apache.solr.common.SolrInputDocument;
/**
* @author qindongliang
* 支持SOlr的SolrOutputFormat
* 若是你想了解,或學習更多這方面的
* 知識,請加入咱們的羣:
*
* 搜索技術交流羣(2000人):324714439
* 大數據技術1號交流羣(2000人):376932160 (已滿)
* 大數據技術2號交流羣(2000人):415886155
* 微信公衆號:我是攻城師(woshigcs)
*
* */
public class SolrOutputFormat extends
FileOutputFormat<Writable, SolrInputDocument> {
final String address;
final String collection;
public SolrOutputFormat(String address, String collection) {
this.address = address;
this.collection = collection;
}
@Override
public RecordWriter<Writable, SolrInputDocument> getRecordWriter(
TaskAttemptContext ctx) throws IOException, InterruptedException {
return new SolrRecordWriter(ctx, address, collection);
}
@Override
public synchronized OutputCommitter getOutputCommitter(
TaskAttemptContext arg0) throws IOException {
return new OutputCommitter(){
@Override
public void abortTask(TaskAttemptContext ctx) throws IOException {
}
@Override
public void commitTask(TaskAttemptContext ctx) throws IOException {
}
@Override
public boolean needsTaskCommit(TaskAttemptContext arg0)
throws IOException {
return true;
}
@Override
public void setupJob(JobContext ctx) throws IOException {
}
@Override
public void setupTask(TaskAttemptContext ctx) throws IOException {
}
};
}
/**
* Write out the LuceneIndex to a local temporary location.<br/>
* On commit/close the index is copied to the hdfs output directory.<br/>
*
*/
static class SolrRecordWriter extends RecordWriter<Writable, SolrInputDocument> {
/**Solr的地址*/
SolrServer server;
/**批處理提交的數量**/
int batch = 5000;
TaskAttemptContext ctx;
List<SolrInputDocument> docs = new ArrayList<SolrInputDocument>(batch);
ScheduledExecutorService exec = Executors.newSingleThreadScheduledExecutor();
/**
* Opens and forces connect to CloudSolrServer
*
* @param address
*/
public SolrRecordWriter(final TaskAttemptContext ctx, String address, String collection) {
try {
this.ctx = ctx;
server = new HttpSolrServer(address);
exec.scheduleWithFixedDelay(new Runnable(){
public void run(){
ctx.progress();
}
}, 1000, 1000, TimeUnit.MILLISECONDS);
} catch (Exception e) {
RuntimeException exc = new RuntimeException(e.toString(), e);
exc.setStackTrace(e.getStackTrace());
throw exc;
}
}
/**
* On close we commit
*/
@Override
public void close(final TaskAttemptContext ctx) throws IOException,
InterruptedException {
try {
if (docs.size() > 0) {
server.add(docs);
docs.clear();
}
server.commit();
} catch (SolrServerException e) {
RuntimeException exc = new RuntimeException(e.toString(), e);
exc.setStackTrace(e.getStackTrace());
throw exc;
} finally {
server.shutdown();
exec.shutdownNow();
}
}
/**
* We add the indexed documents without commit
*/
@Override
public void write(Writable key, SolrInputDocument doc)
throws IOException, InterruptedException {
try {
docs.add(doc);
if (docs.size() >= batch) {
server.add(docs);
docs.clear();
}
} catch (SolrServerException e) {
RuntimeException exc = new RuntimeException(e.toString(), e);
exc.setStackTrace(e.getStackTrace());
throw exc;
}
}
}
}
SolrStore函數
package com.pig.support.solr;
import java.io.IOException;
import java.util.Properties;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.Writable;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.OutputFormat;
import org.apache.hadoop.mapreduce.RecordWriter;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.pig.ResourceSchema;
import org.apache.pig.ResourceSchema.ResourceFieldSchema;
import org.apache.pig.ResourceStatistics;
import org.apache.pig.StoreFunc;
import org.apache.pig.StoreMetadata;
import org.apache.pig.data.Tuple;
import org.apache.pig.impl.util.UDFContext;
import org.apache.pig.impl.util.Utils;
import org.apache.solr.common.SolrInputDocument;
/**
*
* Create a lucene index
*
*/
public class SolrStore extends StoreFunc implements StoreMetadata {
private static final String SCHEMA_SIGNATURE = "solr.output.schema";
ResourceSchema schema;
String udfSignature;
RecordWriter<Writable, SolrInputDocument> writer;
String address;
String collection;
public SolrStore(String address, String collection) {
this.address = address;
this.collection = collection;
}
public void storeStatistics(ResourceStatistics stats, String location,
Job job) throws IOException {
}
public void storeSchema(ResourceSchema schema, String location, Job job)
throws IOException {
}
@Override
public void checkSchema(ResourceSchema s) throws IOException {
UDFContext udfc = UDFContext.getUDFContext();
Properties p = udfc.getUDFProperties(this.getClass(),
new String[] { udfSignature });
p.setProperty(SCHEMA_SIGNATURE, s.toString());
}
public OutputFormat<Writable, SolrInputDocument> getOutputFormat()
throws IOException {
// not be used
return new SolrOutputFormat(address, collection);
}
/**
* Not used
*/
@Override
public void setStoreLocation(String location, Job job) throws IOException {
FileOutputFormat.setOutputPath(job, new Path(location));
}
@Override
public void setStoreFuncUDFContextSignature(String signature) {
this.udfSignature = signature;
}
@SuppressWarnings({ "unchecked", "rawtypes" })
@Override
public void prepareToWrite(RecordWriter writer) throws IOException {
this.writer = writer;
UDFContext udc = UDFContext.getUDFContext();
String schemaStr = udc.getUDFProperties(this.getClass(),
new String[] { udfSignature }).getProperty(SCHEMA_SIGNATURE);
if (schemaStr == null) {
throw new RuntimeException("Could not find udf signature");
}
schema = new ResourceSchema(Utils.getSchemaFromString(schemaStr));
}
/**
* Shamelessly copied from : https://issues.apache.org/jira/secure/attachment/12484764/NUTCH-1016-2.0.patch
* @param input
* @return
*/
private static String stripNonCharCodepoints(String input) {
StringBuilder retval = new StringBuilder(input.length());
char ch;
for (int i = 0; i < input.length(); i++) {
ch = input.charAt(i);
// Strip all non-characters
// http://unicode.org/cldr/utility/list-unicodeset.jsp?a=[:Noncharacter_Code_Point=True:]
// and non-printable control characters except tabulator, new line
// and carriage return
if (ch % 0x10000 != 0xffff && // 0xffff - 0x10ffff range step
// 0x10000
ch % 0x10000 != 0xfffe && // 0xfffe - 0x10fffe range
(ch <= 0xfdd0 || ch >= 0xfdef) && // 0xfdd0 - 0xfdef
(ch > 0x1F || ch == 0x9 || ch == 0xa || ch == 0xd)) {
retval.append(ch);
}
}
return retval.toString();
}
@Override
public void putNext(Tuple t) throws IOException {
final SolrInputDocument doc = new SolrInputDocument();
final ResourceFieldSchema[] fields = schema.getFields();
int docfields = 0;
for (int i = 0; i < fields.length; i++) {
final Object value = t.get(i);
if (value != null) {
docfields++;
doc.addField(fields[i].getName().trim(), stripNonCharCodepoints(value.toString()));
}
}
try {
if (docfields > 0)
writer.write(null, doc);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
return;
}
}
}
Pig腳本以下:
--註冊依賴文件的jar包
REGISTER ./dependfiles/tools.jar;
--註冊solr相關的jar包
REGISTER ./solrdependfiles/pigudf.jar;
REGISTER ./solrdependfiles/solr-core-4.10.2.jar;
REGISTER ./solrdependfiles/solr-solrj-4.10.2.jar;
REGISTER ./solrdependfiles/httpclient-4.3.1.jar
REGISTER ./solrdependfiles/httpcore-4.3.jar
REGISTER ./solrdependfiles/httpmime-4.3.1.jar
REGISTER ./solrdependfiles/noggit-0.5.jar
--加載HDFS數據,並定義scheaml
a = load '/tmp/data' using PigStorage(',') as (sword:chararray,scount:int);
--存儲到solr中,並提供solr的ip地址和端口號
store d into '/user/search/solrindextemp' using com.pig.support.solr.SolrStore('http://localhost:8983/solr/collection1','collection1');
~
~
~
配置成功以後,咱們就能夠運行程序,加載HDFS上數據,通過計算處理以後,並將最終的結果,存儲到Solr之中,截圖以下:
成功以後,咱們就能夠很方便的在solr中進行毫秒級別的操做了,例如各類各樣的全文查詢,過濾,排序統計等等! 一樣的方式,咱們也能夠將索引存儲在ElasticSearch中,關於如何使用Pig和ElasticSearch集成,散仙也會在後面的文章中介紹,敬請期待!