阿里開源分佈式事務組件 seata :seata server 通訊層解析

RPC ?

seata client 和 seata server 間是須要經過網絡通訊來傳遞信息的,client 發送請求消息給 server,server 根據實際的處理邏輯,可能會給 client 發送相應的響應消息,或者不響應任何消息。在 seata 中,客戶端和服務端的通訊實現,被抽象成來公共的模塊,它的 package 位於 io.seata.core.rpc 中。java

這個包名叫 rpc,這個包下的不少類名也有 rpc 相關的字眼,而實際上在我看來,這個通訊框架並非一個常規意義的 rpc 框架,若是硬要揪書本知識,那麼 rpc 的解釋以下:程序員

遠程過程調用(英語:Remote Procedure Call,縮寫爲 RPC)是一個計算機通訊協議。該協議容許運行於一臺計算機的程序調用另外一臺計算機的子程序,而程序員無需額外地爲這個交互做用編程。若是涉及的軟件採用面向對象編程,那麼遠程過程調用亦可稱做遠程調用或遠程方法調用。

在以 dubbo 爲表明的微服務時代下,dubbo 常規意義上咱們都稱之爲 rpc 框架,rpc 的理論原則是:程序員無需額外地爲這個交互做用編程。那麼對於像 dubbo 這樣的 rpc 實現,它能讓 client 像調用本地代碼 api 同樣,來調用遠程 server 上的某個 method。
在 client 這一層直接面向 interface 編程,經過動態代理的方式,對上層屏蔽掉通訊細節,在底層,將方法調用,經過序列化方式,封裝成一個二進制數據串發送給 server,server 層解析該消息,經過反射的方式,將 interface 對應的 implemention 執行起來,將執行結果,扁平化成一個二進制數據串,回送給 client,client 收到數據後,拼裝成 interface api 所定義的返回值類型的一個實例,做爲方法調用的返回值。整個底層的細節,應用層面並不須要瞭解,應用層只須要以 interface.method 的方式,就像代碼在本地執行同樣,就能把遠端 interface_implemention.method 給調用起來。
rpc編程

而 seata 的 rpc 框架上,實際上僅僅是一個普通的基於 netty 的網絡通訊框架,client 與 server 之間經過發送 request 和 response 來達到相互通訊的目的,在 seata 中的每一個 request 和 response 類,都實現瞭如何把本身序列化的邏輯。
各類消息類型,都實現了 io.seata.core.protocol.MessageCodec 接口bootstrap

public interface MessageCodec {
    /**
     * Gets type code.
     *
     * @return the type code
     */
    short getTypeCode();

    /**
     * Encode byte [ ].
     *
     * @return the byte [ ]
     */
    byte[] encode();

    /**
     * Decode boolean.
     *
     * @param in the in
     * @return the boolean
     */
    boolean decode(ByteBuf in);
}

MessageCodec

io.seata.core.protocol.GlobalBeginRequest 爲例,它都 decode 和 encode 實現以下所示:api

@Override
public byte[] encode() {
    ByteBuffer byteBuffer = ByteBuffer.allocate(256);
    byteBuffer.putInt(timeout);

    if (this.transactionName != null) {
        byte[] bs = transactionName.getBytes(UTF8);
        byteBuffer.putShort((short)bs.length);
        if (bs.length > 0) {
            byteBuffer.put(bs);
        }
    } else {
        byteBuffer.putShort((short)0);
    }

    byteBuffer.flip();
    byte[] content = new byte[byteBuffer.limit()];
    byteBuffer.get(content);
    return content;
}

@Override
public void decode(ByteBuffer byteBuffer) {
    this.timeout = byteBuffer.getInt();

    short len = byteBuffer.getShort();
    if (len > 0) {
        byte[] bs = new byte[len];
        byteBuffer.get(bs);
        this.setTransactionName(new String(bs, UTF8));
    }
}

