上一篇文章JAVA NIO 編程入門(二)學習了NIO的彙集和分散,以及選擇器的使用,並經過一個小demo進行實戰,演示瞭如何進行分散和彙集,以及其主要使用場景,本文將是NIO編程入門最後一篇,進行一個RPC簡單小demo框架的實現,對前面的知識進行總結性的實戰,因爲只是演示性質的demo,因此RPC功能並無考慮很完善,也不涉及到性能等問題考慮。編程
RPC 英文全稱 Remote Procedure Calls,翻譯過來就是遠程過程調用,是分佈式系統中不一樣節點間流行的通訊方式。舉例:假設有A服務和B服務分別位於不一樣的服務器,A服務想調用B服務像調用本地方法同樣,這個時候就須要藉助RPC方式進行調用。json
RPC由三個主要部分組成,服務提供者,服務消費者,服務註冊中心,服務註冊中心提供服務提供者註冊服務。客戶端和服務端的交互協議採用json的形式,方便演示,同時考慮到複雜性,本次RPC不利用匯集和分散進行協議設計。bash
@Data
public class RpcRquest {
/**請求id*/
private String requestId;
/**請求接口名*/
private String interfaceName;
/**服務版本**/
private String serviceVersion;
/**方法名*/
private String methodName;
/**參數類型*/
private Class<?>[] parameterTypes;
/**參數*/
private Object[] parameters;
}
複製代碼
@Data
public class RpcResponse {
/**請求流水號*/
private String requestId;
/**異常*/
private Exception exception;
/**返回結果**/
private Object result;
}
複製代碼
public class RpcRegister {
/**存儲註冊的服務提供實現類*/
private HashMap<String, Object> registMap = new HashMap<>();
private static RpcRegister register=new RpcRegister();
public static RpcRegister buildRegist(){
return register;
}
public RpcRegister regist(String interfaceName,Object obj){
registMap.put(interfaceName,obj);
return this;
}
public Object findServier(String interfaceName){
return registMap.get(interfaceName);
}
}
複製代碼
這裏利用一個map存提供服務的實例,後續再在服務端只須要經過接口就能夠查找到對應的實現類。服務器
public class ProviderServer implements Runnable {
/**
* 服務提供端口
*/
private int port;
public ProviderServer(int port) {
this.port = port;
}
@Override
public void run() {
try {
Selector selector = Selector.open();
ServerSocketChannel serverSocketChannel = ServerSocketChannel.open();
serverSocketChannel.socket().bind(new InetSocketAddress(port));
serverSocketChannel.configureBlocking(false);
serverSocketChannel.register(selector, SelectionKey.OP_ACCEPT);
while (true) {
int readyChannels = selector.selectNow();
if (readyChannels == 0) continue;
Set selectedKeys = selector.selectedKeys();
Iterator keyIterator = selectedKeys.iterator();
while (keyIterator.hasNext()) {
SelectionKey key = (SelectionKey) keyIterator.next();
if (key.isAcceptable()) {
ServerSocketChannel serverSocketChannel1 = (ServerSocketChannel) key.channel();
SocketChannel socketChannel = serverSocketChannel1.accept();
ByteBuffer buf1 = ByteBuffer.allocate(2048);
socketChannel.read(buf1);
buf1.flip();
String reciveStr = new String(buf1.array());
if (buf1.hasRemaining()) {
System.out.println(">>>服務端收到數據:" + reciveStr);
//判斷接受的內容是否有結束符,若是有,說明是一個請求結束。
if (reciveStr.contains(RpcConstant.PROTOCOL_END)) {
RpcRquest req = JSONObject.parseObject(reciveStr.replace(RpcConstant.PROTOCOL_END, ""), RpcRquest.class);
RpcResponse res = new RpcResponse();
res.setRequestId(req.getRequestId());
System.out.println(req.toString());
Class<?> remoteInterface = Class.forName(req.getInterfaceName());
Method method = remoteInterface.getMethod(req.getMethodName(), req.getParameterTypes());
if (null != method) {
Object obj = method.invoke(RpcRegister.buildRegist().findServier(req.getInterfaceName()), req.getParameters());
res.setException(null);
res.setResult(obj);
}
buf1.clear();
buf1.put(JSONObject.toJSON(res).toString().getBytes());
buf1.flip();
socketChannel.write(buf1);
}
}
socketChannel.close();
} else if (key.isConnectable()) {
} else if (key.isReadable()) {
} else if (key.isWritable()) {
}
keyIterator.remove();
}
}
} catch (IOException e) {
e.printStackTrace();
} catch (ClassNotFoundException e) {
e.printStackTrace();
} catch (IllegalAccessException e) {
e.printStackTrace();
} catch (InvocationTargetException e) {
e.printStackTrace();
} catch (NoSuchMethodException e) {
e.printStackTrace();
}
}
}
複製代碼
這部分源碼在《JAVA NIO 編程入門(二)》的基礎上增長了反射的部份內容,主要根據接口調用協議,生成客戶端須要調用的方法,進行調用,而後將結果返回。框架
public class RpcInitFactory {
/**
* 客戶端鏈接遠程ip地址
**/
private String ip;
/***遠程端口*/
private int port;
public RpcInitFactory(String ip, int port) {
this.ip = ip;
this.port = port;
}
}
複製代碼
@Data
public class CommonClient {
private RpcInitFactory factory;
public CommonClient(RpcInitFactory factory) {
this.factory = factory;
}
public <T> T invoke(RpcRquest req) {
RpcResponse response = null;
req.setRequestId(UUID.randomUUID().toString());
try {
SocketChannel socketChannel = SocketChannel.open();
socketChannel.connect(new InetSocketAddress(factory.getIp(), factory.getPort()));
ByteBuffer buf1 = ByteBuffer.allocate(2048);
buf1.put(JSONObject.toJSON(req).toString().getBytes());
buf1.put(RpcConstant.PROTOCOL_END.getBytes());
buf1.flip();
if (buf1.hasRemaining())
socketChannel.write(buf1);
buf1.clear();
ByteBuffer body = ByteBuffer.allocate(2048);
socketChannel.read(body);
body.flip();
if (body.hasRemaining()) {
response = JSONObject.parseObject(new String(body.array()), RpcResponse.class);
}
body.clear();
socketChannel.close();
} catch (IOException e) {
e.printStackTrace();
}
return (T) response;
}
複製代碼
public interface Idemo {
/***加法**/
public Integer add(Integer i,Integer j);
}
複製代碼
public class DemoRemoteImpl implements Idemo {
private CommonClient client;
public DemoRemoteImpl(CommonClient client) {
this.client = client;
}
@Override
public Integer add(Integer i, Integer j) {
//構造rpc請求實體類
RpcRquest rpcRquest=new RpcRquest();
//設置版本號
rpcRquest.setServiceVersion("123");
//設置調用的接口名稱
rpcRquest.setInterfaceName(Idemo.class.getName());
//設置調用方法名稱
rpcRquest.setMethodName("add");
//設置參數
rpcRquest.setParameters(new Integer[] {i,j});
//設置參數類型
rpcRquest.setParameterTypes(new Class[] {Integer.class,Integer.class});
//進行遠程調用
RpcResponse response= client.invoke(rpcRquest);
if (null!=response){
return Integer.parseInt(response.getResult().toString());
}
return null;
}
}
複製代碼
public class DemoImp implements Idemo{
@Override
public Integer add(Integer i, Integer j) {
return i+j;
}
}
複製代碼
測試dom
public static void main(String[] args) {
ProviderServer server = new ProviderServer(8090);
RpcRegister.buildRegist().regist(Idemo.class.getName(), new DemoImp());
new Thread(server).start();
}
複製代碼
public static void main(String[] args) {
RpcInitFactory initFactory= new RpcInitFactory("127.0.0.1",8090);
Idemo demo = new DemoRemoteImpl(new CommonClient(initFactory));
System.out.println(demo.add(2, 1));
}
複製代碼
到這裏RPC的小demo功能實現完畢,實際上的RPC框架要比這個複雜的多,真正的RPC框架要考慮性能,高可用,半包,粘包等問題,這裏只是給出了一個RPC框架的實現原理,便於理解RPC框架的實現,並不能真正用於生產環境。socket