http://git.oschina.net/rushmore/zbusjava
http://my.oschina.net/sbz/blognode
zbus的角色是中間消息服務(Broker),默認分佈式運行(固然也能夠嵌入式單進程運做)python
嵌入式直接 new MqServer 啓動jquery
MqServerConfig config = new MqServerConfig();
config.serverPort = 15555;
config.storePath = "./store";
final MqServer server = new MqServer(config);
server.start();linux
啓動後zbus能夠經過瀏覽器直接訪問zbus啓動服務器15555端口的監控服務git
消息隊列是zbus的最基礎服務,MQ參與角色分爲三大類算法
Producer ==> Broker ==> Consumerspring
邏輯上解耦分離 1. 生產者只須要知道Broker的存在,負責生產消息到Broker,不須要關心消費者的行爲 2. 消費者也只須要知道Broker的存在,負責消費處理Broker上某個MQ隊列的消息,不須要關心生產者的行爲編程
不一樣的Broker實如今細節上會有些不一樣,可是在MQ邏輯解耦上基本保持一致,下面細節所有是以zbus特定定義展開windows
zbus與客戶端(生產者與消費者)之間通信的消息(org.zbus.net.http.Message)爲了擴展性採用了【擴展HTTP】消息格式。 zbus的消息邏輯組織是以MQ標識來分組消息,MQ標識在zbus中就是MQ名字,Message對象中能夠直接指定。 物理上zbus把同一個下MQ標識下的消息按照FIFO隊列的模式在磁盤中存儲,隊列長度受限於磁盤大小,與內存無關。
編程模型上,分兩個視圖,生產者與消費者兩個視圖展開
生產者與消費者在編程模型上都須要首先產生一個Broker,Broker是對zbus自己的抽象,爲了達到編程模型的一致,Broker能夠是 單機版本的SingleBroker,也能夠是高可用版本的HaBroker,甚至能夠是不通過網絡的本地化JvmBroker,這些類型的Broker都是不一樣的實現,編程模型上不關心,具體根據實際運行環境而定,爲了更加方便配置,ZbusBroker實現了上述幾種不一樣的Broker實現的代理包裝,根據具體Broker地址來決定最終的版本。
例如
Broker broker = new ZbusBroker("127.0.0.1:15555"); //SingleBroker
Broker broker = new ZbusBroker("127.0.0.1:16666;127.0.0.1:16667"); //HaBroker
Broker broker = new ZbusBroker("jvm"); //JvmBroker
Broker內部核心實現了: 1. 鏈接池管理 2. 同步異步API
因此Broker在JAVA中能夠理解爲相似JDBC鏈接池同樣的重對象,應該共享使用,大部分場景應該是Application生命週期。 而依賴Broker對象而存在的Producer與Consumer通常能夠當作是輕量級對象(Consumer由於擁有連接須要關閉)
生產消息
//Producer是輕量級對象能夠隨意建立不用釋放
Producer producer = new Producer(broker, "MyMQ");
producer.createMQ();//肯定爲建立消息隊列須要顯示調用
Message msg = new Message();
msg.setBody("hello world"); //消息體底層是byte[]
msg = producer.sendSync(msg);
消費消息
Consumer consumer = new Consumer(broker, "MyMQ");
consumer.start(new ConsumerHandler() {
@Override
public void handle(Message msg, Consumer consumer) throws IOException {
//消息回調處理
System.out.println(msg);
}
});
//可控的範圍內須要關閉consumer(內部擁有了物理鏈接)
生產者能夠異步發送消息,直接調用producer.sendAsync(),具體請參考examples中相關示例
消費者可使用更底層的API控制怎麼取消息,直接調用consumer.take()從zbus上取回消息
從上面的API來看,使用很是簡單,鏈接池管理,同步異步處理、高可用等相關主題所有留給了Broker抽象自己
消息嚴格順序問題 參考文章
MQ消息隊列用於解耦應用之間的依賴關係,通常認爲MQ是從更普遍的分佈式RPC中演變而來的:在RPC場景下,若是某個遠程方法調用耗時過長,調用方不但願blocking等待,除了異步處理以外,更加常見的改造方式是採用消息隊列解耦調用方與服務方。
RPC的場景更加常見,RPC須要解決異構環境跨語言的調用問題,有很是多的解決方案,綜合看都是折中方案,zbus也屬其一。
RPC從數據通信角度來看能夠簡單理解爲:
分佈式調用方A --->命令序列化(method+params) ---> 網絡傳輸 ---> 分佈式式服務方B 命令反序列化(method+params)
^ |
| v
|<---結果反序列化(result/error)<----- 網絡傳輸 <----結果序列化(result/error) <---調用本地方法
網絡傳輸能夠是純網絡傳遞消息也能夠是其餘服務器中轉,好比消息隊列
異構環境下RPC方案須要解決的問題包括如下核心問題
1. 跨語言,多語言平臺下的消息通信格式選擇問題
2. 服務端伺服問題,高性能處理模型
3. 分佈式負載均衡問題
WebService採用HTTP協議負載,SOAP跨語言描述對象解決問題1
Windows WCF採用抽象統一WebService和私有序列化高效傳輸解決問題1
在服務端處理模型與分佈式負載均衡方面並很少體現,這裏不討論WebService,WCF或者某些私有的RPC方案的優劣之分,工程優化過程當中出現了諸如Thrift,dubbo等等RPC框架,折射出來是的對已有的RPC方案中折中的不滿。
針對問題1,zbus的RPC採用的是JSON數據根式封裝跨語言平臺協議,特色是簡單明瞭,協議應用普遍(zbus設計上能夠替換JSON)
針對問題二、問題3,zbus默認採用兩套模式,MQ-RPC與DirectRPC, MQ-RPC基於MQ消息隊列集中接入模式,DirectRPC則經過交叉直連模式
zbus的RPC方案除了解決上面三個問題以外,還有兩個重要的工程目標:
4. 極其輕量、方便二次開發
5. RPC業務自己與zbus解耦(無侵入,方便直接替換掉zbus)
zbus的RPC設計很是簡單,模型上對請求和應答作了基本的抽象
public static class Request{
private String module = ""; //模塊標識
private String method; //遠程方法
private Object[] params; //參數列表
private String[] paramTypes;
private String encoding = "UTF-8";
}
public static class Response {
private Object result;
private Throwable error;
private String stackTrace; //異常時候必定保證stackTrace設定,判斷的邏輯以此爲依據
private String encoding = "UTF-8";
}
很是直觀的抽象設計,就是對method+params 與 結果result/error 的JAVA表達而已。
RpcCodec的一個JSON協議實現---JsonRpcCodec完成將上述對象序列化成JSON格式放入到HTTP消息體中在網絡上傳輸
RpcInvoker API核心
public class RpcInvoker{
private MessageInvoker messageInvoker;
private RpcCodec codec; //RPC對象序列化協議
public Response invokeSync(Request request){
.....
}
}
完成將上述請求序列化併發送至網絡,等待結果返回,序列化回result/error。
//調用示例
RpcInvoker rpc = new RpcInvoker(...); //構造出RpcInvoker
//利用RpcInvoker 調用方法echo(String msg), 給定參數值 "test"
//1) 調用動態底層API
Request request = new Request();
request.setMethod("echo");
request.setParams(new Object[]{"test"});
Response response = rpc.invokeSync(request);
//2)強類型返回結果
String echoString = rpc.invokeSync(String.class, "echo", "test");
RpcInvoker同時適配MQ-RPC與DirectRPC,只須要給RpcInvoker指定不一樣的底層消息MessageInvoker,好比
點對點DirectRPC
//1) MessageClient是一種MessageInvoker,物理鏈接點對點
MessageInvoker client = new MessageClient("127.0.0.1:15555", ....);
RpcInvoker rpc = new RpcInvoker(client); //構造出RpcInvoker
//2) Broker也是一種MessageInvoker, 由於Broker管理了鏈接池,這樣構造的RpcInvoker具備鏈接池能力
MessageInvoker broker = new ZbusBroker("127.0.0.1:15555");
RpcInvoker rpc = new RpcInvoker(broker); //構造出RpcInvoker
//1)與2)本質上都是點對點的直連模式
高可用DirectRPC
//1) 接入到Trackserver的ZbusBroker,具有高可用選擇能力
MessageInvoker messageInvoker = new ZbusBroker("127.0.0.1:16666;127.0.0.1:16667");
//指定高可用服務器上的選擇標識,註冊爲相同標識的服務提供方之間高可用
HaInvoker haInvoker = new HaInvoker(messageInvoker, "HaDirectRpc");
RpcInvoker rpc = new RpcInvoker(haInvoker); //構造出RpcInvoker
MQ-RPC
//step 1 生成一個到zbus服務器的MessageInvoker
Broker broker = new ZbusBroker();
//step 2 相似Java IoStream封裝,在點對點基礎上能夠適配出MQ能力的MessageInvoker
MessageInvoker mqInvoker = new MqInvoker(broker, "MyRpc"); //使用某個隊列實現的RPC,調用適配
RpcInvoker rpc = new RpcInvoker(mqInvoker); //構造出RpcInvoker
以上三種RPC結構優缺點以下:
爲了解決問題5,使得zbus在RPC業務解耦,zbus增長了動態代理類
RpcFactory API完成業務interface通過zbus的RPC動態代理類實現
public class RpcFactory {
private final MessageInvoker messageInvoker; //底層支持的消息Invoker,完成動態代理
public <T> T getService(Class<T> api) throws Exception{
....
}
}
經過RpcFactory則完成了業務代碼與zbus的解耦(經過spring等IOC容器更加完全的把zbus徹底隔離掉)
MessageInvoker invoker = new ... //DirectRPC或者MqRPC 選擇, 同上
//RpcFactory根據底層invoker來決定消息流
RpcFactory factory = new RpcFactory(invoker);
//動態生成出InterfaceExample的實現類,RPC調用方不須要真正的實現類,客戶端與服務端都通interface解耦
InterfaceExample hello = factory.getService(InterfaceExample.class);
Spring的配置徹底是上述代碼的XML翻譯,在此不作例子,具體參考examples下spring配置示例。
RPC數據流圖中分佈式服務提供方須要的兩件事情是
對於問題1.如何拿到數據包,分兩大類處理方案:DirectRPC與MQ-RPC
DirectRPC則須要啓動網絡偵聽服務,被動處理請求RPC包;MQ-RPC則是使用Consumer從zbus的MQ隊列中主動取RPC請求包。
DirectRPC的服務zbus採用JAVA NIO服務器完成,對應org.zbus.rpc.direct.Service服務器完成NIO網絡伺服;MQ-RPC對應org.zbus.rpc.mq.Service,多Consumer線程從zbus的某個MQ隊列中併發取RPC請求包。
對於問題2,無論哪一種模式的RPC都採用相同的處理方式--RpcProcessor
public class RpcProcessor implements MessageProcessor{
private RpcCodec codec = new JsonRpcCodec(); //序列反序列化Request/Response
private Map<String, MethodInstance> methods = new HashMap<String, MethodInstance>(); //業務方法映射表
public void addModule(String module, Object... services){
.....
}
public Message process(Message msg){
.....
}
}
RpcProcessor本質上是經過反射將業務邏輯對象中的方法組織成 method==>(Method對象,Instance)映射
RpcProcessor.addModule(module, BizObject...)完成這個映射的管理
process的過程以下:
1. 處理RPC的請求包,RpcCodec反序列化出Request對象
2. 根據Request對象找到合適的Method並嘗試調用
3. 調用結果組裝成合適的Response對象
4. RpcCodec反序列化Response對象返回RPC響應包
啓動RPC服務在zbus中變得很是簡單,分兩步完成
//1)構造RpcProcessor--準備好服務映射表
RpcProcessor processor = new RpcProcessor();
processor.addModule(new InterfaceExampleImpl()); //動態增長業務對象,提供真正的業務邏輯
//2)MQ-RPC或者DirectRPC的Service--容器運行上面的RpcProcessor
ServiceConfig config = new ServiceConfig();
config.setMessageProcessor(processor);
//更多的配置
Service svc = new Service(config);
svc.start();
Spring的配置徹底是上述代碼的XML翻譯,在此不作例子,具體參考examples下spring配置示例。
ZBUS = MQ+RPC
跨平臺多語言+集中式節點控制,使得zbus適合完成服務總線適配工做。
爲何要採用總線架構適配已有服務? 1. 集中式接入控制 2. 標準化 3. 擴展引入zbus的多語言跨平臺能力
總線架構的一個核心需求是提供便捷的服務適配能力,zbus經過MQ和RPC來完成,對
新服務接入參考zbus實現RPC部分
舊服務MQ代理模式適配數據流描述:
zbus標準RPC客戶端 <----> zbus(某個MQ隊列)------->consumer線程消費消息----RPC消息包解包---->舊協議組裝調用舊服務
^ |
| v
--------------consumer.route命令返回<-----組裝RPC消息包<----舊服務返回結果
代理模式通常在調用舊服務的時候採用異步模式,防止同步阻塞的場景發生 標準化RPC則採用zbus的JSON協議方式序列化消息與zbus消息交換,固然也能夠私有的方式。
下面的子項目是多個語言平臺實現MQ代理的案例
接入zbus只須要遵循公開協議便可,目前已經支持的接入平臺包括
zbus協議能夠簡單描述爲擴展HTTP協議,協議總體格式是HTTP格式,由於HTTP協議的普遍應用,相對方便解釋與理解。但同時爲了下降HTTP協議頭部負載與業務數據獨立於zbus控制數據,zbus採用了HTTP擴展協議:
所以zbus協議描述就是HTTP擴展的KeyValue描述
命令控制 cmd
zbus接收到消息Message作何種動做,由cmd KV擴展決定,支持的賦值(Protocol.java 中定義)
public static final String Produce = "produce"; //生產消息命令
public static final String Consume = "consume"; //消費消息命令
public static final String Route = "route"; //路由回發送者命令
public static final String QueryMQ = "query_mq"; //查詢消息隊列信息
public static final String CreateMQ = "create_mq"; //建立消息隊列
public static final String RemoveMQ = "remove_mq"; //刪除消息隊列
public static final String AddKey = "add_key"; //增長一個Key,用於斷定某條消息是否重複,zbus簡單的KV服務
public static final String RemoveKey = "remove_key";//刪除一個Key
//下面的命令是監控中使用到,test測試,data返回監控數據,jquery監控使用到的jquery.js
public static final String Auth = "auth";
public static final String Test = "test";
public static final String Data = "data";
public static final String Jquery = "jquery";
每一個命令可能用到參數Key說明(Message.java)
public static final String MQ = "mq"; //消息隊列標識
public static final String SENDER = "sender"; //消息發送者標識
public static final String RECVER = "recver"; //消息接收者標識
public static final String ID = "id"; //消息ID
public static final String RAWID = "rawid"; //原始消息ID(消費消息時交換中用到)
public static final String SERVER = "server"; //消息通過的broker地址
public static final String TOPIC = "topic"; //消息發佈訂閱主題, 使用,分隔
public static final String ACK = "ack"; //消息ACK
public static final String ENCODING = "encoding"; //消息body二進制編碼
public static final String KEY = "key"; //消息的KEY
public static final String KEY_GROUP = "key_group"; //消息的KEY分組
public static final String MASTER_MQ = "master_mq"; //消息隊列主從複製的主隊列標識
public static final String MASTER_TOKEN = "master_token"; //主隊列訪問控制碼
具體每一個命令對應使用到的參數,請參考MqAdaptor中對應每一個命令的Handler
public class MqAdaptor extends IoAdaptor implements Closeable {
public MqAdaptor(MqServer mqServer){
....
registerHandler(Protocol.Produce, produceHandler);
registerHandler(Protocol.Consume, consumeHandler);
registerHandler(Protocol.Route, routeHandler);
registerHandler(Protocol.CreateMQ, createMqHandler);
registerHandler(Protocol.QueryMQ, queryMqHandler);
registerHandler(Protocol.RemoveMQ, removeMqHandler);
registerHandler(Protocol.AddKey, addKeyHandler);
registerHandler(Protocol.RemoveKey, removeKeyHandler);
registerHandler("", homeHandler);
registerHandler(Protocol.Data, dataHandler);
registerHandler(Protocol.Jquery, jqueryHandler);
registerHandler(Protocol.Test, testHandler);
registerHandler(Message.HEARTBEAT, heartbeatHandler);
}
}
zbus底層通訊基礎並無採用netty這樣的NIO框架,而是基於JAVA NIO作了一個簡單的封裝,儘管沒有使用到netty的大量開箱即用的功能,可是zbus也在通訊基礎上獲取了些咱們認爲更加劇要的東西: 1. 徹底自主個性化的網絡事件模型 2. 輕量級通訊底層
zbus的網絡通信部分核心在org.zbus.net.core包中,org.zbus.net.http 提供了一個輕量級的HTTP擴展實現。
zbus的NIO通訊模型的封裝很是簡單:
1. 網絡事件模型是由SelectorThread來完成,核心就是run方法中的多路複用檢測網絡IO事件
2. 在各個事件處理中(READ/WRITE/CONNECT/ACCEPT)中核心產生了Session處理
3. 事件處理公開機制靠IoAdaptor完成
4. 最外面由SelectorGroup完成多個SelectorThread的負載均衡與簡單管理,提升總體性能
上面的描述也是解讀代碼的前後順序
zbus在net.core包設計的基礎之上,爲了方便使用方構建客戶端與服務器端程序,提供了Client、Server的基本封裝,同步異步處理Sync方便消息的同步異步轉換。
Client本質上就一個IoAdaptor應用案例,專門從鏈接客戶端角度處理網絡各項事件。 Server則提供了一個簡單機制,運行可被個性化的IoAdaptor實例。
Server端示例(簡潔性的體現)
//藉助HTTP協議實現中的MessageAdaptor完成HTTP服務器,只須要簡單的
public static void main(String[] args) throws Exception {
//1) SelectorGroup管理
final SelectorGroup group = new SelectorGroup();
final Server server = new Server(group);
//2)構建一個MessageAdaptor
MessageAdaptor ioAdaptor = new MessageAdaptor();
ioAdaptor.uri("/hello", new MessageProcessor() {
public Message process(Message request) {
Message resp = new Message();
resp.setStatus(200);
resp.setBody("hello");
return resp;
}
});
//3)在8080端口上啓動這個IoAdaptor服務
server.start(8080, ioAdaptor);
}
運行則直接能夠通通瀏覽器訪問 http://localhost:8080/hello
這個示例並非簡單的hello world,SelectorGroup使之具有高性能服務框架,在i7 CPU的box上能上10w+的QPS性能
具體請詳細參考examples下面的net示例
zbus高可用採用相似zookeeper的跟蹤機制,但並無使用zookeeper。
zbus高可用由兩大節點羣組成: 1. ZbusServer節點羣 2. TrackServer節點羣
ZbusServer節點羣由單機版本的zbus組成,各個節點之間無狀態關聯,TrackServer節點羣中各個節點也無任何狀態關聯,ZbusServer把節點狀態(諸如MQ信息)上報給全部的TrackServer。
這裏面信息的一致性zbus是作了妥協的,能夠理解爲zookeeper一種簡化,典型配置是TrackServer全網配置兩臺,全部的ZbusServer都向這兩臺TrackServer上報各自的節點變化信息,包括某個節點MQ信息即時變化推送,定時(默認2s)重複更新。實用角度簡化設計的一種折中。
客戶端(生產者與消費者)仍然是直接連接到某個ZbusServer,可是選擇節點由訂閱TrackServer而給出的總體ZbusServer的節點拓撲信息決定,客戶端同時作了容錯處理,運行中全部的TrackServer失敗不影響已有拓撲信息的實用(本地緩存)
上述客戶端的複雜性由HaBroker封裝(最終由ZbusBroker統一類型選擇),API層面不受高可用選擇影響與單點zbus場景保持一致,同時高可用節點選擇算法也將陸續開放,方便二次開發個性化。
高可用HA環境的搭建 由於zbus HA方案中的節點無任何狀態聯繫,所以HA環境搭建很是簡單,各個節點啓動無順序依賴,通常順序爲:
整個HA的Broker環境就創建好了,例子能夠參考zbus-dist/ha 目錄下的配置啓動,注意若是不按照先TrackServer啓動的順序,先啓動ZbusServer會臨時報沒法找到某個TrackServer錯誤,直到TrackServer啓動正常,錯誤不影響使用。
HA最佳實踐指導:
上述HaBroker與SingleBroker都統一使用ZbusBroker,只是BrokerAddress的地址配置差別。
性能測試程序在test/performance目錄下,根據實際的機器測試給出。
一個參考數據,測試環境
MacBook Pro (Retina, 15-inch, Mid 2015)
Processor 2.5 GHz Intel Core i7
Memory 16 GB 1600 MHz DDR3
消息大小 "hello world"
測試代碼: org.zbus.performance.ProducerPerf.java
2016-03-16 14:50:08 INFO Perf:73 - QPS: 42844.4874, Failed/Total=0/2660020(0.0000)
2016-03-16 14:50:09 INFO Perf:73 - QPS: 42845.4515, Failed/Total=0/2670011(0.0000)
2016-03-16 14:50:09 INFO Perf:73 - QPS: 42842.2988, Failed/Total=0/2680012(0.0000)
2016-03-16 14:50:09 INFO Perf:73 - QPS: 42841.8991, Failed/Total=0/2690011(0.0000)
2016-03-16 14:50:09 INFO Perf:73 - QPS: 42838.7834, Failed/Total=0/2700020(0.0000)
2016-03-16 14:50:10 INFO Perf:73 - QPS: 42840.4312, Failed/Total=0/2710016(0.0000)
2016-03-16 14:50:10 INFO Perf:73 - QPS: 42840.0428, Failed/Total=0/2720007(0.0000)
測試代碼: org.zbus.performance.ConsumerPerf.java
2016-03-16 14:57:13 INFO ConsumerPerf:47 - Consumed:150000, QPS: 43290.0433
2016-03-16 14:57:13 INFO ConsumerPerf:47 - Consumed:160000, QPS: 43859.6491
2016-03-16 14:57:14 INFO ConsumerPerf:47 - Consumed:170000, QPS: 43859.6491
2016-03-16 14:57:14 INFO ConsumerPerf:47 - Consumed:180000, QPS: 43290.0433
2016-03-16 14:57:14 INFO ConsumerPerf:47 - Consumed:190000, QPS: 43859.6491
2016-03-16 14:57:14 INFO ConsumerPerf:47 - Consumed:200000, QPS: 44247.7876
2016-03-16 14:57:15 INFO ConsumerPerf:47 - Consumed:210000, QPS: 43668.1223
2016-03-16 14:57:15 INFO ConsumerPerf:47 - Consumed:220000, QPS: 44843.0493
2016-03-16 14:57:15 INFO ConsumerPerf:47 - Consumed:230000, QPS: 44843.0493
測試代碼,服務器: org.zbus.examples.net.server.MyServer
壓力程序: ab -k -c 20 -n 1000000 http://localhost:8080/hello
Server Hostname: localhost
Server Port: 8080
Document Path: /hello
Document Length: 5 bytes
Concurrency Level: 20
Time taken for tests: 15.073 seconds
Complete requests: 1000000
Failed requests: 0
Keep-Alive requests: 1000000
Total transferred: 67000000 bytes
HTML transferred: 5000000 bytes
Requests per second: 66344.44 [#/sec] (mean)
Time per request: 0.301 [ms] (mean)
Time per request: 0.015 [ms] (mean, across all concurrent requests)
Transfer rate: 4340.90 [Kbytes/sec] received