這意味着,發送方先對 message 作 encode 動做造成字節數組,將字節數組發往接收方,接收方收到字節數組後,對字節數組先判斷 message type,再用對應的 message 類型對字節數組作 decode 動做。數組

類的組織形式

從 seata server 的入口類 io.seata.server.Server 分析,main 方法以下所示:微信

/**
 * The entry point of application.
 *
 * @param args the input arguments
 * @throws IOException the io exception
 */
public static void main(String[] args) throws IOException {
    RpcServer rpcServer = new RpcServer(WORKING_THREADS);

    int port = SERVER_DEFAULT_PORT;
    //server port
    if (args.length > 0) {
        try {
            port = Integer.parseInt(args[0]);
        } catch (NumberFormatException e) {
            System.err.println("Usage: sh services-server.sh $LISTEN_PORT $PATH_FOR_PERSISTENT_DATA");
            System.exit(0);
        }
    }
    rpcServer.setListenPort(port);

    //log store mode : file、db
    String storeMode = null;
    if (args.length > 1) {
        storeMode = args[1];
    }

    UUIDGenerator.init(1);
    SessionHolder.init(storeMode);

    DefaultCoordinator coordinator = new DefaultCoordinator(rpcServer);
    coordinator.init();
    rpcServer.setHandler(coordinator);
    // register ShutdownHook
    ShutdownHook.getInstance().addDisposable(coordinator);

    if (args.length > 2) {
        XID.setIpAddress(args[2]);
    } else {
        XID.setIpAddress(NetUtil.getLocalIp());
    }
    XID.setPort(rpcServer.getListenPort());

    rpcServer.init();

    System.exit(0);
}

能夠看到 seata server 使用一個 RpcServer 類來啓動它的服務監聽端口,這個端口用來接收 seata client 的消息,RpcServer 這個類是通訊層的實現分析的入口。
在這裏,SessionHolder 用來作全局事務樹的管理,DefaultCoordinator 用來處理事務執行邏輯,而 RpcServer 是這二者能夠正常運行的基礎,這篇文章的重點在於剖析 RpcServer 的實現,進而延伸到 seata 整個通訊框架的細節。
若是先從 RpcServer 的類繼承圖看的話,那麼咱們能發現一些與常規思惟不太同樣的地方,類繼承圖以下:網絡

繼承體系

褐色部分是 netty 的類,灰色部分是 seata 的類。
在通常常規的思惟中,依賴 netty 作一個 server,大體的思路是:app

  1. 定義一個 xxx server 類
  2. 在這個類中設置初始化 netty bootstrap,eventloop,以及設置相應的 ChannelHandler

在這種思惟下,很容易想到,server 與 ChannelHandler 之間的關係應該是一個「組合」的關係,即在咱們構建 server 的過程當中,應該把 ChannelHandler 看成參數傳遞給 server,成爲 server 類的成員變量。
沒錯,這是咱們通常狀況下的思惟。不過 seata 在這方面卻不那麼「常規」,從上面的類繼承圖中能夠看到,從 RpcServer 這個類開始向上追溯,發現它實際上是 ChannelDuplexHandler 的一個子類或者實例。這種邏輯讓人一時很困惑,一個問題在我腦海裏浮現:「當我啓動一個 RpcServer 的時候,我是真的在啓動一個 server 嗎?看起來我好像在啓動一個 ChannelHandler,但是 ChannelHandler 怎麼談得上‘啓動’呢?」框架

異步轉同步的 Future 機制

首先分析 AbstractRpcRemoting 這個類,它直接繼承自 ChannelDuplexHandler 類,而 ChannelDuplexHandler 是 netty 中 inbound handler 和 outbound handler 的結合體。
AbstractRpcRemoting 的 init 方法裏,僅僅經過 Java 中的定時任務執行線程池啓動了一個定時執行的任務:

/**
 * Init.
 */
