yls 2020/5/23java
大體流程:git
- netty搭建rpc框架;
- 建立服務消費者和服務提供者的公共接口和類
- 建立服務提供者,啓動netty框架的服務端
- 建立服務消費者,啓動netty框架的客戶端,而後獲取調用結果
/** * rpc調用時傳輸類的信息 * 客戶端與服務端之間通訊,傳遞信息的媒介 */ public class ClassInfo { //自定義name,通常一個接口有多個實現類的時候使用自定義 // 或者默認使用接口名稱 private String name; private String methodName; //參數類型 private Class[] types; //參數列表 private Object[] params; //自定義rpc協議 private String protocol="#rpc#"; public String getProtocol() { return protocol; } public void setProtocol(String protocol) { this.protocol = protocol; } public String getName() { return name; } public void setName(String name) { this.name = name; } public String getMethodName() { return methodName; } public void setMethodName(String methodName) { this.methodName = methodName; } public Class[] getTypes() { return types; } public void setTypes(Class<?>[] types) { this.types = types; } public Object[] getParams() { return params; } public void setParams(Object[] params) { this.params = params; } }
/** * 編碼器 * MyMessageEncoder MyMessageDecoder解決粘包拆包問題 */ public class MyMessageEncoder extends MessageToByteEncoder<String> { @Override protected void encode(ChannelHandlerContext ctx, String msg, ByteBuf out) throws Exception { //先發送內容長度 out.writeInt(msg.getBytes().length); //發送具體的內容 out.writeBytes(msg.getBytes()); } }
/** * 解碼器 */ public class MyMessageDecoder extends ReplayingDecoder<Void> { @Override protected void decode(ChannelHandlerContext ctx, ByteBuf in, List<Object> out) throws Exception { //先讀取要接收的字節長度 final int len = in.readInt(); final byte[] bytes = new byte[len]; //再根據長度讀取真正的字節數組 in.readBytes(bytes); String s = new String(bytes); out.add(s); } }
public class NettyClient { private static NettyClientHandler nettyClientHandler; static ThreadPoolExecutor threadPool = new ThreadPoolExecutor(3, 5, 5, TimeUnit.SECONDS, new LinkedBlockingQueue<>(10)); public static <T> T getBean(Class<T> service) { String simpleName = service.getSimpleName(); return getBean(service, simpleName); } //獲取一個動態代理對象 public static <T> T getBean(Class<T> service, String name) { T o = (T) Proxy.newProxyInstance(service.getClassLoader(), new Class<?>[]{service}, ((proxy, method, args1) -> { //先創建鏈接 if (nettyClientHandler == null) { start(ClientBootStrap.getHost() , ClientBootStrap.getPort()); } //組裝傳輸類的屬性值 ClassInfo classInfo = new ClassInfo(); classInfo.setName(name); classInfo.setMethodName(method.getName()); Class<?>[] parameterTypes = method.getParameterTypes(); classInfo.setTypes(parameterTypes); classInfo.setParams(args1); nettyClientHandler.setClassInfo(classInfo); //運行線程,發送數據 Future future = threadPool.submit(nettyClientHandler); //返回結果 String o1 = (String) future.get(); ObjectMapper objectMapper = new ObjectMapper(); //獲取返回類型,並將服務端返回的json數據轉化爲對應的類型 Type returnType = method.getAnnotatedReturnType().getType(); Object o2 = objectMapper.readValue(o1, (Class<?>) returnType); return o2; })); return o; } //啓動netty客戶端 public static void start(String host, int port) { nettyClientHandler = new NettyClientHandler(); //客戶端須要一個事件循環組就能夠 NioEventLoopGroup group = new NioEventLoopGroup(1); try { //建立客戶端的啓動對象 bootstrap ,不是 serverBootStrap Bootstrap bootstrap = new Bootstrap(); //設置相關參數 bootstrap.group(group) //設置線程組 .channel(NioSocketChannel.class) //設置客戶端通道的實現數 (反射) .handler(new ChannelInitializer<SocketChannel>() { @Override protected void initChannel(SocketChannel ch) throws Exception { ch.pipeline() .addLast(new MyMessageDecoder()) .addLast(new MyMessageEncoder()) .addLast(nettyClientHandler); //加入本身的處理器 } }); System.out.println("客戶端 ready is ok.."); //鏈接服務器 final ChannelFuture channelFuture = bootstrap.connect(host, port).sync(); //對關閉通道進行監聽 // channelFuture.channel().closeFuture().sync(); } catch (InterruptedException e) { e.printStackTrace(); } finally { // group.shutdownGracefully(); } } }
/** * 因爲須要在 handler 中發送消息給服務端,而且將服務端返回的消息讀取後返回給消費者 * 因此實現了 Callable 接口,這樣能夠運行有返回值的線程 */ public class NettyClientHandler extends ChannelInboundHandlerAdapter implements Callable { private ClassInfo classInfo; //傳遞數據的類 private ChannelHandlerContext context;//上下文 private Object result;//服務端返回的結果 private Lock lock = new ReentrantLock();//使用鎖將 channelRead和 call 函數同步 private Condition condition = lock.newCondition();//精準喚醒 call中的等待 public void setClassInfo(ClassInfo classInfo) { this.classInfo = classInfo; } //通道鏈接時,就將上下文保存下來,由於這樣其餘函數也能夠用 @Override public void channelActive(ChannelHandlerContext ctx) throws Exception { this.context = ctx; } @Override public void channelInactive(ChannelHandlerContext ctx) throws Exception { System.out.println("channelInactive 被調用。。。"); } //當服務端返回消息時,將消息複製到類變量中,而後喚醒正在等待結果的線程,返回結果 @Override public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception { lock.lock(); System.out.println(ctx.channel().hashCode()); System.out.println("收到服務端發送的消息 " + msg); result = msg; //喚醒等待的線程 condition.signal(); lock.unlock(); } //這裏面發送數據到服務端,等待channelRead方法接收到返回的數據時,將數據返回給服務消費者 @Override public Object call() throws Exception { lock.lock(); ObjectMapper objectMapper = new ObjectMapper(); final String s = objectMapper.writeValueAsString(classInfo); context.writeAndFlush(s); System.out.println("發出數據 " + s); //向服務端發送消息後等待channelRead中接收到消息後喚醒 condition.await(); lock.unlock(); return result; } //異常處理 @Override public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception { cause.printStackTrace(); } }
public class NettyServer { //啓動netty服務端 public static void start(int port) { EventLoopGroup bossGroup = new NioEventLoopGroup(1); EventLoopGroup workerGroup = new NioEventLoopGroup(); try { //建立服務端的啓動對象,並使用鏈式編程來設置參數 ServerBootstrap serverBootstrap = new ServerBootstrap(); serverBootstrap.group(bossGroup, workerGroup) //設置兩個線程組 .channel(NioServerSocketChannel.class)//使用NioServerSocketChannel 做爲服務器的通道實現 .option(ChannelOption.SO_BACKLOG, 128)//設置線程隊列的鏈接個數 .childOption(ChannelOption.SO_KEEPALIVE, true) //設置一直保持活動鏈接狀態 .childHandler(new ChannelInitializer<SocketChannel>() {//設置一個通道測試對象 @Override protected void initChannel(SocketChannel ch) throws Exception { //給pipeline設置通道處理器 ch.pipeline() .addLast(new MyMessageDecoder()) .addLast(new MyMessageEncoder()) .addLast(new NettyServerHandler()); } });//給 workerGroup 的EventLoop對應的管道設置處理器 //啓動服務器,並綁定端口而且同步 ChannelFuture channelFuture = serverBootstrap.bind(port).sync(); //給 channelFuture 註冊監聽器,監聽關心的事件,異步的時候使用 // channelFuture.addListener((future) -> { // if (future.isSuccess()) { // System.out.println("監聽端口成功。。。"); // } else { // System.out.println("監聽端口失敗。。。"); // } // }); //對關閉通道進行監聽,監聽到通道關閉後,往下執行 channelFuture.channel().closeFuture().sync(); } catch (InterruptedException e) { e.printStackTrace(); } finally { bossGroup.shutdownGracefully(); workerGroup.shutdownGracefully(); } } }
public class NettyServerHandler extends ChannelInboundHandlerAdapter { public static Map<String, Class<?>> classNameMapping = new HashMap(); public static void setClassNameMapping(Object object) { Class<?> clazz = object.getClass(); Class<?>[] interfaces = clazz.getInterfaces(); Class<?> anInterface = interfaces[0]; setClassNameMapping(anInterface.getSimpleName(), object); } //爲實現類定義標識,方便客戶端和服務端通訊調用 public static void setClassNameMapping(String name, Object object) { Class<?> clazz = object.getClass(); classNameMapping.put(name, clazz); } //接收客戶端傳入的值,將值解析爲類對象,獲取其中的屬性,而後反射調用實現類的方法 @Override public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception { String s = (String) msg; System.out.println("接收到數據 " + s); ObjectMapper objectMapper = new ObjectMapper(); ClassInfo classInfo = objectMapper.readValue(s, ClassInfo.class); //確認是rpc調用才往下執行 if(classInfo!=null && "#rpc#".equals(classInfo.getProtocol())){ //反射調用實現類的方法 String name = classInfo.getName(); //獲取指定名稱的實現類 Class<?> aClass = classNameMapping.get(name); Object o = aClass.newInstance(); if (classInfo.getTypes().length > 0) { Method method = aClass.getMethod(classInfo.getMethodName(), classInfo.getTypes()); method.setAccessible(true); Object invoke = method.invoke(o, classInfo.getParams()); String s1 = objectMapper.writeValueAsString(invoke); ctx.writeAndFlush(s1); } else { Method method = aClass.getMethod(classInfo.getMethodName()); method.setAccessible(true); Object invoke = method.invoke(o); String s1 = objectMapper.writeValueAsString(invoke); ctx.writeAndFlush(s1); } } } @Override public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception { cause.printStackTrace(); ctx.close(); } }
public interface HelloService { Result hello(String s); String str(); }
/** * 測試返回結果爲java bean時使用的類 */ public class Result { private int id; private String content; public int getId() { return id; } public void setId(int id) { this.id = id; } public String getContent() { return content; } public void setContent(String content) { this.content = content; } }
public class HelloServiceImpl implements HelloService { @Override public Result hello(String s) { System.out.println("收到消費者的請求。。" + s); Result result=new Result(); result.setId(1); result.setContent("你好,我已經收到了你的消費請求"); return result; } @Override public String str() { return "我是一個字符串。。。"; } }
public class ServerBootStrap { public static void main(String[] args) { NettyServerHandler.setClassNameMapping(new HelloServiceImpl()); NettyServer.start(9999); } }
/** * 消費者 */ public class ClientBootStrap { private static String host = "127.0.0.1"; private static int port = 9999; public static String getHost() { return host; } public static int getPort() { return port; } public static void main(String[] args) { //鏈接netty,並得到一個代理對象 HelloService bean = NettyClient.getBean(HelloService.class); //測試返回結果爲java bean Result res = bean.hello("ffafa"); System.out.println("res=====" + res.getContent()); //測試返回結果爲 String String str = bean.str(); System.out.println("str=====" + str); } }