消息隊列(五)RocketMQ-RPC通訊

「消息隊列的本質在於消息的發送、存儲和接收」。那麼,對於一款消息隊列來講,如何作到消息的高效發送與接收是重點和關鍵。api

1、RocketMQ中Remoting通訊模塊概覽數組

RocketMQ集羣的一部分通訊以下:緩存

  • (1)Broker啓動後須要完成一次將本身註冊至NameServer的操做;隨後每隔30s時間按期向NameServer上報Topic路由信息;
  • (2)消息生產者Producer做爲客戶端發送消息時候,須要根據Msg的Topic從本地緩存的TopicPublishInfoTable獲取路由信息。若是沒有則更新路由信息會從NameServer上從新拉取;
  • (3)消息生產者Producer根據(2)中獲取的路由信息選擇一個隊列(MessageQueue)進行消息發送;Broker做爲消息的接收者收消息並落盤存儲;

從上面(1)~(3)中能夠看出在消息生產者, Broker和NameServer之間都會發生通訊(這裏只說了MQ的部分通訊),所以如何設計一個良好的網絡通訊模塊在MQ中相當重要,它將決定RocketMQ集羣總體的消息傳輸能力與最終的性能。bash

rocketmq-remoting 模塊是 RocketMQ消息隊列中負責網絡通訊的模塊,它幾乎被其餘全部須要網絡通訊的模塊(諸如rocketmq-client、rocketmq-server、rocketmq-namesrv)所依賴和引用。爲了實現客戶端與服務器之間高效的數據請求與接收,RocketMQ消息隊列自定義了通訊協議並在Netty的基礎之上擴展了通訊模塊。服務器

ps:鑑於RocketMQ的通訊模塊是創建在Netty基礎之上的,所以在閱讀RocketMQ的源碼以前,讀者最好先對Netty的多線程模型、JAVA NIO模型均有必定的瞭解,這樣子理解RocketMQ源碼會較爲快一些。網絡

源碼部分主要能夠分爲rocketmq-broker,rocketmq-client,rocketmq-common,rocketmq-filterSrv,rocketmq-namesrv和rocketmq-remoting等模塊,通訊框架就封裝在rocketmq-remoting模塊中。 本文主要從RocketMQ的協議格式,消息編解碼,通訊方式(同步/異步/單向)、通訊流程和Remoting模塊的Netty多線程處理架構等方面介紹RocketMQ的通訊模塊。數據結構

2、RocketMQ中Remoting通訊模塊的具體實現多線程

從類層次結構來看:架構

(1)RemotingService:爲最上層的接口,提供了三個方法:框架

void start();
void shutdown();
void registerRPCHook(RPCHook rpcHook);
複製代碼

(2)RemotingClient/RemotingSever:兩個接口繼承了最上層接口—RemotingService,分別各自爲Client和Server提供所必需的方法,下面所列的是RemotingServer的方法:

/**
     * 同RemotingClient端同樣
     *
     * @param requestCode
     * @param processor
     * @param executor
     */
    void registerProcessor(final int requestCode, final NettyRequestProcessor processor,
        final ExecutorService executor);

    /**
     * 註冊默認的處理器
     *
     * @param processor
     * @param executor
     */
    void registerDefaultProcessor(final NettyRequestProcessor processor, final ExecutorService executor);

    int localListenPort();

    /**
     * 根據請求code來獲取不一樣的處理Pair
     *
     * @param requestCode
     * @return
     */
    Pair<NettyRequestProcessor, ExecutorService> getProcessorPair(final int requestCode);

    /**
     * 同RemotingClient端同樣,同步通訊,有返回RemotingCommand
     * @param channel
     * @param request
     * @param timeoutMillis
     * @return
     * @throws InterruptedException
     * @throws RemotingSendRequestException
     * @throws RemotingTimeoutException
     */
    RemotingCommand invokeSync(final Channel channel, final RemotingCommand request,
        final long timeoutMillis) throws InterruptedException, RemotingSendRequestException,
        RemotingTimeoutException;

    /**
     * 同RemotingClient端同樣,異步通訊,無返回RemotingCommand
     *
     * @param channel
     * @param request
     * @param timeoutMillis
     * @param invokeCallback
     * @throws InterruptedException
     * @throws RemotingTooMuchRequestException
     * @throws RemotingTimeoutException
     * @throws RemotingSendRequestException
     */
    void invokeAsync(final Channel channel, final RemotingCommand request, final long timeoutMillis,
        final InvokeCallback invokeCallback) throws InterruptedException,
        RemotingTooMuchRequestException, RemotingTimeoutException, RemotingSendRequestException;

    /**
     * 同RemotingClient端同樣,單向通訊,諸如心跳包
     *
     * @param channel
     * @param request
     * @param timeoutMillis
     * @throws InterruptedException
     * @throws RemotingTooMuchRequestException
     * @throws RemotingTimeoutException
     * @throws RemotingSendRequestException
     */
    void invokeOneway(final Channel channel, final RemotingCommand request, final long timeoutMillis)
        throws InterruptedException, RemotingTooMuchRequestException, RemotingTimeoutException,
        RemotingSendRequestException;