public void init() {
    timerExecutor.scheduleAtFixedRate(new Runnable() {
        @Override
        public void run() {
            List<MessageFuture> timeoutMessageFutures = new ArrayList<MessageFuture>(futures.size());

            for (MessageFuture future : futures.values()) {
                if (future.isTimeout()) {
                    timeoutMessageFutures.add(future);
                }
            }

            for (MessageFuture messageFuture : timeoutMessageFutures) {
                futures.remove(messageFuture.getRequestMessage().getId());
                messageFuture.setResultMessage(null);
                if (LOGGER.isDebugEnabled()) {
                    LOGGER.debug("timeout clear future : " + messageFuture.getRequestMessage().getBody());
                }
            }
            nowMills = System.currentTimeMillis();
        }
    }, TIMEOUT_CHECK_INTERNAL, TIMEOUT_CHECK_INTERNAL, TimeUnit.MILLISECONDS);
}

這個定時任務的邏輯也比較簡單:掃描 ConcurrentHashMap<Long, MessageFuture> futures 這個成員變量裏的 MessageFuture,若是這個 Future 超時了,就將 Future 的結果設置爲 null。邏輯雖然簡單,但這個功能涉及到了異步通訊裏一個很常見的功能,即異步轉同步的功能。
在 netty 這種基於 NIO 的通訊方式中,數據的發送,接收,所有是非阻塞的,所以判斷一個動做完成與否,並不能像傳統的 Java 同步代碼同樣,代碼執行完了就認爲相應的動做也真正完成了,例如,在 netty 中,若是經過 channel.write(); 方法往對端發送一個數據,這個方法執行完了,並不表明數據發送出去了,channel.write() 方法會返回一個 future,應用代碼應該利用這個 future ,經過這個 future 能夠知道數據到底發送出去了沒有,也能夠爲這個 future 添加動做完成後的回調邏輯,也能夠阻塞等待這個 future 所關聯的動做執行完畢。
在 seata 中,存在着發送一個請求,並等待相應這樣的使用場景,上層的 api 多是這麼定義的:
public Response request(Request request) {}
而基於 nio 的底層數據發送邏輯倒是這樣的:

1. send request message
2. 爲業務的請求構建一個業務層面的 future 實例
3. 阻塞等待在這個 future 上
4. 當收到對應的 response message 後,喚醒上面的 future,阻塞等待在這個 future 上的線程繼續執行
5. 拿到結果,request 方法結束

AbstractRpcRemoting 定義了幾個數據發送相關的方法,分別是:

/**
 * Send async request with response object.
 *
 * @param address the address
 * @param channel the channel
 * @param msg     the msg
 * @return the object
 * @throws TimeoutException the timeout exception
 */
protected Object sendAsyncRequestWithResponse(String address, Channel channel, Object msg) throws TimeoutException;

/**
 * Send async request with response object.
 *
 * @param address the address
 * @param channel the channel
 * @param msg     the msg
 * @param timeout the timeout
 * @return the object
 * @throws TimeoutException the timeout exception
 */
protected Object sendAsyncRequestWithResponse(String address, Channel channel, Object msg, long timeout) throws
    TimeoutException;

/**
 * Send async request without response object.
 *
 * @param address the address
 * @param channel the channel
 * @param msg     the msg
 * @return the object
 * @throws TimeoutException the timeout exception
 */
protected Object sendAsyncRequestWithoutResponse(String address, Channel channel, Object msg) throws
    TimeoutException;

這幾個方法就符合上面說到的發送一個請求,並等待相應這樣的使用場景,上面這三個方法,其實都委託給了 sendAsyncRequest 來實現,這個方法的代碼是這樣子的:

