sphinx與gearman結合應用

一.說明

本文涉及到sphinx分佈式配置,XML數據源,mysql觸發器,任務分發系統gearman以及sphinx屬性更改。數據依賴以前寫的 sphinx應用(一),sphinx應用(二) ,具體請參見: http://www.ttlsa.com/html/category/os/store-system/sphinx-web-application 同時 Gearman的用法 參見: http://www.ttlsa.com/html/category/os/distributed-software-systems/distributed-processing-systems/gearman-distributed-processing-systems

若有疑問,煩請更貼說明或加入Q羣: 39514058 php

MVA的屬性更改不會刷新到磁盤上。 html

二.分佈式索引

對於大量的數據,分佈式索引將會是很是有用的。若是是單一索引,性能將急劇降低,索引查詢時間增大,每秒處理數量下降。
同時,若是索引數據大於服務器內存大小,將會致使sphinx沒法啓動的。
啓動分佈式索引,將index塊type選項設置爲distributed。

三.sphinx配置

# vi main_ttlsa.conf //mysql數據源
source ttlsa
{
	type = mysql
	sql_host = 192.168.1.25
	sql_user = root
	sql_pass =
	sql_db = ttlsa_com
	sql_query = SELECT id, title, content, UNIX_TIMESTAMP(publish_date) AS publish_date, author_id ,tags FROM posts
	sql_attr_uint = author_id
	sql_attr_multi = uint tags from field
	sql_attr_timestamp = publish_date
	sql_query_info = SELECT id, title, content, publish_date, tags, author_id FROM posts WHERE ID=$id
}

index ttlsa_main //主索引
{
	source = ttlsa
	path = /data/sphinx/ttlsa/main
	docinfo = extern
	mlock = 0
	morphology = none
	html_strip = 0
	min_word_len = 1
	min_prefix_len = 0
	min_infix_len = 1
	ngram_len = 0
	charset_type = utf-8
	charset_table = 0..9, A..Z->a..z, _, a..z, \
	U+410..U+42F->U+430..U+44F, U+430..U+44F
	ngram_chars = U+3000..U+2FA1F
}

index master //分佈式索引
{
	type = distributed
	local = ttlsa_main
	agent = 127.0.0.1:3313:ttlsa_xml
	agent_connect_timeout = 1000
	agent_query_timeout = 3000
}

indexer
{
	mem_limit = 512M
}

searchd
{
	listen = 3312
	listen = 0.0.0.0:4306:mysql41 //mysql協議接口監聽地址和端口
	log = /data/sphinx/ttlsa/log/searchd_3312.log
	query_log = /data/sphinx/ttlsa/log/query_3312.log
	read_timeout = 5
	max_children = 200
	max_matches = 1000
	seamless_rotate = 1
	preopen_indexes = 1
	unlink_old = 1
	max_filter_values = 10000
	pid_file = /data/sphinx/ttlsa/log/searchd_3312.pid
	mva_updates_pool = 16M
	compat_sphinxql_magics = 0
}

# vi ttlsa.pl //構造xml數據 mysql

use strict;
use XML::Writer;
use Sphinx::Search;
my $sphinx_server="127.0.0.1";
my $sphinx_port="3312";
my $sph=Sphinx::Search->new();

$sph->SetServer($sphinx_server,$sphinx_port);
$sph->SetConnectTimeout(1);
$sph->SetConnectRetries(3);
$sph->SetSelect("id");
$sph->SetSortMode(SPH_SORT_EXTENDED,'@id desc');
$sph->SetLimits(0,1);

my $results = $sph->Query("", 'ttlsa_main');
my $max_id=$results->{'matches'}->[0]->{'id'};
$sph->Close();