複製代碼

(3)NettyRemotingAbstract:Netty通訊處理的抽象類,定義並封裝了Netty處理的公共處理方法;

(4)NettyRemotingClient/NettyRemotingServer:分別實現了RemotingClient和RemotingServer, 都繼承了NettyRemotingAbstract抽象類。RocketMQ中其餘的組件(如client、nameServer、broker在進行消息的發送和接收時均使用這兩個組件)

二、消息的協議設計與編碼解碼

在Client和Server之間完成一次消息發送時,須要對發送的消息進行一個協議約定,所以就有必要自定義RocketMQ的消息協議。同時,爲了高效地在網絡中傳輸消息和對收到的消息讀取,就須要對消息進行編解碼。在RocketMQ中,RemotingCommand這個類在消息傳輸過程當中對全部數據內容的封裝,不但包含了全部的數據結構,還包含了編碼解碼操做。 RemotingCommand類的部分紅員變量以下:

這裏展現下Broker向NameServer發送一次心跳註冊的報文:

[
code=103,//這裏的103對應的code就是broker向nameserver註冊本身的消息
language=JAVA,
version=137,
opaque=58,//這個就是requestId
flag(B)=0,
remark=null,
extFields={
    brokerId=0,
    clusterName=DefaultCluster,
    brokerAddr=ip1: 10911,
    haServerAddr=ip1: 10912,
    brokerName=LAPTOP-SMF2CKDN
},
serializeTypeCurrentRPC=JSON
複製代碼

RocketMQ通訊協議:

傳輸內容主要能夠分爲如下4部分:

1)消息長度:總長度,四個字節存儲,佔用一個int類型;

2)序列化類型&消息頭長度:一樣佔用一個int類型,第一個字節表示序列化類型,後面三個字節表示消息頭長度;

3)消息頭數據:通過序列化後的消息頭數據;

4)消息主體數據:消息主體的二進制字節數據內容;

消息的編碼和解碼分別在RemotingCommand類的encode和decode方法中完成,下面是消息編碼encode方法的具體實現:

public ByteBuffer encode() {
    // 1> header length size
    int length = 4;    //消息總長度

    // 2> header data length
    //將消息頭編碼成byte[]
    byte[] headerData = this.headerEncode(); 
    //計算頭部長度 
    length += headerData.length;              

    // 3> body data length
    if (this.body != null) {
        //消息主體長度
        length += body.length;                
    }
    //分配ByteBuffer, 這邊加了4, 
    //這是由於在消息總長度的計算中沒有將存儲頭部長度的4個字節計算在內
    ByteBuffer result = ByteBuffer.allocate(4 + length);  

    // length
    //將消息總長度放入ByteBuffer
    result.putInt(length);   

    // header length
    //將消息頭長度放入ByteBuffer
    result.put(markProtocolType(headerData.length, serializeTypeCurrentRPC)); 

    // header data
    //將消息頭數據放入ByteBuffer
    result.put(headerData);    

    // body data;
    if (this.body != null) {
        //將消息主體放入ByteBuffer
        result.put(this.body); 
    }
    //重置ByteBuffer的position位置
    result.flip();     

    return result;
}

    /**
     * markProtocolType方法是將RPC類型和headerData長度編碼放到一個byte[4]數組中
     *
     * @param source
     * @param type
     * @return
     */
    public static byte[] markProtocolType(int source, SerializeType type) {
        byte[] result = new byte[4];

        result[0] = type.getCode();
        //右移16位後再和255與->「16-24位」
        result[1] = (byte) ((source >> 16) & 0xFF);
        //右移8位後再和255與->「8-16位」
        result[2] = (byte) ((source >> 8) & 0xFF);
        //右移0位後再和255與->「8-0位」
        result[3] = (byte) (source & 0xFF);
        return result;
    }
複製代碼

消息解碼decode方法是編碼的逆向過程,其具體實現以下:

