rpc的簡單實現

前文: java

1.以前公司用hessian&ws來作遠程調用,因爲客戶機的ip常常變換,沒法實時跟着做變更。 服務器

2.消息中間件服務load小,沒有充分發揮用處。 網絡

處理方案,利用MOM被客戶機訂閱的機制獲取客戶的id,在MOM的層面作RPC。這樣作解決了上面的問題還能作到以下好處: 框架

1.很好的作到服務註冊,監控等功能。 異步

2.自然的支持分佈式 socket

3.自然支持異步 分佈式

下面貼出最簡要的代碼: ide


  • 基礎類
    import java.io.ObjectInputStream;
    import java.io.ObjectOutputStream;
    import java.lang.reflect.InvocationHandler;
    import java.lang.reflect.Method;
    import java.lang.reflect.Proxy;
    import java.net.ServerSocket;
    import java.net.Socket;
    
    public class RpcExample {
    
        /**
         * 暴露服務
         * 
         * @param service 服務實現
         * @param port 服務端口
         * @throws Exception
         */
        public static void service(final Object service, int port) throws Exception {
            ServerSocket server = new ServerSocket(port);
            for(;;) {
                try {
                    final Socket socket = server.accept();
                    new Thread(new Runnable() {
                        @Override
                        public void run() {
                            try {
                                try {
                                    ObjectInputStream input = new ObjectInputStream(socket.getInputStream());
                                    try {
                                        String methodName = input.readUTF();
                                        Class<?>[] parameterTypes = (Class<?>[])input.readObject();
                                        Object[] arguments = (Object[])input.readObject();
                                        ObjectOutputStream output = new ObjectOutputStream(socket.getOutputStream());
                                        try {
                                            Method method = service.getClass().getMethod(methodName, parameterTypes);
                                            Object result = method.invoke(service, arguments);
                                            output.writeObject(result);
                                        } catch (Throwable t) {
                                            output.writeObject(t);
                                        } finally {
                                            output.close();
                                        }
                                    } finally {
                                        input.close();
                                    }
                                } finally {
                                    socket.close();
                                }
                            } catch (Exception e) {
                                e.printStackTrace();
                            }
                        }
                    }).start();
                } catch (Exception e) {
                    e.printStackTrace();
                }
            }
        }
    
        /**
         * 引用服務
         * 
         * @param <T> 接口泛型
         * @param interfaceClass 接口類型
         * @param host 服務器主機名
         * @param port 服務器端口
         * @return 遠程服務
         * @throws Exception
         */
        @SuppressWarnings("unchecked")
        public static <T> T reference(final Class<T> interfaceClass, final String host, final int port) throws Exception {  return (T) Proxy.newProxyInstance(interfaceClass.getClassLoader(), new Class<?>[] {interfaceClass}, new InvocationHandler() {
                public Object invoke(Object proxy, Method method, Object[] arguments) throws Throwable {
                    Socket socket = new Socket(host, port);
                    try {
                        ObjectOutputStream output = new ObjectOutputStream(socket.getOutputStream());
                        try {
                            output.writeUTF(method.getName());
                            output.writeObject(method.getParameterTypes());
                            output.writeObject(arguments);
                            ObjectInputStream input = new ObjectInputStream(socket.getInputStream());
                            try {
                                Object result = input.readObject();
                                if (result instanceof Throwable) {
                                    throw (Throwable) result;
                                }
                                return result;
                            } finally {
                                input.close();
                            }
                        } finally {
                            output.close();
                        }
                    } finally {
                        socket.close();
                    }
                }
            });
        }
    }

RpcExample解釋: spa

1.例子中是利用socket來實現網絡傳輸,在產品中是利用ActiveMQ來實現,只須要將invoke中的代碼用queue來實現。host和port就能夠不用傳入了。 .net

2.固然你能夠不用ActiveMQ(用這個的理由上面已經說了),甚至socket也不本身寫,你能夠用現成的框架作,例如mina,netty

3.想實現簡單的服務治理和監控 只要在run和invoke中加入相應的代碼。


  • 生產者(服務提供者)
    public class RpcProvider {
        //IService是接口,ServiceImpl是相應的實現類
        public static void main(String[] args) throws Exception {
            IService service = new ServiceImpl();
            RpcExample.service(service, 8161);
        }
    }


  • 消費者

    public class RpcConsumer {
        public static void main(String[] args) throws Exception {
            IService service = RpcExample.reference(IService.class, "127.0.0.1", 8161);
            String print = service.getString();
            System.out.println(print);
        }
    }


生產者,消費者代碼解釋:

1.接口和實現就不貼出來了,裏面就一個方法getString(),

2.接口(IService)必須作成jar,生產者和消費者都須要引用,若是print方法中包含其餘的傳輸對象,必須序列化和作成相應的jar。

3.固然,若是要作成跨語言的就不要作成java對象了,你能夠用各類傳輸協議和格式來作。

4.相應的序列化手段也能夠作成插入式的形式讓使用者定製本身的序列化方式。

相關文章
相關標籤/搜索