my $writer = XML::Writer->new(DATA_MODE => 'true', DATA_INDENT => 2);
$writer->xmlDecl('utf-8');
$writer->startTag('sphinx:docset');
$writer->startTag('sphinx:schema');
$writer->emptyTag('sphinx:field','name'=>'title');
$writer->emptyTag('sphinx:field','name'=>'content');
$writer->emptyTag('sphinx:attr','name'=>'publish_date','type'=>'timestamp',);
$writer->emptyTag('sphinx:attr','name'=>'author_id','type'=>'int','bits'=>'32',);
$writer->endTag('sphinx:schema');
$writer->startTag('sphinx:document','id'=>1000);
$writer->startTag('title');
$writer->characters('0');
$writer->endTag('title');
$writer->startTag('content');
$writer->characters('0');
$writer->endTag('content');
$writer->startTag('publish_date');
$writer->characters('0');
$writer->endTag('publish_date');
$writer->startTag('author_id');
$writer->characters('0');
$writer->endTag('author_id');
$writer->endTag('sphinx:document');
$writer->endTag('sphinx:docset');
$writer->end();

# vi xml_ttlsa.conf //XML數據源 web

source ttlsa_xml
{
	type = xmlpipe2
	xmlpipe_command = perl /usr/local/coreseek4/etc/ttlsa.xml //XML數據流
}

index ttlsa_xml
{
	type = plain
	source = ttlsa_xml
	path = /data/sphinx/ttlsa/xml
	docinfo = extern
	mlock = 0
	morphology = none
	html_strip = 0
	min_word_len = 1
	min_prefix_len = 0
	min_infix_len = 1
	ngram_len = 0
	charset_type = utf-8
	charset_table = 0..9, A..Z->a..z, _, a..z, \
	U+410..U+42F->U+430..U+44F, U+430..U+44F
	ngram_chars = U+3000..U+2FA1F
	}

indexer
{
	mem_limit = 512M
}

searchd
{
	listen = 3313
	listen = 0.0.0.0:9307:mysql41
	log = /data/sphinx/ttlsa/log/searchd_3313.log
	query_log = /data/sphinx/ttlsa/log/query_3313.log
	read_timeout = 5
	max_children = 200
	max_matches = 1000
	seamless_rotate = 1
	preopen_indexes = 1
	unlink_old = 1
	max_filter_values = 10000
	pid_file = /data/sphinx/ttlsa/log/searchd_3313.pid
	mva_updates_pool = 16M
	compat_sphinxql_magics = 0
}

將以前的數據表結構更改下。 sql

mysql> alter table posts add `tags` text NOT NULL;
mysql> update posts set `tags`="0,0";


建索引
shell

# /usr/local/coreseek4/bin/indexer --config /usr/local/coreseek4/etc/main_ttlsa.conf --all
# /usr/local/coreseek4/bin/indexer --config /usr/local/coreseek4/etc/xml_ttlsa.conf --all

啓動服務
數據庫

# /usr/local/coreseek4/bin/searchd --config /usr/local/coreseek4/etc/main_ttlsa.conf
# /usr/local/coreseek4/bin/searchd --config /usr/local/coreseek4/etc/xml_ttlsa.conf

四.Gearman配置

# vi WORKER_UPDATEATTRIBUTES.pl //執行gearman任務
###################################
### author: www.ttlsa.com		###
### QQ羣: 39514058				###
### E-mail: service@ttlsa.com	###
###################################

use strict;
use XML::Writer;
use Sphinx::Search;
use DBI;
use Class::Date qw (date);
use Gearman::Worker;
use Time::HiRes qw(gettimeofday);

my $sphinx_server="127.0.0.1";
my $sphinx_port="3312";
my $driver="DBI:mysql";
my $host="192.168.1.25:3306";
my $dbname="ttlsa_com";
my $user="root";
my $passwd="";
my $worker=new Gearman::Worker;

$worker->job_servers('192.168.1.60:4731');
$worker->register_function(UPDAT_EATTRIBUTES=>\&UPDATE_ATTRIBUTES);
$worker->work while 1;