public static RemotingCommand decode(final ByteBuffer byteBuffer) {
        //獲取byteBuffer的總長度
        int length = byteBuffer.limit();

        //獲取前4個字節,組裝int類型,該長度爲總長度
        int oriHeaderLen = byteBuffer.getInt();

        //獲取消息頭的長度,這裏和0xFFFFFF作與運算,編碼時候的長度即爲24位
        int headerLength = getHeaderLength(oriHeaderLen);

        byte[] headerData = new byte[headerLength];
        byteBuffer.get(headerData);

        RemotingCommand cmd = headerDecode(headerData, getProtocolType(oriHeaderLen));

        int bodyLength = length - 4 - headerLength;
        byte[] bodyData = null;
        if (bodyLength > 0) {
            bodyData = new byte[bodyLength];
            byteBuffer.get(bodyData);
        }
        cmd.body = bodyData;

        return cmd;
    }
複製代碼

三、消息的通訊方式和通訊流程

在RocketMQ消息隊列中支持通訊的方式主要有如下三種:

(1)同步(sync)

(2)異步(async)

(3)單向(oneway)

其中「同步」通訊模式相對簡單,通常用在發送心跳包場景下,無需關注其Response。本文將主要介紹RocketMQ的異步通訊流程(限於篇幅,讀者能夠按照一樣的模式進行分析同步通訊流程)。

下面兩小節內容主要介紹了Client端發送請求消息和Server端接收消息的具體實現,其中對於Client端的回調能夠參考RocketMQ的源碼來分析這裏就不作詳細介紹。

3.一、Client發送請求消息的具體實現

當客戶端調用異步通訊接口—invokeAsync時候,先由RemotingClient的實現類—NettyRemotingClient根據addr獲取相應的channel(若是本地緩存中沒有則建立),隨後調用invokeAsyncImpl方法,將數據流轉給抽象類NettyRemotingAbstract處理(真正作完發送請求動做的是在NettyRemotingAbstract抽象類的invokeAsyncImpl方法裏面)。具體發送請求消息的源代碼以下所示:

/**
     * invokeAsync(異步調用)
     * 
     * @param channel
     * @param request
     * @param timeoutMillis
     * @param invokeCallback
     * @throws InterruptedException
     * @throws RemotingTooMuchRequestException
     * @throws RemotingTimeoutException
     * @throws RemotingSendRequestException
     */
    public void invokeAsyncImpl(final Channel channel, final RemotingCommand request, final long timeoutMillis,
        final InvokeCallback invokeCallback)
        throws InterruptedException, RemotingTooMuchRequestException, RemotingTimeoutException, RemotingSendRequestException {
        //至關於request ID, RemotingCommand會爲每個request產生一個request ID, 從0開始, 每次加1

        final int opaque = request.getOpaque();
        boolean acquired = this.semaphoreAsync.tryAcquire(timeoutMillis, TimeUnit.MILLISECONDS);
        if (acquired) {
            final SemaphoreReleaseOnlyOnce once = new SemaphoreReleaseOnlyOnce(this.semaphoreAsync);
            //根據request ID構建ResponseFuture
            final ResponseFuture responseFuture = new ResponseFuture(opaque, timeoutMillis, invokeCallback, once);
            //將ResponseFuture放入responseTable
            this.responseTable.put(opaque, responseFuture);
            try {
                //使用Netty的channel發送請求數據
                channel.writeAndFlush(request).addListener(new ChannelFutureListener() {
                    //消息發送後執行
                    @Override
                    public void operationComplete(ChannelFuture f) throws Exception {
                        if (f.isSuccess()) {
                            //若是發送消息成功給Server,那麼這裏直接Set後return
                            responseFuture.setSendRequestOK(true);
                            return;
                        } else {
                            responseFuture.setSendRequestOK(false);
                        }

                        responseFuture.putResponse(null);
                        responseTable.remove(opaque);
                        try {
                            //執行回調
                            executeInvokeCallback(responseFuture);
                        } catch (Throwable e) {
                            log.warn("excute callback in writeAndFlush addListener, and callback throw", e);
                        } finally {
                            //釋放信號量
                            responseFuture.release();
                        }

                        log.warn("send a request command to channel <{}> failed.", RemotingHelper.parseChannelRemoteAddr(channel));
                    }
                });
            } catch (Exception e) {
                //異常處理
                responseFuture.release();
                log.warn("send a request command to channel <" + RemotingHelper.parseChannelRemoteAddr(channel) + "> Exception", e);
                throw new RemotingSendRequestException(RemotingHelper.parseChannelRemoteAddr(channel), e);
            }
        } else {
            if (timeoutMillis <= 0) {
                throw new RemotingTooMuchRequestException("invokeAsyncImpl invoke too fast");
            } else {
                String info =
                    String.format("invokeAsyncImpl tryAcquire semaphore timeout, %dms, waiting thread nums: %d semaphoreAsyncValue: %d",
                        timeoutMillis,
                        this.semaphoreAsync.getQueueLength(),
                        this.semaphoreAsync.availablePermits()
                    );
                log.warn(info);
                throw new RemotingTimeoutException(info);
            }
        }
    }
