im-cloud分佈式中間件分析(三)-job節點實現

github: http://github.com/brewlin/im-...

1.概述

job 節點 做爲消費端,消費logic生產的數據,而後經過grpc推送至cloud節點,cloud點真正處理客戶端數據,job節點默認多進程消費啓動4個worker進程,以及默認10個grpc鏈接池
  • 數據流程圖

圖片描述

2.@Consumer 消費中心

默認啓動4個worker進程消費logic請求,耗時處理投放至task進程處理,並轉發至cloud節點

監聽worker啓動事件

須要在config/queue.php ,config/event.php 註冊相應的事件和相關配置php

use App\Consumer\Consumer;
use Core\App;
use Core\Swoole\WorkerStartInterface;
use Swoole\Server as SwooleServer;

class WorkerStartListener implements WorkerStartInterface
{
    const INIT_LOGIC = 1;

    public function onWorkerStart(SwooleServer $server, int $workerId): void
    {
        if(App::isWorkerStatus()){
            //啓動的n個 worker進程 分別做爲消費者進程消費,每一個進程會直接阻塞直到消費到數據
            consumer()->consume(new Consumer());
        }
    }

}

消費主流程

  • 1.爲每一個消費數據請求創建一個協程,處理相關數據
  • 2.將每一個數據投遞至worker進程進行真正的grpc與cloud推送請求
Co::create(function()use($data){
    if(empty(CloudClient::$serviceList)){
        Log::error("cancle task deliver discovery cloud node is empty");
        return;
    }
    Task::deliver(Job::class,"push",[CloudClient::$serviceList,$data]);
},false);
return Result::ACK;
  • 3.經過以上作法能加快併發是消費速度,task進程也進行協程處理,增長並行處理能力,若是task進程阻塞也會形成task任務投遞阻塞,因此在worker進程也須要加一個協程處理
相關文章
相關標籤/搜索