RPC(Remote Procedure Call Protocol)遠程過程調用協議,它是一種經過網絡,從遠程計算機程序上請求服務,而沒必要了解底層網絡技術的協議。說的再直白一點,就是客戶端在沒必要知道調用細節的前提之下,調用遠程計算機上運行的某個對象,使用起來就像調用本地的對象同樣。目前典型的RPC實現框架有:Thrift(facebook開源)、Dubbo(alibaba開源)等等。RPC框架針對網絡協議、網絡I/O模型的封裝是透明的,對於調用的客戶端而言,它就認爲本身在調用本地的一個對象。至於傳輸層上,運用的是TCP協議、UDP協議、亦或是HTTP協議,一律不關心。從網絡I/O模型上來看,是基於select、poll、epoll方式、仍是IOCP(I/O Completion Port)方式承載實現的,對於調用者而言也不用關心。html
目前,主流的RPC框架都支持跨語言調用,即有所謂的IDL(接口定義語言),其實,這個並非RPC所必需要求的。若是你的RPC框架沒有跨語言的要求,IDL就能夠不用包括了。java
最後,值得一提的是,衡量一個RPC框架性能的好壞與否,RPC的網絡I/O模型的選擇,相當重要。在此基礎上,設計出來的RPC服務器,能夠考慮支持阻塞式同步IO、非阻塞式同步IO、固然還有所謂的多路複用IO模型、異步IO模型。支持不一樣的網絡IO模型,在高併發的狀態下,處理性能上會有很大的差異。還有一個衡量的標準,就是選擇的傳輸協議。是基於TCP協議、仍是HTTP協議、仍是UDP協議?對性能也有必定的影響。可是從我目前瞭解的狀況來看,大多數RPC開源實現框架都是基於TCP、或者HTTP的,目測沒有采用UDP協議作爲主要的傳輸協議的。git
明白了RPC的使用原理和性能要求。如今,咱們能不能撇開那些RPC開源框架,本身動手開發一個高性能的RPC服務器呢?我想,仍是能夠的。如今本人就使用Java,基於Netty,開發實現一個高性能的RPC服務器。github
如何實現、基於什麼原理?併發處理性能如何?請繼續接着看下文。spring
咱們有的時候,爲了提升單個節點的通訊吞吐量,提升通訊性能。若是是基於Java後端的,通常首選的是NIO框架(No-block IO)。可是問題也來了,Java的NIO掌握起來要至關的技術功底,和足夠的技術積累,使用起來才能駕輕就熟。通常的開發人員,若是要使用NIO開發一個後端的TCP/HTTP服務器,附帶考慮TCP粘包、網絡通訊異常、消息連接處理等等網絡通訊細節,開發門檻過高,因此比較明智的選擇是,採用業界主流的NIO框架進行服務器後端開發。主流的NIO框架主要有Netty、Mina。它們主要都是基於TCP通訊,非阻塞的IO、靈活的IO線程池而設計的,應對高併發請求也是綽綽有餘。隨着Netty、Mina這樣優秀的NIO框架,設計上日趨完善,Java後端高性能服務器開發,在技術上提供了有力的支持保障,從而打破了C++在服務器後端,一統天下的局面。由於在此以前,Java的NIO一直受人詬病,讓人敬而遠之!apache
既然,這個RPC服務器是基於Netty的,那就在說說Netty吧。實際上Netty是對JAVA NIO框架的再次封裝,它的開源網址是http://netty.io/,本文中使用的Netty版本是:4.0版本,能夠經過http://dl.bintray.com/netty/downloads/netty-4.0.37.Final.tar.bz2,進行下載使用。那也許你會問,如何使用Netty進行RPC服務器的開發呢?實際不難,下面我就簡單的說明一下技術原理:bootstrap
一、定義RPC請求消息、應答消息結構,裏面要包括RPC的接口定義模塊、包括遠程調用的類名、方法名稱、參數結構、參數值等信息。後端
二、服務端初始化的時候經過容器加載RPC接口定義和RPC接口實現類對象的映射關係,而後等待客戶端發起調用請求。緩存
三、客戶端發起的RPC消息裏面包含,遠程調用的類名、方法名稱、參數結構、參數值等信息,經過網絡,以字節流的方式送給RPC服務端,RPC服務端接收到字節流的請求以後,去對應的容器裏面,查找客戶端接口映射的具體實現對象。安全
四、RPC服務端找到實現對象的參數信息,經過反射機制建立該對象的實例,並返回調用處理結果,最後封裝成RPC應答消息通知到客戶端。
五、客戶端經過網絡,收到字節流形式的RPC應答消息,進行拆包、解析以後,顯示遠程調用結果。
上面說的是很簡單,可是實現的時候,咱們還要考慮以下的問題:
一、RPC服務器的傳輸層是基於TCP協議的,出現粘包咋辦?這樣客戶端的請求,服務端不是會解析失敗?好在Netty裏面已經提供瞭解決TCP粘包問題的解碼器:LengthFieldBasedFrameDecoder,能夠靠它輕鬆搞定TCP粘包問題。
二、Netty服務端的線程模型是單線程、多線程(一個線程負責客戶端鏈接,鏈接成功以後,丟給後端IO的線程池處理)、仍是主從模式(客戶端鏈接、後端IO處理都是基於線程池的實現)。固然在這裏,我出於性能考慮,使用了Netty主從線程池模型。
三、Netty的IO處理線程池,若是遇到很是耗時的業務,出現阻塞了咋辦?這樣不是很容易把後端的NIO線程給掛死、阻塞?本文的處理方式是,對於複雜的後端業務,分派到專門的業務線程池裏面,進行異步回調處理。
四、RPC消息的傳輸是經過字節流在NIO的通道(Channel)之間傳輸,那具體如何實現呢?本文,是經過基於Java原生對象序列化機制的編碼、解碼器(ObjectEncoder、ObjectDecoder)進行實現的。固然出於性能考慮,這個可能不是最優的方案。更優的方案是把消息的編碼、解碼器,搞成能夠配置實現的。具體好比能夠經過:protobuf、JBoss Marshalling方式進行解碼和編碼,以提升網絡消息的傳輸效率。
五、RPC服務器要考慮多線程、高併發的使用場景,因此線程安全是必須的。此外儘可能不要使用synchronized進行加鎖,改用輕量級的ReentrantLock方式進行代碼塊的條件加鎖。好比本文中的RPC消息處理回調,就有這方面的使用。
六、RPC服務端的服務接口對象和服務接口實現對象要能輕易的配置,輕鬆進行加載、卸載。在這裏,本文是經過Spring容器進行統一的對象管理。
綜上所述,本文設計的RPC服務器調用的流程圖以下所示:
客戶端併發發起RPC調用請求,而後RPC服務端使用Netty鏈接器,分派出N個NIO鏈接線程,這個時候Netty鏈接器的任務結束。而後NIO鏈接線程是統一放到Netty NIO處理線程池進行管理,這個線程池裏面會對具體的RPC請求鏈接進行消息編碼、消息解碼、消息處理等等一系列操做。最後進行消息處理(Handler)的時候,處於性能考慮,這裏的設計是,直接把複雜的消息處理過程,丟給專門的RPC業務處理線程池集中處理,而後Handler對應的NIO線程就當即返回、不會阻塞。這個時候RPC調用結束,客戶端會異步等待服務端消息的處理結果,本文是經過消息回調機制實現(MessageCallBack)。
再來講一說Netty對於RPC消息的解碼、編碼、處理對應的模塊和流程,具體以下圖所示:
從上圖能夠看出客戶端、服務端對RPC消息編碼、解碼、處理調用的模塊以及調用順序了。Netty就是把這樣一個一個的處理器串在一塊兒,造成一個責任鏈,統一進行調用。
說了這麼多,如今先簡單看下,我設計實現的NettyRPC的代碼目錄層級結構:
其中newlandframework.netty.rpc.core包是NettyRPC的核心實現。newlandframework.netty.rpc.model包裏面,則封裝了RPC消息請求、應答報文結構,以及RPC服務接口與實現綁定關係的容器定義。newlandframework.netty.rpc.config裏面定義了NettyRPC的服務端文件配置屬性。
下面先來看下newlandframework.netty.rpc.model包中定義的內容。具體是RPC消息請求、應答消息的結構定義:
RPC請求消息結構
/** * @filename:MessageRequest.java * * Newland Co. Ltd. All rights reserved. * * @Description:rpc服務請求結構 * @author tangjie * @version 1.0 * */ package newlandframework.netty.rpc.model; import java.io.Serializable; import org.apache.commons.lang.builder.ToStringBuilder; import org.apache.commons.lang.builder.ToStringStyle; public class MessageRequest implements Serializable { private String messageId; private String className; private String methodName; private Class<?>[] typeParameters; private Object[] parametersVal; public String getMessageId() { return messageId; } public void setMessageId(String messageId) { this.messageId = messageId; } public String getClassName() { return className; } public void setClassName(String className) { this.className = className; } public String getMethodName() { return methodName; } public void setMethodName(String methodName) { this.methodName = methodName; } public Class<?>[] getTypeParameters() { return typeParameters; } public void setTypeParameters(Class<?>[] typeParameters) { this.typeParameters = typeParameters; } public Object[] getParameters() { return parametersVal; } public void setParameters(Object[] parametersVal) { this.parametersVal = parametersVal; } public String toString() { return new ToStringBuilder(this, ToStringStyle.SHORT_PREFIX_STYLE) .append("messageId", messageId).append("className", className) .append("methodName", methodName).toString(); } }
RPC應答消息結構
/** * @filename:MessageResponse.java * * Newland Co. Ltd. All rights reserved. * * @Description:rpc服務應答結構 * @author tangjie * @version 1.0 * */ package newlandframework.netty.rpc.model; import java.io.Serializable; import org.apache.commons.lang.builder.ToStringBuilder; import org.apache.commons.lang.builder.ToStringStyle; public class MessageResponse implements Serializable { private String messageId; private String error; private Object resultDesc; public String getMessageId() { return messageId; } public void setMessageId(String messageId) { this.messageId = messageId; } public String getError() { return error; } public void setError(String error) { this.error = error; } public Object getResult() { return resultDesc; } public void setResult(Object resultDesc) { this.resultDesc = resultDesc; } public String toString() { return new ToStringBuilder(this, ToStringStyle.SHORT_PREFIX_STYLE) .append("messageId", messageId).append("error", error).toString(); } }
RPC服務接口定義、服務接口實現綁定關係容器定義,提供給spring做爲容器使用。
/** * @filename:MessageKeyVal.java * * Newland Co. Ltd. All rights reserved. * * @Description:rpc服務映射容器 * @author tangjie * @version 1.0 * */ package newlandframework.netty.rpc.model; import java.util.Map; public class MessageKeyVal { private Map<String, Object> messageKeyVal; public void setMessageKeyVal(Map<String, Object> messageKeyVal) { this.messageKeyVal = messageKeyVal; } public Map<String, Object> getMessageKeyVal() { return messageKeyVal; } }
好了,定義好核心模型結構以後,如今再向你們展現一下NettyRPC核心包:newlandframework.netty.rpc.core的關鍵部分實現代碼,首先是業務線程池相關類的實現代碼,具體以下:
線程工廠定義實現
/** * @filename:NamedThreadFactory.java * * Newland Co. Ltd. All rights reserved. * * @Description:線程工廠 * @author tangjie * @version 1.0 * */ package newlandframework.netty.rpc.core; import java.util.concurrent.ThreadFactory; import java.util.concurrent.atomic.AtomicInteger; public class NamedThreadFactory implements ThreadFactory { private static final AtomicInteger threadNumber = new AtomicInteger(1); private final AtomicInteger mThreadNum = new AtomicInteger(1); private final String prefix; private final boolean daemoThread; private final ThreadGroup threadGroup; public NamedThreadFactory() { this("rpcserver-threadpool-" + threadNumber.getAndIncrement(), false); } public NamedThreadFactory(String prefix) { this(prefix, false); } public NamedThreadFactory(String prefix, boolean daemo) { this.prefix = prefix + "-thread-"; daemoThread = daemo; SecurityManager s = System.getSecurityManager(); threadGroup = (s == null) ? Thread.currentThread().getThreadGroup() : s.getThreadGroup(); } public Thread newThread(Runnable runnable) { String name = prefix + mThreadNum.getAndIncrement(); Thread ret = new Thread(threadGroup, runnable, name, 0); ret.setDaemon(daemoThread); return ret; } public ThreadGroup getThreadGroup() { return threadGroup; } }
業務線程池定義實現
/** * @filename:RpcThreadPool.java * * Newland Co. Ltd. All rights reserved. * * @Description:rpc線程池封裝 * @author tangjie * @version 1.0 * */ package newlandframework.netty.rpc.core; import java.util.concurrent.Executor; import java.util.concurrent.LinkedBlockingQueue; import java.util.concurrent.SynchronousQueue; import java.util.concurrent.ThreadPoolExecutor; import java.util.concurrent.TimeUnit; public class RpcThreadPool { //獨立出線程池主要是爲了應對複雜耗I/O操做的業務,不阻塞netty的handler線程而引入 //固然若是業務足夠簡單,把處理邏輯寫入netty的handler(ChannelInboundHandlerAdapter)也何嘗不可 public static Executor getExecutor(int threads, int queues) { String name = "RpcThreadPool"; return new ThreadPoolExecutor(threads, threads, 0, TimeUnit.MILLISECONDS, queues == 0 ? new SynchronousQueue<Runnable>() : (queues < 0 ? new LinkedBlockingQueue<Runnable>() : new LinkedBlockingQueue<Runnable>(queues)), new NamedThreadFactory(name, true), new AbortPolicyWithReport(name)); } }
/** * @filename:AbortPolicyWithReport.java * * Newland Co. Ltd. All rights reserved. * * @Description:線程池異常策略 * @author tangjie * @version 1.0 * */ package newlandframework.netty.rpc.core; import java.util.concurrent.RejectedExecutionException; import java.util.concurrent.ThreadPoolExecutor; public class AbortPolicyWithReport extends ThreadPoolExecutor.AbortPolicy { private final String threadName; public AbortPolicyWithReport(String threadName) { this.threadName = threadName; } public void rejectedExecution(Runnable r, ThreadPoolExecutor e) { String msg = String.format("RpcServer[" + " Thread Name: %s, Pool Size: %d (active: %d, core: %d, max: %d, largest: %d), Task: %d (completed: %d)," + " Executor status:(isShutdown:%s, isTerminated:%s, isTerminating:%s)]", threadName, e.getPoolSize(), e.getActiveCount(), e.getCorePoolSize(), e.getMaximumPoolSize(), e.getLargestPoolSize(), e.getTaskCount(), e.getCompletedTaskCount(), e.isShutdown(), e.isTerminated(), e.isTerminating()); System.out.println(msg); throw new RejectedExecutionException(msg); } }
RPC調用客戶端定義實現
/** * @filename:MessageSendExecutor.java * * Newland Co. Ltd. All rights reserved. * * @Description:Rpc客戶端執行模塊 * @author tangjie * @version 1.0 * */ package newlandframework.netty.rpc.core; import java.lang.reflect.Proxy; public class MessageSendExecutor { private RpcServerLoader loader = RpcServerLoader.getInstance(); public MessageSendExecutor(String serverAddress) { loader.load(serverAddress); } public void stop() { loader.unLoad(); } public static <T> T execute(Class<T> rpcInterface) { return (T) Proxy.newProxyInstance( rpcInterface.getClassLoader(), new Class<?>[]{rpcInterface}, new MessageSendProxy<T>(rpcInterface) ); } }
這裏的RPC客戶端實際上,是動態代理了MessageSendProxy,固然這裏是應用了,JDK原生的動態代理實現,你還能夠改爲CGLIB(Code Generation Library)方式。不過本人測試了一下CGLIB方式,在高併發的狀況下面會出現空指針異常,可是一樣的狀況,JDK原生的動態代理卻沒有問題。併發程度不高的狀況下面,兩種代理方式都運行正常。後續再深刻研究看看吧!廢話不說了,如今給出MessageSendProxy的實現方式
/** * @filename:MessageSendProxy.java * * Newland Co. Ltd. All rights reserved. * * @Description:Rpc客戶端消息處理 * @author tangjie * @version 1.0 * */ package newlandframework.netty.rpc.core; import java.lang.reflect.InvocationHandler; import java.lang.reflect.Method; import java.util.UUID; import newlandframework.netty.rpc.model.MessageRequest; public class MessageSendProxy<T> implements InvocationHandler { private Class<T> cls; public MessageSendProxy(Class<T> cls) { this.cls = cls; } public Object invoke(Object proxy, Method method, Object[] args) throws Throwable { MessageRequest request = new MessageRequest(); request.setMessageId(UUID.randomUUID().toString()); request.setClassName(method.getDeclaringClass().getName()); request.setMethodName(method.getName()); request.setTypeParameters(method.getParameterTypes()); request.setParameters(args); MessageSendHandler handler = RpcServerLoader.getInstance().getMessageSendHandler(); MessageCallBack callBack = handler.sendRequest(request); return callBack.start(); } }
進一步發現MessageSendProxy實際上是把消息發送給RpcServerLoader模塊,它的代碼以下:
/** * @filename:RpcServerLoader.java * * Newland Co. Ltd. All rights reserved. * * @Description:rpc服務器配置加載 * @author tangjie * @version 1.0 * */ package newlandframework.netty.rpc.core; import io.netty.channel.EventLoopGroup; import io.netty.channel.nio.NioEventLoopGroup; import java.net.InetSocketAddress; import java.util.concurrent.ThreadPoolExecutor; import java.util.concurrent.locks.Condition; import java.util.concurrent.locks.Lock; import java.util.concurrent.locks.ReentrantLock; import newlandframework.netty.rpc.serialize.support.RpcSerializeProtocol; public class RpcServerLoader { private volatile static RpcServerLoader rpcServerLoader; private final static String DELIMITER = ":"; private RpcSerializeProtocol serializeProtocol = RpcSerializeProtocol.JDKSERIALIZE; //方法返回到Java虛擬機的可用的處理器數量 private final static int parallel = Runtime.getRuntime().availableProcessors() * 2; //netty nio線程池 private EventLoopGroup eventLoopGroup = new NioEventLoopGroup(parallel); private static ThreadPoolExecutor threadPoolExecutor = (ThreadPoolExecutor) RpcThreadPool.getExecutor(16, -1); private MessageSendHandler messageSendHandler = null; //等待Netty服務端鏈路創建通知信號 private Lock lock = new ReentrantLock(); private Condition signal = lock.newCondition(); private RpcServerLoader() { } //併發雙重鎖定 public static RpcServerLoader getInstance() { if (rpcServerLoader == null) { synchronized (RpcServerLoader.class) { if (rpcServerLoader == null) { rpcServerLoader = new RpcServerLoader(); } } } return rpcServerLoader; } public void load(String serverAddress, RpcSerializeProtocol serializeProtocol) { String[] ipAddr = serverAddress.split(RpcServerLoader.DELIMITER); if (ipAddr.length == 2) { String host = ipAddr[0]; int port = Integer.parseInt(ipAddr[1]); final InetSocketAddress remoteAddr = new InetSocketAddress(host, port); threadPoolExecutor.submit(new MessageSendInitializeTask(eventLoopGroup, remoteAddr, this, serializeProtocol)); } } public void setMessageSendHandler(MessageSendHandler messageInHandler) { try { lock.lock(); this.messageSendHandler = messageInHandler; //喚醒全部等待客戶端RPC線程 signal.signalAll(); } finally { lock.unlock(); } } public MessageSendHandler getMessageSendHandler() throws InterruptedException { try { lock.lock(); //Netty服務端鏈路沒有創建完畢以前,先掛起等待 if (messageSendHandler == null) { signal.await(); } return messageSendHandler; } finally { lock.unlock(); } } public void unLoad() { messageSendHandler.close(); threadPoolExecutor.shutdown(); eventLoopGroup.shutdownGracefully(); } public void setSerializeProtocol(RpcSerializeProtocol serializeProtocol) { this.serializeProtocol = serializeProtocol; } }
好了,如今一次性給出RPC客戶端消息編碼、解碼、處理的模塊實現代碼。
/** * @filename:MessageSendInitializeTask.java * * Newland Co. Ltd. All rights reserved. * * @Description:Rpc客戶端線程任務處理 * @author tangjie * @version 1.0 * */ package newlandframework.netty.rpc.core; import io.netty.bootstrap.Bootstrap; import io.netty.channel.ChannelFuture; import io.netty.channel.ChannelFutureListener; import io.netty.channel.ChannelOption; import io.netty.channel.EventLoopGroup; import io.netty.channel.socket.nio.NioSocketChannel; import java.net.InetSocketAddress; public class MessageSendInitializeTask implements Runnable { private EventLoopGroup eventLoopGroup = null; private InetSocketAddress serverAddress = null; private RpcServerLoader loader = null; MessageSendInitializeTask(EventLoopGroup eventLoopGroup, InetSocketAddress serverAddress, RpcServerLoader loader) { this.eventLoopGroup = eventLoopGroup; this.serverAddress = serverAddress; this.loader = loader; } public void run() { Bootstrap b = new Bootstrap(); b.group(eventLoopGroup) .channel(NioSocketChannel.class).option(ChannelOption.SO_KEEPALIVE, true); b.handler(new MessageSendChannelInitializer()); ChannelFuture channelFuture = b.connect(serverAddress); channelFuture.addListener(new ChannelFutureListener() { public void operationComplete(final ChannelFuture channelFuture) throws Exception { if (channelFuture.isSuccess()) { MessageSendHandler handler = channelFuture.channel().pipeline().get(MessageSendHandler.class); MessageSendInitializeTask.this.loader.setMessageSendHandler(handler); } } }); } }
/** * @filename:MessageSendChannelInitializer.java * * Newland Co. Ltd. All rights reserved. * * @Description:Rpc客戶端管道初始化 * @author tangjie * @version 1.0 * */ package newlandframework.netty.rpc.core; import io.netty.channel.ChannelInitializer; import io.netty.channel.ChannelPipeline; import io.netty.channel.socket.SocketChannel; import io.netty.handler.codec.LengthFieldBasedFrameDecoder; import io.netty.handler.codec.LengthFieldPrepender; import io.netty.handler.codec.serialization.ClassResolvers; import io.netty.handler.codec.serialization.ObjectDecoder; import io.netty.handler.codec.serialization.ObjectEncoder; public class MessageSendChannelInitializer extends ChannelInitializer<SocketChannel> { //ObjectDecoder 底層默認繼承半包解碼器LengthFieldBasedFrameDecoder處理粘包問題的時候, //消息頭開始即爲長度字段,佔據4個字節。這裏出於保持兼容的考慮 final public static int MESSAGE_LENGTH = 4; protected void initChannel(SocketChannel socketChannel) throws Exception { ChannelPipeline pipeline = socketChannel.pipeline(); //ObjectDecoder的基類半包解碼器LengthFieldBasedFrameDecoder的報文格式保持兼容。由於底層的父類LengthFieldBasedFrameDecoder //的初始化參數即爲super(maxObjectSize, 0, 4, 0, 4); pipeline.addLast(new LengthFieldBasedFrameDecoder(Integer.MAX_VALUE, 0, MessageSendChannelInitializer.MESSAGE_LENGTH, 0, MessageSendChannelInitializer.MESSAGE_LENGTH)); //利用LengthFieldPrepender回填補充ObjectDecoder消息報文頭 pipeline.addLast(new LengthFieldPrepender(MessageSendChannelInitializer.MESSAGE_LENGTH)); pipeline.addLast(new ObjectEncoder()); //考慮到併發性能,採用weakCachingConcurrentResolver緩存策略。通常狀況使用:cacheDisabled便可 pipeline.addLast(new ObjectDecoder(Integer.MAX_VALUE, ClassResolvers.weakCachingConcurrentResolver(this.getClass().getClassLoader()))); pipeline.addLast(new MessageSendHandler()); } }
/** * @filename:MessageSendHandler.java * * Newland Co. Ltd. All rights reserved. * * @Description:Rpc客戶端處理模塊 * @author tangjie * @version 1.0 * */ package newlandframework.netty.rpc.core; import io.netty.buffer.Unpooled; import io.netty.channel.Channel; import io.netty.channel.ChannelFutureListener; import io.netty.channel.ChannelHandlerContext; import io.netty.channel.ChannelInboundHandlerAdapter; import java.net.SocketAddress; import java.util.concurrent.ConcurrentHashMap; import newlandframework.netty.rpc.model.MessageRequest; import newlandframework.netty.rpc.model.MessageResponse; public class MessageSendHandler extends ChannelInboundHandlerAdapter { private ConcurrentHashMap<String, MessageCallBack> mapCallBack = new ConcurrentHashMap<String, MessageCallBack>(); private volatile Channel channel; private SocketAddress remoteAddr; public Channel getChannel() { return channel; } public SocketAddress getRemoteAddr() { return remoteAddr; } public void channelActive(ChannelHandlerContext ctx) throws Exception { super.channelActive(ctx); this.remoteAddr = this.channel.remoteAddress(); } public void channelRegistered(ChannelHandlerContext ctx) throws Exception { super.channelRegistered(ctx); this.channel = ctx.channel(); } public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception { MessageResponse response = (MessageResponse) msg; String messageId = response.getMessageId(); MessageCallBack callBack = mapCallBack.get(messageId); if (callBack != null) { mapCallBack.remove(messageId); callBack.over(response); } } public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception { ctx.close(); } public void close() { channel.writeAndFlush(Unpooled.EMPTY_BUFFER).addListener(ChannelFutureListener.CLOSE); } public MessageCallBack sendRequest(MessageRequest request) { MessageCallBack callBack = new MessageCallBack(request); mapCallBack.put(request.getMessageId(), callBack); channel.writeAndFlush(request); return callBack; } }
最後給出RPC服務端的實現。首先是經過spring自動加載RPC服務接口、接口實現容器綁定加載,初始化Netty主/從線程池等操做,具體是經過MessageRecvExecutor模塊實現的,如今給出實現代碼:
/** * @filename:MessageRecvExecutor.java * * Newland Co. Ltd. All rights reserved. * * @Description:Rpc服務器執行模塊 * @author tangjie * @version 1.0 * */ package newlandframework.netty.rpc.core; import io.netty.bootstrap.ServerBootstrap; import io.netty.channel.ChannelFuture; import io.netty.channel.ChannelOption; import io.netty.channel.EventLoopGroup; import io.netty.channel.nio.NioEventLoopGroup; import io.netty.channel.socket.nio.NioServerSocketChannel; import java.nio.channels.spi.SelectorProvider; import java.util.Iterator; import java.util.Map; import java.util.Set; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ThreadFactory; import java.util.concurrent.ThreadPoolExecutor; import java.util.logging.Level; import newlandframework.netty.rpc.model.MessageKeyVal; import org.springframework.beans.BeansException; import org.springframework.beans.factory.InitializingBean; import org.springframework.context.ApplicationContext; import org.springframework.context.ApplicationContextAware; public class MessageRecvExecutor implements ApplicationContextAware, InitializingBean { private String serverAddress; private final static String DELIMITER = ":"; private Map<String, Object> handlerMap = new ConcurrentHashMap<String, Object>(); private static ThreadPoolExecutor threadPoolExecutor; public MessageRecvExecutor(String serverAddress) { this.serverAddress = serverAddress; } public static void submit(Runnable task) { if (threadPoolExecutor == null) { synchronized (MessageRecvExecutor.class) { if (threadPoolExecutor == null) { threadPoolExecutor = (ThreadPoolExecutor) RpcThreadPool.getExecutor(16, -1); } } } threadPoolExecutor.submit(task); } public void setApplicationContext(ApplicationContext ctx) throws BeansException { try { MessageKeyVal keyVal = (MessageKeyVal) ctx.getBean(Class.forName("newlandframework.netty.rpc.model.MessageKeyVal")); Map<String, Object> rpcServiceObject = keyVal.getMessageKeyVal(); Set s = rpcServiceObject.entrySet(); Iterator<Map.Entry<String, Object>> it = s.iterator(); Map.Entry<String, Object> entry; while (it.hasNext()) { entry = it.next(); handlerMap.put(entry.getKey(), entry.getValue()); } } catch (ClassNotFoundException ex) { java.util.logging.Logger.getLogger(MessageRecvExecutor.class.getName()).log(Level.SEVERE, null, ex); } } public void afterPropertiesSet() throws Exception { //netty的線程池模型設置成主從線程池模式,這樣能夠應對高併發請求 //固然netty還支持單線程、多線程網絡IO模型,能夠根據業務需求靈活配置 ThreadFactory threadRpcFactory = new NamedThreadFactory("NettyRPC ThreadFactory"); //方法返回到Java虛擬機的可用的處理器數量 int parallel = Runtime.getRuntime().availableProcessors() * 2; EventLoopGroup boss = new NioEventLoopGroup(); EventLoopGroup worker = new NioEventLoopGroup(parallel,threadRpcFactory,SelectorProvider.provider()); try { ServerBootstrap bootstrap = new ServerBootstrap(); bootstrap.group(boss, worker).channel(NioServerSocketChannel.class) .childHandler(new MessageRecvChannelInitializer(handlerMap)) .option(ChannelOption.SO_BACKLOG, 128) .childOption(ChannelOption.SO_KEEPALIVE, true); String[] ipAddr = serverAddress.split(MessageRecvExecutor.DELIMITER); if (ipAddr.length == 2) { String host = ipAddr[0]; int port = Integer.parseInt(ipAddr[1]); ChannelFuture future = bootstrap.bind(host, port).sync(); System.out.printf("[author tangjie] Netty RPC Server start success ip:%s port:%d\n", host, port); future.channel().closeFuture().sync(); } else { System.out.printf("[author tangjie] Netty RPC Server start fail!\n"); } } finally { worker.shutdownGracefully(); boss.shutdownGracefully(); } } }
最後仍是老規矩,給出RPC服務端消息編碼、解碼、處理的核心模塊代碼實現,具體以下:
/** * @filename:MessageRecvChannelInitializer.java * * Newland Co. Ltd. All rights reserved. * * @Description:Rpc服務端管道初始化 * @author tangjie * @version 1.0 * */ package newlandframework.netty.rpc.core; import io.netty.channel.ChannelInitializer; import io.netty.channel.ChannelPipeline; import io.netty.channel.socket.SocketChannel; import io.netty.handler.codec.LengthFieldBasedFrameDecoder; import io.netty.handler.codec.LengthFieldPrepender; import io.netty.handler.codec.serialization.ClassResolvers; import io.netty.handler.codec.serialization.ObjectDecoder; import io.netty.handler.codec.serialization.ObjectEncoder; import java.util.Map; public class MessageRecvChannelInitializer extends ChannelInitializer<SocketChannel> { //ObjectDecoder 底層默認繼承半包解碼器LengthFieldBasedFrameDecoder處理粘包問題的時候, //消息頭開始即爲長度字段,佔據4個字節。這裏出於保持兼容的考慮 final public static int MESSAGE_LENGTH = 4; private Map<String, Object> handlerMap = null; MessageRecvChannelInitializer(Map<String, Object> handlerMap) { this.handlerMap = handlerMap; } protected void initChannel(SocketChannel socketChannel) throws Exception { ChannelPipeline pipeline = socketChannel.pipeline(); //ObjectDecoder的基類半包解碼器LengthFieldBasedFrameDecoder的報文格式保持兼容。由於底層的父類LengthFieldBasedFrameDecoder //的初始化參數即爲super(maxObjectSize, 0, 4, 0, 4); pipeline.addLast(new LengthFieldBasedFrameDecoder(Integer.MAX_VALUE, 0, MessageRecvChannelInitializer.MESSAGE_LENGTH, 0, MessageRecvChannelInitializer.MESSAGE_LENGTH)); //利用LengthFieldPrepender回填補充ObjectDecoder消息報文頭 pipeline.addLast(new LengthFieldPrepender(MessageRecvChannelInitializer.MESSAGE_LENGTH)); pipeline.addLast(new ObjectEncoder()); //考慮到併發性能,採用weakCachingConcurrentResolver緩存策略。通常狀況使用:cacheDisabled便可 pipeline.addLast(new ObjectDecoder(Integer.MAX_VALUE, ClassResolvers.weakCachingConcurrentResolver(this.getClass().getClassLoader()))); pipeline.addLast(new MessageRecvHandler(handlerMap)); } }
/** * @filename:MessageRecvHandler.java * * Newland Co. Ltd. All rights reserved. * * @Description:Rpc服務器消息處理 * @author tangjie * @version 1.0 * */ package newlandframework.netty.rpc.core; import io.netty.channel.ChannelHandlerContext; import io.netty.channel.ChannelInboundHandlerAdapter; import java.util.Map; import newlandframework.netty.rpc.model.MessageRequest; import newlandframework.netty.rpc.model.MessageResponse; public class MessageRecvHandler extends ChannelInboundHandlerAdapter { private final Map<String, Object> handlerMap; public MessageRecvHandler(Map<String, Object> handlerMap) { this.handlerMap = handlerMap; } public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception { MessageRequest request = (MessageRequest) msg; MessageResponse response = new MessageResponse(); MessageRecvInitializeTask recvTask = new MessageRecvInitializeTask(request, response, handlerMap, ctx); //不要阻塞nio線程,複雜的業務邏輯丟給專門的線程池 MessageRecvExecutor.submit(recvTask); } public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) { //網絡有異常要關閉通道 ctx.close(); } }
/** * @filename:MessageRecvInitializeTask.java * * Newland Co. Ltd. All rights reserved. * * @Description:Rpc服務器消息線程任務處理 * @author tangjie * @version 1.0 * */ package newlandframework.netty.rpc.core; import io.netty.channel.ChannelFuture; import io.netty.channel.ChannelFutureListener; import io.netty.channel.ChannelHandlerContext; import java.util.Map; import newlandframework.netty.rpc.model.MessageRequest; import newlandframework.netty.rpc.model.MessageResponse; import org.apache.commons.beanutils.MethodUtils; public class MessageRecvInitializeTask implements Runnable { private MessageRequest request = null; private MessageResponse response = null; private Map<String, Object> handlerMap = null; private ChannelHandlerContext ctx = null; public MessageResponse getResponse() { return response; } public MessageRequest getRequest() { return request; } public void setRequest(MessageRequest request) { this.request = request; } MessageRecvInitializeTask(MessageRequest request, MessageResponse response, Map<String, Object> handlerMap, ChannelHandlerContext ctx) { this.request = request; this.response = response; this.handlerMap = handlerMap; this.ctx = ctx; } public void run() { response.setMessageId(request.getMessageId()); try { Object result = reflect(request); response.setResult(result); } catch (Throwable t) { response.setError(t.toString()); t.printStackTrace(); System.err.printf("RPC Server invoke error!\n"); } ctx.writeAndFlush(response).addListener(new ChannelFutureListener() { public void operationComplete(ChannelFuture channelFuture) throws Exception { System.out.println("RPC Server Send message-id respone:" + request.getMessageId()); } }); } private Object reflect(MessageRequest request) throws Throwable { String className = request.getClassName(); Object serviceBean = handlerMap.get(className); String methodName = request.getMethodName(); Object[] parameters = request.getParameters(); return MethodUtils.invokeMethod(serviceBean, methodName, parameters); } }
而後是RPC消息處理的回調實現模塊代碼
/** * @filename:MessageCallBack.java * * Newland Co. Ltd. All rights reserved. * * @Description:Rpc消息回調 * @author tangjie * @version 1.0 * */ package newlandframework.netty.rpc.core; import java.util.concurrent.TimeUnit; import java.util.concurrent.locks.Condition; import java.util.concurrent.locks.Lock; import java.util.concurrent.locks.ReentrantLock; import newlandframework.netty.rpc.model.MessageRequest; import newlandframework.netty.rpc.model.MessageResponse; public class MessageCallBack { private MessageRequest request; private MessageResponse response; private Lock lock = new ReentrantLock(); private Condition finish = lock.newCondition(); public MessageCallBack(MessageRequest request) { this.request = request; } public Object start() throws InterruptedException { try { lock.lock(); //設定一下超時時間,rpc服務器過久沒有相應的話,就默認返回空吧。 finish.await(10*1000, TimeUnit.MILLISECONDS); if (this.response != null) { return this.response.getResult(); } else { return null; } } finally { lock.unlock(); } } public void over(MessageResponse reponse) { try { lock.lock(); finish.signal(); this.response = reponse; } finally { lock.unlock(); } } }
到此爲止,NettyRPC的關鍵部分:服務端、客戶端的模塊已經經過Netty所有實現了。如今給出spring加載配置rpc-invoke-config.xml的內容:
<?xml version="1.0" encoding="UTF-8"?> <beans xmlns="http://www.springframework.org/schema/beans" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xmlns:context="http://www.springframework.org/schema/context" xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans.xsd http://www.springframework.org/schema/context http://www.springframework.org/schema/context/spring-context.xsd"> <context:component-scan base-package="newlandframework.netty.rpc.core"/> <context:property-placeholder location="classpath:newlandframework/netty/rpc/config/rpc-server.properties"/> <bean id="rpcbean" class="newlandframework.netty.rpc.model.MessageKeyVal"> <property name="messageKeyVal"> <map> <entry key="newlandframework.netty.rpc.servicebean.Calculate"> <ref bean="calc"/> </entry> </map> </property> </bean> <bean id="calc" class="newlandframework.netty.rpc.servicebean.CalculateImpl"/> <bean id="rpcServer" class="newlandframework.netty.rpc.core.MessageRecvExecutor"> <constructor-arg name="serverAddress" value="${rpc.server.addr}"/> </bean> </beans>
再貼出RPC服務綁定ip信息的配置文件:rpc-server.properties的內容。
#rpc server's ip address config
rpc.server.addr=127.0.0.1:18888
最後NettyRPC服務端啓動方式參考以下:
new ClassPathXmlApplicationContext("newlandframework/netty/rpc/config/rpc-invoke-config.xml");
若是一切順利,沒有出現意外的話,控制檯上面,會出現以下截圖所示的狀況:
若是出現了,說明NettyRPC服務器,已經啓動成功!
上面基於Netty的RPC服務器,併發處理性能如何呢?實踐是檢驗真理的惟一標準,下面咱們就來實戰一下。
下面的測試案例,是基於RPC遠程調用兩數相加函數,並返回計算結果。客戶端同時開1W個線程,同一時刻,瞬時發起併發計算請求,而後觀察Netty的RPC服務器是否有正常應答回覆響應,以及客戶端是否有正常返回調用計算結果。值得注意的是,測試案例是基於1W個線程瞬時併發請求而設計的,並非1W個線程循環發起請求。這二者對於衡量RPC服務器的併發處理性能,仍是有很大差異的。固然,前者對於併發性能的處理要求,要高上不少不少。
如今,先給出RPC計算接口、RPC計算接口實現類的代碼實現:
/** * @filename:Calculate.java * * Newland Co. Ltd. All rights reserved. * * @Description:計算器定義接口 * @author tangjie * @version 1.0 * */ package newlandframework.netty.rpc.servicebean; public interface Calculate { //兩數相加 int add(int a, int b); }
/** * @filename:CalculateImpl.java * * Newland Co. Ltd. All rights reserved. * * @Description:計算器定義接口實現 * @author tangjie * @version 1.0 * */ package newlandframework.netty.rpc.servicebean; public class CalculateImpl implements Calculate { //兩數相加 public int add(int a, int b) { return a + b; } }
下面是瞬時併發RPC請求的測試樣例:
/** * @filename:CalcParallelRequestThread.java * * Newland Co. Ltd. All rights reserved. * * @Description:併發線程模擬 * @author tangjie * @version 1.0 * */ package newlandframework.netty.rpc.servicebean; import newlandframework.netty.rpc.core.MessageSendExecutor; import java.util.concurrent.CountDownLatch; import java.util.logging.Level; import java.util.logging.Logger; public class CalcParallelRequestThread implements Runnable { private CountDownLatch signal; private CountDownLatch finish; private MessageSendExecutor executor; private int taskNumber = 0; public CalcParallelRequestThread(MessageSendExecutor executor, CountDownLatch signal, CountDownLatch finish, int taskNumber) { this.signal = signal; this.finish = finish; this.taskNumber = taskNumber; this.executor = executor; } public void run() { try { signal.await(); Calculate calc = executor.execute(Calculate.class); int add = calc.add(taskNumber, taskNumber); System.out.println("calc add result:[" + add + "]"); finish.countDown(); } catch (InterruptedException ex) { Logger.getLogger(CalcParallelRequestThread.class.getName()).log(Level.SEVERE, null, ex); } } }
/** * @filename:RpcParallelTest.java * * Newland Co. Ltd. All rights reserved. * * @Description:rpc併發測試代碼 * @author tangjie * @version 1.0 * */ package newlandframework.netty.rpc.servicebean; import java.util.concurrent.CountDownLatch; import newlandframework.netty.rpc.core.MessageSendExecutor; import org.apache.commons.lang.time.StopWatch; public class RpcParallelTest { public static void main(String[] args) throws Exception { final MessageSendExecutor executor = new MessageSendExecutor("127.0.0.1:18888"); //並行度10000 int parallel = 10000; //開始計時 StopWatch sw = new StopWatch(); sw.start(); CountDownLatch signal = new CountDownLatch(1); CountDownLatch finish = new CountDownLatch(parallel); for (int index = 0; index < parallel; index++) { CalcParallelRequestThread client = new CalcParallelRequestThread(executor, signal, finish, index); new Thread(client).start(); } //10000個併發線程瞬間發起請求操做 signal.countDown(); finish.await(); sw.stop(); String tip = String.format("RPC調用總共耗時: [%s] 毫秒", sw.getTime()); System.out.println(tip); executor.stop(); } }
好了,如今先啓動NettyRPC服務器,確認沒有問題以後,運行併發RPC請求客戶端,看下客戶端打印的計算結果,以及處理耗時。
從上面來看,10000個瞬時RPC計算請求,總共耗時接近11秒。咱們在來看下NettyRPC的服務端運行狀況,以下所示:
能夠很清楚地看到,RPC服務端都有收到客戶端發起的RPC計算請求,並返回消息應答。
最後咱們仍是要分別驗證一下,RPC服務端是否存在丟包、粘包、IO阻塞的狀況?1W個併發計算請求,是否成功接收處理並應答了?實際狀況說明一切,看下圖所示:
很是給力,RPC的服務端確實成功接收到了客戶端發起的1W筆瞬時併發計算請求,而且成功應答處理了。並無出現:丟包、粘包、IO阻塞的狀況。再看下RPC客戶端,是否成功獲得計算結果的應答返回了呢?
很好,RPC的客戶端,確實收到了RPC服務端計算的1W筆加法請求的計算結果,並且耗時接近11秒。因而可知,基於Netty+業務線程池的NettyRPC服務器,應對併發多線程RPC請求,處理起來是駕輕就熟,遊刃有餘!
最後,本文經過Netty這個NIO框架,實現了一個很簡單的「高性能」的RPC服務器,代碼雖然寫出來了,可是仍是有一些值得改進的地方,好比:
一、對象序列化傳輸能夠支持目前主流的序列化框架:protobuf、JBoss Marshalling、Avro等等。
二、Netty的線程模型能夠根據業務需求,進行定製。由於,並非每筆業務都須要這麼強大的併發處理性能。
三、目前RPC計算只支持一個RPC服務接口映射綁定一個對應的實現,後續要支持一對多的狀況。
四、業務線程池的啓動參數、線程池併發阻塞容器模型等等,能夠配置化管理。
五、Netty的Handler處理部分,對於複雜的業務邏輯,如今是統一分派到特定的線程池進行後臺異步處理。固然你還能夠考慮JMS(消息隊列)方式進行解耦,統一分派給消息隊列的訂閱者,統一處理。目前實現JMS的開源框架也有不少,ActiveMQ、RocketMQ等等,均可以考慮。
本文實現的NettyRPC,對於面前的您而言,必定還有不少地方,能夠加以完善和改進,優化改進的工做就交給您自由發揮了。
因爲本人技術能力、認知水平有限。本文中有說不對的地方,懇請園友們批評指正!不吝賜教!最後,感謝面前的您,耐心的閱讀完本文,相信如今的你,對於Java開發高性能的服務端應用,又有了一個更深刻的瞭解!本文算是對我Netty學習成果的階段性總結,後續有時間,我還會繼續推出Netty工業級開發的相關文章,敬請期待!
PS:還有興趣的朋友能夠參考、閱讀一下,個人另一篇文章:Netty實現高性能RPC服務器優化篇之消息序列化。此外,自從在博客園發表了兩篇:基於Netty開發高性能RPC服務器的文章以後,本人收到不少園友們索要源代碼進行學習交流的請求。爲了方便你們,本人把NettyRPC的代碼開源託管到github上面,歡迎有興趣的朋友一塊兒學習、研究!
附上NettyRPC項目的下載路徑:https://github.com/tang-jie/NettyRPC
Netty工業級開發系列文章進階:Netty構建分佈式消息隊列(AvatarMQ)設計指南之架構篇
談談如何使用Netty開發實現高性能的RPC服務器、Netty實現高性能RPC服務器優化篇之消息序列化。這兩篇文章主要設計的思路是,基於Netty構建了一個高性能的RPC服務器,而這些前期代碼的準備工做,主要是爲了設計、實現一個基於Netty的分佈式消息隊列系統作鋪墊,本人把這個分佈式消息隊列系統,命名爲:AvatarMQ。做爲Netty工業級開發系列的進階篇,感興趣的朋友能夠點擊關注:Netty構建分佈式消息隊列(AvatarMQ)設計指南之架構篇,必定不會讓您失望!
AvatarMQ項目開源網址:https://github.com/tang-jie/AvatarMQ。