private Object sendAsyncRequest(String address, Channel channel, Object msg, long timeout)
    throws TimeoutException {
    if (channel == null) {
        LOGGER.warn("sendAsyncRequestWithResponse nothing, caused by null channel.");
        return null;
    }
    final RpcMessage rpcMessage = new RpcMessage();
    rpcMessage.setId(RpcMessage.getNextMessageId());
    rpcMessage.setAsync(false);
    rpcMessage.setHeartbeat(false);
    rpcMessage.setRequest(true);
    rpcMessage.setBody(msg);

    final MessageFuture messageFuture = new MessageFuture();
    messageFuture.setRequestMessage(rpcMessage);
    messageFuture.setTimeout(timeout);
    futures.put(rpcMessage.getId(), messageFuture);

    if (address != null) {
        ConcurrentHashMap<String, BlockingQueue<RpcMessage>> map = basketMap;
        BlockingQueue<RpcMessage> basket = map.get(address);
        if (basket == null) {
            map.putIfAbsent(address, new LinkedBlockingQueue<RpcMessage>());
            basket = map.get(address);
        }
        basket.offer(rpcMessage);
        if (LOGGER.isDebugEnabled()) {
            LOGGER.debug("offer message: " + rpcMessage.getBody());
        }
        if (!isSending) {
            synchronized (mergeLock) {
                mergeLock.notifyAll();
            }
        }
    } else {
        ChannelFuture future;
        channelWriteableCheck(channel, msg);
        future = channel.writeAndFlush(rpcMessage);
        future.addListener(new ChannelFutureListener() {
            @Override
            public void operationComplete(ChannelFuture future) {
                if (!future.isSuccess()) {
                    MessageFuture messageFuture = futures.remove(rpcMessage.getId());
                    if (messageFuture != null) {
                        messageFuture.setResultMessage(future.cause());
                    }
                    destroyChannel(future.channel());
                }
            }
        });
    }
    if (timeout > 0) {
        try {
            return messageFuture.get(timeout, TimeUnit.MILLISECONDS);
        } catch (Exception exx) {
            LOGGER.error("wait response error:" + exx.getMessage() + ",ip:" + address + ",request:" + msg);
            if (exx instanceof TimeoutException) {
                throw (TimeoutException)exx;
            } else {
                throw new RuntimeException(exx);
            }
        }
    } else {
        return null;
    }
}

先拋開方法的其它細節,好比說同步寫仍是異步寫,以及發送頻率控制。咱們能夠發現,這個方法其實從大角度來劃分,就是以下的步驟:

  1. 構造請求 message
  2. 爲這個請求 message 構造一個 message future
  3. 發送數據
  4. 阻塞等待在 message future

不過 AbstractRpcRemoting 也定義了方法用於僅發送消息,不接收響應的使用場景,以下所示:

/**
 * Send request.
 *
 * @param channel the channel
 * @param msg     the msg
 */
protected void sendRequest(Channel channel, Object msg) {
    RpcMessage rpcMessage = new RpcMessage();
    rpcMessage.setAsync(true);
    rpcMessage.setHeartbeat(msg instanceof HeartbeatMessage);
    rpcMessage.setRequest(true);
    rpcMessage.setBody(msg);
    rpcMessage.setId(RpcMessage.getNextMessageId());
    if (msg instanceof MergeMessage) {
        mergeMsgMap.put(rpcMessage.getId(), (MergeMessage)msg);
    }
    channelWriteableCheck(channel, msg);
    if (LOGGER.isDebugEnabled()) {
        LOGGER.debug("write message:" + rpcMessage.getBody() + ", channel:" + channel + ",active?"
            + channel.isActive() + ",writable?" + channel.isWritable() + ",isopen?" + channel.isOpen());
    }
    channel.writeAndFlush(rpcMessage);
}

/**
 * Send response.
 *
 * @param msgId   the msg id
 * @param channel the channel
 * @param msg     the msg
 */