複製代碼

在Client端發送請求消息時有個比較重要的數據結構須要注意下:

(1)responseTable—保存請求碼與響應關聯映射

protected final ConcurrentHashMap<Integer /* opaque */, ResponseFuture> responseTable 
複製代碼

opaque表示請求發起方在同個鏈接上不一樣的請求標識代碼,每次發送一個消息的時候,能夠選擇同步阻塞/異步非阻塞的方式。不管是哪一種通訊方式,都會保存請求操做碼至ResponseFuture的Map映射—responseTable中。

(2)ResponseFuture—保存返回響應(包括回調執行方法和信號量)

public ResponseFuture(int opaque, long timeoutMillis, InvokeCallback invokeCallback,
        SemaphoreReleaseOnlyOnce once) {
        this.opaque = opaque;
        this.timeoutMillis = timeoutMillis;
        this.invokeCallback = invokeCallback;
        this.once = once;
    }
複製代碼

對於同步通訊來講,第3、四個參數爲null;而對於異步通訊來講,invokeCallback是在收到消息響應的時候可以根據responseTable找到請求碼對應的回調執行方法,semaphore參數用做流控,當多個線程同時往一個鏈接寫數據時能夠經過信號量控制permit同時寫許可的數量。

(3)異常發送流程處理—定時掃描responseTable本地緩存

在發送消息時候,若是遇到異常狀況(好比服務端沒有response返回給客戶端或者response因網絡而丟失),上面所述的responseTable的本地緩存Map將會出現堆積狀況。這個時候須要一個定時任務來專門作responseTable的清理回收。在RocketMQ的客戶端/服務端啓動時候會產生一個頻率爲1s調用一次來的定時任務檢查全部的responseTable緩存中的responseFuture變量,判斷是否已經獲得返回, 並進行相應的處理。

public void scanResponseTable() {
        final List<ResponseFuture> rfList = new LinkedList<ResponseFuture>();
        Iterator<Entry<Integer, ResponseFuture>> it = this.responseTable.entrySet().iterator();
        while (it.hasNext()) {
            Entry<Integer, ResponseFuture> next = it.next();
            ResponseFuture rep = next.getValue();

            if ((rep.getBeginTimestamp() + rep.getTimeoutMillis() + 1000) <= System.currentTimeMillis()) {
                rep.release();
                it.remove();
                rfList.add(rep);
                log.warn("remove timeout request, " + rep);
            }
        }

        for (ResponseFuture rf : rfList) {
            try {
                executeInvokeCallback(rf);
            } catch (Throwable e) {
                log.warn("scanResponseTable, operationComplete Exception", e);
            }
        }
    }
複製代碼

3.二、Server端接收消息並進行處理的具體實現

Server端接收消息的處理入口在NettyServerHandler類的channelRead0方法中,其中調用了processMessageReceived方法(這裏省略了Netty服務端消息流轉的大部分流程和邏輯)。其中服務端最爲重要的處理請求方法實現以下:

