消息中間件 二 之AMQP實戰

實例分析

前面咱們看了AMQP說明文檔, 對AMQP有了大致的瞭解, 本文從實例出發再過一遍AMQP的基本操做.css

準備

環境
RabbitMQ server 3.7.16
RabbitMQ client 5.7.3java

客戶端代碼使用的是RabbitMQ官網教程, 以下:網絡

public static void main(String[] args) throws IOException, TimeoutException {

        ConnectionFactory factory = new ConnectionFactory();
        factory.setHost("localhost");
        try (Connection connection = factory.newConnection();
             Channel channel = connection.createChannel();) {

            boolean durable = true;
            channel.queueDeclare(QUEUE_NAME, durable, false, false, null);
            String message = String.join(" ", "dMessage.......");
            channel.exchangeDeclare("mind", "direct");
            channel.basicPublish("mind", "", MessageProperties.PERSISTENT_TEXT_PLAIN, message.getBytes());

            System.out.println(" [x] Sent '" + message + "'");
        }
    }

下面是用wireshark抓包結果併發

imageapp

咱們後面以編碼No來具體分析less

抓包分析

imagesocket

1-6是tcp建立鏈接的三次握手步驟, 這裏不作過多分析tcp

imageoop

7-24是amqp建立鏈接的過程, 咱們能夠對照前面的博客中的說明文檔來分析這裏. 每次一端向另外一端發送信息, 另外一端在接收到後都會發送一個ack表示接收到了.學習

image

1 在tcp鏈接建立後, 客戶端會向服務端發送協議版本信息, 這裏是amqp的0.9.1版本, 服務端會校驗版本是否接受, 若是不符合要求會返回錯誤信息, 這裏只有正確信息, 後面咱們本身實現客戶端的時候能夠實現一個錯誤例子.

image

2 服務端校驗協議經過後, 會向客戶端發送建立鏈接請求Connection.Start, 客戶端在準備好後會返回一個Connection.Start-Ok. 接着服務端發送Connection.Tune與客戶端進行參數調試, 調試的內容有Channel最大數量, Frame最大長度等. 客戶端在調試後發送Connection.Tune-OK. 這個階段就是對鏈接的參數調試.

image

3 參數調試以後, 客戶端請求服務端打開鏈接Connection.Open, 服務端打開以後會返回Connection.Open-Ok. Connection打開成功後, 客戶端請求打開通道Channel.Open, 服務端打開以後返回Channel.Open-Ok. 至此鏈接建立成功.

image

4 鏈接建立成功以後, 客戶端進行隊列和exchange的聲明, Queue.Declare -> Queue.Declare-Ok, Exchange.Declare -> Exchange.Declare-Ok.

image

5 有了Exchange後, 客戶端向Exchange發送信息, 咱們能夠看到發送的Exchange, 和發送的內容

image

image

6 發送內容結束後, 客戶端關閉, 先關閉通道Channel, 而後關閉Connection.

image

7 最後是tcp關閉鏈接

代碼分析

下面咱們從代碼層面分析這個過程, 下面是一個整體的時序圖, 你們能夠參考

image

下面咱們仍是按照抓包中看到的順序來分析代碼

建立tcp鏈接

代碼很簡單

ConnectionFactory factory = new ConnectionFactory();
factory.setHost("localhost");
Connection connection = factory.newConnection();

咱們重點看factory.newConnection(); 順着方法咱們很快找到了AutorecoveringConnection的init()方法

public void init() throws IOException, TimeoutException {
        this.delegate = this.cf.newConnection();
        this.addAutomaticRecoveryListener(delegate);
    }

在this.cf.newConnection()中重點看下

FrameHandler frameHandler = factory.create(addr, connectionName());
                RecoveryAwareAMQConnection conn = createConnection(params, frameHandler, metricsCollector);
                conn.start();
                metricsCollector.newConnection(conn);
                return conn;

你們debug代碼能夠看到factory是SocketFrameHandlerFactory的實例, 因此create中的代碼以下:

public FrameHandler create(Address addr, String connectionName) throws IOException {
        String hostName = addr.getHost();
        int portNumber = ConnectionFactory.portOrDefault(addr.getPort(), ssl);
        Socket socket = null;
        try {
            socket = createSocket(connectionName);
            configurator.configure(socket);
            socket.connect(new InetSocketAddress(hostName, portNumber),
                    connectionTimeout);
            return create(socket);
        } catch (IOException ioe) {
            quietTrySocketClose(socket);
            throw ioe;
        }
    }

