Java回調機制在RPC框架中的應用示例

完整源碼:java

 

應用場景描述:git

服務提供者在項目啓動時,建立並啓動一個TCP服務器,而後將本身提供的全部服務註冊到註冊中心。

 

這裏有3個角色:服務器

1.服務提供者
2.TCP服務器
3.註冊中心

 

引出回調:tcp

服務提供者,要求TCP服務器啓動成功以後,回調一下"註冊服務"的邏輯。

 

開始擼碼:ide

首先定義一個回調基類,這是一個抽象類,裏面只有一個抽象的回調函數,是未來回調時要實現的方法函數

 1 /**
 2  * 回調基類
 3  *
 4  * @author syj
 5  */
 6 public abstract class BaseCallBack {
 7     /**
 8      * 回調執行邏輯
 9      *
10      * @throws Exception
11      */
12     public abstract void run() throws Exception;
13 }

 

誰來調用這個回調函數呢,前面說了,TCP服務器啓動以後要調用這個回調函數,因此回調的操做要在TCP服務器中完成。測試

TCP服務器是一個具體的服務實現,咱們可能會有多種服務器的實現,因此先定義一個抽象的服務類:this

 1 /**
 2  * 服務抽象
 3  *
 4  * @author syj
 5  */
 6 public abstract class Server {
 7 
 8     // 服務啓動後回調
 9     private BaseCallBack startedCallBack;
10 
11     // 服務中止後回調
12     private BaseCallBack stopedCallBack;
13 
14     /**
15      * 設置服務啓動後的回調邏輯
16      *
17      * @param startedCallBack
18      */
19     public void setStartedCallBack(BaseCallBack startedCallBack) {
20         this.startedCallBack = startedCallBack;
21     }
22 
23     /**
24      * 設置服務中止後的回調邏輯
25      *
26      * @param stopedCallBack
27      */
28     public void setStopedCallBack(BaseCallBack stopedCallBack) {
29         this.stopedCallBack = stopedCallBack;
30     }
31 
32     /**
33      * 服務啓動後回調
34      */
35     public void onStarted() {
36         if (startedCallBack != null) {
37             try {
38                 startedCallBack.run();
39             } catch (Exception e) {
40                 e.printStackTrace();
41             }
42         }
43     }
44 
45     /**
46      * 服務中止後回調
47      */
48     public void onStoped() {
49         if (stopedCallBack != null) {
50             try {
51                 stopedCallBack.run();
52             } catch (Exception e) {
53                 e.printStackTrace();
54             }
55         }
56     }
57 
58     /**
59      * 啓動服務
60      *
61      * @param provider
62      * @throws Exception
63      */
64     public abstract void start(Provider provider) throws Exception;
65 
66     /**
67      * 中止服務
68      *
69      * @throws Exception
70      */
71     public abstract void stop() throws Exception;
72 
73 }

這個服務器抽象類,主要有啓動服務和中止服務的方法,還持有兩個回調對象,一個是服務器啓動後的回調對象,一個是服務器中止後的回調對象。並有兩個方法分別去調用這兩個回調對象的run方法。spa

下面定義一個TCP服務器類,它是上面服務器抽象類的一個具體實現類:code

 1 /**
 2  * 服務實現類
 3  *
 4  * @author syj
 5  */
 6 public class TcpServerImpl extends Server {
 7 
 8 
 9     /**
10      * 啓動服務
11      *
12      * @param provider
13      */
14     @Override
15     public void start(Provider provider) {
16         System.out.println(">>>> start! " + provider.getTcpSrvAddr() + ":" + provider.getTcpSrvPort());
17         // 啓動後回調
18         onStarted();
19 
20     }
21 
22     /**
23      * 中止服務
24      *
25      */
26     @Override
27     public void stop() {
28         System.out.println(">>>> stop!");
29         // 中止後回調
30         onStoped();
31     }
32 }

 

該類主要有兩個功能,一個是啓動TCP服務,一個是中止TCP服務,這兩個方法的最後都須要觸發回調邏輯的執行;

