整個RpcContext經過ThreadLocal維持。redis
public class RpcContext { private static final ThreadLocal<RpcContext> LOCAL = new ThreadLocal<RpcContext>() { @Override protected RpcContext initialValue() { return new RpcContext(); } }; private final Map<String, String> attachments = new HashMap<String, String>(); private final Map<String, Object> values = new HashMap<String, Object>(); private Future<?> future; private List<URL> urls; private URL url; private String methodName; private Class<?>[] parameterTypes; private Object[] arguments; private InetSocketAddress localAddress; private InetSocketAddress remoteAddress; @Deprecated private List<Invoker<?>> invokers; @Deprecated private Invoker<?> invoker; @Deprecated private Invocation invocation;
經過isProviderSide,isConsumerSide標記Context屬於服務提供方仍是服務使用方。app
經過Future從Context中獲取異步調用結果:異步
public <T> Future<T> asyncCall(Callable<T> callable) { try { try { setAttachment(Constants.ASYNC_KEY, Boolean.TRUE.toString()); final T o = callable.call(); //local invoke will return directly if (o != null) { FutureTask<T> f = new FutureTask<T>(new Callable<T>() { public T call() throws Exception { return o; } }); f.run(); return f; } else { } } catch (Exception e) { throw new RpcException(e); } finally { removeAttachment(Constants.ASYNC_KEY); } } catch (final RpcException e) { return new Future<T>() { public boolean cancel(boolean mayInterruptIfRunning) { return false; } public boolean isCancelled() { return false; } public boolean isDone() { return true; } public T get() throws InterruptedException, ExecutionException { throw new ExecutionException(e.getCause()); } public T get(long timeout, TimeUnit unit) throws InterruptedException, ExecutionException, TimeoutException { return get(); } }; } return ((Future<T>) getContext().getFuture()); }
public class RpcInvocation implements Invocation, Serializable { private static final long serialVersionUID = -4355285085441097045L; private String methodName; private Class<?>[] parameterTypes; private Object[] arguments; private Map<String, String> attachments; private transient Invoker<?> invoker;
public Result invoke(Invocation invocation) throws RpcException { try { return new RpcResult(doInvoke(proxy, invocation.getMethodName(), invocation.getParameterTypes(), invocation.getArguments())); } catch (InvocationTargetException e) { return new RpcResult(e.getTargetException()); } catch (Throwable e) { throw new RpcException("Failed to invoke remote proxy method " + invocation.getMethodName() + " to " + getUrl() + ", cause: " + e.getMessage(), e); } } protected abstract Object doInvoke(T proxy, String methodName, Class<?>[] parameterTypes, Object[] arguments) throws Throwable; // doInvoke具體實現交由各自實現類實現。 // Javassist實現 public class JavassistProxyFactory extends AbstractProxyFactory { @SuppressWarnings("unchecked") public <T> T getProxy(Invoker<T> invoker, Class<?>[] interfaces) { return (T) Proxy.getProxy(interfaces).newInstance(new InvokerInvocationHandler(invoker)); } public <T> Invoker<T> getInvoker(T proxy, Class<T> type, URL url) { // TODO Wrapper cannot handle this scenario correctly: the classname contains '$' final Wrapper wrapper = Wrapper.getWrapper(proxy.getClass().getName().indexOf('$') < 0 ? proxy.getClass() : type); return new AbstractProxyInvoker<T>(proxy, type, url) { @Override protected Object doInvoke(T proxy, String methodName, Class<?>[] parameterTypes, Object[] arguments) throws Throwable { return wrapper.invokeMethod(proxy, methodName, parameterTypes, arguments); } }; } } // JDK實現 public class JdkProxyFactory extends AbstractProxyFactory { @SuppressWarnings("unchecked") public <T> T getProxy(Invoker<T> invoker, Class<?>[] interfaces) { return (T) Proxy.newProxyInstance(Thread.currentThread().getContextClassLoader(), interfaces, new InvokerInvocationHandler(invoker)); } public <T> Invoker<T> getInvoker(T proxy, Class<T> type, URL url) { return new AbstractProxyInvoker<T>(proxy, type, url) { @Override protected Object doInvoke(T proxy, String methodName, Class<?>[] parameterTypes, Object[] arguments) throws Throwable { Method method = proxy.getClass().getMethod(methodName, parameterTypes); return method.invoke(proxy, arguments); } }; } }
比較有意思,是經過對invocation.Arguments做爲key放到redis中取值,將值反序列化出來做爲執行結果。async
resource = jedisPool.getResource(); if (get.equals(invocation.getMethodName())) { if (invocation.getArguments().length != 1) { throw new IllegalArgumentException("The redis get method arguments mismatch, must only one arguments. interface: " + type.getName() + ", method: " + invocation.getMethodName() + ", url: " + url); } byte[] value = resource.get(String.valueOf(invocation.getArguments()[0]).getBytes()); if (value == null) { return new RpcResult(); } ObjectInput oin = getSerialization(url).deserialize(url, new ByteArrayInputStream(value)); return new RpcResult(oin.readObject());
對於每次Context中的Invoke代理執行時,能夠將定義的Filter注入,對代理調用進行攔截。ide
public void testFilter() throws Exception { MonitorFilter monitorFilter = new MonitorFilter(); monitorFilter.setMonitorFactory(monitorFactory); Invocation invocation = new RpcInvocation("aaa", new Class<?>[0], new Object[0]); RpcContext.getContext().setRemoteAddress(NetUtils.getLocalHost(), 20880).setLocalAddress(NetUtils.getLocalHost(), 2345); monitorFilter.invoke(serviceInvoker, invocation); // intercepting invocation public Result invoke(Invoker<?> invoker, Invocation invocation) throws RpcException { if (invoker.getUrl().hasParameter(Constants.MONITOR_KEY)) { RpcContext context = RpcContext.getContext(); // provider must fetch context before invoke() gets called String remoteHost = context.getRemoteHost(); long start = System.currentTimeMillis(); // record start timestamp getConcurrent(invoker, invocation).incrementAndGet(); // count up try { Result result = invoker.invoke(invocation); // proceed invocation chain collect(invoker, invocation, result, remoteHost, start, false); return result; } catch (RpcException e) { collect(invoker, invocation, null, remoteHost, start, true); throw e; } finally { getConcurrent(invoker, invocation).decrementAndGet(); // count down } } else { return invoker.invoke(invocation); } }