之前在作項目的時候接觸到了rpc,感受頗有意思,可是那個框架用了rabbitmq來進行channel的管理。正好前幾天看到了netty,一個高效的JAVA NIO框架,因此萌生了想本身寫一個rpc框架的想法。java
RPC(Remote Procedure Call)指遠程過程調用。它的做用大概能夠這麼描述一下:B 程序員 程序想要調用 A 程序的某個函數,可是因爲 A 與 B 是兩個獨立的項目,B 不可能直接調用 A 中的任何一個類裏的任何一個函數。這時 RPC 就能起到它的做用了。 爲了完成 B 程序的需求, A 程序 對 B 程序進行規定,若是 B 想要調用 A 的方法,須要給 A 一個規定的數據格式,而後 A 在本地執行完 B 所想要使用的函數後將結果 封裝成一個規定好的數據格式後發送給 B。這樣 B 就達到了不拷貝 A 的代碼的狀況下完成其所須要的業務功能。 程序員
一個rpc底層應該支持io/nio,這種實現方法大體有兩種,一是經過代碼徹底有本身實現,可是這種方法對技術要求比較高,並且容易出現隱藏的bug,另外一種就是利用現有的開源框架,Netty 是個不錯的選擇,它是一個利用 Java 的高級網絡的能力,隱藏其背後的複雜性而提供一個易於使用的 API 的客戶端/服務器框架。它能大大的簡化咱們的開發流程,使得代碼更加牢靠。在此次的RPC中,咱們使用 Netty 來做爲鏈接客戶端與服務端的橋樑。redis
一個好的rpc不該該受到語言的限制,因此client端到server端的數據交換格式應該有一個良好的定義,好比json、xml。如今這方面成熟的框架有不少好比Thrift、Protobuf等等。咱們不用本身去定義以及實現一個交換格式,這些成熟的框架都是久經考驗的。在本例中因爲是抱着學習的目的,本人採用java自帶的序列化與反序列化方法。json
client的端想要調用服務端的某個方法,須要得知這個方法的某些信息,而如今問題就來了,得知這個信息的時候是由寫 A 程序的人去直接告訴 B 程序的人,仍是由 B 程序主動去發現 A 的服務。很明顯,第一種方法很不牢靠,如果採用這種方法的話,A服務的每次改動都要通知到 B ,B也要每次根據 A服務的改變,而重寫本身的代碼。相比之下,第二種方法更顯得可行。 其實實現服務的註冊與發現的方法也有不少,好比zookeeper,redis等等。大體原理就是,A 服務將本身暴露出的方法信息存在zookeeper或者redis上,每次更改由A主動通知或由 B 去zookeeper或redis上自動去拉取最新的信息。對於zookeeper來講存儲方法信息的是一個個固定的節點,對於reids來講就是一個key值。用zookeeper還解決了在分佈式的部署方案下,某個服務down機的問題。由於zookeeper與生俱來的容災能力(好比leader選舉),能夠確保服務註冊表的高可用性。在本例中,我並未實現服務的註冊於發現。服務器
client端與server端有各自須要處理的發送格式與接受格式。對於client端來講須要封裝好本身所要請求的方法信息發送給server端,並等待server端返回的結果。server端則是接收client的請求數據,處理完成後返回給client端結果數據。 其實RPC在調用的時候應該讓調用者像調用本地服務通常的去完成業務邏輯。這種實如今java中就應該用代理來實現。網絡
client請求格式 利用java自帶的序列化方法要繼承Serializable方法而且要實現無參構造方法。app
package com.example.nettyrpcfirst.netty.entity;
import java.io.Serializable;
/** * @auther lichaobao * @date 2018/9/21 * @QQ 1527563274 */
public class Request implements Serializable {
private static final long serialVersionUID = -1L;
private String clientId;
private String className;
private String methodName;
private Class[] paramterTypes;
private Object[] parameters;
public Request(String clientId, String className, String methodName, Class[] paramterTypes, Object[] parameters) {
this.clientId = clientId;
this.className = className;
this.methodName = methodName;
this.paramterTypes = paramterTypes;
this.parameters = parameters;
}
public Request() {
}
//getter and setter
}
複製代碼
server 響應數據格式 具體要求與client端相同框架
package com.example.nettyrpcfirst.netty.entity;
import java.io.Serializable;
/** * @auther lichaobao * @date 2018/9/21 * @QQ 1527563274 */
public class Response implements Serializable {
private static final long serialVersionUID = -1L;
private String clientId;
private Throwable err;
private Object result;
public Response() {
}
// getter and setter
}
複製代碼
注意 clientId字段的設置是爲了保證返回的數據是本身想要的。dom
netty不熟悉的能夠去官網寫寫幾個例子socket
package com.example.nettyrpcfirst.netty.server;
/** * @auther lichaobao * @date 2018/9/21 * @QQ 1527563274 */
public class ServerHandler extends SimpleChannelInboundHandler {
Logger logger = LoggerFactory.getLogger(ServerHandler.class);
private final Map<String, Object> services;
public ServerHandler(Map<String, Object> services) {
this.services = services;
}
@Override
public void channelActive(ChannelHandlerContext ctx) throws Exception {
logger.info("{} has created a channel",ctx.channel().remoteAddress());
}
@Override
protected void channelRead0(ChannelHandlerContext channelHandlerContext, Object o) throws Exception {
Runnable r = () ->{
Request request = (Request) o;
Response response = new Response();
response.setClientId(request.getClientId());
try {
Object service = services.get(request.getClassName());
FastClass serviceFastClass = FastClass.create(service.getClass());
FastMethod serviceFastMethod = serviceFastClass.getMethod(request.getMethodName(), request.getParamterTypes());
Object result = serviceFastMethod.invoke(service, request.getParameters());
response.setResult(result);
}catch (Exception e){
response.setErr(e);
}
channelHandlerContext.writeAndFlush(response).addListener(new ChannelFutureListener() {
@Override
public void operationComplete(ChannelFuture channelFuture) throws Exception {
logger.info("send response for request: "+request.getClientId());
}
});
};
Server.submit(r);
}
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
cause.printStackTrace();
logger.error("rpc server err occur" + cause.getMessage()+" | "+ctx.channel().remoteAddress());
ctx.close();
}
}
複製代碼
最主要的就是channelRead0方法,這裏定義了在接收到客戶端的數據後如何去調用本地方法,具體是用cglib代理完成。 server具體代碼
package com.example.nettyrpcfirst.netty.server;
/** * @auther lichaobao * @date 2018/9/21 * @QQ 1527563274 */
@Configuration
public class Server implements BeanNameAware, BeanFactoryAware, ApplicationContextAware,InitializingBean {
private static final org.slf4j.Logger logger = LoggerFactory.getLogger(Server.class);
private Map<String,Object> services = new ConcurrentHashMap<>();
private static ExecutorService threadPoolExecutor;
public Server(){
}
/** * 啓動netty server * @throws Exception */
@Override
public void afterPropertiesSet() throws Exception {
logger.info("afterPropertiesSet");
EventLoopGroup bossGroup = new NioEventLoopGroup();
EventLoopGroup workerGroup = new NioEventLoopGroup();
try{
ServerBootstrap b = new ServerBootstrap();
b.group(bossGroup,workerGroup)
.channel(NioServerSocketChannel.class)
.childHandler(new ChannelInitializer<SocketChannel>() {
@Override
protected void initChannel(SocketChannel socketChannel) throws Exception {
socketChannel.pipeline()
.addLast(new ObjectDecoder(1024,ClassResolvers.cacheDisabled(this.getClass().getClassLoader())))
.addLast(new ObjectEncoder())
.addLast(new ServerHandler(services));
}
})
.option(ChannelOption.SO_BACKLOG,128)
.childOption(ChannelOption.SO_KEEPALIVE,true);
ChannelFuture future = b.bind(8080).sync();
future.channel().closeFuture().sync();
}catch (Exception e){
e.printStackTrace();
logger.error("an error occur ----------->"+e.getMessage());
}finally {
bossGroup.shutdownGracefully();
workerGroup.shutdownGracefully();
}
}
/** * 經過掃描全部帶有@RPCServer註解的類進行註冊 * @param applicationContext * @throws BeansException */
@Override
public void setApplicationContext(ApplicationContext applicationContext) throws BeansException {
logger.info("setApplicationContext");
Map<String,Object> serviceBeanMap = applicationContext.getBeansWithAnnotation(RPCServer.class);
if(!serviceBeanMap.isEmpty()){
for (Object service :serviceBeanMap.values()){
String interfaceName = service.getClass().getAnnotation(RPCServer.class).value().getName();
logger.info("RPCService: {}" , interfaceName);
this.services.put(interfaceName,service);
}
}
}
public static void submit(Runnable task){
if(threadPoolExecutor == null){
synchronized (RPCServer.class){
if(threadPoolExecutor == null){
threadPoolExecutor = Executors.newFixedThreadPool(16);
}
}
}
threadPoolExecutor.submit(task);
}
@Override
public void setBeanFactory(BeanFactory beanFactory) throws BeansException {
logger.info("setBeanFactory()");
}
@Override
public void setBeanName(String s) {
logger.info("setBeanName() {}", s);
}
}
複製代碼
RPCServer 自定義註解
package com.example.nettyrpcfirst.netty.annoations;
/** * @auther lichaobao * @date 2018/9/21 * @QQ 1527563274 */
@Target({ElementType.TYPE})
@Retention(RetentionPolicy.RUNTIME)
@Documented
@Component
public @interface RPCServer {
Class<?> value();
}
複製代碼
client代理類
package com.example.nettyrpcfirst.netty.client;
/** * @auther lichaobao * @date 2018/9/21 * @QQ 1527563274 */
public class ClientProxy {
@SuppressWarnings("unchecked")
public <T> T create(Class<?> interfaceClass){
return (T)Proxy.newProxyInstance(interfaceClass.getClassLoader(),
new Class<?>[]{interfaceClass},
new InvocationHandler() {
@Override
public Object invoke(Object proxy, Method method, Object[] args) throws Throwable {
Request request = new Request();
request.setClientId(UUID.randomUUID().toString());
request.setClassName(method.getDeclaringClass().getName());
request.setMethodName(method.getName());
request.setParamterTypes(method.getParameterTypes());
request.setParameters(args);
Client client = new Client("127.0.0.1",8080);
Response response = client.send(request);
if(response.getErr()!=null){
throw response.getErr();
}else{
return response.getResult();
}
}
});
}
}
複製代碼
client 與server 鏈接發送數據並等待數據返回
package com.example.nettyrpcfirst.netty.client;
/** * @auther lichaobao * @date 2018/9/21 * @QQ 1527563274 */
public class Client extends SimpleChannelInboundHandler<Response> {
private static org.slf4j.Logger logger = LoggerFactory.getLogger(Client.class);
private Response response;
private final static Object obj = new Object();
private String host;
private int port;
ChannelFuture future;
public Client(String host,int port){
this.host = host;
this.port = port;
}
/** * 接收到消息後喚醒線程 * @param channelHandlerContext * @param response * @throws Exception */
@Override
protected void channelRead0(ChannelHandlerContext channelHandlerContext, Response response) throws Exception {
this.response = response;
synchronized (obj){
obj.notifyAll();
}
}
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
logger.error("client caught exception", cause);
ctx.close();
}
/** * 鏈接server端channel,發送完數據後鎖定線程,等待數據返回 * @param request * @return * @throws Exception */
public Response send(Request request) throws Exception{
EventLoopGroup eventLoopGroup = new NioEventLoopGroup();
try{
Bootstrap b = new Bootstrap();
b.group(eventLoopGroup)
.channel(NioSocketChannel.class)
.handler(new ChannelInitializer<SocketChannel>() {
@Override
protected void initChannel(SocketChannel socketChannel) throws Exception {
socketChannel.pipeline()
.addLast(new ObjectDecoder(1024,ClassResolvers.cacheDisabled(this.getClass().getClassLoader())))
.addLast(new ObjectEncoder())
.addLast(Client.this);
}
})
.option(ChannelOption.SO_KEEPALIVE,true);
ChannelFuture future = b.connect(host,port).sync();
future.channel().writeAndFlush(request).sync();
System.out.println("2 "+Thread.currentThread().getName());
synchronized (obj){
System.out.println("1111111111111111");
obj.wait();
}
if(response != null){
System.out.println("3333333333333");
}
return response;
}finally {
if(future!=null){
future.channel().closeFuture().sync();
}
eventLoopGroup.shutdownGracefully();
}
}
}
複製代碼
package com.example.nettyrpcfirst.netty.client;
import com.example.nettyrpcfirst.netty.entity.TestService;
/** * @auther lichaobao * @date 2018/9/21 * @QQ 1527563274 */
public class TestMain {
public static void main(String[] args){
TestService testService = new ClientProxy().create(TestService.class);
String result = testService.play();
System.out.println("收到消息 ------------> "+result);
}
}
複製代碼
不出意外的話 控制檯會成功打印