自實現RPC調用

服務提供者apache

服務接口:socket

public interface HelloService {
    public String sayHello(String name);
}

服務實現類:ide

public class HelloServiceImpl implements HelloService {
    @Override
    public String sayHello(String name) {
        return "hello:" + name;
    }
}

服務註冊:this

public class ProviderReflect {
    private static final ThreadPoolExecutor executor = new ThreadPoolExecutor(2, 5, 1, TimeUnit.MINUTES, new LinkedBlockingDeque<Runnable>(1000), new MyThreadFactory("providerReflact"));

    /**
     * 服務發佈
     *
     * @param service
     * @param port
     */
    public static void provider(final Object service, int port) {
        ServerSocket serverSocket = null;

        try {
            serverSocket = new ServerSocket(port);
            while (true) {
                Socket socket = serverSocket.accept();
                executor.execute(new Runnable() {
                    @Override
                    public void run() {
                        ObjectOutputStream oos = null;
                        ObjectInputStream ois = null;
                        try {
                            ois = new ObjectInputStream(socket.getInputStream());

                            // 請求調用的方法名
                            String methodName = ois.readUTF();
                            // 請求的參數
                            Object[] args = (Object[]) ois.readObject();

                            oos = new ObjectOutputStream(socket.getOutputStream());
                            Object result = MethodUtils.invokeExactMethod(service, methodName, args);
                            System.out.println(result);
                            // 寫結果
                            oos.writeObject(result);
                        } catch (IOException e) {
                            e.printStackTrace();
                        } catch (InvocationTargetException e) {
                            e.printStackTrace();
                        } catch (NoSuchMethodException e) {
                            e.printStackTrace();
                        } catch (IllegalAccessException e) {
                            e.printStackTrace();
                        } catch (ClassNotFoundException e) {
                            e.printStackTrace();
                        } finally {
                            if (oos != null) {
                                try {
                                    oos.close();
                                } catch (IOException e) {
                                    e.printStackTrace();
                                }
                            }
                            if (ois != null) {
                                try {
                                    ois.close();
                                } catch (IOException e) {
                                    e.printStackTrace();
                                }
                            }
                        }
                    }
                });
            }
        } catch (IOException e) {
            e.printStackTrace();
        } finally {
            if (serverSocket != null) {
                try {
                    serverSocket.close();
                } catch (IOException e) {
                    e.printStackTrace();
                }
            }

        }
    }

    static class MyThreadFactory implements ThreadFactory {
        /**
         * 線程池中的線程名稱前綴
         */
        private String namePrefix;

        /**
         * 原子的整型
         */
        private AtomicInteger atomicInteger = new AtomicInteger(1);

        public MyThreadFactory(String whatFeaturOfGroup) {
            this.namePrefix = "From MyThreadFactory:" + whatFeaturOfGroup + "-worker-";
        }

        @Override
        public Thread newThread(Runnable r) {
            String name = namePrefix + atomicInteger.getAndIncrement();
            Thread thread = new Thread(r, name);
            System.out.println(thread.getName());
            return thread;
        }
    }

    /**
     * 發佈服務
     *
     * @param args
     */
    public static void main(String[] args) {
        HelloService helloService = new HelloServiceImpl();
        ProviderReflect.provider(helloService, 8082);
    }
}

pom.xml:atom

       <dependency>
            <groupId>org.apache.commons</groupId>
            <artifactId>commons-lang3</artifactId>
            <version>3.4</version>
        </dependency>

 

服務消費者spa

消費接口:線程

public interface HelloService {
    public String sayHello(String name);
}

接口的代理:代理

public class ConsumerProxy {
    public static <T> T consume(final Class<?> serviceClass, final String host, final int port) {
        return (T) Proxy.newProxyInstance(serviceClass.getClassLoader(), new Class<?>[]{serviceClass}, new InvocationHandler() {
            @Override
            public Object invoke(Object proxy, Method method, Object[] args) throws Throwable {
                Socket socket = null;
                ObjectOutputStream output = null;
                ObjectInputStream input = null;
                try {
                    socket = new Socket(host, port);
                    output = new ObjectOutputStream(socket.getOutputStream());
//                    output.writeUTF(serviceClass.getName());
                    output.writeUTF(method.getName());
//                    output.writeObject(method.getParameterTypes());
                    output.writeObject(args);
                    input = new ObjectInputStream(socket.getInputStream());
                    return input.readObject();
                } finally {
                    if (socket != null) {
                        socket.close();
                    }
                    if (output != null) {
                        output.close();
                    }
                    if (input != null) {
                        input.close();
                    }
                }
            }
        });
    }

    public static void main(String[] args) {
        HelloService helloService = ConsumerProxy.consume(HelloService.class, "127.0.0.1", 8082);
        String response = helloService.sayHello("yangyongjie");
        System.out.println(response);
    }
}

輸出:code

hello:yangyongjieserver

相關文章
相關標籤/搜索