-- 提供方 <dubbo:application name="demo-provider"/> <dubbo:registry address="zookeeper://"/> <dubbo:protocol name="dubbo" port="20880"/> <bean id="demoService" class="com.alibaba.dubbo.demo.provider.DemoServiceImpl"/> <dubbo:service interface="com.alibaba.dubbo.demo.DemoService" ref="demoService"/> -- 消費方 <dubbo:application name="demo-consumer"/> <dubbo:registry address="zookeeper://"/> <dubbo:reference id="demoService" check="false" interface="com.alibaba.dubbo.demo.DemoService"/>
public <T> T getProxy(Invoker<T> invoker, Class<?>[] interfaces) { return (T) Proxy.getProxy(interfaces).newInstance(new InvokerInvocationHandler(invoker)); }
public String sayHello(String paramString){ Object[] arrayOfObject = new Object[1]; arrayOfObject[0] = paramString; Object localObject = this.handler.invoke(this, methods[0], arrayOfObject); return ((String)localObject); }
public Object invoke(Object proxy, Method method, Object[] args) throws Throwable { String methodName = method.getName(); Class<?>[] parameterTypes = method.getParameterTypes(); if (method.getDeclaringClass() == Object.class) { return method.invoke(invoker, args); } if ("toString".equals(methodName) && parameterTypes.length == 0) { return invoker.toString(); } if ("hashCode".equals(methodName) && parameterTypes.length == 0) { return invoker.hashCode(); } if ("equals".equals(methodName) && parameterTypes.length == 1) { return invoker.equals(args[0]); } return invoker.invoke(new RpcInvocation(method, args)).recreate(); }
public <T> Invoker<T> join(Directory<T> directory) throws RpcException { return new MockClusterInvoker<T>(directory, this.cluster.join(directory)); }
a. 若是mock爲null或false,直接調用FailoverClusterInvoker b. 若是mock以force開頭,強制執行mock調用 c. 以上都不是,則先調用FailoverClusterInvoker,調用失敗再執行mock調用this
// 執行Directory.list 和 router.route List<Invoker<T>> invokers = list(invocation); // 執行LoadBalance.select Invoker<T> invoker = select(loadbalance, invocation, copyinvokers, invoked);
a. 經過Directory目錄服務的list方法獲取訂閱返回的服務提供者的Invoker對象集合,若是存在路由,路由服務根據策略過濾Invoker對象集合 b. 根據負載均衡策略LoadBalance來選擇一個Invokerurl
private static <T> Invoker<T> buildInvokerChain(final Invoker<T> invoker, String key, String group) { Invoker<T> last = invoker; List<Filter> filters = ExtensionLoader.getExtensionLoader(Filter.class).getActivateExtension(invoker.getUrl(), key, group); if (!filters.isEmpty()) { // 倒序循環,順序執行 for (int i = filters.size() - 1; i >= 0; i--) { final Filter filter = filters.get(i); final Invoker<T> next = last; last = new Invoker<T>() { public Class<T> getInterface() { return invoker.getInterface(); } public URL getUrl() { return invoker.getUrl(); } public boolean isAvailable() { return invoker.isAvailable(); } public Result invoke(Invocation invocation) throws RpcException { return filter.invoke(next, invocation); } public void destroy() { invoker.destroy(); } @Override public String toString() { return invoker.toString(); } }; } } return last; }
public <T> Invoker<T> refer(Class<T> type, URL url) throws RpcException { if (Constants.REGISTRY_PROTOCOL.equals(url.getProtocol())) { return protocol.refer(type, url); } return new ListenerInvokerWrapper<T>(protocol.refer(type, url), Collections.unmodifiableList( ExtensionLoader.getExtensionLoader(InvokerListener.class) .getActivateExtension(url, Constants.INVOKER_LISTENER_KEY))); }
public class RpcInvocation implements Invocation, Serializable { private static final long serialVersionUID = -4355285085441097045L; // 方法名稱 private String methodName; // 參數類型 private Class<?>[] parameterTypes; // 參數值 private Object[] arguments; // 附件 private Map<String, String> attachments; private transient Invoker<?> invoker; }
HeaderExchangeChannel public void send(Object message, boolean sent) throws RemotingException { if (message instanceof Request || message instanceof Response || message instanceof String) { channel.send(message, sent); } else { Request request = new Request(); request.setVersion("2.0.0"); request.setTwoWay(false); request.setData(message); channel.send(request, sent); } }
RpcInvocation [methodName=sayHello, parameterTypes=[class java.lang.String], arguments=[world], attachments={path=com.alibaba.dubbo.demo.DemoService, interface=com.alibaba.dubbo.demo.DemoService, version=0.0.0}]
AllChannelHandler cexecutor.execute(new ChannelEventRunnable(channel, handler, ChannelState.RECEIVED, message));
Response handleRequest(ExchangeChannel channel, Request req) throws RemotingException { Response res = new Response(req.getId(), req.getVersion()); // find handler by message class. Object msg = req.getData(); try { // handle data. Object result = handler.reply(channel, msg); res.setStatus(Response.OK); res.setResult(result); } catch (Throwable e) { res.setStatus(Response.SERVICE_ERROR); res.setErrorMessage(StringUtils.toString(e)); } return res; }
RpcInvocation [methodName=sayHello, parameterTypes=[class java.lang.String], arguments=[world], attachments={path=com.alibaba.dubbo.demo.DemoService, input=201, dubbo=2.0.0, interface=com.alibaba.dubbo.demo.DemoService, version=0.0.0}]
public Object reply(ExchangeChannel channel, Object message) throws RemotingException { if (message instanceof Invocation) { Invocation inv = (Invocation) message; // 根據invocation匹配Invoker Invoker<?> invoker = getInvoker(channel, inv); // 部分省略 RpcContext.getContext().setRemoteAddress(channel.getRemoteAddress()); return invoker.invoke(inv); } } Invoker<?> getInvoker(Channel channel, Invocation inv) throws RemotingException { int port = channel.getLocalAddress().getPort(); String path = inv.getAttachments().get(Constants.PATH_KEY); // 組裝serviceKey String serviceKey = serviceKey(port, path, inv.getAttachments().get(Constants.VERSION_KEY), inv.getAttachments().get(Constants.GROUP_KEY)); // 根據serviceKey從exporterMap中獲取DubboExporter,再得到Invoker DubboExporter<?> exporter = (DubboExporter<?>) exporterMap.get(serviceKey); return exporter.getInvoker(); }
@Activate(group = Constants.CONSUMER, order = -10000) public class ConsumerContextFilter implements Filter {} @Activate(group = Constants.PROVIDER, order = -10000) public class ContextFilter implements Filter {}
public <T> Invoker<T> getInvoker(T proxy, Class<T> type, URL url) { // TODO Wrapper cannot handle this scenario correctly: the classname contains '$' final Wrapper wrapper = Wrapper.getWrapper(proxy.getClass().getName().indexOf('$') < 0 ? proxy.getClass() : type); return new AbstractProxyInvoker<T>(proxy, type, url) { @Override protected Object doInvoke(T proxy, String methodName, Class<?>[] parameterTypes, Object[] arguments) throws Throwable { return wrapper.invokeMethod(proxy, methodName, parameterTypes, arguments); } }; }
public Result invoke(Invocation invocation) throws RpcException { try { return new RpcResult(doInvoke(proxy, invocation.getMethodName(), invocation.getParameterTypes(), invocation.getArguments())); } catch (InvocationTargetException e) { return new RpcResult(e.getTargetException()); } catch (Throwable e) { throw new RpcException("Failed to invoke remote proxy method " + invocation.getMethodName() + " to " + getUrl() + ", cause: " + e.getMessage(), e); } }
public class Wrapper0 extends Wrapper { public Object invokeMethod(Object o, String n, Class[] p, Object[] v) throws java.lang.reflect.InvocationTargetException { com.alibaba.dubbo.demo.provider.DemoServiceImpl w; try { w = ((com.alibaba.dubbo.demo.provider.DemoServiceImpl) $1); } catch (Throwable e) { throw new IllegalArgumentException(e); } try { if ("sayHello".equals($2) && ($3.length == 1)) { return ($w) w.sayHello((java.lang.String) $4[0]); } } catch (Throwable e) { throw new java.lang.reflect.InvocationTargetException(e); } throw new com.alibaba.dubbo.common.bytecode.NoSuchMethodException( "Not found method \"" + $2 + "\" in class com.alibaba.dubbo.demo.provider.DemoServiceImpl."); } } }
public ResponseFuture request(Object request, int timeout) throws RemotingException { // create request. Request req = new Request(); req.setVersion("2.0.0"); req.setTwoWay(true); req.setData(request); DefaultFuture future = new DefaultFuture(channel, req, timeout); try { channel.send(req); } catch (RemotingException e) { future.cancel(); throw e; } return future; }
Response handleRequest(ExchangeChannel channel, Request req) throws RemotingException { Response res = new Response(req.getId(), req.getVersion()); // find handler by message class. Object msg = req.getData(); try { // handle data. Object result = handler.reply(channel, msg); res.setStatus(Response.OK); res.setResult(result); } catch (Throwable e) { res.setStatus(Response.SERVICE_ERROR); res.setErrorMessage(StringUtils.toString(e)); } return res; }
private Object returnFromResponse() throws RemotingException { Response res = response; if (res == null) { throw new IllegalStateException("response cannot be null"); } if (res.getStatus() == Response.OK) { return res.getResult(); } if (res.getStatus() == Response.CLIENT_TIMEOUT || res.getStatus() == Response.SERVER_TIMEOUT) { throw new TimeoutException(res.getStatus() == Response.SERVER_TIMEOUT, channel, res.getErrorMessage()); } throw new RemotingException(channel, res.getErrorMessage()); }