RPC(Remote Procedure Call),即遠程過程調用,它是一種經過網絡從遠程計算機程序 上請求服務,而不須要了解底層網絡實現的技術。常見的RPC 框架有: 源自阿里的Dubbo, Spring 旗下的Spring Cloud,Google 出品的grpc 等等。java
將上面的12個步驟整理爲下面9個步驟:git
1,服務消費方(Client)以本地調用方式調用服務
2. client stub 接收到調用後負責將方法、參數等封裝成可以進行網絡傳輸的消息體
3. client stub 將消息進行編碼併發送到服務端
4. server stub 收到消息後進行解碼
5. server stub 根據解碼結果調用本地的服務
6. 本地服務執行並將結果返回給server stub
7. server stub 將返回導入結果進行編碼併發送至消費方
8. client stub 接收到消息並進行解碼
9. 服務消費方(client)獲得結果
複製代碼
RPC 的目標就是將2-8 這些步驟都封裝起來,用戶無需關心這些細節,能夠像調用本地 方法同樣便可完成遠程服務調用。接下來咱們基於Netty 本身動手搞定一個RPC。github
Client(服務的調用方): 兩個接口+ 一個包含main 方法的測試類
Client Sub: 一個客戶端代理類+ 一個客戶端業務處理類
Server(服務的提供方): 兩個接口+ 兩個實現類
Server Sub: 一個網絡處理服務器+ 一個服務器業務處理類
注意:服務調用方的接口必須跟服務提供方的接口保持一致(包路徑能夠不一致),最終要實現的目標是:在TestNettyRPC 中遠程調用HelloRPCImpl 或HelloNettyImpl 中的方法
複製代碼
public interface HelloNetty {
String hello();
}
public class HelloNettyImpl implements HelloNetty {
@Override
public String hello() {
return "----> hello,netty <---";
}
}
public interface HelloRPC {
String hello(String name);
}
public class HelloRpcImpl implements HelloRPC {
@Override
public String hello(String name) {
return "hello," + name;
}
}
複製代碼
上述代碼做爲服務的提供方,咱們分別編寫了兩個接口和兩個實現類,供消費方遠程調用編程
public class ClassInfo implements Serializable {
private static final long serialVersionUID = -7821682294197810003L;
private String className;//類名
private String methodName;//返回值
private Class<?>[] types; //參數類型
private Object[] objects; //參數列表
// ..getter
// ..setter
}
複製代碼
上述代碼做爲實體類用來封裝消費方發起遠程調用時傳給服務方的數據bootstrap
public class InvokeHandler extends ChannelInboundHandlerAdapter {
//獲得某接口下某個實現類的名字
private String getImplClassName(ClassInfo classInfo) throws ClassNotFoundException {
//服務方接口和實現類所在的包路徑
String interfacePath = "cn.haoxiaoyong.record.rpc.server";
int lastDot = classInfo.getClassName().lastIndexOf(".");
String interfaceName = classInfo.getClassName().substring(lastDot);
Class<?> superClass = Class.forName(interfacePath + interfaceName);
Reflections reflections = new Reflections(interfacePath);
//獲得某接口下的全部實現類
Set<Class<?>> ImplClassSet = (Set<Class<?>>) reflections.getSubTypesOf(superClass);
if (ImplClassSet.size() == 0) {
System.out.println("未找到實現類");
return null;
} else if (ImplClassSet.size() > 1) {
System.out.println("找到多個實現類,未明確使用哪個");
return null;
} else {
//把集合轉換爲數組
Class[] classes = ImplClassSet.toArray(new Class[0]);
return classes[0].getName();//獲得實現類的名字
}
}
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
ClassInfo classInfo = (ClassInfo) msg;
Object clazz = Class.forName(getImplClassName(classInfo)).newInstance();
Method method = clazz.getClass().getMethod(classInfo.getMethodName(), classInfo.getTypes());
//經過反射調用實現類的方法
Object result = method.invoke(clazz, classInfo.getObjects());
ctx.writeAndFlush(result);
}
}
複製代碼
上述代碼做爲業務處理類,讀取消費方發來的數據,並根據獲得的數據進行本地調用,而後把結果返回給消費方.數組
public class NettyRpcServer {
private int port;
public NettyRpcServer(int port) {
this.port = port;
}
public void start() {
EventLoopGroup bossGroup = new NioEventLoopGroup();
EventLoopGroup wprkGroup = new NioEventLoopGroup();
try {
ServerBootstrap serverBootstrap = new ServerBootstrap();
serverBootstrap.group(bossGroup, wprkGroup)
.channel(NioServerSocketChannel.class)
.option(ChannelOption.SO_BACKLOG, 128)
.childOption(ChannelOption.SO_KEEPALIVE, true)
.localAddress(port)
.childHandler(new ChannelInitializer<SocketChannel>() {
@Override
protected void initChannel(SocketChannel ch) throws Exception {
ChannelPipeline pipeline = ch.pipeline();
//編碼器
pipeline.addLast("encoder", new ObjectEncoder())
//解碼器
.addLast("decoder", new ObjectDecoder(Integer.MAX_VALUE, ClassResolvers.cacheDisabled(null)))
.addLast(new InvokeHandler());
}
});
ChannelFuture future = serverBootstrap.bind(port).sync();
System.out.println("......server is ready......");
future.channel().closeFuture().sync();
} catch (InterruptedException e) {
bossGroup.shutdownGracefully();
wprkGroup.shutdownGracefully();
}
}
public static void main(String[] args) {
new NettyRpcServer(9999).start();
}
}
複製代碼
上述代碼是用Netty實現的網絡服務器,採用Netty自帶的ObjectEncoder 和ObjectDecoder 做爲編碼解碼(爲了下降複雜度,這裏並無使用 第三方的編碼解碼器),固然實際開發中也能夠採用JSON或XML.服務器
public class ResultHandler extends ChannelInboundHandlerAdapter {
private Object response;
public Object getResponse() {
return response;
}
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
response = msg;
ctx.close();
}
}
複製代碼
上述代碼做爲客戶端的業務處理類讀取遠程調用返回的數據網絡
public class NettyRpcProxy {
//根據結構建立代理對象
public static Object create(Class target) {
return Proxy.newProxyInstance(target.getClassLoader(), new Class[]{target}, new InvocationHandler() {
@Override
public Object invoke(Object proxy, Method method, Object[] args) throws Throwable {
//封裝ClassInfo
ClassInfo classInfo = new ClassInfo();
classInfo.setClassName(target.getName());
classInfo.setMethodName(method.getName());
classInfo.setObjects(args);
classInfo.setTypes(method.getParameterTypes());
//開始用Netty發送數據
EventLoopGroup group = new NioEventLoopGroup();
ResultHandler resultHandler = new ResultHandler();
try {
Bootstrap bootstrap = new Bootstrap();
bootstrap.group(group)
.channel(NioSocketChannel.class)
.handler(new ChannelInitializer<SocketChannel>() {
@Override
protected void initChannel(SocketChannel ch) throws Exception {
ChannelPipeline pipeline = ch.pipeline();
//編碼器
pipeline.addLast("encoder", new ObjectEncoder())
//解碼器,構造方法第一個參數設置二進制的最大字節數,第二個參數設置具體使用哪一個類解析器
.addLast("decoder", new ObjectDecoder(Integer.MAX_VALUE, ClassResolvers.cacheDisabled(null)))
//客戶端業務處理類
.addLast("handler", resultHandler);
}
});
ChannelFuture future = bootstrap.connect("127.0.0.1", 9999).sync();
future.channel().writeAndFlush(classInfo).sync();
future.channel().closeFuture().sync();
} finally {
group.shutdownGracefully();
}
return resultHandler.getResponse();
}
});
}
}
複製代碼
上述代碼是用Netty 實現的客戶端代理類,採用Netty 自帶的ObjectEncoder 和ObjectDecoder 做爲編解碼器(爲了下降複雜度,這裏並無使用第三方的編解碼器),固然實際開發時也 能夠採用JSON 或XML。多線程
public interface HelloNetty {
String hello();
}
public interface HelloRPC {
String hello(String name);
}
複製代碼
上述代碼定義了兩個接口做爲服務的調用方併發
public class TestNettyRpc {
public static void main(String[] args) {
//第一次遠程調用
HelloNetty helloNetty = (HelloNetty) NettyRpcProxy.create(HelloNetty.class);
System.out.println(helloNetty.hello());
//第二次遠程調用
HelloRPC helloRPC = (HelloRPC) NettyRpcProxy.create(HelloRPC.class);
System.out.println(helloRPC.hello("RPC"));
}
}
複製代碼
消費方不須要知道底層的網絡實現細節,就像調用本地方法同樣成功發起了兩次遠程調用。
1,首先啓動服務方: NettyRpcServer 中的main方法;
2,而後啓動服務調用方: TestNettyRpc中的測試方法;
當前項目:自定義RPC(項目中rpc包下)
Netty框架--netty編碼解碼(項目中proto包下)
歡迎給個小星星鼓勵一下(Star,害羞臉)....
加 QQ: 3521467848 獲取關於 Netty視頻