關於具體回調邏輯的定義寫在哪裏呢?天然是服務的提供者最清楚,因此寫在服務提供者類中最合適:

 1 import java.util.TreeSet;
 2 
 3 /**
 4  * 服務操做
 5  *
 6  * @author syj
 7  */
 8 public class Provider {
 9 
10     // 模擬要註冊的服務列表
11     public static TreeSet<String> serviceKeys = new TreeSet<String>() {{
12         add("userService");
13         add("productService");
14         add("orderService");
15     }};
16 
17     // 模擬本機http服務使用的ip和端口
18     public static String localAddress = "127.0.0.1:8081";
19 
20     // TCP服務器地址
21     private String tcpSrvAddr;
22 
23     // TCP服務器端口
24     private int tcpSrvPort;
25 
26     public String getTcpSrvAddr() {
27         return tcpSrvAddr;
28     }
29 
30     public int getTcpSrvPort() {
31         return tcpSrvPort;
32     }
33 
34     private Server server;
35     private Registry registry;
36 
37     public Provider() {
38     }
39 
40     /**
41      * 初始化配置
42      *
43      * @param tcpSrvAddr
44      * @param tcpSrvPort
45      */
46     public void initConfig(String tcpSrvAddr, int tcpSrvPort) {
47         this.tcpSrvAddr = tcpSrvAddr;
48         this.tcpSrvPort = tcpSrvPort;
49     }
50 
51     /**
52      * 啓動服務
53      */
54     public void start() {
55         try {
56             registry = Registry.class.newInstance();
57             server = TcpServerImpl.class.newInstance();
58             // 設置服務啓動後回調邏輯
59             server.setStartedCallBack(new BaseCallBack() {
60                 @Override
61                 public void run() {
62                     System.out.println(">>>> setStartedCallBack:" + serviceKeys + ":" + localAddress);
63                     // 註冊服務
64                     registry.start();
65                     registry.registry(serviceKeys, localAddress);
66                 }
67             });
68 
69             // 設置服務中止後回調邏輯
70             server.setStopedCallBack(new BaseCallBack() {
71                 @Override
72                 public void run() {
73                     System.out.println(">>>> setStopedCallBack:" + tcpSrvAddr + ":" + tcpSrvPort);
74                     registry.remove(serviceKeys, localAddress);
75                 }
76             });
77 
78             // 啓動服務
79             server.start(this);
80         } catch (Exception e) {
81             e.printStackTrace();
82         }
83     }
84 
85     /**
86      * 中止服務
87      */
88     public void stop() {
89         try {
90             server.stop();
91         } catch (Exception e) {
92             e.printStackTrace();
93         }
94     }
95 }

因爲服務提供者須要啓動TCP服務器,因此它依賴一個Server對象,在他的start方法中,啓動TCP服務以前,先給這個TCP服務設置兩個回調回調具體邏輯,就是前面說的一個是TCP服務器啓動以後要執行的邏輯,一個是TCP服務器中止以後要執行的邏輯。

TCP服務器啓動成功以後,要將服務提供者的全部服務註冊到註冊中心,TCP服務器中止以後要從註冊中心移除本身的全部服務。

下面是註冊中心類,它只負責服務的註冊和移除,別的事無論:

 1 import java.util.HashMap;
 2 import java.util.Map;
 3 import java.util.Set;
 4 import java.util.TreeSet;
 5 
 6 /**
 7  * 服務註冊中心
 8  *
 9  * @author syj
10  */
11 public class Registry {
12 
13     private Map<String, TreeSet<String>> registryData;
14 
15 
16     public void start() {
17         registryData = new HashMap<String, TreeSet<String>>();
18         System.out.println(">>>> 註冊中心建立成功");
19     }
20 
21     public void stop() {
22         registryData.clear();
23     }
24 
25 
26     public boolean registry(Set<String> keys, String value) {
27         if (keys==null || keys.size()==0 || value==null || value.trim().length()==0) {
28             return false;
29         }
30         for (String key : keys) {
31             TreeSet<String> values = registryData.get(key);
32             if (values == null) {
33                 values = new TreeSet<>();
34                 registryData.put(key, values);
35             }
36             values.add(value);
37         }
38         System.out.println(">>>> 服務註冊成功");
39         return true;
40     }
41 
42     public boolean remove(Set<String> keys, String value) {
43         if (keys==null || keys.size()==0 || value==null || value.trim().length()==0) {
44             return false;
45         }
46         for (String key : keys) {
47             TreeSet<String> values = registryData.get(key);
48             if (values != null) {
49                 values.remove(value);
50             }
51         }
52         System.out.println(">>>> 服務移除成功");
53         return true;
54     }
55 }

 

寫個測試類測試一下:

 1 /**
 2  * 測試類
 3  *
 4  * @author syj
 5  */
 6 public class App {
 7     // TCP 服務器IP
 8     public static String tcpSrvAddr = "192.168.11.23";
 9     // TCP 服務端口
10     public static int tcpSrvPort = 9090;
11 
12     public static void main(String[] args) {
13         Provider provider = new Provider();
14         provider.initConfig(tcpSrvAddr, tcpSrvPort);
15         provider.start();
16         provider.stop();
17     }
18 }

 

輸出結果:

>>>> start! 192.168.11.23:9090
>>>> setStartedCallBack:[orderService, productService, userService]:127.0.0.1:8081
>>>> 註冊中心建立成功
>>>> 服務註冊成功
>>>> stop!
>>>> setStopedCallBack:192.168.11.23:9090
>>>> 服務移除成功

 

總結:

A類調用B類的某個方法,B類的該方法中又調用了A類的某個方法,這就是回調。這僅是一個形象的描述,具體的回調機制在實際應用時會很靈活。好比這個例子中,Provider類的start方法調用啦Server類的start方法,而Server類的start方法中又調用了onStarted方法,雖然這個onStarted方法不是Provider中的方法,但其執行的回調邏輯是在Provider中經過setStartedCallBack()來設置的。
相關文章
相關標籤/搜索