protected void sendResponse(long msgId, Channel channel, Object msg) {
    RpcMessage rpcMessage = new RpcMessage();
    rpcMessage.setAsync(true);
    rpcMessage.setHeartbeat(msg instanceof HeartbeatMessage);
    rpcMessage.setRequest(false);
    rpcMessage.setBody(msg);
    rpcMessage.setId(msgId);
    channelWriteableCheck(channel, msg);
    if (LOGGER.isDebugEnabled()) {
        LOGGER.debug("send response:" + rpcMessage.getBody() + ",channel:" + channel);
    }
    channel.writeAndFlush(rpcMessage);
}

這樣的場景就不須要引入 future 機制,直接調用 netty 的 api 把數據發送出去就完事了。
分析思路回到有 future 的場景,發送數據後,要在 future 上進行阻塞等待,即調用 get 方法,那 get 方法什麼返回呢,咱們上面說到 future 被喚醒的時候,咱們先不討論 future 的實現細節,一個 future 何時被喚醒呢,在這種 請求-響應 的模式下,顯然是收到了響應的時候。因此咱們須要查看一下 AbstractRpcRemoting 的 channelRead 方法

@Override
public void channelRead(final ChannelHandlerContext ctx, Object msg) throws Exception {
    if (msg instanceof RpcMessage) {
        final RpcMessage rpcMessage = (RpcMessage)msg;
        if (rpcMessage.isRequest()) {
            if (LOGGER.isDebugEnabled()) {
                LOGGER.debug(String.format("%s msgId:%s, body:%s", this, rpcMessage.getId(), rpcMessage.getBody()));
            }
            try {
                AbstractRpcRemoting.this.messageExecutor.execute(new Runnable() {
                    @Override
                    public void run() {
                        try {
                            dispatch(rpcMessage.getId(), ctx, rpcMessage.getBody());
                        } catch (Throwable th) {
                            LOGGER.error(FrameworkErrorCode.NetDispatch.getErrCode(), th.getMessage(), th);
                        }
                    }
                });
            } catch (RejectedExecutionException e) {
                LOGGER.error(FrameworkErrorCode.ThreadPoolFull.getErrCode(),
                    "thread pool is full, current max pool size is " + messageExecutor.getActiveCount());
                if (allowDumpStack) {
                    String name = ManagementFactory.getRuntimeMXBean().getName();
                    String pid = name.split("@")[0];
                    int idx = new Random().nextInt(100);
                    try {
                        Runtime.getRuntime().exec("jstack " + pid + " >d:/" + idx + ".log");
                    } catch (IOException exx) {
                        LOGGER.error(exx.getMessage());
                    }
                    allowDumpStack = false;
                }
            }
        } else {
            MessageFuture messageFuture = futures.remove(rpcMessage.getId());
            if (LOGGER.isDebugEnabled()) {
                LOGGER.debug(String
                    .format("%s msgId:%s, future :%s, body:%s", this, rpcMessage.getId(), messageFuture,
                        rpcMessage.getBody()));
            }
            if (messageFuture != null) {
                messageFuture.setResultMessage(rpcMessage.getBody());
            } else {
                try {
                    AbstractRpcRemoting.this.messageExecutor.execute(new Runnable() {
                        @Override
                        public void run() {
                            try {
                                dispatch(rpcMessage.getId(), ctx, rpcMessage.getBody());
                            } catch (Throwable th) {
                                LOGGER.error(FrameworkErrorCode.NetDispatch.getErrCode(), th.getMessage(), th);
                            }
                        }
                    });
                } catch (RejectedExecutionException e) {
                    LOGGER.error(FrameworkErrorCode.ThreadPoolFull.getErrCode(),
                        "thread pool is full, current max pool size is " + messageExecutor.getActiveCount());
                }
            }
        }
    }
}

能夠看到調用了 messageFuture 當 setResultMessage() 方法,設置 future 的結果,也就是說,喚醒了 future,那麼阻塞在 future 的 get 方法上的線程就被喚醒了,獲得結果,繼續往下執行。