public void processRequestCommand(final ChannelHandlerContext ctx, final RemotingCommand cmd) {
    //根據RemotingCommand中的code獲取processor和ExecutorService
    final Pair<NettyRequestProcessor, ExecutorService> matched = this.processorTable.get(cmd.getCode());
    final Pair<NettyRequestProcessor, ExecutorService> pair = null == matched ? this.defaultRequestProcessor : matched;
    final int opaque = cmd.getOpaque();

    if (pair != null) {
        Runnable run = new Runnable() {
            @Override
            public void run() {
                try {
                    //rpc hook
                    RPCHook rpcHook = NettyRemotingAbstract.this.getRPCHook();
                    if (rpcHook != null) {
                        rpcHook.doBeforeRequest(RemotingHelper.parseChannelRemoteAddr(ctx.channel()), cmd);
                    }
                    //processor處理請求
                    final RemotingCommand response = pair.getObject1().processRequest(ctx, cmd);
                    //rpc hook
                    if (rpcHook != null) {
                        rpcHook.doAfterResponse(RemotingHelper.parseChannelRemoteAddr(ctx.channel()), cmd, response);
                    }

                    if (!cmd.isOnewayRPC()) {
                        if (response != null) {
                            response.setOpaque(opaque);
                            response.markResponseType();
                            try {
                                ctx.writeAndFlush(response);
                            } catch (Throwable e) {
                                PLOG.error("process request over, but response failed", e);
                                PLOG.error(cmd.toString());
                                PLOG.error(response.toString());
                            }
                        } else {

                        }
                    }
                } catch (Throwable e) {
                    if (!"com.aliyun.openservices.ons.api.impl.authority.exception.AuthenticationException"
                        .equals(e.getClass().getCanonicalName())) {
                        PLOG.error("process request exception", e);
                        PLOG.error(cmd.toString());
                    }

                    if (!cmd.isOnewayRPC()) {
                        final RemotingCommand response = RemotingCommand.createResponseCommand(RemotingSysResponseCode.SYSTEM_ERROR, //
                            RemotingHelper.exceptionSimpleDesc(e));
                        response.setOpaque(opaque);
                        ctx.writeAndFlush(response);
                    }
                }
            }
        };

        if (pair.getObject1().rejectRequest()) {
            final RemotingCommand response = RemotingCommand.createResponseCommand(RemotingSysResponseCode.SYSTEM_BUSY,
                "[REJECTREQUEST]system busy, start flow control for a while");
            response.setOpaque(opaque);
            ctx.writeAndFlush(response);
            return;
        }

        try {
            //封裝requestTask
            final RequestTask requestTask = new RequestTask(run, ctx.channel(), cmd);
            //想線程池提交requestTask
            pair.getObject2().submit(requestTask);
        } catch (RejectedExecutionException e) {
            if ((System.currentTimeMillis() % 10000) == 0) {
                PLOG.warn(RemotingHelper.parseChannelRemoteAddr(ctx.channel()) //
                    + ", too many requests and system thread pool busy, RejectedExecutionException " //
                    + pair.getObject2().toString() //
                    + " request code: " + cmd.getCode());
            }

            if (!cmd.isOnewayRPC()) {
                final RemotingCommand response = RemotingCommand.createResponseCommand(RemotingSysResponseCode.SYSTEM_BUSY,
                    "[OVERLOAD]system busy, start flow control for a while");
                response.setOpaque(opaque);
                ctx.writeAndFlush(response);
            }
        }
    } else {
        String error = " request type " + cmd.getCode() + " not supported";
        //構建response
        final RemotingCommand response =
            RemotingCommand.createResponseCommand(RemotingSysResponseCode.REQUEST_CODE_NOT_SUPPORTED, error);
        response.setOpaque(opaque);
        ctx.writeAndFlush(response);
        PLOG.error(RemotingHelper.parseChannelRemoteAddr(ctx.channel()) + error);
    }
}
複製代碼

上面的請求處理方法中根據RemotingCommand的請求業務碼來匹配到相應的業務處理器;而後生成一個新的線程提交至對應的業務線程池進行異步處理。

1)processorTable—請求業務碼與業務處理、業務線程池的映射變量

protected final HashMap<Integer/* request code */, Pair<NettyRequestProcessor, ExecutorService>> processorTable =
        new HashMap<Integer, Pair<NettyRequestProcessor, ExecutorService>>(64);
複製代碼

我想RocketMQ這種作法是爲了給不一樣類型的請求業務碼指定不一樣的處理器Processor處理,同時消息實際的處理並非在當前線程,而是被封裝成task放到業務處理器Processor對應的線程池中完成異步執行。(在RocketMQ中能看到不少地方都是這樣的處理,這樣的設計可以最大程度的保證異步,保證每一個線程都專一處理本身負責的東西)

3、總結

剛開始看RocketMQ源碼—RPC通訊模塊可能以爲略微有點複雜,可是隻要可以抓住Client端發送請求消息、Server端接收消息並處理的流程以及回調過程來分析和梳理,那麼總體來講並不複雜。RPC通訊部分也是RocketMQ源碼中最重要的部分之一,想要對其中的全過程和細節有更爲深入的理解,還須要多在本地環境Debug和分析對應的日誌。

note:在漫長的人生道路上,不管是暴風驟雨,或是激流險灘。最要緊的是人生的慾火不能熄滅。

相關文章
相關標籤/搜索