這裏咱們能夠看到java網絡的底層代碼Socket,
socket.connect(new InetSocketAddress(hostName, portNumber), connectionTimeout);
這句代碼完成了tcp的鏈接的建立工做.
(準備看這裏源碼的時候就想着確定有個地方在作這步操做, 但就是找不到, 最後一點一點debug找到的...)

建立Connection

在上一步的最後, 把socket對象封裝到一個FrameHandler實例中, 從這裏咱們能夠猜測, 後面全部消息的通訊都跟這個FrameHandler分不開.
咱們繼續看, 返回以後

FrameHandler frameHandler = factory.create(addr, connectionName());
                RecoveryAwareAMQConnection conn = createConnection(params, frameHandler, metricsCollector);
                conn.start();

使用FrameHandler實例構造了一個Connection對象, 而後調用了start()方法, 實際調用的是父類AMQConnection方法, 這個也是整個鏈接過程的重點
這裏代碼比較長, 咱們選擇一些重要的一點一點看

initializeConsumerWorkService(); // 初始化工做線程
initializeHeartbeatSender(); // 初始化心跳線程
// Make sure that the first thing we do is to send the header,
// which should cause any socket errors to show up for us, rather
// than risking them pop out in the MainLoop
// 確保咱們在最開始發送的信息頭在發生錯誤的時候不會出如今MainLoop中
// 這個實體就是爲了接收在發送給服務端版本後接收服務端的Connection.Start方法的
AMQChannel.SimpleBlockingRpcContinuation connStartBlocker =
    new AMQChannel.SimpleBlockingRpcContinuation();
// We enqueue an RPC continuation here without sending an RPC
// request, since the protocol specifies that after sending
// the version negotiation header, the client (connection
// initiator) is to wait for a connection.start method to
// arrive.
// 咱們這裏沒有經過發送請求獲取響應, 是由於服務端在接收到版本信息後會主動發送信息
_channel0.enqueueRpc(connStartBlocker);

enqueueRpc裏面以下, 就是在循環中等待服務端信息接收成功的通知

private void doEnqueueRpc(Supplier<RpcWrapper> rpcWrapperSupplier) {
    synchronized (_channelMutex) {
        boolean waitClearedInterruptStatus = false;
        while (_activeRpc != null) {
            try {
                _channelMutex.wait(); // 後面在接收到Connection.Start方法後會通知
            } catch (InterruptedException e) { //NOSONAR
                waitClearedInterruptStatus = true;
                // No Sonar: we re-interrupt the thread later
            }
        }
        if (waitClearedInterruptStatus) {
            Thread.currentThread().interrupt();
        }
        // 獲取到通知後更新實體信息
        _activeRpc = rpcWrapperSupplier.get();
    }
}

_frameHandler.sendHeader(); // 發送版本信息, 對應抓包7

this._frameHandler.initialize(this); // 初始化, 主要是啓動了一個MainLoop線程用於獲取服務端信息

MainLoop線程的核心代碼代碼

Frame frame = _frameHandler.readFrame();
readFrame(frame);

_frameHandler.readFrame() 內部代碼以下, 這裏你們能夠查看譯文中的2.3.5 Frame Details 幀的細節部分, 對照客戶端是如何構造的, Frame結構以下

image

public static Frame readFrom(DataInputStream is) throws IOException {
    int type;
    int channel;

    try {
        type = is.readUnsignedByte(); // 一個字節的類型信息
    } catch (SocketTimeoutException ste) {
        // System.err.println("Timed out waiting for a frame.");
        return null; // failed
    }

    if (type == 'A') { // 這裏是處理, 若是服務端不支持客戶端的版本, 會發送支持的版本信息, 開頭是'A'
        /*
         * Probably an AMQP.... header indicating a version
         * mismatch.
         */
        /*
         * Otherwise meaningless, so try to read the version,
         * and throw an exception, whether we read the version
         * okay or not.
         */
        protocolVersionMismatch(is); // 這裏面會拋出異常
    }

    channel = is.readUnsignedShort(); // 兩個個字節的channel編號
    int payloadSize = is.readInt(); // 4個字節的payload大小
    byte[] payload = new byte[payloadSize];
    is.readFully(payload); // 讀取payloadSize大小的字節

    int frameEndMarker = is.readUnsignedByte(); // 一個字節的尾部
    if (frameEndMarker != AMQP.FRAME_END) {
        throw new MalformedFrameException("Bad frame end marker: " + frameEndMarker);
    }

    // 構造對象並返回 
    return new Frame(type, channel, payload);
}

上一步主要是對信息的封裝, 下面是客戶端對封裝對象的處理