sub UPDATE_ATTRIBUTES{
	my $sph=Sphinx::Search->new();
	$sph->SetServer($sphinx_server,$sphinx_port);
	$sph->SetConnectTimeout(1);
	$sph->SetConnectRetries(3);
	$sph->SetFilter('publish_date',[0]);
	$sph->SetFilter('author_id',[0]);
	$sph->SetSelect('id');
	$sph->SetSortMode(SPH_SORT_EXTENDED,'@id asc');
	$sph->SetLimits(0,1);
	my $start_time=gettimeofday();
	my $rt = $sph->Query("", 'master');
	my $min_id = $rt->{'matches'}->[0]->{'id'};
	my $ct=gettimeofday() - $start_time;
	print "查詢當前最小document ID($min_id)耗時: $ct\n";
	$sph->Close();

	if($min_id){
		my $dbh=DBI->connect("$driver:$dbname:$host","$user","$passwd") or die DBI->errstr;
		my $sql="select id,author_id,publish_date,tags from posts where id >=$min_id";
		my $sth=$dbh->prepare($sql);
		my $rv=$sth->execute;
		my $attrs=['author_id','publish_date'];
		my $values={};

		while(my $hash_ref=$sth->fetchrow_hashref){
			$values->{$hash_ref->{'id'}}=[$hash_ref->{'author_id'},date($hash_ref->{'publish_date'})->epoch];
		}

		my $start_time=gettimeofday();
		my $num=$sph->UpdateAttributes('master',$attrs,$values);
		my $ct=gettimeofday() - $start_time;
		print "更改屬性耗時: $ct\n";

		if(defined($num)){
			print "更改屬性成功數量: $num\n\n\n";
		}else{
			print "!!!更改屬性失敗!!!\n\n\n";
		}
	}
}

使用MySQL UDFs來調用gearman分佈式任務分發系統,具體參見: http://www.ttlsa.com/html/1269.html c#

若是向數據庫插入大量的數據,瞬間將會添加輸入數據量的任務。 服務器

mysql> SELECT gman_servers_set("192.168.1.60:4731") AS gman_server;
+-------------------+
| gman_server |
+-------------------+
| 192.168.1.60:4731 |
+-------------------+
1 row in set (0.11 sec)

mysql> CREATE TRIGGER insert_posts after INSERT ON posts FOR each row SET @RETURN=gman_do_background('UPDAT_EATTRIBUTES','undef');

Query OK, 0 rows affected (0.08 sec)

//建立觸發器,當向表posts插入數據時候,添加任務。 app

五.測試

mysql> insert into posts values ('','xxx','xxxxxxx',1, CURRENT_TIMESTAMP(),"2,2");

gearman任務輸出內容以下:

查詢當前最小document ID(6)耗時: 0.00577688217163086

更改屬性耗時: 0.203788042068481

更改屬性成功數量: 1

# mysql -h127.0.0.1 -P4306
mysql> show tables;
+------------+-------------+
| Index | Type |
+------------+-------------+
| master | distributed |
| ttlsa_main | local |
+------------+-------------+

上面我提到過,使用觸發器來添加任務,若插入大量的數據的話,瞬間將添加插入的數據量的任務,屆時將阻塞以後的任務執行。下面的腳本是插入大量的數據,你們能夠測測。

任務數量能夠經過一下命名查看

# telnet 192.168.1.60 4731
status

# vi insert_data.php

<?php
/*
###################################
### author: www.ttlsa.com		###
### QQ羣: 39514058				###
### E-mail: service@ttlsa.com	###
###################################
*/

$host="192.168.1.25";
$user="root";
$passwd="";
$dbname="ttlsa_com";

$conn=mysql_connect($host,$user,$passwd) or die(mysql_error());
mysql_select_db($dbname) or die(mysql_error());

$i=1;
$count=1000;
$val=array();
while($i<=$count){
	$publish_date=date("Y-m-d G:i:s");
	$author_id=$i;
	$tags=$i.",".$i;
	$title=$i;
	$content=str_pad($i,10,0,STR_PAD_LEFT);
	$val[].="('{$publish_date}','{$author_id}','{$tags}','{$title}','{$content}')";
	$i++;
}

$st=microtime(true);
$sql=" insert into posts (`publish_date`,`author_id`,`tags`,`title`,`content`) values " . implode(",",$val);
#mysql_query($sql,$conn) or die(mysql_error());
$num=mysql_affected_rows($conn);
$ct=microtime(true) - $st;

print "$sql\n";
print "執行耗時: $ct\n";
print "插入行數: $num\n";
print "所耗內存: ".memory_get_usage(true)."kb\n";
print "-----------------------------------\n\n\n";

留下兩個問題共你們思索: 1. 若是是delete操做呢?索引又該如何操做? 2. 若是是update操做呢?索引又該如何操做?

如需轉載請註明出處:http://www.ttlsa.com/html/1440.html

相關文章
相關標籤/搜索