PRC(Remote Procedure Call) 遠程過程調用。通俗的講就是程序經過RPC
框架調用遠程主機的方法就如同調用本地方法同樣。Dubbo
就是這樣一個Rpc
框架,本文主要參考Dubbo的設計思路,簡易實現了Rpc框架。 本文涉及到知識點包括:java
Rpc 框架通常分爲三個部分,Registry(註冊中心)、Provider(提供者)、Consumer(消費者)。git
java中常見的代理有JDK動態代理、Cglib動態代理、靜態代理(ASM等字節碼技術)。github
舉個例子redis
@Override
@SuppressWarnings("unchecked")
public <T> T createProxyBean(Class<T> rpcServiceInterface) {
return (T) Proxy.newProxyInstance(Thread.currentThread().getContextClassLoader(), new Class<?>[]{rpcServiceInterface}, new AryaRpcInvocationHandler());
}
複製代碼
JDK代理生成代理對象主要經過java.lang.reflect.Proxy
類的newProxyInstance
方法。JDK代理須要被代理對象必須實現接口。bootstrap
Cglib其實是對ASM的易用性封裝,Cglib不須要目標對象必須實現某一個接口,相對JDK動態代理更加靈活。bash
Enhancer en = new Enhancer();
en.setSuperclass(clazz);
en.setCallback(new MethodInterceptor() {
@Override
public Object intercept(Object arg0, Method method, Object[] args, MethodProxy arg3) throws Throwable {
Object o = method.invoke(object, args);
return o;
}
});
return en.create();
複製代碼
經過字節碼技術對class文件進行修改,使用和學習成本相對較高,須要對Class的文件結構以及各類符號引用有比較深的認識,才能較好的使用,由於是對字節碼的修改因此相對的性能上也比動態代理要好一些。服務器
咱們知道數據在網絡上傳輸都是經過二進制流的形式進行進行的。當Consumer調用Provider時傳輸的參數須要先進行序列化,provider接收到參數時須要進行反序列化才能拿到須要的參數數據,因此序列化的性能對RPC的調用性能有很大的影響。目前主流的序列化方式有不少包括:Kryo、Protostuff、hessian。等網絡
Protostuff是google序列化Protosbuff的開源實現,項目中咱們用到它的序列化方式app
/** * @author HODO */
public class ProtostuffSerializer implements Serializer {
@Override
public byte[] serialize(Object object) {
Class targetClass = object.getClass();
RuntimeSchema schema = RuntimeSchema.createFrom(targetClass);
LinkedBuffer linkedBuffer = LinkedBuffer.allocate(LinkedBuffer.DEFAULT_BUFFER_SIZE);
return ProtostuffIOUtil.toByteArray(object, schema, linkedBuffer);
}
@SuppressWarnings("unchecked")
@Override
public <T> T deserialize(byte[] bytes, Class<T> targetClass) {
RuntimeSchema schema = RuntimeSchema.createFrom(targetClass);
T object = (T) schema.newMessage();
ProtostuffIOUtil.mergeFrom(bytes, object, schema);
return object;
}
}
複製代碼
Netty是一個高性能、異步事件驅動的NIO框架,它提供了對TCP、UDP和文件傳輸的支持。舉個例子: Netty 服務端代碼框架
public class NettyServer {
private ApplicationContext applicationContext;
public NettyServer(ApplicationContext applicationContext) {
this.applicationContext = applicationContext;
}
public void init(int port) {
EventLoopGroup boss = new NioEventLoopGroup();
EventLoopGroup worker = new NioEventLoopGroup();
try {
ServerBootstrap bootstrap = new ServerBootstrap();
bootstrap.group(boss, worker);
bootstrap.channel(NioServerSocketChannel.class);
bootstrap.option(ChannelOption.SO_BACKLOG, 1024);
bootstrap.option(ChannelOption.TCP_NODELAY, true);
bootstrap.childOption(ChannelOption.SO_KEEPALIVE, true);
bootstrap.localAddress(port);
bootstrap.childHandler(new ChannelInitializer<SocketChannel>() {
@Override
public void initChannel(SocketChannel socketChannel) throws Exception {
ChannelPipeline channelPipeline = socketChannel.pipeline();
channelPipeline.addLast(new NettyServerHandler(applicationContext));
}
});
ChannelFuture f = bootstrap.bind().sync();
if (f.isSuccess()) {
System.out.println("Netty端口號:" + port);
}
f.channel().closeFuture().sync();
} catch (Exception e) {
e.printStackTrace();
} finally {
boss.shutdownGracefully();
worker.shutdownGracefully();
}
}
}
複製代碼
Netty 客服端代碼
public class NettyClient {
private int port;
private String host;
private final CountDownLatch countDownLatch = new CountDownLatch(1);
SerializerFactory serializerFactory = new SerializerFactory();
Serializer serializer = serializerFactory.getSerialize(ProtostuffSerializer.class);
public NettyClient(String host, int port) {
this.port = port;
this.host = host;
}
public NettyClient(String inetAddress) {
if (inetAddress != null && inetAddress.length() != 0) {
String[] strings = inetAddress.split(":");
this.host = strings[0];
this.port = Integer.valueOf(strings[1]);
}
}
public RpcResponse invoker(RpcRequest rpcRequest) throws InterruptedException {
EventLoopGroup eventLoopGroup = new NioEventLoopGroup();
try {
Bootstrap bootstrap = new Bootstrap();
final NettyClientHandler clientHandler = new NettyClientHandler();
bootstrap.group(eventLoopGroup).channel(NioSocketChannel.class).option(ChannelOption.SO_KEEPALIVE, true).handler(new ChannelInitializer<SocketChannel>() {
@Override
protected void initChannel(SocketChannel socketChannel) throws Exception {
socketChannel.pipeline().addLast(clientHandler);
}});
ChannelFuture future = bootstrap.connect(new InetSocketAddress(host, port)).sync();
serializer.serialize(rpcRequest);
future.channel().writeAndFlush(Unpooled.buffer().writeBytes(serializer.serialize(rpcRequest)));
countDownLatch.await();
// 等待連接關閉
//future.channel().closeFuture().sync();
return clientHandler.getRpcResponse();
} finally {
eventLoopGroup.shutdownGracefully();
}
}
public class NettyClientHandler extends ChannelInboundHandlerAdapter {
private RpcResponse rpcResponse;
/** * 接收 Rpc 調用結果 * * @param ctx netty 容器 * @param msg 服務端答覆消息 * @throws Exception */
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
ByteBuf buf = (ByteBuf) msg;
byte[] req = new byte[buf.readableBytes()];
buf.readBytes(req);
rpcResponse = serializer.deserialize(req, RpcResponse.class);
countDownLatch.countDown();
}
RpcResponse getRpcResponse() {
return rpcResponse;
}
}
}
複製代碼
選用了zookeeper
做爲註冊中心,在建議Rpc框架中提供了註冊中心的擴展。只要實現RegistryManager
接口便可。zookeeper
經常使用的命令行:
一、客服端腳本鏈接zookeeper服務器不指定-server
默認鏈接本地服務
./zkCli -service ip:port
二、建立
create [-s] [-e] path data acl
建立一個節點-s
-e
分別指定節點的類型和特性:順序和臨時節點默認建立的是臨時節點,acl用於權限控制
三、讀取
ls path
只能看指定節點下的一級節點
get path
查看指定節點的數據和屬性信息
四、更新
set path data [version]
能夠指定更新操做是基於哪個版本當更新的 path
不存在時報 Node does not exist
五、刪除
`delete path [version]``
在框架中還提供了兩個註解@RpcConsumer
和RpcProvider
在項目中只要引入
<dependency>
<groupId>com.yoku.arya</groupId>
<artifactId>arya</artifactId>
<version>1.0-SNAPSHOT</version>
</dependency>
複製代碼
在provider端容器注入
@Bean
public RpcProviderProcessor rpcProviderProcessor() {
return new RpcProviderProcessor();
}
複製代碼
在comsumer端容器注入
@Bean
public RpcConsumerProcessor rpcConsumerProcessor() {
return new RpcConsumerProcessor();
}
複製代碼
項目完整的代碼 arya github.com/hoodoly/ary…
框架使用Demo github.com/hoodoly/ary…
歡迎 star
聯繫方式:gunriky@163.com 有問題能夠直接聯繫