若是事件處理的邏輯能迅速完成,而且不會發起新的 IO 請求,好比只是在內存中記個標識,
則直接在 IO 線程上處理更快,由於減小了線程池調度。數據庫
但若是事件處理邏輯較慢,或者須要發起新的 IO 請求,好比須要查詢數據庫,則必須派發到
線程池,不然 IO 線程阻塞,將致使不能接收其它請求。api
若是用 IO 線程處理事件,又在事件處理過程當中發起新的 IO 請求,好比在鏈接事件中發起登
<dubbo:provider dispatcher="xxx" />
<dubbo:protocol name="dubbo" dispatcher="xxx" />源碼分析
all=com.alibaba.dubbo.remoting.transport.dispatcher.all.AllDispatcher direct=com.alibaba.dubbo.remoting.transport.dispatcher.direct.DirectDispatcher message=com.alibaba.dubbo.remoting.transport.dispatcher.message.MessageOnlyDispatcher execution=com.alibaba.dubbo.remoting.transport.dispatcher.execution.ExecutionDispatcher connection=com.alibaba.dubbo.remoting.transport.dispatcher.connection.ConnectionOrderedDispatcher
private ExchangeHandler requestHandler = new ExchangeHandlerAdapter() { //調用服務端實現方法,返回結果。 public Object reply(ExchangeChannel channel, Object message) throws RemotingException { if (message instanceof Invocation) { Invocation inv = (Invocation) message; //獲取暴露的服務代理 Invoker<?> invoker = getInvoker(channel, inv); //若是是callback 須要處理高版本調用低版本的問題 if (Boolean.TRUE.toString().equals(inv.getAttachments().get(IS_CALLBACK_SERVICE_INVOKE))) { String methodsStr = invoker.getUrl().getParameters().get("methods"); boolean hasMethod = false; if (methodsStr == null || methodsStr.indexOf(",") == -1) { hasMethod = inv.getMethodName().equals(methodsStr); } else { String[] methods = methodsStr.split(","); for (String method : methods) { if (inv.getMethodName().equals(method)) { hasMethod = true; break; } } } if (!hasMethod) { logger.warn(new IllegalStateException("The methodName " + inv.getMethodName() + " not found in callback service interface ,invoke will be ignored. please update the api interface. url is:" + invoker.getUrl()) + " ,invocation is :" + inv); return null; } } RpcContext.getContext().setRemoteAddress(channel.getRemoteAddress()); //經過代理服務,執行方法 return invoker.invoke(inv); } throw new RemotingException(channel, "Unsupported request: " + message == null ? null : (message.getClass().getName() + ": " + message) + ", channel: consumer: " + channel.getRemoteAddress() + " --> provider: " + channel.getLocalAddress()); } @Override //接受消息處理方法 public void received(Channel channel, Object message) throws RemotingException { if (message instanceof Invocation) { //調用消息,調用replay reply((ExchangeChannel) channel, message); } else { super.received(channel, message); } } @Override //客戶端鏈接後處理方法 public void connected(Channel channel) throws RemotingException { invoke(channel, Constants.ON_CONNECT_KEY); } @Override //斷開鏈接後處理方法 public void disconnected(Channel channel) throws RemotingException { if (logger.isInfoEnabled()) { logger.info("disconected from " + channel.getRemoteAddress() + ",url:" + channel.getUrl()); } invoke(channel, Constants.ON_DISCONNECT_KEY); } //調用過程 private void invoke(Channel channel, String methodKey) { Invocation invocation = createInvocation(channel, channel.getUrl(), methodKey); if (invocation != null) { try { received(channel, invocation); } catch (Throwable t) { logger.warn("Failed to invoke event method " + invocation.getMethodName() + "(), cause: " + t.getMessage(), t); } } } //從url中建立調用對象 private Invocation createInvocation(Channel channel, URL url, String methodKey) { String method = url.getParameter(methodKey); if (method == null || method.length() == 0) { return null; } RpcInvocation invocation = new RpcInvocation(method, new Class<?>[0], new Object[0]); invocation.setAttachment(Constants.PATH_KEY, url.getPath()); invocation.setAttachment(Constants.GROUP_KEY, url.getParameter(Constants.GROUP_KEY)); invocation.setAttachment(Constants.INTERFACE_KEY, url.getParameter(Constants.INTERFACE_KEY)); invocation.setAttachment(Constants.VERSION_KEY, url.getParameter(Constants.VERSION_KEY)); if (url.getParameter(Constants.STUB_EVENT_KEY, false)) { invocation.setAttachment(Constants.STUB_EVENT_KEY, Boolean.TRUE.toString()); } return invocation; } };
try { //Exchangers是門面類,裏面封裝了具體交換層實現 server = Exchangers.bind(url, requestHandler); } catch (RemotingException e) { throw new RpcException("Fail to start server(url: " + url + ") " + e.getMessage(), e); }
//跟到Exchangers.bind方法 public static ExchangeServer bind(URL url, ExchangeHandler handler) throws RemotingException { if (url == null) { throw new IllegalArgumentException("url == null"); } if (handler == null) { throw new IllegalArgumentException("handler == null"); } url = url.addParameterIfAbsent(Constants.CODEC_KEY, "exchange"); //經過spi會走HeaderExchanger的bind邏輯 return getExchanger(url).bind(url, handler); } //HeaderExchanger的bind方法 public ExchangeServer bind(URL url, ExchangeHandler handler) throws RemotingException { //能夠看到這時原始handler第一被裝飾 return new HeaderExchangeServer(Transporters.bind(url, new DecodeHandler(new HeaderExchangeHandler(handler)))); }
public HeaderExchangeHandler(ExchangeHandler handler) { if (handler == null) { throw new IllegalArgumentException("handler == null"); } this.handler = handler; }
//鏈接處理邏輯 public void connected(Channel channel) throws RemotingException { //添加一些心跳時間參數 channel.setAttribute(KEY_READ_TIMESTAMP, System.currentTimeMillis()); channel.setAttribute(KEY_WRITE_TIMESTAMP, System.currentTimeMillis()); ExchangeChannel exchangeChannel = HeaderExchangeChannel.getOrAddChannel(channel); try { //經過被包裝類對應方法處理 handler.connected(exchangeChannel); } finally { HeaderExchangeChannel.removeChannelIfDisconnected(channel); } } //斷開邏輯, public void disconnected(Channel channel) throws RemotingException { //添加一些心跳時間參數 channel.setAttribute(KEY_READ_TIMESTAMP, System.currentTimeMillis()); channel.setAttribute(KEY_WRITE_TIMESTAMP, System.currentTimeMillis()); ExchangeChannel exchangeChannel = HeaderExchangeChannel.getOrAddChannel(channel); try { //經過被包裝類對應方法處理 handler.disconnected(exchangeChannel); } finally { HeaderExchangeChannel.removeChannelIfDisconnected(channel); } } //發送數據 public void sent(Channel channel, Object message) throws RemotingException { Throwable exception = null; try { channel.setAttribute(KEY_WRITE_TIMESTAMP, System.currentTimeMillis()); ExchangeChannel exchangeChannel = HeaderExchangeChannel.getOrAddChannel(channel); try { //調用被包裝類對應方法處理。 handler.sent(exchangeChannel, message); } finally { HeaderExchangeChannel.removeChannelIfDisconnected(channel); } } catch (Throwable t) { exception = t; } //發送消息,如果請求消息,有個異步發送重試邏輯 if (message instanceof Request) { Request request = (Request) message; DefaultFuture.sent(channel, request); } if (exception != null) { if (exception instanceof RuntimeException) { throw (RuntimeException) exception; } else if (exception instanceof RemotingException) { throw (RemotingException) exception; } else { throw new RemotingException(channel.getLocalAddress(), channel.getRemoteAddress(), exception.getMessage(), exception); } } } /*** * 接受請求數據,經過handleRequest方法處理後獲得處理結果。 * @param channel channel. * @param message message. * @throws RemotingException */ public void received(Channel channel, Object message) throws RemotingException { channel.setAttribute(KEY_READ_TIMESTAMP, System.currentTimeMillis()); ExchangeChannel exchangeChannel = HeaderExchangeChannel.getOrAddChannel(channel); try { if (message instanceof Request) { // handle request. Request request = (Request) message; if (request.isEvent()) { handlerEvent(channel, request); } else { if (request.isTwoWay()) {//是往返消息,調用私有方法handleRequest處理獲得結果 Response response = handleRequest(exchangeChannel, request); channel.send(response); } else {//不須要回復的消息調用 handler.received(exchangeChannel, request.getData()); } } } else if (message instanceof Response) { //處理響應消息的邏輯 handleResponse(channel, (Response) message); } else if (message instanceof String) { if (isClientSide(channel)) { Exception e = new Exception("Dubbo client can not supported string message: " + message + " in channel: " + channel + ", url: " + channel.getUrl()); logger.error(e.getMessage(), e); } else { String echo = handler.telnet(channel, (String) message); if (echo != null && echo.length() > 0) { channel.send(echo); } } } else { handler.received(exchangeChannel, message); } } finally { HeaderExchangeChannel.removeChannelIfDisconnected(channel); } } //異常處理 public void caught(Channel channel, Throwable exception) throws RemotingException { if (exception instanceof ExecutionException) { ExecutionException e = (ExecutionException) exception; Object msg = e.getRequest(); if (msg instanceof Request) { Request req = (Request) msg; if (req.isTwoWay() && !req.isHeartbeat()) {//有往返要求,就回復消息 Response res = new Response(req.getId(), req.getVersion()); res.setStatus(Response.SERVER_ERROR); res.setErrorMessage(StringUtils.toString(e)); channel.send(res); return; } } } ExchangeChannel exchangeChannel = HeaderExchangeChannel.getOrAddChannel(channel); try { //調用對應被裝飾類方法 handler.caught(exchangeChannel, exception); } finally { HeaderExchangeChannel.removeChannelIfDisconnected(channel); } }
public void received(Channel channel, Object message) throws RemotingException { if (message instanceof Decodeable) { decode(message); } if (message instanceof Request) { decode(((Request) message).getData()); } if (message instanceof Response) { decode(((Response) message).getResult()); } handler.received(channel, message); }
return new HeaderExchangeServer(Transporters.bind(url, new DecodeHandler(new HeaderExchangeHandler(handler))));
//Transporters.bind方法 public static Server bind(URL url, ChannelHandler... handlers) throws RemotingException { if (url == null) { throw new IllegalArgumentException("url == null"); } if (handlers == null || handlers.length == 0) { throw new IllegalArgumentException("handlers == null"); } ChannelHandler handler; if (handlers.length == 1) { handler = handlers[0]; } else { handler = new ChannelHandlerDispatcher(handlers); } //根據spi 這裏具體走NettyTransporter.bind方法 return getTransporter().bind(url, handler); } //NettyTransporter的bind方法 public Server bind(URL url, ChannelHandler listener) throws RemotingException { //這裏是建立NettyServer實例 return new NettyServer(url, listener); } //NettyServer構造器 public NettyServer(URL url, ChannelHandler handler) throws RemotingException { //這裏看下ChannelHandlers.wrap方法 super(url, ChannelHandlers.wrap(handler, ExecutorUtil.setThreadName(url, SERVER_THREAD_POOL_NAME))); } //ChannelHandlers.wrap方法 public static ChannelHandler wrap(ChannelHandler handler, URL url) { //調用內部wrapInternal方法 return ChannelHandlers.getInstance().wrapInternal(handler, url); } //wrapInternal方法 protected ChannelHandler wrapInternal(ChannelHandler handler, URL url) { //這裏終於看到經過spi獲取Dispatcher實現的代碼 //還能看到經過Dispatcher.dispatch方法返回的handler後又通過了兩層裝飾,HeartbeatHandler而後MultiMessageHandler類 return new MultiMessageHandler(new HeartbeatHandler(ExtensionLoader.getExtensionLoader(Dispatcher.class) .getAdaptiveExtension().dispatch(handler, url))); }
public class Dispatcher$Adaptive implements com.alibaba.dubbo.remoting.Dispatcher { public com.alibaba.dubbo.remoting.ChannelHandler dispatch(com.alibaba.dubbo.remoting.ChannelHandler arg0, com.alibaba.dubbo.common.URL arg1) { if (arg1 == null) throw new IllegalArgumentException("url == null"); com.alibaba.dubbo.common.URL url = arg1; //默認是all實現方案 String extName = url.getParameter("dispatcher", url.getParameter("dispather", url.getParameter("channel.handler", "all"))); if(extName == null) throw new IllegalStateException("Fail to get extension(com.alibaba.dubbo.remoting.Dispatcher) name from url(" + url.toString() + ") use keys([dispatcher, dispather, channel.handler])"); com.alibaba.dubbo.remoting.Dispatcher extension = (com.alibaba.dubbo.remoting.Dispatcher)ExtensionLoader.getExtensionLoader(com.alibaba.dubbo.remoting.Dispatcher.class).getExtension(extName); //調用接口Dispatcher實現的dispatch方法返回ChannelHandler對象 return extension.dispatch(arg0, arg1); } }
1, all實現,用戶手冊說,全部消息都派發到線程池,包括請求,響應,鏈接事件,斷開事件,心跳等。
public class AllDispatcher implements Dispatcher { public static final String NAME = "all"; public ChannelHandler dispatch(ChannelHandler handler, URL url) { //使用AllChannelHandler類實現 return new AllChannelHandler(handler, url); } }
//鏈接事件放入線程池 public void connected(Channel channel) throws RemotingException { ExecutorService cexecutor = getExecutorService(); try { cexecutor.execute(new ChannelEventRunnable(channel, handler, ChannelState.CONNECTED)); } catch (Throwable t) { throw new ExecutionException("connect event", channel, getClass() + " error when process connected event .", t); } } //斷開事件鏈接放入線程池 public void disconnected(Channel channel) throws RemotingException { ExecutorService cexecutor = getExecutorService(); try { cexecutor.execute(new ChannelEventRunnable(channel, handler, ChannelState.DISCONNECTED)); } catch (Throwable t) { throw new ExecutionException("disconnect event", channel, getClass() + " error when process disconnected event .", t); } } //接受請求(包含回覆消息處理)消息放入線程池 public void received(Channel channel, Object message) throws RemotingException { ExecutorService cexecutor = getExecutorService(); try { cexecutor.execute(new ChannelEventRunnable(channel, handler, ChannelState.RECEIVED, message)); } catch (Throwable t) { //TODO 臨時解決線程池滿後異常信息沒法發送到對端的問題。待重構 //fix 線程池滿了拒絕調用不返回,致使消費者一直等待超時 if(message instanceof Request && t instanceof RejectedExecutionException){ Request request = (Request)message; if(request.isTwoWay()){ String msg = "Server side(" + url.getIp() + "," + url.getPort() + ") threadpool is exhausted ,detail msg:" + t.getMessage(); Response response = new Response(request.getId(), request.getVersion()); response.setStatus(Response.SERVER_THREADPOOL_EXHAUSTED_ERROR); response.setErrorMessage(msg); channel.send(response); return; } } throw new ExecutionException(message, channel, getClass() + " error when process received event .", t); } } //異常處理線程池 public void caught(Channel channel, Throwable exception) throws RemotingException { ExecutorService cexecutor = getExecutorService(); try { cexecutor.execute(new ChannelEventRunnable(channel, handler, ChannelState.CAUGHT, exception)); } catch (Throwable t) { throw new ExecutionException("caught event", channel, getClass() + " error when process caught event .", t); } }
2,direct 分配實現,文檔上說,全部消息都不派發到線程池,所有在 IO 線程上直接執行。
public class DirectDispatcher implements Dispatcher { public static final String NAME = "direct"; public ChannelHandler dispatch(ChannelHandler handler, URL url) { //直接返回原生的handler不進行另外裝飾 return handler; } }
3,message 手冊上說,只有請求響應消息派發到線程池,其它鏈接斷開事件,心跳等消息,直接在 IO 線程上執行。
public class MessageOnlyDispatcher implements Dispatcher { public static final String NAME = "message"; public ChannelHandler dispatch(ChannelHandler handler, URL url) { //經過MessageOnlyChannelHandler裝飾類處理 return new MessageOnlyChannelHandler(handler, url); } }
public class MessageOnlyChannelHandler extends WrappedChannelHandler { public MessageOnlyChannelHandler(ChannelHandler handler, URL url) { super(handler, url); } //接收請求(包括響應)消息放在線程池。 public void received(Channel channel, Object message) throws RemotingException { ExecutorService cexecutor = executor; if (cexecutor == null || cexecutor.isShutdown()) { cexecutor = SHARED_EXECUTOR; } try { cexecutor.execute(new ChannelEventRunnable(channel, handler, ChannelState.RECEIVED, message)); } catch (Throwable t) { throw new ExecutionException(message, channel, getClass() + " error when process received event .", t); } } }
4,execution 手冊上說,只請求消息派發到線程池,不含響應,響應和其它鏈接斷開事件,心跳等消息,直接在 IO 線程上執行。
public class ExecutionDispatcher implements Dispatcher { public static final String NAME = "execution"; public ChannelHandler dispatch(ChannelHandler handler, URL url) { //經過裝飾類ExecutionChannelHandler實現 return new ExecutionChannelHandler(handler, url); } }
public class ExecutionChannelHandler extends WrappedChannelHandler { public ExecutionChannelHandler(ChannelHandler handler, URL url) { super(handler, url); } //鏈接事件放入線程池 public void connected(Channel channel) throws RemotingException { executor.execute(new ChannelEventRunnable(channel, handler, ChannelState.CONNECTED)); } //斷開事件鏈接放入線程池 public void disconnected(Channel channel) throws RemotingException { executor.execute(new ChannelEventRunnable(channel, handler, ChannelState.DISCONNECTED)); } //消息接受(響應消息發送)放入線程池 public void received(Channel channel, Object message) throws RemotingException { try { executor.execute(new ChannelEventRunnable(channel, handler, ChannelState.RECEIVED, message)); } catch (Throwable t) { //TODO 臨時解決線程池滿後異常信息沒法發送到對端的問題。待重構 //fix 線程池滿了拒絕調用不返回,致使消費者一直等待超時 if(message instanceof Request && t instanceof RejectedExecutionException){ Request request = (Request)message; if(request.isTwoWay()){ String msg = "Server side("+url.getIp()+","+url.getPort()+") threadpool is exhausted ,detail msg:"+t.getMessage(); Response response = new Response(request.getId(), request.getVersion()); response.setStatus(Response.SERVER_THREADPOOL_EXHAUSTED_ERROR); response.setErrorMessage(msg); channel.send(response); return; } } throw new ExecutionException(message, channel, getClass() + " error when process received event .", t); } } //異常消息放入線程池 public void caught(Channel channel, Throwable exception) throws RemotingException { executor.execute(new ChannelEventRunnable(channel, handler, ChannelState.CAUGHT, exception)); } }
5,connection實現,手冊說,在 IO 線程上,將鏈接斷開事件放入隊列,有序逐個執行,其它消息派發到線程池。
public class ConnectionOrderedDispatcher implements Dispatcher { public static final String NAME = "connection"; public ChannelHandler dispatch(ChannelHandler handler, URL url) { //裝飾類ConnectionOrderedChannelHandler實現 return new ConnectionOrderedChannelHandler(handler, url); } }
public class ConnectionOrderedChannelHandler extends WrappedChannelHandler { protected final ThreadPoolExecutor connectionExecutor; private final int queuewarninglimit; public ConnectionOrderedChannelHandler(ChannelHandler handler, URL url) { super(handler, url); String threadName = url.getParameter(Constants.THREAD_NAME_KEY, Constants.DEFAULT_THREAD_NAME); //經過定義只有一個線程的線程池,保證執行的順序 //用LinkedBlockingQueue保存待處理的任務 connectionExecutor = new ThreadPoolExecutor(1, 1, 0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<Runnable>(url.getPositiveParameter(Constants.CONNECT_QUEUE_CAPACITY, Integer.MAX_VALUE)), new NamedThreadFactory(threadName, true), new AbortPolicyWithReport(threadName, url) ); // FIXME 沒有地方釋放connectExecutor! //這是等待隊列報警大小 queuewarninglimit = url.getParameter(Constants.CONNECT_QUEUE_WARNING_SIZE, Constants.DEFAULT_CONNECT_QUEUE_WARNING_SIZE); } //鏈接事件放入隊列 public void connected(Channel channel) throws RemotingException { try { checkQueueLength(); connectionExecutor.execute(new ChannelEventRunnable(channel, handler, ChannelState.CONNECTED)); } catch (Throwable t) { throw new ExecutionException("connect event", channel, getClass() + " error when process connected event .", t); } } //斷開事件放入隊列 public void disconnected(Channel channel) throws RemotingException { try { checkQueueLength(); connectionExecutor.execute(new ChannelEventRunnable(channel, handler, ChannelState.DISCONNECTED)); } catch (Throwable t) { throw new ExecutionException("disconnected event", channel, getClass() + " error when process disconnected event .", t); } } //放入線程池 public void received(Channel channel, Object message) throws RemotingException { ExecutorService cexecutor = executor; if (cexecutor == null || cexecutor.isShutdown()) { cexecutor = SHARED_EXECUTOR; } try { cexecutor.execute(new ChannelEventRunnable(channel, handler, ChannelState.RECEIVED, message)); } catch (Throwable t) { //fix 線程池滿了拒絕調用不返回,致使消費者一直等待超時 if(message instanceof Request && t instanceof RejectedExecutionException){ Request request = (Request)message; if(request.isTwoWay()){ String msg = "Server side(" + url.getIp() + "," + url.getPort() + ") threadpool is exhausted ,detail msg:" + t.getMessage(); Response response = new Response(request.getId(), request.getVersion()); response.setStatus(Response.SERVER_THREADPOOL_EXHAUSTED_ERROR); response.setErrorMessage(msg); channel.send(response); return; } } throw new ExecutionException(message, channel, getClass() + " error when process received event .", t); } } //放入線程池 public void caught(Channel channel, Throwable exception) throws RemotingException { ExecutorService cexecutor = executor; if (cexecutor == null || cexecutor.isShutdown()) { cexecutor = SHARED_EXECUTOR; } try { cexecutor.execute(new ChannelEventRunnable(channel, handler, ChannelState.CAUGHT, exception)); } catch (Throwable t) { throw new ExecutionException("caught event", channel, getClass() + " error when process caught event .", t); } } private void checkQueueLength() { if (connectionExecutor.getQueue().size() > queuewarninglimit) { logger.warn(new IllegalThreadStateException("connectionordered channel handler `queue size: " + connectionExecutor.getQueue().size() + " exceed the warning limit number :" + queuewarninglimit)); } } }
public class HeartbeatHandler extends AbstractChannelHandlerDelegate { private static final Logger logger = LoggerFactory.getLogger(HeartbeatHandler.class); public static String KEY_READ_TIMESTAMP = "READ_TIMESTAMP"; public static String KEY_WRITE_TIMESTAMP = "WRITE_TIMESTAMP"; public HeartbeatHandler(ChannelHandler handler) { super(handler); } public void connected(Channel channel) throws RemotingException { setReadTimestamp(channel); setWriteTimestamp(channel); handler.connected(channel); } public void disconnected(Channel channel) throws RemotingException { clearReadTimestamp(channel); clearWriteTimestamp(channel); handler.disconnected(channel); } public void sent(Channel channel, Object message) throws RemotingException { setWriteTimestamp(channel); handler.sent(channel, message); } /*** * 心跳消息的接受和發送 * * @param channel * @param message * @throws RemotingException */ public void received(Channel channel, Object message) throws RemotingException { setReadTimestamp(channel); if (isHeartbeatRequest(message)) { Request req = (Request) message; if (req.isTwoWay()) { Response res = new Response(req.getId(), req.getVersion()); res.setEvent(Response.HEARTBEAT_EVENT); channel.send(res); if (logger.isInfoEnabled()) { int heartbeat = channel.getUrl().getParameter(Constants.HEARTBEAT_KEY, 0); if (logger.isDebugEnabled()) { logger.debug("Received heartbeat from remote channel " + channel.getRemoteAddress() + ", cause: The channel has no data-transmission exceeds a heartbeat period" + (heartbeat > 0 ? ": " + heartbeat + "ms" : "")); } } } return; } if (isHeartbeatResponse(message)) { if (logger.isDebugEnabled()) { logger.debug( new StringBuilder(32) .append("Receive heartbeat response in thread ") .append(Thread.currentThread().getName()) .toString()); } return; } //非心跳消息的接受,走派發裝飾類 handler.received(channel, message); } }
public class MultiMessageHandler extends AbstractChannelHandlerDelegate { public MultiMessageHandler(ChannelHandler handler) { super(handler); } @SuppressWarnings("unchecked") @Override public void received(Channel channel, Object message) throws RemotingException { //多個消息類型時,循環接受 if (message instanceof MultiMessage) { MultiMessage list = (MultiMessage) message; for (Object obj : list) { handler.received(channel, obj); } } else { handler.received(channel, message); } } }