unconsumedMessage 源碼分析java
protected ActiveMQConnection createActiveMQConnection(String userName, String password) throws JMSException { if (brokerURL == null) { throw new ConfigurationException("brokerURL not set."); } ActiveMQConnection connection = null; try { Transport transport = createTransport(); connection = createActiveMQConnection(transport, factoryStats); connection.setUserName(userName); connection.setPassword(password); configureConnection(connection); transport.start(); if (clientID != null) { connection.setDefaultClientID(clientID); } return connection; } catch (JMSException e) { // Clean up! try { connection.close(); } catch (Throwable ignore) { } throw e; } catch (Exception e) { // Clean up! try { connection.close(); } catch (Throwable ignore) { } throw JMSExceptionSupport.create("Could not connect to broker URL: " + brokerURL + ". Reason: " + e, e); } }
transport.start();apache
public void start() throws Exception { if (started.compareAndSet(false, true)) { boolean success = false; stopped.set(false); try { preStart(); doStart(); success = true; } finally { started.set(success); } for(ServiceListener l:this.serviceListeners) { l.started(this); } } }
TcpTransport.run().session
public void run() { LOG.trace("TCP consumer thread for " + this + " starting"); this.runnerThread=Thread.currentThread(); try { while (!isStopped()) { doRun(); } } catch (IOException e) { stoppedLatch.get().countDown(); onException(e); } catch (Throwable e){ stoppedLatch.get().countDown(); IOException ioe=new IOException("Unexpected error occurred: " + e); ioe.initCause(e); onException(ioe); }finally { stoppedLatch.get().countDown(); } }
TcpTransport.doRun()異步
protected void doRun() throws IOException { try { Object command = readCommand(); doConsume(command); } catch (SocketTimeoutException e) { } catch (InterruptedIOException e) { } }
TcpTransport.readCommand()socket
protected Object readCommand() throws IOException { return wireFormat.unmarshal(dataIn); }
TcpTransport.doConsume()async
public void doConsume(Object command) { if (command != null) { if (transportListener != null) { transportListener.onCommand(command); } else { LOG.error("No transportListener available to process inbound command: " + command); } } }
protected ActiveMQConnection(final Transport transport, IdGenerator clientIdGenerator, IdGenerator connectionIdGenerator, JMSStatsImpl factoryStats) throws Exception { this.transport = transport; this.clientIdGenerator = clientIdGenerator; this.factoryStats = factoryStats; // Configure a single threaded executor who's core thread can timeout if // idle executor = new ThreadPoolExecutor(1, 1, 5, TimeUnit.SECONDS, new LinkedBlockingQueue<Runnable>(), new ThreadFactory() { @Override public Thread newThread(Runnable r) { Thread thread = new Thread(r, "ActiveMQ Connection Executor: " + transport); //Don't make these daemon threads - see https://issues.apache.org/jira/browse/AMQ-796 //thread.setDaemon(true); return thread; } }); // asyncConnectionThread.allowCoreThreadTimeOut(true); String uniqueId = connectionIdGenerator.generateId(); this.info = new ConnectionInfo(new ConnectionId(uniqueId)); this.info.setManageable(true); this.info.setFaultTolerant(transport.isFaultTolerant()); this.connectionSessionId = new SessionId(info.getConnectionId(), -1); this.transport.setTransportListener(this);//當transport綁定爲本身 this.stats = new JMSConnectionStatsImpl(sessions, this instanceof XAConnection); this.factoryStats.addConnection(this); this.timeCreated = System.currentTimeMillis(); this.connectionAudit.setCheckForDuplicates(transport.isFaultTolerant()); }
onCommandide
public void onCommand(final Object o) { final Command command = (Command)o; if (!closed.get() && command != null) { try { command.visit(new CommandVisitorAdapter() { @Override public Response processMessageDispatch(MessageDispatch md) throws Exception { // 等待Transport中斷處理完成 waitForTransportInterruptionProcessingToComplete(); // 這裏經過消費者ID來獲取消費者對象 //(ActiveMQMessageConsumer實現了ActiveMQDispatcher接口),因此MessageDispatch //包含了消息應該被分配到那個消費者的映射信息 //在建立MessageConsumer的時候,調用 //ActiveMQMessageConsumer的第282行,調用ActiveMQSession的1798行將當前的消費者綁定 //到dispatchers中 因此這裏拿到的是ActiveMQSession ActiveMQDispatcher dispatcher = dispatchers.get(md.getConsumerId()); if (dispatcher != null) { // Copy in case a embedded broker is dispatching via // vm:// // md.getMessage() == null to signal end of queue // browse. Message msg = md.getMessage(); if (msg != null) { msg = msg.copy(); msg.setReadOnlyBody(true); msg.setReadOnlyProperties(true); msg.setRedeliveryCounter(md.getRedeliveryCounter()); msg.setConnection(ActiveMQConnection.this); msg.setMemoryUsage(null); md.setMessage(msg); } dispatcher.dispatch(md); // 調用會話ActiveMQSession本身的dispatch方法來處理這條消息 } else { LOG.debug("{} no dispatcher for {} in {}", this, md, dispatchers); } return null; } //若是傳入的是ProducerAck,則調用的是下面這個方法,這裏咱們僅僅關注MessageDispatch就好了 @Override public Response processProducerAck(ProducerAck pa) throws Exception { if (pa != null && pa.getProducerId() != null) { ActiveMQMessageProducer producer = producers.get(pa.getProducerId()); if (producer != null) { producer.onProducerAck(pa); } } return null; } @Override public Response processBrokerInfo(BrokerInfo info) throws Exception { brokerInfo = info; brokerInfoReceived.countDown(); optimizeAcknowledge &= !brokerInfo.isFaultTolerantConfiguration(); getBlobTransferPolicy().setBrokerUploadUrl(info.getBrokerUploadUrl()); return null; } @Override public Response processConnectionError(final ConnectionError error) throws Exception { executor.execute(new Runnable() { @Override public void run() { onAsyncException(error.getException()); } }); return null; } @Override public Response processControlCommand(ControlCommand command) throws Exception { return null; } @Override public Response processConnectionControl(ConnectionControl control) throws Exception { onConnectionControl((ConnectionControl)command); return null; } @Override public Response processConsumerControl(ConsumerControl control) throws Exception { onConsumerControl((ConsumerControl)command); return null; } @Override public Response processWireFormat(WireFormatInfo info) throws Exception { onWireFormatInfo((WireFormatInfo)command); return null; } }); } catch (Exception e) { onClientInternalException(e); } } for (Iterator<TransportListener> iter = transportListeners.iterator(); iter.hasNext();) { TransportListener listener = iter.next(); listener.onCommand(command); } }
public Response visit(CommandVisitor visitor) throws Exception { return visitor.processMessageDispatch(this); }