接下來咱們討論 MessageFuture 的實現細節,其實 seata 裏面有不少種 future 相關的類,實現方式也不太同樣,不過都大同小異,有的是基於 CompletableFuture 實現,有的是基於 CountDownLatch 實現。好比說,MessageFuture 就是基於 CompletableFuture 實現的,先看看它的成員變量:

private RpcMessage requestMessage;
private long timeout;
private long start = System.currentTimeMillis();
private transient CompletableFuture origin = new CompletableFuture();

CompletableFuture 是它的一個成員變量,它被利用來阻塞當前線程。MessageFuture 的 get 方法,依賴於 CompletableFuture 的 get 方法,來實現有必定時間限制的等待,直到另外一個線程喚醒 CompletableFuture。以下所示:

/**
 * Get object.
 *
 * @param timeout the timeout
 * @param unit    the unit
 * @return the object
 * @throws TimeoutException the timeout exception
 * @throws InterruptedException the interrupted exception
 */
public Object get(long timeout, TimeUnit unit) throws TimeoutException,
    InterruptedException {
    Object result = null;
    try {
        result = origin.get(timeout, unit);
    } catch (ExecutionException e) {
        throw new ShouldNeverHappenException("Should not get results in a multi-threaded environment", e);
    } catch (TimeoutException e) {
        throw new TimeoutException("cost " + (System.currentTimeMillis() - start) + " ms");
    }

    if (result instanceof RuntimeException) {
        throw (RuntimeException)result;
    } else if (result instanceof Throwable) {
        throw new RuntimeException((Throwable)result);
    }

    return result;
}

/**
 * Sets result message.
 *
 * @param obj the obj
 */
public void setResultMessage(Object obj) {
    origin.complete(obj);
}

既然說到了 future 機制,這裏也順便把 io.seata.config.ConfigFuture 提一下,它就是上面提到的基於 CountDownLatch 實現的一種 future 機制,雖然實現方式二者不同,但完成的功能和做用是同樣的。

private final CountDownLatch latch = new CountDownLatch(1);

/**
 * Get object.
 *
 * @param timeout the timeout
 * @param unit    the unit
 * @return the object
 * @throws InterruptedException the interrupted exception
 */
public Object get(long timeout, TimeUnit unit) {
    this.timeoutMills = unit.toMillis(timeout);
    try {
        boolean success = latch.await(timeout, unit);
        if (!success) {
            LOGGER.error(
                "config operation timeout,cost:" + (System.currentTimeMillis() - start) + " ms,op:" + operation
                    .name()
                    + ",dataId:" + dataId);
            return getFailResult();
        }
    } catch (InterruptedException exx) {
        LOGGER.error("config operate interrupted,error:" + exx.getMessage());
        return getFailResult();
    }
    if (operation == ConfigOperation.GET) {
        return result == null ? content : result;
    } else {
        return result == null ? Boolean.FALSE : result;
    }
}

/**
 * Sets result.
 *
 * @param result the result
 */
public void setResult(Object result) {
    this.result = result;
    latch.countDown();
}

阻塞操做調用了 CountDownLatch 的 await 方法,而喚醒操做則調用 countDown 方法,核心在於須要把 CountDownLatch 的 latch 值設置爲 1。
實際上,Java 語言自己已經提供了 java.util.concurrent.Future 這個類來提供 Future 機制,但 Java 原生的 Future 機制功能過於單一,好比說不能主動設置 future 的結果,也不能爲它添加 listener,全部有許多像 seata 這樣的軟件,會選擇去從新實現一種 future 機制來知足異步轉同步的需求。也有像 netty 這樣的軟件,它不會藉助相似於 countdownlatch 來實現,而是直接擴展 java.util.concurrent.Future,在它的基礎上添加功能。

防洪機制

在 AbstractRpcRemoting 中,往外發數據的時候,它都會先進行一個檢查,即檢查當前的 channel 是否可寫。

