互聯網時代,各類分佈式框架盛行,應用層面有各類變化,可是萬變不離其宗,RPC(remote procedure call)是最核心的部分,在其之上再開發服務註冊和發現,負載均衡,數據HA,調用鏈路記錄,等等功能。java
分佈式系統很是複雜,今天只是管中窺豹,利用1小時搭建一個基礎系統。基礎系統組件能夠概括爲以下三個:數據庫
服務註冊中心,管理元數據(Registry)apache
服務提供方(Provider)json
服務調用方(Consumer)bootstrap
0. 流行系統分析服務器
0.1 HBase架構
HBase是Apache Hadoop的數據庫,可以對大型數據提供隨機、實時的讀寫訪問。極其複雜,咱們不去深究,只看看它的基礎框架。負載均衡
ZooKeeper管理元數據(Registry)框架
Data Node(Provider)socket
Client(Consumer)
0.2 Kafka
Kafka是由LinkedIn開發的一個分佈式的消息系統,使用Scala編寫,它以可水平擴展和高吞吐率而被普遍使用。
ZooKeeper管理元數據(Registry)
Producer,Consumer(Consumer)
Broker(Provider)
0.3 Dubbo
Dubbo是阿里巴巴開發的一個分佈式服務框架,致力於提供高性能和透明化的RPC遠程服務調用方案,以及SOA服務治理方案。
1. 基礎框架開發
咱們先不深究架構原因,接下來咱們按照以下框架開發。Netty應用在衆多分佈式系統中,咱們也基於Netty開發,你們能夠稍微修改便可應用在生產環境中。
1.0 總體框架
以下,分紅4個模塊。接下來分別講解
common(提供基礎類定義)
registry(註冊中心)
client(服務調用方,Consumer)
server(服務提供方,Provider)
1.1 common
<dependency> <groupId>io.netty</groupId> <artifactId>netty-all</artifactId> <version>4.0.36.Final</version> </dependency>
1.1.1 Command.java
每個調用都是一個命令
package com.parsecode.common; import java.util.concurrent.atomic.AtomicLong; public class Command { // 註冊服務端 public final static int REGISTER_SERVER = 1; // 取消註冊 public final static int UNREGISTER_SERVER = 2; // 調用方法 public final static int INVOKE_REQUEST = 3; // 方法返回 public final static int INVOKE_RESPONSE = 4; // 獲取服務列表 public final static int GET_SERVER_LIST = 5; public final static int GET_SERVER_LIST_RESPONSE = 6; private static AtomicLong IDS = new AtomicLong(0); private int type; private long requestId; // 存放具體的方法調用信息、調用結果等 private byte[] body; public Command(int type, byte[] body) { this.type = type; this.body = body; this.requestId = IDS.incrementAndGet(); } // command 長度 = type(4) + requestId(8) + body.length public int length() { return 12 + (body == null ? 0 : body.length); } public int getType() { return type; } public void setType(int type) { this.type = type; } public byte[] getBody() { return body; } public void setBody(byte[] body) { this.body = body; } public long getRequestId() { return requestId; } public void setRequestId(long requestId) { this.requestId = requestId; } }
1.1.2 Invocation.java
用來表示一個方法調用,包括了,調用是哪一個接口,哪一個方法,參數類型,參數值。會用JSON序列化成byte[]。
package com.parsecode.common; import java.io.Serializable; public class Invocation implements Serializable { private String interfaceName; private String methodName; private String[] parameterTypes; private Object[] arguments; public String getInterfaceName() { return interfaceName; } public void setInterfaceName(String interfaceName) { this.interfaceName = interfaceName; } public String getMethodName() { return methodName; } public void setMethodName(String methodName) { this.methodName = methodName; } public String[] getParameterTypes() { return parameterTypes; } public void setParameterTypes(String[] parameterTypes) { this.parameterTypes = parameterTypes; } public Object[] getArguments() { return arguments; } public void setArguments(Object[] arguments) { this.arguments = arguments; } }
1.1.3 消息編解碼
也就是你們說的通訊協議,簡單設計協議以下
消息解碼
package com.parsecode.common; import io.netty.buffer.ByteBuf; import io.netty.channel.ChannelFuture; import io.netty.channel.ChannelFutureListener; import io.netty.channel.ChannelHandlerContext; import io.netty.handler.codec.LengthFieldBasedFrameDecoder; import java.nio.ByteBuffer; public class NettyDecoder extends LengthFieldBasedFrameDecoder { public NettyDecoder() { super(65536, 0, 4, 0, 4); } @Override public Object decode(ChannelHandlerContext ctx, ByteBuf in) throws Exception { ByteBuf frame = null; try { frame = (ByteBuf) super.decode(ctx, in); if (null == frame) { return null; } ByteBuffer byteBuffer = frame.nioBuffer(); int length = byteBuffer.limit(); int type = byteBuffer.getInt(); long requestId = byteBuffer.getLong(); byte[] bodyData = null; if ((length - 12) > 0) { bodyData = new byte[length - 12]; byteBuffer.get(bodyData); } Command cmd = new Command(type, bodyData); cmd.setRequestId(requestId); return cmd; } catch (Exception e) { ctx.channel().close().addListener(new ChannelFutureListener() { @Override public void operationComplete(ChannelFuture future) throws Exception { System.out.println("closeChannel"); } }); } finally { if (null != frame) { frame.release(); } } return null; } }
消息編碼
package com.parsecode.common; import io.netty.buffer.ByteBuf; import io.netty.channel.ChannelHandlerContext; import io.netty.handler.codec.MessageToByteEncoder; public class NettyEncoder extends MessageToByteEncoder<Command> { @Override public void encode(ChannelHandlerContext ctx, Command cmd, ByteBuf out) { try { int length = cmd.length(); out.writeInt(length); out.writeInt(cmd.getType()); out.writeLong(cmd.getRequestId()); out.writeBytes(cmd.getBody()); } catch (Exception e) { ctx.channel().close(); } } }
1.2 registry
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<parent>
<groupId>com.parsecode.framework</groupId>
<artifactId>distributed-communication-all</artifactId>
<version>0.0.1-SNAPSHOT</version>
</parent>
<modelVersion>4.0.0</modelVersion>
<packaging>jar</packaging>
<artifactId>distributed-communication-registry</artifactId>
<name>distributed-communication-registry ${project.version}</name>
<dependencies>
<dependency>
<groupId>io.netty</groupId>
<artifactId>netty-all</artifactId>
<version>4.0.36.Final</version>
</dependency>
<dependency>
<groupId>com.parsecode.framework</groupId>
<artifactId>distributed-communication-common</artifactId>
</dependency>
<dependency>
<groupId>org.apache.commons</groupId>
<artifactId>commons-lang3</artifactId>
<version>3.4</version>
</dependency>
</dependencies>
</project>
Registry.java
package com.parsecode.registry; import com.parsecode.common.NettyDecoder; import com.parsecode.common.NettyEncoder; import com.parsecode.common.Command; import io.netty.bootstrap.ServerBootstrap; import io.netty.channel.*; import io.netty.channel.nio.NioEventLoopGroup; import io.netty.channel.socket.SocketChannel; import io.netty.channel.socket.nio.NioServerSocketChannel; import io.netty.handler.timeout.IdleStateHandler; import org.apache.commons.lang3.StringUtils; import java.util.*; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ThreadFactory; import java.util.concurrent.atomic.AtomicInteger; public class Registry { private final ServerBootstrap serverBootstrap; private final EventLoopGroup eventLoopGroupWorker; private final EventLoopGroup eventLoopGroupBoss; // 存放對外提供的服務對象 <interface, servers> private final ConcurrentHashMap<String, List<String>> servers = new ConcurrentHashMap<String, List<String>>(); public Registry() { this.serverBootstrap = new ServerBootstrap(); this.eventLoopGroupBoss = new NioEventLoopGroup(1, new ThreadFactory() { private AtomicInteger threadIndex = new AtomicInteger(0); @Override public Thread newThread(Runnable r) { return new Thread(r, String.format("NettyBoss_%d", this.threadIndex.incrementAndGet())); } }); final int threadCount = Runtime.getRuntime().availableProcessors() * 2; this.eventLoopGroupWorker = new NioEventLoopGroup(threadCount, new ThreadFactory() { private AtomicInteger threadIndex = new AtomicInteger(0); private int threadTotal = threadCount; @Override public Thread newThread(Runnable r) { return new Thread(r, String.format("NettyServerWorker_%d_%d", threadTotal, this.threadIndex.incrementAndGet())); } }); } public void start() { serverBootstrap.group(eventLoopGroupBoss, eventLoopGroupWorker) .channel(NioServerSocketChannel.class) .option(ChannelOption.SO_BACKLOG, 1024) // 連接隊列個數 .childOption(ChannelOption.TCP_NODELAY, true) .localAddress("127.0.0.1", 8200) .childHandler(new ChannelInitializer<SocketChannel>() { @Override public void initChannel(SocketChannel ch) throws Exception { ch.pipeline().addLast( new NettyEncoder(), new NettyDecoder(), new IdleStateHandler(0, 0, 60), //空閒鏈路狀態處理 new NettyServerHandler()); } }); try { serverBootstrap.bind().sync().addListener(new ChannelFutureListener() { @Override public void operationComplete(ChannelFuture channelFuture) throws Exception { System.out.println("registry 建立成功"); } }); } catch (InterruptedException e1) { throw new RuntimeException("serverBootstrap.bind().sync() InterruptedException", e1); } } class NettyServerHandler extends SimpleChannelInboundHandler<Command> { @Override protected void channelRead0(ChannelHandlerContext ctx, Command msg) throws Exception { processMessageReceived(ctx, msg); } } // 根據不一樣的命令,相應處理 public void processMessageReceived(ChannelHandlerContext ctx, Command msg) { final Command cmd = msg; if (cmd != null) { switch (cmd.getType()) { case Command.GET_SERVER_LIST: try { String interfaceName = new String(msg.getBody(), "UTF-8"); List<String> ips = servers.get(interfaceName); if (ips == null) { ips = new ArrayList<String>(); servers.put(interfaceName, ips); } // 格式: ip:port,ip:port String str = StringUtils.join(ips, ","); byte[] body = String.valueOf(str).getBytes("UTF-8"); Command response = new Command(Command.GET_SERVER_LIST_RESPONSE, body); response.setRequestId(msg.getRequestId()); ctx.channel().writeAndFlush(response); } catch (Exception e) {} break; case Command.REGISTER_SERVER: case Command.UNREGISTER_SERVER: // 格式:interface,ip:port try { String str = new String(msg.getBody(), "UTF-8"); System.out.println("服務註冊:" + str); String[] aStr = str.split(","); List<String> ips = servers.get(aStr[0]); if (ips == null) { ips = new ArrayList<String>(); servers.put(aStr[0], ips); } if (msg.getType() == Command.REGISTER_SERVER && !ips.contains(aStr[1])) { ips.add(aStr[1]); } else { ips.remove(aStr[1]); } } catch (Exception e){ System.out.println("error" + e.getMessage()); }; break; default: break; } } } public static void main(String[] args) { Registry registry = new Registry(); registry.start(); } }
1.3 server
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<parent>
<groupId>com.parsecode.framework</groupId>
<artifactId>distributed-communication-all</artifactId>
<version>0.0.1-SNAPSHOT</version>
</parent>
<modelVersion>4.0.0</modelVersion>
<packaging>jar</packaging>
<artifactId>distributed-communication-server</artifactId>
<name>distributed-communication-server ${project.version}</name>
<dependencies>
<dependency>
<groupId>io.netty</groupId>
<artifactId>netty-all</artifactId>
<version>4.0.36.Final</version>
</dependency>
<dependency>
<groupId>com.parsecode.framework</groupId>
<artifactId>distributed-communication-common</artifactId>
</dependency>
<dependency>
<groupId>com.alibaba</groupId>
<artifactId>fastjson</artifactId>
<version>1.2.12</version>
</dependency>
</dependencies>
</project>
ServiceProvder.java
package com.parsecode.server; import com.alibaba.fastjson.JSON; import com.parsecode.common.*; import io.netty.bootstrap.Bootstrap; import io.netty.bootstrap.ServerBootstrap; import io.netty.channel.*; import io.netty.channel.nio.NioEventLoopGroup; import io.netty.channel.socket.SocketChannel; import io.netty.channel.socket.nio.NioServerSocketChannel; import io.netty.channel.socket.nio.NioSocketChannel; import io.netty.handler.timeout.IdleState; import io.netty.handler.timeout.IdleStateEvent; import io.netty.handler.timeout.IdleStateHandler; import java.lang.reflect.Method; import java.net.InetSocketAddress; import java.util.*; import java.util.concurrent.ThreadFactory; import java.util.concurrent.atomic.AtomicInteger; public class ServiceProvider { // 提供服務 private final ServerBootstrap serverBootstrap; // 連接註冊中心 private final Bootstrap registry = new Bootstrap(); private final EventLoopGroup eventLoopGroupWorker; private final EventLoopGroup eventLoopGroupBoss; private final EventLoopGroup eventLoopGroupRegistry; // 存放對外提供的服務對象 <interface, implement> private final Map<String, Object> services = new HashMap<String, Object>(); // 向註冊中心,實時發送心跳 private final Timer timer = new Timer("registry-heartbeat", true); public ServiceProvider() { this.serverBootstrap = new ServerBootstrap(); this.eventLoopGroupBoss = new NioEventLoopGroup(1, new ThreadFactory() { private AtomicInteger threadIndex = new AtomicInteger(0); @Override public Thread newThread(Runnable r) { return new Thread(r, String.format("NettyBoss_%d", this.threadIndex.incrementAndGet())); } }); final int threadCount = Runtime.getRuntime().availableProcessors() * 2; this.eventLoopGroupWorker = new NioEventLoopGroup(threadCount, new ThreadFactory() { private AtomicInteger threadIndex = new AtomicInteger(0); private int threadTotal = threadCount; @Override public Thread newThread(Runnable r) { return new Thread(r, String.format("NettyServerWorker_%d_%d", threadTotal, this.threadIndex.incrementAndGet())); } }); eventLoopGroupRegistry = new NioEventLoopGroup(1, new ThreadFactory() { private AtomicInteger threadIndex = new AtomicInteger(0); @Override public Thread newThread(Runnable r) { return new Thread(r, String.format("NettyClientRegistry_%d", this.threadIndex.incrementAndGet())); } }); } public void addService(String interfaceName, Object service) { services.put(interfaceName, service); } public void start() { serverBootstrap.group(eventLoopGroupBoss, eventLoopGroupWorker) .channel(NioServerSocketChannel.class) .option(ChannelOption.SO_BACKLOG, 1024) // 連接隊列個數 .option(ChannelOption.SO_REUSEADDR, true) .option(ChannelOption.SO_KEEPALIVE, false) //設置心跳參數 FALSE爲不啓用參數 .childOption(ChannelOption.TCP_NODELAY, true) .localAddress(new InetSocketAddress(8080)) .childHandler(new ChannelInitializer<SocketChannel>() { @Override public void initChannel(SocketChannel ch) throws Exception { ch.pipeline().addLast( new NettyEncoder(), new NettyDecoder(), new IdleStateHandler(0, 0, 60), //空閒鏈路狀態處理 new NettyServerHandler()); } }); registry.group(this.eventLoopGroupRegistry) .channel(NioSocketChannel.class) .option(ChannelOption.TCP_NODELAY, true) .handler(new ChannelInitializer<SocketChannel>() { @Override public void initChannel(SocketChannel ch) throws Exception { ch.pipeline().addLast( new NettyEncoder(), new NettyDecoder(), new IdleStateHandler(0, 0, 60), new NettyServerHandler()); } }); try { serverBootstrap.bind().sync(); } catch (InterruptedException e1) { throw new RuntimeException("this.serverBootstrap.bind().sync() InterruptedException", e1); } // 向註冊中心,註冊本身 timer.scheduleAtFixedRate(new TimerTask() { private volatile boolean registryOK = false; private volatile Channel channel; @Override public void run() { try { while (!registryOK) { // 註冊中心 port 8200 InetSocketAddress registryAddress = new InetSocketAddress("127.0.0.1", 8200); ChannelFuture channelFuture = registry.connect(registryAddress); channelFuture.syncUninterruptibly().addListener(new ChannelFutureListener() { @Override public void operationComplete(ChannelFuture channelFuture) throws Exception { registryOK = true; channel = channelFuture.channel(); } }); } if (registryOK && channel.isActive()) { for (String key : services.keySet()) { // 服務port和ip byte[] body = (key + ",127.0.0.1:8080").getBytes("UTF-8"); Command cmd = new Command(Command.REGISTER_SERVER, body); channel.writeAndFlush(cmd); System.out.println("註冊服務 > " + channel.toString()); } } else { registryOK = false; channel.close(); } } catch (Exception e) { registryOK = false; if (null != channel) { channel.close(); } } } }, 10, 1000); } public void shutdown() { eventLoopGroupBoss.shutdownGracefully(); eventLoopGroupWorker.shutdownGracefully(); } class NettyServerHandler extends SimpleChannelInboundHandler<Command> { @Override protected void channelRead0(ChannelHandlerContext ctx, Command msg) throws Exception { final Command cmd = msg; switch (cmd.getType()) { case Command.INVOKE_REQUEST: try { Invocation invoke = JSON.parseObject(cmd.getBody(), Invocation.class); // 找到服務 Object service = services.get(invoke.getInterfaceName()); Class cls = Class.forName(invoke.getInterfaceName()); List<Class> argsTypeList = new ArrayList<Class>(invoke.getParameterTypes().length); for (String s : invoke.getParameterTypes()) { argsTypeList.add(Class.forName(s)); } Method method = cls.getMethod(invoke.getMethodName(), argsTypeList.toArray(new Class[argsTypeList.size()])); Object result = method.invoke(service, invoke.getArguments()); Command response = new Command(Command.INVOKE_RESPONSE, JSON.toJSONBytes(result)); response.setRequestId(cmd.getRequestId()); ctx.channel().writeAndFlush(response); } catch (Exception e) {} break; default: break; } } @Override public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception { if (evt instanceof IdleStateEvent) { IdleStateEvent evnet = (IdleStateEvent) evt; if (evnet.state().equals(IdleState.ALL_IDLE)) { System.out.println("NETTY SERVER PIPELINE: IDLE exception"); ctx.channel().close(); } } ctx.fireUserEventTriggered(evt); } } public static void main(String[] args) { ServiceProvider sp = new ServiceProvider(); sp.addService(IService.class.getCanonicalName(), new ServiceImpl()); sp.start(); } }
1.4 client
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<parent>
<groupId>com.parsecode.framework</groupId>
<artifactId>distributed-communication-all</artifactId>
<version>0.0.1-SNAPSHOT</version>
</parent>
<modelVersion>4.0.0</modelVersion>
<packaging>jar</packaging>
<artifactId>distributed-communication-client</artifactId>
<name>distributed-communication-client ${project.version}</name>
<dependencies>
<dependency>
<groupId>io.netty</groupId>
<artifactId>netty-all</artifactId>
<version>4.0.36.Final</version>
</dependency>
<dependency>
<groupId>com.parsecode.framework</groupId>
<artifactId>distributed-communication-common</artifactId>
</dependency>
<dependency>
<groupId>com.alibaba</groupId>
<artifactId>fastjson</artifactId>
<version>1.2.12</version>
</dependency>
</dependencies>
</project>
ServiceConsumer.java
package com.parsecode.client; import com.alibaba.fastjson.JSON; import com.parsecode.common.*; import io.netty.bootstrap.Bootstrap; import io.netty.channel.*; import io.netty.channel.nio.NioEventLoopGroup; import io.netty.channel.socket.SocketChannel; import io.netty.channel.socket.nio.NioSocketChannel; import io.netty.handler.timeout.IdleStateHandler; import java.lang.reflect.InvocationHandler; import java.lang.reflect.Method; import java.lang.reflect.Proxy; import java.net.InetSocketAddress; import java.util.*; import java.util.concurrent.*; import java.util.concurrent.atomic.AtomicInteger; public class ServiceConsumer implements InvocationHandler { private final Bootstrap bootstrap = new Bootstrap(); private final Bootstrap registry = new Bootstrap(); // <ip:port, channel>,本例子只支持一個接口。生產環境須要支持多服務(interface) private final ConcurrentHashMap<String, Channel> channelTables = new ConcurrentHashMap<String, Channel>(); // 存放調用響應 private final ConcurrentHashMap<Long, Command> responses = new ConcurrentHashMap<Long, Command>(); private final Timer timer = new Timer("registry-heartbeat", true); private final EventLoopGroup eventLoopGroupWorker; private final EventLoopGroup eventLoopGroupRegistry; // 遠端服務接口 private Class interfaceClass; public ServiceConsumer(Class interfaceClass) { this.interfaceClass = interfaceClass; // 服務鏈接線程組 eventLoopGroupWorker = new NioEventLoopGroup(1, new ThreadFactory() { private AtomicInteger threadIndex = new AtomicInteger(0); @Override public Thread newThread(Runnable r) { return new Thread(r, String.format("NettyClientWorker_%d", this.threadIndex.incrementAndGet())); } }); // 註冊中心鏈接線程組 eventLoopGroupRegistry = new NioEventLoopGroup(1, new ThreadFactory() { private AtomicInteger threadIndex = new AtomicInteger(0); @Override public Thread newThread(Runnable r) { return new Thread(r, String.format("NettyClientRegistry_%d", this.threadIndex.incrementAndGet())); } }); } /** * 這裏簡單的用代理就能夠了。 * 生產環境:須要用javassist,jdk等字節碼技術動態生產class */ public <T> T getTarget() { final Class<?>[] interfaces = new Class[]{interfaceClass}; return (T) Proxy.newProxyInstance(ServiceConsumer.class.getClassLoader(), interfaces, this); } public void start() { // 定義遠端服務鏈接參數 bootstrap.group(this.eventLoopGroupWorker).channel(NioSocketChannel.class)// .option(ChannelOption.TCP_NODELAY, true) .handler(new ChannelInitializer<SocketChannel>() { @Override public void initChannel(SocketChannel ch) throws Exception { ch.pipeline().addLast( new NettyEncoder(), new NettyDecoder(), new IdleStateHandler(0, 0, 6), // new NettyClientHandler()); } }); // 定義註冊中心鏈接參數 registry.group(this.eventLoopGroupRegistry) .channel(NioSocketChannel.class) .option(ChannelOption.TCP_NODELAY, true) .handler(new ChannelInitializer<SocketChannel>() { @Override public void initChannel(SocketChannel ch) throws Exception { ch.pipeline().addLast( new NettyEncoder(), new NettyDecoder(), new IdleStateHandler(0, 0, 60), new NettyClientHandler()); } }); // 按期從註冊中心拉取服務端ip,port timer.scheduleAtFixedRate(new TimerTask() { volatile boolean registryOK = false; volatile Channel channel; @Override public void run() { try { while (!registryOK) { // 註冊中心 port 8200 ChannelFuture channelFuture = registry.connect(new InetSocketAddress("127.0.0.1", 8200)); channelFuture.syncUninterruptibly().addListener(new ChannelFutureListener() { @Override public void operationComplete(ChannelFuture channelFuture) throws Exception { registryOK = true; channel = channelFuture.channel(); } }); } if (registryOK && channel.isActive()) { byte[] body = interfaceClass.getCanonicalName().getBytes("UTF-8"); Command cmd = new Command(Command.GET_SERVER_LIST, body); channel.writeAndFlush(cmd); } else { channel.close(); registryOK = false; } } catch (Exception e) { registryOK = false; } } }, 10, 1000); } class NettyClientHandler extends SimpleChannelInboundHandler<Command> { @Override protected void channelRead0(ChannelHandlerContext ctx, Command msg) throws Exception { final Command cmd = msg; if (cmd != null) { switch (cmd.getType()) { case Command.INVOKE_RESPONSE: responses.put(msg.getRequestId(), msg); break; case Command.GET_SERVER_LIST_RESPONSE: try { String str = new String(msg.getBody(), "UTF-8"); String[] servers = str.split(","); for (String ip : servers) { System.out.println("服務提供者:" + ip); String[] ipAndPort = ip.split(":"); // 已經鏈接到了服務端,跳過 if (channelTables.containsKey(ip)) { continue; } if (ipAndPort.length == 2) { Channel channel = bootstrap.connect(new InetSocketAddress(ipAndPort[0], Integer.parseInt(ipAndPort[1]))) .sync().channel(); channelTables.put(ip, channel); } } }catch (Exception e){} break; default: break; } } }// channelRead0 } @Override public Object invoke(Object proxy, Method method, Object[] args) throws Throwable { Invocation inv = new Invocation(); inv.setInterfaceName(interfaceClass.getCanonicalName()); inv.setMethodName(method.getName()); inv.setArguments(args); String[] types = new String[method.getParameterTypes().length]; int index = 0; for (Class type : method.getParameterTypes()) { types[index] = type.getCanonicalName(); index++; } inv.setParameterTypes(types); Command cmd = new Command(Command.INVOKE_REQUEST, JSON.toJSONBytes(inv)); // 若是服務端沒有就位,等待。這裏只是例子,生產環境須要用超時和線程池。 while(channelTables.isEmpty()) { Thread.sleep(2); } for (String key : channelTables.keySet()) { channelTables.get(key).writeAndFlush(cmd); System.out.println("目標調用服務器:" + channelTables.get(key).toString()); } // 等待服務端返回。生產環境須要用Future來控制,這裏爲了簡單。 try { Thread.sleep(5); } catch (Exception e){} Command response = responses.get(cmd.getRequestId()); if (response != null) { return JSON.parse(response.getBody()); } return null; } public static void main(String[] args) throws Exception{ ServiceConsumer sc = new ServiceConsumer(IService.class); sc.start(); IService service = sc.getTarget(); System.out.println("IService.hello(\"world\"): " + service.hello("world")); } }
1.5 應用
1.5.1 定義服務
1.5.2 服務實現
調用效果