remoting是rpc的基礎,今天簡單略讀一下remoting。異步
首先是兩個核心對Request和Response。ide
public class Request { public static final String HEARTBEAT_EVENT = null; public static final String READONLY_EVENT = "R"; private static final AtomicLong INVOKE_ID = new AtomicLong(0); private final long mId; private String mVersion; private boolean mTwoWay = true; private boolean mEvent = false; private boolean mBroken = false; private Object mData;
public class Response { public static final String HEARTBEAT_EVENT = null; public static final String READONLY_EVENT = "R"; /** * ok. */ public static final byte OK = 20; /** * clien side timeout. */ public static final byte CLIENT_TIMEOUT = 30; /** * server side timeout. */ public static final byte SERVER_TIMEOUT = 31; /** * request format error. */ public static final byte BAD_REQUEST = 40; /** * response format error. */ public static final byte BAD_RESPONSE = 50; /** * service not found. */ public static final byte SERVICE_NOT_FOUND = 60; /** * service error. */ public static final byte SERVICE_ERROR = 70; /** * internal server error. */ public static final byte SERVER_ERROR = 80; /** * internal server error. */ public static final byte CLIENT_ERROR = 90; /** * server side threadpool exhausted and quick return. */ public static final byte SERVER_THREADPOOL_EXHAUSTED_ERROR = 100; private long mId = 0; private String mVersion; private byte mStatus = OK; private boolean mEvent = false; private String mErrorMsg; private Object mResult;
經過Channel維持一個endpoint鏈接。ui
public interface Channel extends Endpoint { /** * get remote address. * * @return remote address. */ InetSocketAddress getRemoteAddress(); /** * is connected. * * @return connected */ boolean isConnected(); /** * has attribute. * * @param key key. * @return has or has not. */ boolean hasAttribute(String key); /** * get attribute. * * @param key key. * @return value. */ Object getAttribute(String key); /** * set attribute. * * @param key key. * @param value value. */ void setAttribute(String key, Object value); /** * remove attribute. * * @param key key. */ void removeAttribute(String key); }
心跳this
public HeaderExchangeClient(Client client, boolean needHeartbeat) { if (client == null) { throw new IllegalArgumentException("client == null"); } this.client = client; this.channel = new HeaderExchangeChannel(client); String dubbo = client.getUrl().getParameter(Constants.DUBBO_VERSION_KEY); this.heartbeat = client.getUrl().getParameter(Constants.HEARTBEAT_KEY, dubbo != null && dubbo.startsWith("1.0.") ? Constants.DEFAULT_HEARTBEAT : 0); this.heartbeatTimeout = client.getUrl().getParameter(Constants.HEARTBEAT_TIMEOUT_KEY, heartbeat * 3); if (heartbeatTimeout < heartbeat * 2) { throw new IllegalStateException("heartbeatTimeout < heartbeatInterval * 2"); } if (needHeartbeat) { startHeatbeatTimer(); } }
異步結果方面會大量使用Future模式。線程
private static final Map<Long, Channel> CHANNELS = new ConcurrentHashMap<Long, Channel>(); private static final Map<Long, DefaultFuture> FUTURES = new ConcurrentHashMap<Long, DefaultFuture>(); public interface ResponseFuture { /** * get result. * * @return result. */ Object get() throws RemotingException; /** * get result with the specified timeout. * * @param timeoutInMillis timeout. * @return result. */ Object get(int timeoutInMillis) throws RemotingException; /** * set callback. * * @param callback */ void setCallback(ResponseCallback callback); /** * check is done. * * @return done or not. */ boolean isDone(); }
針對於超時響應,會在類內建立一個後臺線程經過死循環進行result超時驗證。code
private static class RemotingInvocationTimeoutScan implements Runnable { public void run() { while (true) { try { for (DefaultFuture future : FUTURES.values()) { if (future == null || future.isDone()) { continue; } if (System.currentTimeMillis() - future.getStartTimestamp() > future.getTimeout()) { // create exception response. Response timeoutResponse = new Response(future.getId()); // set timeout status. timeoutResponse.setStatus(future.isSent() ? Response.SERVER_TIMEOUT : Response.CLIENT_TIMEOUT); timeoutResponse.setErrorMessage(future.getTimeoutMessage(true)); // handle response. DefaultFuture.received(future.getChannel(), timeoutResponse); } } Thread.sleep(30); } catch (Throwable e) { logger.error("Exception when scan the timeout invocation of remoting.", e); } } } }
經過不停比較timeout時間判斷響應時間是否超時,orm