private void channelWriteableCheck(Channel channel, Object msg) {
    int tryTimes = 0;
    synchronized (lock) {
        while (!channel.isWritable()) {
            try {
                tryTimes++;
                if (tryTimes > NettyClientConfig.getMaxNotWriteableRetry()) {
                    destroyChannel(channel);
                    throw new FrameworkException("msg:" + ((msg == null) ? "null" : msg.toString()),
                        FrameworkErrorCode.ChannelIsNotWritable);
                }
                lock.wait(NOT_WRITEABLE_CHECK_MILLS);
            } catch (InterruptedException exx) {
                LOGGER.error(exx.getMessage());
            }
        }
    }
}

這要從 netty 的內部機制提及,當調用 ChannelHandlerContext 或者 Channel 的 write 方法時,netty 只是把要寫的數據放入了自身的一個環形隊列裏面,再由後臺線程真正往鏈路上發。若是接受方的處理速度慢,也就是說,接收的速度慢,那麼根據 tcpip 協議的滑動窗口機制,它也會致使發送方發送得慢。
咱們能夠把 netty 的環形隊列想像成一個水池,調用 write 方法往池子里加水,netty 經過後臺線程,慢慢把池子的水流走。這就有可能出現一種狀況,因爲池子水流走的速度遠遠慢於往池子里加水的速度,這樣會致使池子的總水量隨着時間的推移愈來愈多。因此往池子里加水時應該考慮當前池子裏的水量,不然最終會致使應用的內存溢出。
netty 對於水池提供了兩個設置,一個是高水位,一個是低水位,當池子裏的水高於高水位時,這個時候 channel.isWritable() 返回 false,而且直到水位慢慢降回到低水位時,這個方法纔會返回 true。

高低水位

上述的 channelWriteableCheck 方法,發現channel 不可寫的時候,進入循環等待,等待的目的是讓池子的水位降低到 low water mark,若是等待超過最大容許等待的時間,那麼將會拋出異常並關閉鏈接。

消息隊列

在 AbstractRpcRemoting 中,發送數據有兩種方式,一種是直接調用 channel 往外寫,另外一種是先把數據放進「數據籃子」裏,它其實是一個 map, key 爲遠端地址,value爲一個消息隊列。數據放隊列後,再由其它線程往外發。下面是 sendAsycRequest 方法的一部分代碼,顯示了這種機制:

ConcurrentHashMap<String, BlockingQueue<RpcMessage>> map = basketMap;
BlockingQueue<RpcMessage> basket = map.get(address);
if (basket == null) {
    map.putIfAbsent(address, new LinkedBlockingQueue<RpcMessage>());
    basket = map.get(address);
}
basket.offer(rpcMessage);
if (LOGGER.isDebugEnabled()) {
    LOGGER.debug("offer message: " + rpcMessage.getBody());
}
if (!isSending) {
    synchronized (mergeLock) {
        mergeLock.notifyAll();
    }
}

但咱們在 AbstractRpcRemoting 裏面沒有看有任何額外的線程在晴空這個 basketMap。回顧一下上面的 RpcServer 的類繼承體系,接下來咱們要分析一下,AbstractRpcRemotingServer 這個類。

繼承體系

AbstractRpcRemotingServer 這個類主要定義了於netty 啓動一個 server bootstrap 相關的類,可見真正啓動服務監聽端口的是在這個類,先看一下它的start方法