private void readFrame(Frame frame) throws IOException {
    if (frame != null) {
        _missedHeartbeats = 0;
        if (frame.type == AMQP.FRAME_HEARTBEAT) {
            // Ignore it: we've already just reset the heartbeat counter.
        } else {
            if (frame.channel == 0) { // the special channel 0通道是在建立鏈接過程當中使用的
                _channel0.handleFrame(frame); // 這一步就是將Connection.Start內容放到了channel提早設置的實體中
            } else {
                if (isOpen()) {
                    // If we're still _running, but not isOpen(), then we
                    // must be quiescing, which means any inbound frames
                    // for non-zero channels (and any inbound commands on
                    // channel zero that aren't Connection.CloseOk) must
                    // be discarded.
                    ChannelManager cm = _channelManager;
                    if (cm != null) {
                        ChannelN channel;
                        try {
                            channel = cm.getChannel(frame.channel);
                        } catch(UnknownChannelException e) {
                            // this can happen if channel has been closed,
                            // but there was e.g. an in-flight delivery.
                            // just ignoring the frame to avoid closing the whole connection
                            LOGGER.info("Received a frame on an unknown channel, ignoring it");
                            return;
                        }
                        channel.handleFrame(frame);
                    }
                }
            }
        }
    } else {
        // Socket timeout waiting for a frame.
        // Maybe missed heartbeat.
        handleSocketTimeout();
    }
}

咱們回到start()方法中, 獲取Connection.Start方法, 而後設置一些服務單發過來的參數
connStart = (AMQP.Connection.Start) connStartBlocker.getReply(handshakeTimeout/2).getMethod();

而後按照是響應Start.Ok, Tune方法, 對應抓包9-16

do {
    Method method = (challenge == null)
                            ? new AMQP.Connection.StartOk.Builder()
                                      .clientProperties(_clientProperties)
                                      .mechanism(sm.getName())
                                      .response(response)
                                      .build()
                            : new AMQP.Connection.SecureOk.Builder().response(response).build();

    try {
        Method serverResponse = _channel0.rpc(method, handshakeTimeout/2).getMethod();
        if (serverResponse instanceof AMQP.Connection.Tune) {
            connTune = (AMQP.Connection.Tune) serverResponse;
        } else {
            challenge = ((AMQP.Connection.Secure) serverResponse).getChallenge();
            response = sm.handleChallenge(challenge, username, password);
        }
    } catch (ShutdownSignalException e) {
        Method shutdownMethod = e.getReason();
        if (shutdownMethod instanceof AMQP.Connection.Close) {
            AMQP.Connection.Close shutdownClose = (AMQP.Connection.Close) shutdownMethod;
            if (shutdownClose.getReplyCode() == AMQP.ACCESS_REFUSED) {
                throw new AuthenticationFailureException(shutdownClose.getReplyText());
            }
        }
        throw new PossibleAuthenticationFailureException(e);
    }
} while (connTune == null);

獲取到調試信息, 設置本地參數

int channelMax = negotiateChannelMax(this.requestedChannelMax,
                        connTune.getChannelMax());
_channelManager = instantiateChannelManager(channelMax, threadFactory);

int frameMax =
    negotiatedMaxValue(this.requestedFrameMax,
                       connTune.getFrameMax());
this._frameMax = frameMax;

int heartbeat =
    negotiatedMaxValue(this.requestedHeartbeat,
                       connTune.getHeartbeat());

setHeartbeat(heartbeat); 啓動心跳線程
發送調整完畢方法TuneOk, 並請求打開鏈接Open

_channel0.transmit(new AMQP.Connection.TuneOk.Builder()
                                .channelMax(channelMax)
                                .frameMax(frameMax)
                                .heartbeat(heartbeat)
                              .build());
_channel0.exnWrappingRpc(new AMQP.Connection.Open.Builder()
                          .virtualHost(_virtualHost)
                        .build());

至此Connection的鏈接已經建立並打開

建立Channel

接下來是Channel的建立, 咱們前面代碼中使用的Channel是特殊的, 專門用於建立Connection, 下面建立的是爲了後面發送隊列消息使用的Channel.
Channel channel = connection.createChannel() // 入口
根據AMQP文檔, 建立Channel須要客戶端發送Channel.Open方法而後接收服務端的Channel.OpenOk, 咱們從抓包中也能夠觀察到. 咱們一步一步跟蹤代碼, 代碼層級比較深, 這裏給出調用邏輯, 從下到上(對, 沒錯, 這就是建立Channel報錯日誌截取了部分)

