分佈式消息通訊ActiveMQ原理-3-筆記

unconsumedMessage 源碼分析java

  • unconsumedMessages 獲取數據過程
    • 在建立鏈接的時候:
      • ActiveMQConnectionFactory.createConnection
        1. 動態建立一個傳輸協議
        2. 建立一個鏈接
        3. 經過transport.start()
protected ActiveMQConnection createActiveMQConnection(String userName, String password) throws JMSException {
    if (brokerURL == null) {
        throw new ConfigurationException("brokerURL not set.");
    }
    ActiveMQConnection connection = null;
    try {
        Transport transport = createTransport();
        connection = createActiveMQConnection(transport, factoryStats);

        connection.setUserName(userName);
        connection.setPassword(password);

        configureConnection(connection);

        transport.start();

        if (clientID != null) {
            connection.setDefaultClientID(clientID);
        }

        return connection;
    } catch (JMSException e) {
        // Clean up!
        try {
            connection.close();
        } catch (Throwable ignore) {
        }
        throw e;
    } catch (Exception e) {
        // Clean up!
        try {
            connection.close();
        } catch (Throwable ignore) {
        }
        throw JMSExceptionSupport.create("Could not connect to broker URL: " + brokerURL + ". Reason: " + e, e);
    }
}

 transport.start();apache

  • transport是一個鏈式的調用,是一個多層包裝的對象。
    • ResponseCorrelator(MutexTransport(WireFormatNegotiator(InactivityMonitor(TcpTransport())))
    • 最終調用TcpTransport.start()方法,
      • 然而這個類中並無start,
      • 而是在父類ServiceSupport.start()中。
public void start() throws Exception {
        if (started.compareAndSet(false, true)) {
            boolean success = false;
            stopped.set(false);
            try {
                preStart();
                doStart();
                success = true;
            } finally {
                started.set(success);
            }
            for(ServiceListener l:this.serviceListeners) {
                l.started(this);
            }
        }
    }
  • 咱們以前看過的中間件的源碼,通訊層都是獨立來實現及解耦的
    • 而ActiveMQ也是同樣,提供了Transport接口和TransportSupport類。
    • 這個接口的主要做用是爲了讓客戶端有消息被異步發送、同步發送和被消費的能力。
    • 接下來沿着doStart()往下看,又調用TcpTransport.doStart() ,
    • 接着經過super.doStart(),調用TransportThreadSupport.doStart().
    • 建立了一個線程,傳入的是this,調用子類的run方法,也就是TcpTransport.run().

TcpTransport.run().session

  • run方法主要是從socket中讀取數據包,
  • 只要TcpTransport沒有中止,它就會不斷去調用doRun。
public void run() {
        LOG.trace("TCP consumer thread for " + this + " starting");
        this.runnerThread=Thread.currentThread();
        try {
            while (!isStopped()) {
                doRun();
            }
        } catch (IOException e) {
            stoppedLatch.get().countDown();
            onException(e);
        } catch (Throwable e){
            stoppedLatch.get().countDown();
            IOException ioe=new IOException("Unexpected error occurred: " + e);
            ioe.initCause(e);
            onException(ioe);
        }finally {
            stoppedLatch.get().countDown();
        }
    }

TcpTransport.doRun()異步

protected void doRun() throws IOException {
        try {
            Object command = readCommand();
            doConsume(command);
        } catch (SocketTimeoutException e) {
        } catch (InterruptedIOException e) {
        }
    }

TcpTransport.readCommand()socket

  • 經過wireFormat對數據進行格式化,
    • 能夠認爲這是一個反序列化過程。
  • wireFormat默認實現是OpenWireFormat
    • activeMQ自定義的跨語言的wire協議
protected Object readCommand() throws IOException {
        return wireFormat.unmarshal(dataIn);
    }
  • 傳輸層的主要工做是得到數據而且把數據轉換爲對象,
    • 再把對象對象傳給ActiveMQConnection

TcpTransport.doConsume()async

public void doConsume(Object command) {
        if (command != null) {
            if (transportListener != null) {
                transportListener.onCommand(command);
            } else {
                LOG.error("No transportListener available to process inbound command: " + command);
            }
        }
    }
  • TransportSupport類中惟一的成員變量是TransportListener transportListener
  • 這也意味着一個Transport支持類綁定一個傳送監聽器類,
    • 傳送監聽器接口TransportListener 最重要的方法就是 void onCommand(Object command);
    • 它用來處理命令,
    • 這個transportListener是在哪裏賦值的呢?
      • 再回到ActiveMQConnection的構造方法中。->246行
  • 傳遞了ActiveMQConnection本身自己,
    • ActiveMQConnection是TransportListener接口的實現類之一
  • 因而,消息就這樣從傳送層到達了咱們的鏈接層上。
protected ActiveMQConnection(final Transport transport, IdGenerator clientIdGenerator, IdGenerator connectionIdGenerator, JMSStatsImpl factoryStats) throws Exception {

        this.transport = transport;
        this.clientIdGenerator = clientIdGenerator;
        this.factoryStats = factoryStats;

        // Configure a single threaded executor who's core thread can timeout if
        // idle
        executor = new ThreadPoolExecutor(1, 1, 5, TimeUnit.SECONDS, new LinkedBlockingQueue<Runnable>(), new ThreadFactory() {
            @Override
            public Thread newThread(Runnable r) {
                Thread thread = new Thread(r, "ActiveMQ Connection Executor: " + transport);
                //Don't make these daemon threads - see https://issues.apache.org/jira/browse/AMQ-796
                //thread.setDaemon(true);
                return thread;
            }
        });
        // asyncConnectionThread.allowCoreThreadTimeOut(true);
        String uniqueId = connectionIdGenerator.generateId();
        this.info = new ConnectionInfo(new ConnectionId(uniqueId));
        this.info.setManageable(true);
        this.info.setFaultTolerant(transport.isFaultTolerant());
        this.connectionSessionId = new SessionId(info.getConnectionId(), -1);

        this.transport.setTransportListener(this);//當transport綁定爲本身

        this.stats = new JMSConnectionStatsImpl(sessions, this instanceof XAConnection);
        this.factoryStats.addConnection(this);
        this.timeCreated = System.currentTimeMillis();
        this.connectionAudit.setCheckForDuplicates(transport.isFaultTolerant());
    }
  • 從構造函數能夠看出,
    • 建立ActiveMQConnection對象時,
      • 除了和Transport相互綁定,
      • 還對線程池執行器executor進行了初始化。
  • 下面咱們看看該類的核心方法

onCommandide

  • 這裏面會針對不一樣的消息作分發,
    • 好比傳入的command是MessageDispatch,
    • 那麼這個command的visit方法就會調用processMessageDispatch方法
public void onCommand(final Object o) {
        final Command command = (Command)o;
        if (!closed.get() && command != null) {
            try {
                command.visit(new CommandVisitorAdapter() {
                    @Override
                    public Response processMessageDispatch(MessageDispatch md) throws Exception {
                        // 等待Transport中斷處理完成
                        waitForTransportInterruptionProcessingToComplete();
                        // 這裏經過消費者ID來獲取消費者對象
                        //(ActiveMQMessageConsumer實現了ActiveMQDispatcher接口),因此MessageDispatch 
                        //包含了消息應該被分配到那個消費者的映射信息 //在建立MessageConsumer的時候,調用 
                       //ActiveMQMessageConsumer的第282行,調用ActiveMQSession的1798行將當前的消費者綁定 
                       //到dispatchers中 因此這裏拿到的是ActiveMQSession
                        ActiveMQDispatcher dispatcher = dispatchers.get(md.getConsumerId());
                        if (dispatcher != null) {
                            // Copy in case a embedded broker is dispatching via
                            // vm://
                            // md.getMessage() == null to signal end of queue
                            // browse.
                            Message msg = md.getMessage();
                            if (msg != null) {
                                msg = msg.copy();
                                msg.setReadOnlyBody(true);
                                msg.setReadOnlyProperties(true);
                                msg.setRedeliveryCounter(md.getRedeliveryCounter());
                                msg.setConnection(ActiveMQConnection.this);
                                msg.setMemoryUsage(null);
                                md.setMessage(msg);
                            }
                            dispatcher.dispatch(md);
                             // 調用會話ActiveMQSession本身的dispatch方法來處理這條消息
                        } else {
                            LOG.debug("{} no dispatcher for {} in {}", this, md, dispatchers);
                        }
                        return null;
                    }
                    //若是傳入的是ProducerAck,則調用的是下面這個方法,這裏咱們僅僅關注MessageDispatch就好了

                    @Override
                    public Response processProducerAck(ProducerAck pa) throws Exception {
                        if (pa != null && pa.getProducerId() != null) {
                            ActiveMQMessageProducer producer = producers.get(pa.getProducerId());
                            if (producer != null) {
                                producer.onProducerAck(pa);
                            }
                        }
                        return null;
                    }

                    @Override
                    public Response processBrokerInfo(BrokerInfo info) throws Exception {
                        brokerInfo = info;
                        brokerInfoReceived.countDown();
                        optimizeAcknowledge &= !brokerInfo.isFaultTolerantConfiguration();
                        getBlobTransferPolicy().setBrokerUploadUrl(info.getBrokerUploadUrl());
                        return null;
                    }

                    @Override
                    public Response processConnectionError(final ConnectionError error) throws Exception {
                        executor.execute(new Runnable() {
                            @Override
                            public void run() {
                                onAsyncException(error.getException());
                            }
                        });
                        return null;
                    }

                    @Override
                    public Response processControlCommand(ControlCommand command) throws Exception {
                        return null;
                    }

                    @Override
                    public Response processConnectionControl(ConnectionControl control) throws Exception {
                        onConnectionControl((ConnectionControl)command);
                        return null;
                    }

                    @Override
                    public Response processConsumerControl(ConsumerControl control) throws Exception {
                        onConsumerControl((ConsumerControl)command);
                        return null;
                    }

                    @Override
                    public Response processWireFormat(WireFormatInfo info) throws Exception {
                        onWireFormatInfo((WireFormatInfo)command);
                        return null;
                    }
                });
            } catch (Exception e) {
                onClientInternalException(e);
            }
        }

        for (Iterator<TransportListener> iter = transportListeners.iterator(); iter.hasNext();) {
            TransportListener listener = iter.next();
            listener.onCommand(command);
        }
    }
  • 在如今這個場景中,
    • 咱們只關注processMessageDispatch方法,
    • 在這個方法中,只是簡單的去調用ActiveMQSession的dispatch方法來處理消息,
  • Ø tips: command.visit, 這裏使用了適配器模式,
    • 若是command是一個MessageDispatch,
    • 那麼它就會調用processMessageDispatch方法,其餘方法他不會關心,
    • 代碼以下:MessageDispatch.visit
public Response visit(CommandVisitor visitor) throws Exception {
        return visitor.processMessageDispatch(this);
    }
相關文章
相關標籤/搜索