延遲隊列,顧名思義它是一種帶有延遲功能的消息隊列。 那麼,是在什麼場景下我才須要這樣的隊列呢?php
先看看一下業務場景:mysql
一般解決以上問題,最簡單直接的辦法就是定時去掃表。git
掃表存在的問題是:github
延時隊列能對於上述需求能很好的解決web
調研了市場上一些開源的方案,如下:redis
1.有贊科技:只有原理,沒有開源代碼sql
2.github我的的:github.com/ouqiang/del…數據庫
1.基於redis實現,redis只能配置一個,若是redis掛了整個服務不可用,可用性差點
2.消費端實現的是拉模式,接入成本大,每一個項目都得去實現一遍接入代碼
3.在star使用的人數很少,放在生產環境,存在風險,加之對go語言不瞭解,出了問題難以維護
複製代碼
3.SchedulerX-阿里開源的: 功能很強大,可是運維複雜,依賴組件多,不夠輕量性能優化
4.RabbitMQ-延時任務: 自己沒有延時功能,須要藉助一特性本身實現,並且公司沒有部署這個隊列,去單獨部署一個這個來作延時隊列成本有點高,並且還須要專門的運維來維護,目前團隊不支持bash
基本以上緣由打算本身寫一個,日常使用php多,項目基本redis的zset結構做爲存儲,用php語言實現 ,實現原理參考了有贊團隊:tech.youzan.com/queuing_del…
整體架構
採用master-work架構模式,主要包括6個模塊:
環境依賴:PHP 5.4+ 安裝sockets,redis,pcntl,pdo_mysql 拓展
create database dq;
#存放告警信息
CREATE TABLE `dq_alert` (
`id` int(11) NOT NULL AUTO_INCREMENT,
`host` varchar(255) NOT NULL DEFAULT '',
`port` int(11) NOT NULL DEFAULT '0',
`user` varchar(255) NOT NULL DEFAULT '',
`pwd` varchar(255) NOT NULL DEFAULT '',
`ext` varchar(2048) NOT NULL DEFAULT '',
PRIMARY KEY (`id`)
) ENGINE=InnoDB AUTO_INCREMENT=2 DEFAULT CHARSET=utf8;
#存放redis信息
CREATE TABLE `dq_redis` (
`id` int(11) NOT NULL AUTO_INCREMENT,
`t_name` varchar(200) NOT NULL DEFAULT '',
`t_content` varchar(2048) NOT NULL DEFAULT '',
PRIMARY KEY (`id`)
) ENGINE=InnoDB AUTO_INCREMENT=14 DEFAULT CHARSET=utf8;
#存儲註冊信息
CREATE TABLE `dq_topic` (
`id` int(11) NOT NULL AUTO_INCREMENT,
`t_name` varchar(1024) NOT NULL DEFAULT '',
`delay` int(11) NOT NULL DEFAULT '0',
`callback` varchar(1024) NOT NULL DEFAULT '',
`timeout` int(11) NOT NULL DEFAULT '3000',
`email` varchar(1024) NOT NULL DEFAULT '',
`topic` varchar(255) NOT NULL DEFAULT '',
`createor` varchar(1024) NOT NULL DEFAULT '',
`status` tinyint(4) NOT NULL DEFAULT '1',
`method` varchar(32) NOT NULL DEFAULT 'GET',
PRIMARY KEY (`id`)
) ENGINE=InnoDB AUTO_INCREMENT=6 DEFAULT CHARSET=utf8;
複製代碼
在DqConf.php文件中修改php了路徑 $logPath
命令:
php DqHttpServer.php --port 8088
訪問:http://127.0.0.1:8088,出現配置界面
php DqInit.php --port 6789 看到以下信息說明啓動成功
![]()
<?php
include_once 'DqLoader.php';
date_default_timezone_set("PRC");
//可配置多個
$server=array(
'127.0.0.1:6789',
);
$dqClient = new DqClient();
$dqClient->addServer($server);
$topic ='order_openvip_checker'; //topic在後臺註冊
$id = uniqid();
$data=array(
'id'=>$id,
'body'=>array(
'a'=>1,
'b'=>2,
'c'=>3,
'ext'=>str_repeat('a',64),
),
//可選,設置後以這個通知時間爲準,默認延時時間在註冊topic的時候指定
'fix_time'=>date('Y-m-d 23:50:50'),
);
//添加
$boolRet = $dqClient->add($topic, $data);
echo 'add耗時:'.(msectime() - $time)."ms\n";
//查詢
$time = msectime();
$result = $dqClient->get($topic, $id);
echo 'get耗時:'.(msectime() - $time)."ms\n";
//刪除
$time = msectime();
$boolRet = $dqClient->del($topic,$id);
echo 'del耗時:'.(msectime() - $time)."ms\n";
複製代碼
執行php test.php
默認日誌目錄在項目目錄的logs目錄下,在DqConf.php修改$logPath
ps -ef | grep dq-master| grep -v grep | head -n 1 | awk '{print $2}' | xargs kill -USR2
須要安裝pthreads拓展:
測試原理:使用多線程模擬併發,在1s內能成功返回請求成功的個數
php DqBench concurrency requests
concurrency:併發數
requests: 每一個併發產生的請求數
測試環境:內存 8G ,8核cpu,2個redis和1個dq-server 部署在一個機器上,數據包64字節
qps:2400
複製代碼
若是調用通知接口在超時時間內,沒有收到回覆認爲通知失敗,系統會從新把數據放入隊列,從新通知,系統默認最大通知10次(能夠在Dqconf.php文件中修改$notify_exp_nums)通知間隔爲2n+1,好比第一次1分鐘,通知失敗,第二次3分鐘後,直到收到回覆,超出最大通知次數後系統自動丟棄,同時發郵件通知
ps:網絡抖動在所不免,通知接口若是涉及到核心的服務,必定要保證冪等!!
線上部署了兩個實例每一個機房部一個,4個redis做存儲,服務穩定運行數月,各項指標均符合預期
主要接入業務:
項目地址: github.com/chenlinzhon…