com.rabbitmq.client.impl.AMQChannel.privateRpc(AMQChannel.java:295)
com.rabbitmq.client.impl.AMQChannel.exnWrappingRpc(AMQChannel.java:141)
com.rabbitmq.client.impl.ChannelN.open(ChannelN.java:133)
com.rabbitmq.client.impl.ChannelManager.createChannel(ChannelManager.java:182)
com.rabbitmq.client.impl.AMQConnection.createChannel(AMQConnection.java:555)
com.rabbitmq.client.impl.recovery.AutorecoveringConnection.createChannel(AutorecoveringConnection.java:165)

privateRpc的代碼咱們看下

private AMQCommand privateRpc(Method m) throws IOException, ShutdownSignalException{
    SimpleBlockingRpcContinuation k = new SimpleBlockingRpcContinuation(m);
    rpc(m, k); // 發送Channel.Open 方法
    // At this point, the request method has been sent, and we
    // should wait for the reply to arrive.
    // 這裏咱們已經發送了請求, 咱們應該等待響應
    // Calling getReply() on the continuation puts us to sleep
    // until the connection's reader-thread throws the reply over
    // the fence or the RPC times out (if enabled)
    // 調用getReply()方法會阻塞直到獲取到結果或者超時
    if(_rpcTimeout == NO_RPC_TIMEOUT) {
        return k.getReply();
    } else {
        try {
            return k.getReply(_rpcTimeout);
        } catch (TimeoutException e) {
            throw wrapTimeoutException(m, e);
        }
    }
}

接收Channel.OpenOk方法是由MainLoop線程完成的, 方式相似以前獲取Connection.Start方法.

消息發送

至此AMQP鏈接算是徹底建立完畢, 下面就是消息隊列相關. 首先是隊列和Exchange的聲明, 這裏隊列的聲明其實沒有什麼用, 代碼這麼寫就是爲了看下聲明過程

channel.queueDeclare(QUEUE_NAME, durable, false, false, null);
channel.exchangeDeclare("mind", "direct");

channel.basicPublish("mind", "", MessageProperties.PERSISTENT_TEXT_PLAIN, message.getBytes());

這裏聲明的方式很是簡單你們跟着代碼很容易明白, queue和Exchange的聲明過程基本同樣, 不一樣的是queue在聲明前會校驗下隊列的合法性(長度). 它們獲取響應結果的方式和Channel.OpenOk的獲取方式一毛同樣.
消息的發送過程也是發送一個AMQPCommand, 可是細節不少, 準備在後面實現客戶端的部分再詳細看.

關閉鏈接

程序執行結束, 執行try-with-resources部分, 自動執行close()方法, 執行順序從下到上, 也就是先執行Channel的close(), 而後Connection的close(); 從抓包中也能夠看到先發送的Channel.close方法, 再發送Connection.close方法. 代碼細節的部分這裏就不展開了, 會放到後面代碼實現上.

總結

總體過了一遍主要流程, 後面咱們會本身實現一個簡單客戶端加深下理解; 這個過程當中除了瞭解了客戶端的操做流程外, 對java的部分知識也學習了一下
try-with-resources 在關閉時, 執行關閉的順序和聲明順序相反;
try-with-resources 也能夠有catch和finally塊, 它們是在try-with-resources聲明關閉以後執行的.

java線程狀態流轉

image

客戶端實現(待完成~)

今天咱們的目標是實現rabbitmq客戶端, 並使用該客戶端發送消息到指定Exchange中.

tcp鏈接建立

超級簡單

socket = new Socket();
socket.connect(new InetSocketAddress(host, port));
// 保存鏈接輸入輸出流
inputStream = new DataInputStream(new BufferedInputStream(socket.getInputStream()));
outputStream = new DataOutputStream(new BufferedOutputStream(socket.getOutputStream()));

抓包

image

發送信息頭

咱們經過抓包和源碼知道, 發送頭就是發送了"AMQP0091"

private int major = 0;
private int minor = 9;
private int revision = 1;
outputStream.write("AMQP".getBytes());
outputStream.write(0);
outputStream.write(major);
outputStream.write(minor);
outputStream.write(revision);
outputStream.flush();

抓包結果

image

能夠看到服務端已經承認協議, 併發送了Connection.Start方法過來.
若是咱們發送的協議服務端不認識會怎麼樣, 咱們把major改成2試試
抓包結果

image

本身看下咱們發的內容以下

image

咱們是發送了0291, 抓包是支持AMQP協議的, 因此這裏應該是不認識了, 因此顯示爲unknown version, 可是讓我不理解的是服務端返回的結果也是unknown version, 根據AMQP文檔中的說明, 服務端這時應該返回支持的協議, 咱們點開看下

image

的確是0091正常的協議, 可是抓包軟件沒有顯示出來, 很奇怪~

Connection.StartOk

相關文章
相關標籤/搜索