目標:介紹基於Grizzly的來實現的遠程通訊、介紹dubbo-remoting-grizzly內的源碼解析。java
Grizzly NIO框架的設計初衷是幫助開發者更好地利用Java NIO API,構建強大的可擴展的服務器應用。關於Grizzly我也沒有很熟悉,因此只能根據grizzly在dubbo的遠程通信中應用稍微講解一下。git
下面是dubbo-remoting-grizzly下的包結構:github
private static final Logger logger = LoggerFactory.getLogger(GrizzlyChannel.class);
/** * 通道key */
private static final String CHANNEL_KEY = GrizzlyChannel.class.getName() + ".CHANNEL";
/** * 通道屬性 */
private static final Attribute<GrizzlyChannel> ATTRIBUTE = Grizzly.DEFAULT_ATTRIBUTE_BUILDER.createAttribute(CHANNEL_KEY);
/** * Grizzly的鏈接實例 */
private final Connection<?> connection;
複製代碼
能夠看到,該類中的ATTRIBUTE和connection都是Grizzly涉及到的屬性,ATTRIBUTE中封裝了GrizzlyChannel的實例還有Connection實例。Grizzly把鏈接的一些鏈接的方法定義在了Connection接口中,包括得到遠程地址、檢測通道是否鏈接等方法。segmentfault
@Override
@SuppressWarnings("rawtypes")
public void send(Object message, boolean sent) throws RemotingException {
super.send(message, sent);
int timeout = 0;
try {
// 發送消息,得到GrizzlyFuture實例
GrizzlyFuture future = connection.write(message);
if (sent) {
// 得到延遲多少時間得到響應
timeout = getUrl().getPositiveParameter(Constants.TIMEOUT_KEY, Constants.DEFAULT_TIMEOUT);
// 得到請求的值
future.get(timeout, TimeUnit.MILLISECONDS);
}
} catch (TimeoutException e) {
throw new RemotingException(this, "Failed to send message " + message + " to " + getRemoteAddress()
+ "in timeout(" + timeout + "ms) limit", e);
} catch (Throwable e) {
throw new RemotingException(this, "Failed to send message " + message + " to " + getRemoteAddress() + ", cause: " + e.getMessage(), e);
}
}
複製代碼
該方法是發送消息的方法,調用了connection的write方法,它會返回一個GrizzlyFuture實例,GrizzlyFuture繼承了java.util.concurrent.Future。緩存
其餘方法比較簡單,實現的方法都基本調用connection的方法或者Grizzly.DEFAULT_ATTRIBUTE_BUILDER的方法。服務器
該類是Grizzly的通道處理器,它繼承了BaseFilter。app
/** * url */
private final URL url;
/** * 通道處理器 */
private final ChannelHandler handler;
複製代碼
該類有兩個屬性,這兩個屬性應該很熟悉了。而該類有handleConnect、handleClose、handleRead、handleWrite、exceptionOccurred的實現方法,分別調用了ChannelHandler中封裝的五個方法。舉個例子:框架
@Override
public NextAction handleConnect(FilterChainContext ctx) throws IOException {
// 得到Connection鏈接實例
Connection<?> connection = ctx.getConnection();
// 得到GrizzlyChannel通道
GrizzlyChannel channel = GrizzlyChannel.getOrAddChannel(connection, url, handler);
try {
// 鏈接
handler.connected(channel);
} catch (RemotingException e) {
throw new IOException(StringUtils.toString(e));
} finally {
GrizzlyChannel.removeChannelIfDisconnected(connection);
}
return ctx.getInvokeAction();
}
複製代碼
能夠看到得到GrizzlyChannel通道就調用了handler.connected進行鏈接。而其餘四個方法也差很少,感興趣的能夠自行查看代碼。less
該類是Grizzly的客戶端實現類,繼承了AbstractClient類ide
/** * Grizzly中的傳輸對象 */
private TCPNIOTransport transport;
/** * 鏈接實例 */
private volatile Connection<?> connection; // volatile, please copy reference to use
複製代碼
能夠看到屬性中有TCPNIOTransport的實例,在該類中實現的客戶端方法都調用了transport中的方法,TCPNIOTransport中封裝了建立,鏈接,斷開鏈接等方法。
咱們來看建立客戶端的邏輯:
@Override
protected void doOpen() throws Throwable {
// 作一些過濾,用於處理消息
FilterChainBuilder filterChainBuilder = FilterChainBuilder.stateless();
filterChainBuilder.add(new TransportFilter());
filterChainBuilder.add(new GrizzlyCodecAdapter(getCodec(), getUrl(), this));
filterChainBuilder.add(new GrizzlyHandler(getUrl(), this));
// 傳輸構建者
TCPNIOTransportBuilder builder = TCPNIOTransportBuilder.newInstance();
// 得到線程池配置實例
ThreadPoolConfig config = builder.getWorkerThreadPoolConfig();
// 設置線程池的配置,包括核心線程數等
config.setPoolName(CLIENT_THREAD_POOL_NAME)
.setQueueLimit(-1)
.setCorePoolSize(0)
.setMaxPoolSize(Integer.MAX_VALUE)
.setKeepAliveTime(60L, TimeUnit.SECONDS);
// 設置建立屬性
builder.setTcpNoDelay(true).setKeepAlive(true)
.setConnectionTimeout(getConnectTimeout())
.setIOStrategy(SameThreadIOStrategy.getInstance());
// 建立一個transport
transport = builder.build();
transport.setProcessor(filterChainBuilder.build());
// 建立客戶端
transport.start();
}
複製代碼
能夠看到首先是設置及一些過濾,在上面對信息先處理,而後設置了線程池的配置,建立transport,而且用transport來進行建立客戶端。
該類是Grizzly的服務器實現類,繼承了AbstractServer類。
/** * 鏈接該服務器的客戶端通道集合 */
private final Map<String, Channel> channels = new ConcurrentHashMap<String, Channel>(); // <ip:port, channel>
/** * 傳輸實例 */
private TCPNIOTransport transport;
複製代碼
該類中有兩個屬性,其中transport用法跟GrizzlyClient中的同樣。
我也就講解一個doOpen方法:
@Override
protected void doOpen() throws Throwable {
// 增長過濾器,來處理信息
FilterChainBuilder filterChainBuilder = FilterChainBuilder.stateless();
filterChainBuilder.add(new TransportFilter());
filterChainBuilder.add(new GrizzlyCodecAdapter(getCodec(), getUrl(), this));
filterChainBuilder.add(new GrizzlyHandler(getUrl(), this));
TCPNIOTransportBuilder builder = TCPNIOTransportBuilder.newInstance();
// 得到線程池配置
ThreadPoolConfig config = builder.getWorkerThreadPoolConfig();
config.setPoolName(SERVER_THREAD_POOL_NAME).setQueueLimit(-1);
// 得到url配置中線程池類型
String threadpool = getUrl().getParameter(Constants.THREADPOOL_KEY, Constants.DEFAULT_THREADPOOL);
if (Constants.DEFAULT_THREADPOOL.equals(threadpool)) {
// 優先從url得到線程池的線程數,默認線程數爲200
int threads = getUrl().getPositiveParameter(Constants.THREADS_KEY, Constants.DEFAULT_THREADS);
// 設置線程池配置
config.setCorePoolSize(threads).setMaxPoolSize(threads)
.setKeepAliveTime(0L, TimeUnit.SECONDS);
// 若是是cached類型的線程池
} else if ("cached".equals(threadpool)) {
int threads = getUrl().getPositiveParameter(Constants.THREADS_KEY, Integer.MAX_VALUE);
// 設置核心線程數爲0、最大線程數爲threads
config.setCorePoolSize(0).setMaxPoolSize(threads)
.setKeepAliveTime(60L, TimeUnit.SECONDS);
} else {
throw new IllegalArgumentException("Unsupported threadpool type " + threadpool);
}
builder.setKeepAlive(true).setReuseAddress(false)
.setIOStrategy(SameThreadIOStrategy.getInstance());
// 建立transport
transport = builder.build();
transport.setProcessor(filterChainBuilder.build());
transport.bind(getBindAddress());
// 開啓服務器
transport.start();
}
複製代碼
該方法是建立服務器,能夠看到操做跟GrizzlyClient中的差很少。
該類實現了Transporter接口,是基於Grizzly的傳輸層實現。
public class GrizzlyTransporter implements Transporter {
public static final String NAME = "grizzly";
@Override
public Server bind(URL url, ChannelHandler listener) throws RemotingException {
// 返回GrizzlyServer實例
return new GrizzlyServer(url, listener);
}
@Override
public Client connect(URL url, ChannelHandler listener) throws RemotingException {
// // 返回GrizzlyClient實例
return new GrizzlyClient(url, listener);
}
}
複製代碼
能夠看到,bind和connect方法分別就是建立了GrizzlyServer和GrizzlyClient實例。這裏我建議查看一下《dubbo源碼解析(九)遠程通訊——Transport層》。
該類是Grizzly編解碼類,繼承了BaseFilter。
/** * 編解碼器 */
private final Codec2 codec;
/** * url */
private final URL url;
/** * 通道處理器 */
private final ChannelHandler handler;
/** * 緩存大小 */
private final int bufferSize;
/** * 空緩存區 */
private ChannelBuffer previousData = ChannelBuffers.EMPTY_BUFFER;
public GrizzlyCodecAdapter(Codec2 codec, URL url, ChannelHandler handler) {
this.codec = codec;
this.url = url;
this.handler = handler;
int b = url.getPositiveParameter(Constants.BUFFER_KEY, Constants.DEFAULT_BUFFER_SIZE);
// 若是緩存區大小在16k之內,則設置配置大小,若是不是,則設置8k的緩衝區大小
this.bufferSize = b >= Constants.MIN_BUFFER_SIZE && b <= Constants.MAX_BUFFER_SIZE ? b : Constants.DEFAULT_BUFFER_SIZE;
}
複製代碼
@Override
public NextAction handleWrite(FilterChainContext context) throws IOException {
Connection<?> connection = context.getConnection();
GrizzlyChannel channel = GrizzlyChannel.getOrAddChannel(connection, url, handler);
try {
// 分配一個1024的動態緩衝區
ChannelBuffer channelBuffer = ChannelBuffers.dynamicBuffer(1024); // Do not need to close
// 得到消息
Object msg = context.getMessage();
// 編碼
codec.encode(channel, channelBuffer, msg);
// 檢測是否鏈接
GrizzlyChannel.removeChannelIfDisconnected(connection);
// 分配緩衝區
Buffer buffer = connection.getTransport().getMemoryManager().allocate(channelBuffer.readableBytes());
// 把channelBuffer的數據寫到buffer
buffer.put(channelBuffer.toByteBuffer());
buffer.flip();
buffer.allowBufferDispose(true);
// 設置到上下文
context.setMessage(buffer);
} finally {
GrizzlyChannel.removeChannelIfDisconnected(connection);
}
return context.getInvokeAction();
}
複製代碼
該方法是寫數據,能夠發現編碼調用的是 codec.encode,其餘的我都在註釋裏寫明瞭,關鍵仍是對前面兩篇文章的一些內容須要理解。
@Override
public NextAction handleRead(FilterChainContext context) throws IOException {
Object message = context.getMessage();
Connection<?> connection = context.getConnection();
Channel channel = GrizzlyChannel.getOrAddChannel(connection, url, handler);
try {
// 若是接收的是一個數據包
if (message instanceof Buffer) { // receive a new packet
Buffer grizzlyBuffer = (Buffer) message; // buffer
ChannelBuffer frame;
// 若是緩衝區可讀
if (previousData.readable()) {
// 若是該緩衝區是動態的緩衝區
if (previousData instanceof DynamicChannelBuffer) {
// 寫入數據
previousData.writeBytes(grizzlyBuffer.toByteBuffer());
frame = previousData;
} else {
// 得到須要的緩衝區大小
int size = previousData.readableBytes() + grizzlyBuffer.remaining();
// 新建一個動態緩衝區
frame = ChannelBuffers.dynamicBuffer(size > bufferSize ? size : bufferSize);
// 寫入previousData中的數據
frame.writeBytes(previousData, previousData.readableBytes());
// 寫入grizzlyBuffer中的數據
frame.writeBytes(grizzlyBuffer.toByteBuffer());
}
} else {
// 不然是基於Java NIO的ByteBuffer生成的緩衝區
frame = ChannelBuffers.wrappedBuffer(grizzlyBuffer.toByteBuffer());
}
Object msg;
int savedReadIndex;
do {
savedReadIndex = frame.readerIndex();
try {
// 解碼
msg = codec.decode(channel, frame);
} catch (Exception e) {
previousData = ChannelBuffers.EMPTY_BUFFER;
throw new IOException(e.getMessage(), e);
}
// 拆包
if (msg == Codec2.DecodeResult.NEED_MORE_INPUT) {
frame.readerIndex(savedReadIndex);
// 結束調用鏈
return context.getStopAction();
} else {
if (savedReadIndex == frame.readerIndex()) {
// 沒有可讀內容
previousData = ChannelBuffers.EMPTY_BUFFER;
throw new IOException("Decode without read data.");
}
if (msg != null) {
// 把解碼後信息放入上下文
context.setMessage(msg);
// 繼續下面的調用鏈
return context.getInvokeAction();
} else {
return context.getInvokeAction();
}
}
} while (frame.readable());
} else { // Other events are passed down directly
return context.getInvokeAction();
}
} finally {
GrizzlyChannel.removeChannelIfDisconnected(connection);
}
}
複製代碼
該方法是讀數據,直接調用了codec.decode進行解碼。
該部分相關的源碼解析地址:github.com/CrazyHZM/in…
該文章講解了基於grizzly的來實現的遠程通訊、介紹dubbo-remoting-grizzly內的源碼解析,關鍵須要對grizzly有所瞭解。下一篇我會講解基於http實現遠程通訊部分。