@Override
public void start() {
    this.serverBootstrap.group(this.eventLoopGroupBoss, this.eventLoopGroupWorker)
        .channel(nettyServerConfig.SERVER_CHANNEL_CLAZZ)
        .option(ChannelOption.SO_BACKLOG, nettyServerConfig.getSoBackLogSize())
        .option(ChannelOption.SO_REUSEADDR, true)
        .childOption(ChannelOption.SO_KEEPALIVE, true)
        .childOption(ChannelOption.TCP_NODELAY, true)
        .childOption(ChannelOption.SO_SNDBUF, nettyServerConfig.getServerSocketSendBufSize())
        .childOption(ChannelOption.SO_RCVBUF, nettyServerConfig.getServerSocketResvBufSize())
        .childOption(ChannelOption.WRITE_BUFFER_WATER_MARK,
            new WriteBufferWaterMark(nettyServerConfig.getWriteBufferLowWaterMark(),
                nettyServerConfig.getWriteBufferHighWaterMark()))
        .localAddress(new InetSocketAddress(listenPort))
        .childHandler(new ChannelInitializer<SocketChannel>() {
            @Override
            public void initChannel(SocketChannel ch) {
                ch.pipeline().addLast(new IdleStateHandler(nettyServerConfig.getChannelMaxReadIdleSeconds(), 0, 0))
                    .addLast(new MessageCodecHandler());
                if (null != channelHandlers) {
                    addChannelPipelineLast(ch, channelHandlers);
                }

            }
        });

    if (nettyServerConfig.isEnableServerPooledByteBufAllocator()) {
        this.serverBootstrap.childOption(ChannelOption.ALLOCATOR, NettyServerConfig.DIRECT_BYTE_BUF_ALLOCATOR);
    }

    try {
        ChannelFuture future = this.serverBootstrap.bind(listenPort).sync();
        LOGGER.info("Server started ... ");
        RegistryFactory.getInstance().register(new InetSocketAddress(XID.getIpAddress(), XID.getPort()));
        initialized.set(true);
        future.channel().closeFuture().sync();
    } catch (Exception exx) {
        throw new RuntimeException(exx);
    }
}

這個類很常規,就是遵循 netty 的使用規範,用合適的配置啓動一個 server,並調用註冊中心 api 把本身做爲一個服務發佈出去。
咱們能夠看到,配置中確實也出現了咱們上文中提到過的上下水位的配置。
另外,channelpipeline 中,除了添加一個保持鏈路有效性探測的 IdleStateHandler,和一個 MessageCodec,處理事務邏輯相關的 Handler 還須要由參數傳入。
接下來咱們看 RpcServer 這個類,從它的 init 方法裏,咱們能夠看到,它把本身作爲一個 ChannelHandler,加入到了 channel pipeline 中

/**
 * Init.
 */
@Override
public void init() {
    super.init();
    setChannelHandlers(RpcServer.this);
    DefaultServerMessageListenerImpl defaultServerMessageListenerImpl = new DefaultServerMessageListenerImpl(
        transactionMessageHandler);
    defaultServerMessageListenerImpl.init();
    defaultServerMessageListenerImpl.setServerMessageSender(this);
    this.setServerMessageListener(defaultServerMessageListenerImpl);
    super.start();

}

RpcServer 自身也實現了 channelRead 方法,但它只處理心跳相關的信息和註冊相關的信息,其它的業務消息,它交給父類處理,而先前咱們也已經看到,父類的channelRead
方法裏,反過來會調用 dispatch 這個抽象方法去作消息的分發,而 RpcServer 類實現了這個抽象方法,在接收到不一樣的消息類型是,採起不一樣的處理流程。
關於事務的處理流程的細節,本篇文章暫不涉及,後續文章再慢慢分析。

行文至此,回想咱們先前提到的一個疑惑:
「當我啓動一個 RpcServer 的時候,我是真的在啓動一個 server 嗎?看起來我好像在啓動一個 ChannelHandler,但是 ChannelHandler 怎麼談得上‘啓動’呢?」
是的,咱們既在啓動一個 server,這個 server 也實現了事務處理邏輯,它同時也是個 ChannelHandler。
沒有必定的事實標準去衡量這樣寫的代碼是好是壞,咱們也不必去爭論 Effective Java 提到的何時該用組合,何時該用繼承。
本文到此結束。
掃一掃關注個人微信公衆號

相關文章
相關標籤/搜索