github: http://github.com/brewlin/im-...
im-cloud分佈式中間件分析(四)-logic節點實現
logic 節點 做爲生產者和client端,做爲業務節點,提供push推送resetapi接口,能夠擴容多個節點作nginx負載均衡
默認啓動10個消息隊列鏈接池,在task進程爲每一個人物建立協程異步生產任務
直接調用組件task接口進行投遞到task進程執行,task投遞爲非阻塞操做,執行完畢會直接返回,大大的提高了worker處理併發請求的能力,惟一的影響是,若是多個task進程的消費能力更不上worker的投遞速度也會影響worker的處理能力,因此須要作取捨nginx
use Task\Task; /** * @var LogicPush */ Task::deliver(LogicPush::class,"pushMids",[(int)$arg["op"],$arg["mids"],$arg["msg"]]);
App\Task
下單個請求流程執行的生命週期會調用生成多個對象,多達10多個。併發大的狀況下GC 幾乎會首先掛掉,並且會耗時等待,因此new對象也有優化的空間git
項目在初始化也就是主進程啓動期間就掃描相關代碼,有註解的就進行收集,而後實例化到容器container中,之後再屢次使用的時候直接複用代碼,而無需屢次new對象,大大節省空間和時間,以下圖爲建立一個協程去執行任務,相關的對象都從容器中獲取github
Co::create(function ()use($op,$mids,$msg){ /** @var RedisDao $servers */ $servers = \container()->get(RedisDao::class)->getKeysByMids($mids); $keys = []; foreach($servers as $key => $server){ $keys[$server][] = $key; } foreach($keys as $server => $key){ //丟到隊列裏去操作,讓job去處理 \container()->get(QueueDao::class)->pushMsg($op,$server,$key,$msg); } },true); //第二個參數爲true 表示使用Context::waitGroup() 等待任務執行完成
如上圖所示能夠調用組件提供的多個方法獲取容器對象json
container()->get(class)
bean(class)
即便將主要的耗時任務放到task進程中執行,worker進程中依然會有少許的等待時間,如今採起的方式,是請求到來時獲取數據後,直接回復結束當前鏈接,而後在繼續執行任務,這樣就不用等到投遞task任務後再結束當前鏈接,大大提升併發能力,雖然可能耗時性能沒有發生太大的改變,可是併發能力大大的提高。以下所示:segmentfault
/** * @return \Core\Http\Response\Response|static */ public function mids() { Context::get()->getResponse()->end(); $post = Context::get()->getRequest()->input(); if(empty($post["operation"]) || empty($post["mids"]) ||empty($post["msg"])){ return $this->error("缺乏參數"); } $arg = [ "op" => $post["operation"], "mids" => is_array($post["mids"])?$post["mids"]:[$post["mids"]], "msg" => $post["msg"] ]; Log::debug("push mids post data:".json_encode($arg)); /** * @var LogicPush */ Task::deliver(LogicPush::class,"pushMids",[(int)$arg["op"],$arg["mids"],$arg["msg"]]); }
Context::get()->getResponse()->end();
經過協程上下文獲取reponse對象直接結束當前鏈接,而後在繼續執行當前任務,並釋放內存