本文首發於泊浮目的簡書專欄: https://www.jianshu.com/nb/21...
不管是事件和消息驅動,都是解耦的有力手段之一。ZStack做爲一個大型軟件項目,也使用了這些方案對整個架構進行了解耦。前端
EventFacade是一個頗有意思的組件,由於它幾乎是自舉的。這就意味着有興趣的朋友能夠copy and paste,而後稍做修改就能夠在本身的項目裏工做起來了。java
在ZStack的repo中,一樣提供了相應的case:node
package org.zstack.test.core.cloudbus; /** * Created with IntelliJ IDEA. * User: frank * Time: 12:38 AM * To change this template use File | Settings | File Templates. */ public class TestCanonicalEvent { CLogger logger = Utils.getLogger(TestCanonicalEvent.class); ComponentLoader loader; EventFacade evtf; boolean success; @Before public void setUp() throws Exception { BeanConstructor con = new BeanConstructor(); loader = con.build(); evtf = loader.getComponent(EventFacade.class); ((EventFacadeImpl) evtf).start(); } @Test public void test() throws InterruptedException { String path = "/test/event"; evtf.on(path, new EventRunnable() { @Override public void run() { success = true; } }); evtf.fire(path, null); TimeUnit.SECONDS.sleep(1); Assert.assertTrue(success); } }
使用方法很是簡單,先註冊一個路徑用於接收事件,而後沿着該路徑發送一個事件,該事件註冊的函數則會被調用。git
package org.zstack.core.cloudbus; import java.util.Map; /** * Created with IntelliJ IDEA. * User: frank * Time: 11:29 PM * To change this template use File | Settings | File Templates. */ public interface EventFacade { void on(String path, AutoOffEventCallback cb); void on(String path, EventCallback cb); void on(String path, EventRunnable runnable); void off(AbstractEventFacadeCallback cb); void onLocal(String path, AutoOffEventCallback cb); void onLocal(String path, EventCallback cb); void onLocal(String path, EventRunnable runnable); void fire(String path, Object data); boolean isFromThisManagementNode(Map tokens); String META_DATA_MANAGEMENT_NODE_ID = "metadata::managementNodeId"; String META_DATA_PATH = "metadata::path"; String WEBHOOK_TYPE = "CanonicalEvent"; }
@Override public void on(String path, AutoOffEventCallback cb) { global.put(cb.uniqueIdentity, new CallbackWrapper(path, cb)); } @Override public void on(String path, final EventCallback cb) { global.put(cb.uniqueIdentity, new CallbackWrapper(path, cb)); } @Override public void on(String path, EventRunnable cb) { global.put(cb.uniqueIdentity, new CallbackWrapper(path, cb)); }
on方法僅僅是將一個屬於EventRunnable
的uuid做爲key,並將Callback做爲value放入global這個map中。爲何要這麼作呢?由於在Map的key是不可重複的,存path確定是不妥的。github
EventFacadeImpl的方法簽名以及成員變量:web
public class EventFacadeImpl implements EventFacade, CloudBusEventListener, Component, GlobalApiMessageInterceptor { @Autowired private CloudBus bus; private final Map<String, CallbackWrapper> global = Collections.synchronizedMap(new HashMap<>()); private final Map<String, CallbackWrapper> local = new ConcurrentHashMap<>(); private EventSubscriberReceipt unsubscriber;
相對的fire
方法:spring
@Override public void fire(String path, Object data) { assert path != null; CanonicalEvent evt = new CanonicalEvent(); evt.setPath(path); evt.setManagementNodeId(Platform.getManagementServerId()); if (data != null) { /* if (!TypeUtils.isPrimitiveOrWrapper(data.getClass()) && !data.getClass().isAnnotationPresent(NeedJsonSchema.class)) { throw new CloudRuntimeException(String.format("data[%s] passed to canonical event is not annotated by @NeedJsonSchema", data.getClass().getName())); } */ evt.setContent(data); } //從local這個map中找到對應的event並調用 fireLocal(evt); //將事件發送給對應的webhook callWebhooks(evt); //經過cloudBus發送事件,關於cloudBus的源碼以後會講到 bus.publish(evt); }
在上面的分析中並無看到global的event是如何被觸發的,若是想徹底瞭解其中的過程,還得從CloudBus提及,咱們稍後就會提到它。可是已經能夠猜到爲什麼要區分on和onLocal了。一個是經過消息總線觸發,一個是在當前JVM進程內觸發——這意味着一個支持ManagerNode集羣事件,一個只支持單個MN事件。這也是來自於ZStack
的業務場景——有些事情須要MN一塊兒作,有些事情一個MN作了其餘MN就不用作了。介於篇幅,有興趣的讀者能夠自行翻看代碼,這裏再也不詳舉。json
WebHook是ZStack向前端主動通訊的手段之一。在註冊了相應EventPath後,該path被調用後則會向相應的URL發送content。從case——CanonicalEventWebhookCase
和WebhookCase
能夠看到它的正確使用姿式。segmentfault
class CanonicalEventWebhookCase extends SubCase { EnvSpec envSpec @Override void clean() { envSpec.delete() } @Override void setup() { INCLUDE_CORE_SERVICES = false spring { include("webhook.xml") } } String WEBHOOK_PATH = "/canonical-event-webhook" void testErrorToCreateWebhookifOpaqueFieldMissing() { expect(AssertionError.class) { createWebhook { name = "webhook1" url = "http://127.0.0.1:8989$WEBHOOK_PATH" type = EventFacade.WEBHOOK_TYPE } } } void testCanonicalEventWithVariableInPath() { String path = "/test/{uuid}/event" int count = 0 WebhookInventory hook1 = createWebhook { name = "webhook1" url = "http://127.0.0.1:8989$WEBHOOK_PATH" type = EventFacade.WEBHOOK_TYPE opaque = path } // this webhook will not be called because path unmatching WebhookInventory hook2 = createWebhook { name = "webhook1" url = "http://127.0.0.1:8989$WEBHOOK_PATH" type = EventFacade.WEBHOOK_TYPE opaque = "/this-path-does-not-match" } CanonicalEvent evt envSpec.simulator(WEBHOOK_PATH) { HttpEntity<String> e -> evt = json(e.getBody(), CanonicalEvent.class) count ++ return [:] } String content = "hello world" String eventPath = "/test/${Platform.uuid}/event" bean(EventFacade.class).fire(eventPath, content) retryInSecs { assert count == 1 assert evt != null assert evt.path == eventPath assert evt.content == content assert evt.managementNodeId == Platform.getManagementServerId() } } void testCanonicalEventUseWebhook() { String path = "/test/event" WebhookInventory hook1 = createWebhook { name = "webhook1" url = "http://127.0.0.1:8989$WEBHOOK_PATH" type = EventFacade.WEBHOOK_TYPE opaque = path } WebhookInventory hook2 = createWebhook { name = "webhook2" url = "http://127.0.0.1:8989$WEBHOOK_PATH" type = EventFacade.WEBHOOK_TYPE opaque = path } def testFireTwoEvents = { List<CanonicalEvent> evts = [] envSpec.simulator(WEBHOOK_PATH) { HttpEntity<String> e -> CanonicalEvent evt = json(e.getBody(), CanonicalEvent.class) evts.add(evt) return [:] } String content = "hello world" bean(EventFacade.class).fire(path, content) retryInSecs { assert evts.size() == 2 CanonicalEvent evt1 = evts[0] CanonicalEvent evt2 = evts[1] assert evt1.path == path assert evt1.content == content assert evt1.managementNodeId == Platform.getManagementServerId() assert evt2.path == path assert evt2.content == content assert evt2.managementNodeId == Platform.getManagementServerId() } } def testOneEventsGetAfterDeleteOneHook = { deleteWebhook { uuid = hook1.uuid } List<CanonicalEvent> evts = [] envSpec.simulator(WEBHOOK_PATH) { HttpEntity<String> e -> CanonicalEvent evt = json(e.getBody(), CanonicalEvent.class) evts.add(evt) return [:] } String content = "hello world" bean(EventFacade.class).fire(path, content) retryInSecs { assert evts.size() == 1 } } def testNoEventGetAfterDeleteAllHooks = { deleteWebhook { uuid = hook2.uuid } List<CanonicalEvent> evts = [] envSpec.simulator(WEBHOOK_PATH) { HttpEntity<String> e -> CanonicalEvent evt = json(e.getBody(), CanonicalEvent.class) evts.add(evt) return [:] } String content = "hello world" bean(EventFacade.class).fire(path, content) retryInSecs { assert evts.size() == 0 } } testFireTwoEvents() testOneEventsGetAfterDeleteOneHook() testNoEventGetAfterDeleteAllHooks() } @Override void environment() { envSpec = env { // nothing } } @Override void test() { envSpec.create { testCanonicalEventUseWebhook() testCanonicalEventWithVariableInPath() testErrorToCreateWebhookifOpaqueFieldMissing() } } }
class WebhookCase extends SubCase { EnvSpec envSpec @Override void clean() { envSpec.delete() } @Override void setup() { INCLUDE_CORE_SERVICES = false spring { include("webhook.xml") } } @Override void environment() { envSpec = env { // nothing } } void testWebhooksCRUD() { WebhookInventory hook = null def testCreateWebhook = { def params = null hook = createWebhook { name = "webhook" type = "custom-type" url = "http://127.0.0.1:8080/webhook" description = "desc" opaque = "test data" params = delegate } assert dbIsExists(hook.uuid, WebhookVO.class) assert hook.name == params.name assert hook.type == params.type assert hook.url == params.url assert hook.description == params.description assert hook.opaque == params.opaque } def testQueryWebhook = { List<WebhookInventory> invs = queryWebhook { conditions = ["name=${hook.name}"] } assert invs.size() == 1 assert invs[0].uuid == hook.uuid } def testDeleteWebhook = { deleteWebhook { uuid = hook.uuid } assert !dbIsExists(hook.uuid, WebhookVO.class) } testCreateWebhook() testQueryWebhook() testDeleteWebhook() } void testInvalidUrl() { expect(AssertionError.class) { createWebhook { name = "webhook" type = "custom-type" url = "this is not a url" description = "desc" opaque = "test data" } } } @Override void test() { envSpec.create { testWebhooksCRUD() testInvalidUrl() } } }
CloudBus能夠說是ZStack中最重要的組件了,ZStack各個模塊的通訊所有是由Message來完成的,而CloudBus就是它們的通訊媒介,接下來咱們來看它的源碼。api
本節適合對AMQP有必定了解同窗,若是不瞭解能夠先看個人博客 MQ學習小記
package org.zstack.test.core.cloudbus; import junit.framework.Assert; import org.junit.Before; import org.junit.Test; import org.zstack.core.cloudbus.CloudBusIN; import org.zstack.core.componentloader.ComponentLoader; import org.zstack.header.AbstractService; import org.zstack.header.Service; import org.zstack.header.message.Message; import org.zstack.header.message.MessageReply; import org.zstack.header.message.NeedReplyMessage; import org.zstack.test.BeanConstructor; import org.zstack.utils.Utils; import org.zstack.utils.logging.CLogger; import java.util.concurrent.TimeUnit; public class TestCloudBusCall { CLogger logger = Utils.getLogger(TestCloudBusCall.class); ComponentLoader loader; CloudBusIN bus; Service serv; public static class HelloWorldMsg extends NeedReplyMessage { private String greet; public String getGreet() { return greet; } public void setGreet(String greet) { this.greet = greet; } } public static class HelloWorldReply extends MessageReply { private String greet; public String getGreet() { return greet; } public void setGreet(String greet) { this.greet = greet; } } class FakeService extends AbstractService { @Override public boolean start() { bus.registerService(this); bus.activeService(this); return true; } @Override public boolean stop() { bus.deActiveService(this); bus.unregisterService(this); return true; } @Override public void handleMessage(Message msg) { if (msg.getClass() == HelloWorldMsg.class) { HelloWorldMsg hmsg = (HelloWorldMsg) msg; HelloWorldReply r = new HelloWorldReply(); r.setGreet(hmsg.getGreet()); bus.reply(msg, r); } } @Override public String getId() { return this.getClass().getCanonicalName(); } } @Before public void setUp() throws Exception { BeanConstructor con = new BeanConstructor(); loader = con.build(); bus = loader.getComponent(CloudBusIN.class); serv = new FakeService(); serv.start(); } @Test public void test() throws InterruptedException, ClassNotFoundException { HelloWorldMsg msg = new HelloWorldMsg(); msg.setGreet("Hello"); msg.setServiceId(FakeService.class.getCanonicalName()); msg.setTimeout(TimeUnit.SECONDS.toMillis(10)); HelloWorldReply r = (HelloWorldReply) bus.call(msg); serv.stop(); Assert.assertEquals("Hello", r.getGreet()); } } 咱們註冊了一個Service,並覆寫HandleMessage方法,在Case中,咱們成功收到了消息並經過了斷言。 再看一個:
package org.zstack.test.core.cloudbus;
import junit.framework.Assert;
import org.junit.Before;
import org.junit.Test;
import org.zstack.core.cloudbus.CloudBusCallBack;
import org.zstack.core.cloudbus.CloudBusIN;
import org.zstack.core.componentloader.ComponentLoader;
import org.zstack.header.AbstractService;
import org.zstack.header.Service;
import org.zstack.header.message.Message;
import org.zstack.header.message.MessageReply;
import org.zstack.header.message.NeedReplyMessage;
import org.zstack.test.BeanConstructor;
import org.zstack.utils.Utils;
import org.zstack.utils.logging.CLogger;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
public class TestCloudBusSendCallback {
CLogger logger = Utils.getLogger(TestCloudBusSendCallback.class); ComponentLoader loader; CloudBusIN bus; CountDownLatch latch = new CountDownLatch(1); boolean isSuccess = false; Service serv; public static class HelloWorldMsg extends NeedReplyMessage { private String greet; public String getGreet() { return greet; } public void setGreet(String greet) { this.greet = greet; } } public static class HelloWorldReply extends MessageReply { private String greet; public String getGreet() { return greet; } public void setGreet(String greet) { this.greet = greet; } } class FakeService extends AbstractService { @Override public boolean start() { bus.registerService(this); bus.activeService(this); return true; } @Override public boolean stop() { bus.deActiveService(this); bus.unregisterService(this); return true; } @Override public void handleMessage(Message msg) { if (msg.getClass() == HelloWorldMsg.class) { HelloWorldMsg hmsg = (HelloWorldMsg) msg; HelloWorldReply r = new HelloWorldReply(); r.setGreet(hmsg.getGreet()); bus.reply(msg, r); } } @Override public String getId() { return this.getClass().getCanonicalName(); } } @Before public void setUp() throws Exception { BeanConstructor con = new BeanConstructor(); loader = con.build(); bus = loader.getComponent(CloudBusIN.class); serv = new FakeService(); serv.start(); } @Test public void test() throws InterruptedException, ClassNotFoundException { HelloWorldMsg msg = new HelloWorldMsg(); msg.setGreet("Hello"); msg.setServiceId(FakeService.class.getCanonicalName()); msg.setTimeout(TimeUnit.SECONDS.toMillis(10)); bus.send(msg, new CloudBusCallBack(null) { @Override public void run(MessageReply reply) { if (reply instanceof HelloWorldReply) { HelloWorldReply hr = (HelloWorldReply) reply; if ("Hello".equals(hr.getGreet())) { isSuccess = true; } } latch.countDown(); } }); latch.await(15, TimeUnit.SECONDS); serv.stop(); Assert.assertEquals(true, isSuccess); }
}
一樣也是註冊了一個Service,而後使用了CallBack,若是運行一下發現斷言是能夠經過的——意味着CallBack有被調用。 綜上,使用CloudBus很簡單——只須要註冊你的Service,使用CloudBus指定Service發送,Service就能收到,若是你須要註冊你的CallBack,也能很簡單完成。
這麼好用的東西,內部實現恐怕不會太簡單。咱們先從接口開始看:
package org.zstack.core.cloudbus; import org.zstack.header.Component; import org.zstack.header.Service; import org.zstack.header.errorcode.ErrorCode; import org.zstack.header.exception.CloudConfigureFailException; import org.zstack.header.message.*; import java.util.List; public interface CloudBus extends Component { void send(Message msg); <T extends Message> void send(List<T> msgs); void send(NeedReplyMessage msg, CloudBusCallBack callback); @Deprecated void send(List<? extends NeedReplyMessage> msgs, CloudBusListCallBack callBack); @Deprecated void send(List<? extends NeedReplyMessage> msgs, int parallelLevel, CloudBusListCallBack callBack); @Deprecated void send(List<? extends NeedReplyMessage> msgs, int parallelLevel, CloudBusSteppingCallback callback); void route(List<Message> msgs); void route(Message msg); void reply(Message request, MessageReply reply); void publish(List<Event> events); void publish(Event event); MessageReply call(NeedReplyMessage msg); <T extends NeedReplyMessage> List<MessageReply> call(List<T> msg); void registerService(Service serv) throws CloudConfigureFailException; void unregisterService(Service serv); EventSubscriberReceipt subscribeEvent(CloudBusEventListener listener, Event...events); void dealWithUnknownMessage(Message msg); void replyErrorByMessageType(Message msg, Exception e); void replyErrorByMessageType(Message msg, String err); void replyErrorByMessageType(Message msg, ErrorCode err); void logExceptionWithMessageDump(Message msg, Throwable e); String makeLocalServiceId(String serviceId); void makeLocalServiceId(Message msg, String serviceId); String makeServiceIdByManagementNodeId(String serviceId, String managementNodeId); void makeServiceIdByManagementNodeId(Message msg, String serviceId, String managementNodeId); String makeTargetServiceIdByResourceUuid(String serviceId, String resourceUuid); void makeTargetServiceIdByResourceUuid(Message msg, String serviceId, String resourceUuid); void installBeforeDeliveryMessageInterceptor(BeforeDeliveryMessageInterceptor interceptor, Class<? extends Message>...classes); void installBeforeSendMessageInterceptor(BeforeSendMessageInterceptor interceptor, Class<? extends Message>...classes); void installBeforePublishEventInterceptor(BeforePublishEventInterceptor interceptor, Class<? extends Event>...classes); }
接口的命名語義較爲清晰,在這裏很少作解釋。開始咱們的源碼閱讀之旅。
init是在bean處於加載器,Spring提供的一個鉤子。在xml中咱們能夠看到聲明:
<?xml version="1.0" encoding="UTF-8"?> <beans xmlns="http://www.springframework.org/schema/beans" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xmlns:aop="http://www.springframework.org/schema/aop" xmlns:tx="http://www.springframework.org/schema/tx" xmlns:zstack="http://zstack.org/schema/zstack" xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans-3.0.xsd http://www.springframework.org/schema/aop http://www.springframework.org/schema/aop/spring-aop-3.0.xsd http://www.springframework.org/schema/tx http://www.springframework.org/schema/tx/spring-tx-3.0.xsd http://zstack.org/schema/zstack http://zstack.org/schema/zstack/plugin.xsd" default-init-method="init" default-destroy-method="destroy"> <bean id="TimeoutManager" class="org.zstack.core.timeout.ApiTimeoutManagerImpl" /> <bean id="CloudBus" class = "org.zstack.core.cloudbus.CloudBusImpl2" depends-on="ThreadFacade,ThreadAspectj"> <zstack:plugin> <zstack:extension interface="org.zstack.header.managementnode.ManagementNodeChangeListener" order="9999"/> </zstack:plugin> </bean> <bean id="EventFacade" class = "org.zstack.core.cloudbus.EventFacadeImpl"> <zstack:plugin> <zstack:extension interface="org.zstack.header.Component" /> <zstack:extension interface="org.zstack.header.apimediator.GlobalApiMessageInterceptor" /> </zstack:plugin> </bean> <bean id="ResourceDestinationMaker" class="org.zstack.core.cloudbus.ResourceDestinationMakerImpl" /> <bean id="MessageIntegrityChecker" class="org.zstack.core.cloudbus.MessageIntegrityChecker"> <zstack:plugin> <zstack:extension interface="org.zstack.header.Component" /> </zstack:plugin> </bean> </beans>
init方法:
void init() { trackerClose = CloudBusGlobalProperty.CLOSE_TRACKER; serverIps = CloudBusGlobalProperty.SERVER_IPS; tracker = new MessageTracker(); ConnectionFactory connFactory = new ConnectionFactory(); List<Address> addresses = CollectionUtils.transformToList(serverIps, new Function<Address, String>() { @Override public Address call(String arg) { return Address.parseAddress(arg); } }); connFactory.setAutomaticRecoveryEnabled(true); connFactory.setRequestedHeartbeat(CloudBusGlobalProperty.RABBITMQ_HEART_BEAT_TIMEOUT); connFactory.setNetworkRecoveryInterval((int) TimeUnit.SECONDS.toMillis(CloudBusGlobalProperty.RABBITMQ_NETWORK_RECOVER_INTERVAL)); connFactory.setConnectionTimeout((int) TimeUnit.SECONDS.toMillis(CloudBusGlobalProperty.RABBITMQ_CONNECTION_TIMEOUT)); logger.info(String.format("use RabbitMQ server IPs: %s", serverIps)); try { if (CloudBusGlobalProperty.RABBITMQ_USERNAME != null) { connFactory.setUsername(CloudBusGlobalProperty.RABBITMQ_USERNAME); logger.info(String.format("use RabbitMQ username: %s", CloudBusGlobalProperty.RABBITMQ_USERNAME)); } if (CloudBusGlobalProperty.RABBITMQ_PASSWORD != null) { connFactory.setPassword(CloudBusGlobalProperty.RABBITMQ_PASSWORD); logger.info("use RabbitMQ password: ******"); } if (CloudBusGlobalProperty.RABBITMQ_VIRTUAL_HOST != null) { connFactory.setVirtualHost(CloudBusGlobalProperty.RABBITMQ_VIRTUAL_HOST); logger.info(String.format("use RabbitMQ virtual host: %s", CloudBusGlobalProperty.RABBITMQ_VIRTUAL_HOST)); } conn = connFactory.newConnection(addresses.toArray(new Address[]{})); logger.debug(String.format("rabbitmq connection is established on %s", conn.getAddress())); ((Recoverable)conn).addRecoveryListener(new RecoveryListener() { @Override public void handleRecovery(Recoverable recoverable) { logger.info(String.format("rabbitmq connection is recovering on %s", conn.getAddress().toString())); } }); channelPool = new ChannelPool(CloudBusGlobalProperty.CHANNEL_POOL_SIZE, conn); createExchanges(); outboundQueue = new BusQueue(makeMessageQueueName(SERVICE_ID), BusExchange.P2P); Channel chan = channelPool.acquire(); chan.queueDeclare(outboundQueue.getName(), false, false, true, queueArguments()); chan.basicConsume(outboundQueue.getName(), true, consumer); chan.queueBind(outboundQueue.getName(), outboundQueue.getBusExchange().toString(), outboundQueue.getBindingKey()); channelPool.returnChannel(chan); maid.construct(); noRouteEndPoint.construct(); tracker.construct(); tracker.trackService(SERVICE_ID); } catch (Exception e) { throw new CloudRuntimeException(e); } }
簡單來講,該函數嘗試獲取配置文件中與RabbitMQ中相關的配置,並初始化Connection,並以此爲基礎建立了channel poll。而後將一個channel和一個messageQueue綁定在了一塊兒。同時構造了EventMaid和noRouteEndPoint和tracker,後兩者都是Message的消費者,看名字就能夠看出來,一個用於訂閱/發佈模型(綁定此交換器的隊列都會收到消息),一個用於track。
start則是ZStack定義的一個鉤子,當ManagerNode起來的時候,start會被調用到。
@Override public boolean start() { populateExtension(); prepareStatistics(); for (Service serv : services) { assert serv.getId() != null : String.format("service id can not be null[%s]", serv.getClass().getName()); registerService(serv); } jmxf.registerBean("CloudBus", this); return true; }
一個個看:
private void populateExtension() { services = pluginRgty.getExtensionList(Service.class); for (ReplyMessagePreSendingExtensionPoint extp : pluginRgty.getExtensionList(ReplyMessagePreSendingExtensionPoint.class)) { List<Class> clazzs = extp.getReplyMessageClassForPreSendingExtensionPoint(); if (clazzs == null || clazzs.isEmpty()) { continue; } for (Class clz : clazzs) { if (!(APIEvent.class.isAssignableFrom(clz)) && !(MessageReply.class.isAssignableFrom(clz))) { throw new CloudRuntimeException(String.format("ReplyMessagePreSendingExtensionPoint can only marshal APIEvent or MessageReply. %s claimed by %s is neither APIEvent nor MessageReply", clz.getName(), extp.getClass().getName())); } List<ReplyMessagePreSendingExtensionPoint> exts = replyMessageMarshaller.get(clz); if (exts == null) { exts = new ArrayList<ReplyMessagePreSendingExtensionPoint>(); replyMessageMarshaller.put(clz, exts); } exts.add(extp); } } }
首先收集了全部繼承於Service的類,而後加載會改變msg reply的extensionPoint。
private void prepareStatistics() { List<Class> needReplyMsgs = BeanUtils.scanClassByType("org.zstack", NeedReplyMessage.class); needReplyMsgs = CollectionUtils.transformToList(needReplyMsgs, new Function<Class, Class>() { @Override public Class call(Class arg) { return !APIMessage.class.isAssignableFrom(arg) || APISyncCallMessage.class.isAssignableFrom(arg) ? arg : null; } }); for (Class clz : needReplyMsgs) { MessageStatistic stat = new MessageStatistic(); stat.setMessageClassName(clz.getName()); statistics.put(stat.getMessageClassName(), stat); } }
爲須要回覆的msg設置統計信息。
以後就是把全部的Service收集起來,方便Msg的分發。
@Override public String makeLocalServiceId(String serviceId) { return serviceId + "." + Platform.getManagementServerId(); } @Override public void makeLocalServiceId(Message msg, String serviceId) { msg.setServiceId(makeLocalServiceId(serviceId)); }
如ZStack的伸縮性祕密武器:無狀態服務中所說通常,每一個管理節點都會註冊一堆服務隊列。所以咱們要按照其格式組裝,這樣消息才能被服務所接收。
@Override public String makeTargetServiceIdByResourceUuid(String serviceId, String resourceUuid) { DebugUtils.Assert(serviceId!=null, "serviceId cannot be null"); DebugUtils.Assert(resourceUuid!=null, "resourceUuid cannot be null"); //獲得資源所在的MN UUID String mgmtUuid = destMaker.makeDestination(resourceUuid); return serviceId + "." + mgmtUuid; } @Override public void makeTargetServiceIdByResourceUuid(Message msg, String serviceId, String resourceUuid) { String targetService = makeTargetServiceIdByResourceUuid(serviceId, resourceUuid); msg.setServiceId(targetService); }
在ZStack中,ManagerNode頗有多是集羣部署的,每一個MN管控不一樣的資源。那麼就須要一致性哈希環來肯定資源所在哪一個MN。
@Override public void send(final NeedReplyMessage msg, final CloudBusCallBack callback) { //給msg一個超時時間 evaluateMessageTimeout(msg); //new繼承於Envelope的匿名內部類 Envelope e = new Envelope() { //用來判斷這個msg是否已經發出去了 AtomicBoolean called = new AtomicBoolean(false); final Envelope self = this; //計算超時,往線程池提交一個任務 TimeoutTaskReceipt timeoutTaskReceipt = thdf.submitTimeoutTask(new Runnable() { @Override public void run() { self.timeout(); } }, TimeUnit.MILLISECONDS, msg.getTimeout()); @Override //msg 發送成功時候調用這個方法 public void ack(MessageReply reply) { //計算該msg耗時 count(msg); //根據msg的惟一UUID移除在這個map中的記錄 envelopes.remove(msg.getId()); //若是更新失敗,說明這個消息已經被髮送過了。返回 if (!called.compareAndSet(false, true)) { return; } //取消一個計算超時的任務 timeoutTaskReceipt.cancel(); //調用註冊的callback callback.run(reply); } //消息超時時調用的邏輯 @Override public void timeout() { // 根據msg的惟一UUID移除在這個map中的記錄 envelopes.remove(msg.getId()); // 如何已經被調用過則返回 if (!called.compareAndSet(false, true)) { return; } //內部構造一個超時reply返回給callback callback.run(createTimeoutReply(msg)); } //用於getWaitingReplyMessageStatistic @Override List<Message> getRequests() { List<Message> requests = new ArrayList<Message>(); requests.add(msg); return requests; } }; //往envelopes這個map裏放入msg的惟一UUID和剛剛構造的envelope envelopes.put(msg.getId(), e); //發送消息 send(msg, false); }
private void send(Message msg, Boolean noNeedReply) { //msg的serviceID不容許爲空,否則不能 if (msg.getServiceId() == null) { throw new IllegalArgumentException(String.format("service id cannot be null: %s", msg.getClass().getName())); } //爲msg構建基本屬性 basicProperty(msg); //設置msg header屬性 msg.putHeaderEntry(CORRELATION_ID, msg.getId()); //消息的回覆隊列設置 msg.putHeaderEntry(REPLY_TO, outboundQueue.getBindingKey()); if (msg instanceof APIMessage) { // API always need reply msg.putHeaderEntry(NO_NEED_REPLY_MSG, Boolean.FALSE.toString()); } else if (msg instanceof NeedReplyMessage) { // for NeedReplyMessage sent without requiring receiver to reply, // mark it, then it will not be tracked and replied msg.putHeaderEntry(NO_NEED_REPLY_MSG, noNeedReply.toString()); } buildRequestMessageMetaData(msg); wire.send(msg); }
該函數是一段公用邏輯。全部的消息都是從這裏進去而後由rabbitMQ發出去的。因此在這裏須要多說幾句。
protected void basicProperty(Message msg) { AMQP.BasicProperties.Builder builder = new AMQP.BasicProperties.Builder(); msg.setAMQPProperties(builder.deliveryMode(1).expiration(String.valueOf(TimeUnit.SECONDS.toMillis(CloudBusGlobalProperty.MESSAGE_TTL))).build()); }
這個函數設置了msg基礎屬性——持久化策略(否)和超時。
那麼再看buildRequestMessageMetaData
方法
private void buildRequestMessageMetaData(Message msg) { if (msg instanceof APIMessage || (msg instanceof NeedReplyMessage && !Boolean.valueOf((String)msg.getHeaderEntry(NO_NEED_REPLY_MSG)))) { RequestMessageMetaData metaData; if (msg instanceof LockResourceMessage) { LockResourceMessage lmsg = (LockResourceMessage) msg; LockMessageMetaData lmetaData = new LockMessageMetaData(); lmetaData.unlockKey = lmsg.getUnlockKey(); lmetaData.reason = lmsg.getReason(); lmetaData.senderManagementUuid = Platform.getManagementServerId(); metaData = lmetaData; } else { metaData = new RequestMessageMetaData(); } metaData.needApiEvent = msg instanceof APIMessage && !(msg instanceof APISyncCallMessage); metaData.msgId = msg.getId(); metaData.replyTo = msg.getHeaderEntry(REPLY_TO); metaData.timeout = msg instanceof NeedReplyMessage ? ((NeedReplyMessage) msg).getTimeout() : null; metaData.serviceId = msg.getServiceId(); metaData.messageName = msg.getClass().getName(); metaData.className = metaData.getClass().getName(); msg.getAMQPHeaders().put(MESSAGE_META_DATA, JSONObjectUtil.toJsonString(metaData)); } }
方法buildRequestMessageMetaData
將消息所需的metaData從msg裏取了出來並放入了msg的真正Header中。
而後是wire.send:
public void send(Message msg) { // for unit test finding invocation chain MessageCommandRecorder.record(msg.getClass()); List<BeforeSendMessageInterceptor> interceptors = beforeSendMessageInterceptors.get(msg.getClass()); if (interceptors != null) { for (BeforeSendMessageInterceptor interceptor : interceptors) { interceptor.intercept(msg); /* if (logger.isTraceEnabled()) { logger.trace(String.format("called %s for message[%s]", interceptor.getClass(), msg.getClass())); } */ } } for (BeforeSendMessageInterceptor interceptor : beforeSendMessageInterceptorsForAll) { interceptor.intercept(msg); /* if (logger.isTraceEnabled()) { logger.trace(String.format("called %s for message[%s]", interceptor.getClass(), msg.getClass())); } */ } send(msg, true); }
邏輯一目瞭然:
send(msg, true);
:
public void send(final Message msg, boolean makeQueueName) { /* StopWatch watch = new StopWatch(); watch.start(); */ String serviceId = msg.getServiceId(); if (makeQueueName) { //獲取真正的隊列名 serviceId = makeMessageQueueName(serviceId); } // build json schema buildSchema(msg); //當前的thread Context中獲取必要信息。每一個api調用所攜帶的uuid就是這樣傳遞下去的 evalThreadContextToMessage(msg); if (logger.isTraceEnabled() && logMessage(msg)) { logger.trace(String.format("[msg send]: %s", wire.dumpMessage(msg))); } //從channel poll 中取出一個channel Channel chan = channelPool.acquire(); try { //接下來單獨解釋 new RecoverableSend(chan, msg, serviceId, outboundQueue.getBusExchange()).send(); /* watch.stop(); logger.debug(String.mediaType("sending %s cost %sms", msg.getClass().getName(), watch.getTime())); */ } catch (IOException e) { throw new CloudRuntimeException(e); } finally { //返回給channel poll channelPool.returnChannel(chan); } }
單獨分析 new RecoverableSend(chan, msg, serviceId, outboundQueue.getBusExchange()).send();
:
private class RecoverableSend { Channel chan; byte[] data; String serviceId; Message msg; BusExchange exchange; RecoverableSend(Channel chan, Message msg, String serviceId, BusExchange exchange) throws IOException { data = compressMessageIfNeeded(msg); this.chan = chan; this.serviceId = serviceId; this.msg = msg; this.exchange = exchange; } void send() throws IOException { try { chan.basicPublish(exchange.toString(), serviceId, true, msg.getAMQPProperties(), data); } catch (ShutdownSignalException e) { if (!(conn instanceof AutorecoveringConnection) || serverIps.size() <= 1 || !Platform.IS_RUNNING) { // the connection is not recoverable throw e; } logger.warn(String.format("failed to send a message because %s; as the connection is recoverable," + "we are doing recoverable send right now", e.getMessage())); if (!recoverSend()) { throw e; } } } private byte[] compressMessageIfNeeded(Message msg) throws IOException { if (!CloudBusGlobalProperty.COMPRESS_NON_API_MESSAGE || msg instanceof APIEvent || msg instanceof APIMessage) { return gson.toJson(msg, Message.class).getBytes(); } msg.getAMQPHeaders().put(AMQP_PROPERTY_HEADER__COMPRESSED, "true"); return Compresser.deflate(gson.toJson(msg, Message.class).getBytes()); } private boolean recoverSend() throws IOException { int interval = conn.getHeartbeat() / 2; interval = interval > 0 ? interval : 1; int count = 0; // as the connection is lost, there is no need to wait heart beat missing 8 times // so we use reflection to fast the process RecoveryAwareAMQConnection delegate = FieldUtils.getFieldValue("delegate", conn); DebugUtils.Assert(delegate != null, "cannot get RecoveryAwareAMQConnection"); Field _missedHeartbeats = FieldUtils.getField("_missedHeartbeats", RecoveryAwareAMQConnection.class); DebugUtils.Assert(_missedHeartbeats!=null, "cannot find _missedHeartbeats"); _missedHeartbeats.setAccessible(true); try { _missedHeartbeats.set(delegate, 100); } catch (IllegalAccessException e) { throw new CloudRuntimeException(e); } while (count < CloudBusGlobalProperty.RABBITMQ_RECOVERABLE_SEND_TIMES) { try { TimeUnit.SECONDS.sleep(interval); } catch (InterruptedException e1) { logger.warn(e1.getMessage()); } try { chan.basicPublish(exchange.toString(), serviceId, true, msg.getAMQPProperties(), data); return true; } catch (ShutdownSignalException e) { logger.warn(String.format("recoverable send fails %s times, will continue to retry %s times; %s", count, CloudBusGlobalProperty.RABBITMQ_RECOVERABLE_SEND_TIMES-count, e.getMessage())); count ++; } } return false; } }
最核心的代碼便是:
chan.basicPublish(exchange.toString(), serviceId, true, msg.getAMQPProperties(), data);
根據交換器、綁定器的key和msg的基本屬性還有已經序列化的msg在RabbitMQ中發送消息。
咱們能夠看一下該方法簽名:
/** * Publish a message * @see com.rabbitmq.client.AMQP.Basic.Publish * @param exchange the exchange to publish the message to * @param routingKey the routing key * @param mandatory true if the 'mandatory' flag is to be set * @param props other properties for the message - routing headers etc * @param body the message body * @throws java.io.IOException if an error is encountered */ void basicPublish(String exchange, String routingKey, boolean mandatory, BasicProperties props, byte[] body) throws IOException;
當mandatory標誌位設置爲true時,若是exchange根據自身類型和消息routeKey沒法找到一個符合條件的queue,那麼會調用basic.return方法將消息返還給生產者;當mandatory設爲false時,出現上述情形broker會直接將消息扔掉。
還有一個附有immediate的方法:
/** * Publish a message * @see com.rabbitmq.client.AMQP.Basic.Publish * @param exchange the exchange to publish the message to * @param routingKey the routing key * @param mandatory true if the 'mandatory' flag is to be set * @param immediate true if the 'immediate' flag is to be * set. Note that the RabbitMQ server does not support this flag. * @param props other properties for the message - routing headers etc * @param body the message body * @throws java.io.IOException if an error is encountered */ void basicPublish(String exchange, String routingKey, boolean mandatory, boolean immediate, BasicProperties props, byte[] body) throws IOException;
當immediate標誌位設置爲true時,若是exchange在將消息route到queue(s)時發現對應的queue上沒有消費者,那麼這條消息不會放入隊列中。當與消息routeKey關聯的全部queue(一個或多個)都沒有消費者時,該消息會經過basic.return方法返還給生產者。
@Override public void reply(Message request, MessageReply reply) { if (Boolean.valueOf((String) request.getHeaderEntry(NO_NEED_REPLY_MSG))) { if (logger.isTraceEnabled()) { logger.trace(String.format("%s in message%s is set, drop reply%s", NO_NEED_REPLY_MSG, wire.dumpMessage(request), wire.dumpMessage(reply))); } return; } AMQP.BasicProperties.Builder builder = new AMQP.BasicProperties.Builder(); reply.setAMQPProperties(builder.deliveryMode(1).build()); reply.getHeaders().put(IS_MESSAGE_REPLY, Boolean.TRUE.toString()); reply.putHeaderEntry(CORRELATION_ID, request.getId()); reply.setServiceId((String) request.getHeaderEntry(REPLY_TO)); buildResponseMessageMetaData(reply); if (request instanceof NeedReplyMessage) { callReplyPreSendingExtensions(reply, (NeedReplyMessage) request); } wire.send(reply, false); }
其餘屬性以前都有提到。 reply.setServiceId((String) request.getHeaderEntry(REPLY_TO));
則是將reply統一通過outboundQueue
這個隊列,同時根據correlationId
返回給原發送者。
callReplyPreSendingExtensions
則會根據需求改變reply結果。以後就是wire.send,以前已經分析過了。
@Override public void publish(Event event) { if (event instanceof APIEvent) { APIEvent aevt = (APIEvent) event; DebugUtils.Assert(aevt.getApiId() != null, String.format("apiId of %s cannot be null", aevt.getClass().getName())); } //和前面的msgProperty同樣 eventProperty(event); //構建metaData buildResponseMessageMetaData(event); //前面分析過了 callReplyPreSendingExtensions(event, null); //調用beforeEventPublishInterceptors。爲了拋出異常的時候方便track,聲明瞭這樣的一個變量。 BeforePublishEventInterceptor c = null; try { List<BeforePublishEventInterceptor> is = beforeEventPublishInterceptors.get(event.getClass()); if (is != null) { for (BeforePublishEventInterceptor i : is) { c = i; i.beforePublishEvent(event); } } for (BeforePublishEventInterceptor i : beforeEventPublishInterceptorsForAll) { c = i; i.beforePublishEvent(event); } } catch (StopRoutingException e) { if (logger.isTraceEnabled()) { logger.trace(String.format("BeforePublishEventInterceptor[%s] stop publishing event: %s", c == null ? "null" : c.getClass().getName(), JSONObjectUtil.toJsonString(event))); } return; } wire.publish(event); }
接下來看wire.publish方法
public void publish(Event evt) { /* StopWatch watch = new StopWatch(); watch.start(); */ buildSchema(evt); evalThreadContextToMessage(evt); if (logger.isTraceEnabled() && logMessage(evt)) { logger.trace(String.format("[event publish]: %s", wire.dumpMessage(evt))); } Channel chan = channelPool.acquire(); try { new RecoverableSend(chan, evt, evt.getType().toString(), BusExchange.BROADCAST).send(); /* watch.stop(); logger.debug(String.mediaType("sending %s cost %sms", evt.getClass().getName(), watch.getTime())); */ } catch (IOException e) { throw new CloudRuntimeException(e); } finally { channelPool.returnChannel(chan); } }
大部分方法和send
無異。可是在Event的類中定義了兩種Type:
package org.zstack.header.message; import org.zstack.header.rest.APINoSee; public abstract class Event extends Message { /** * @ignore */ @APINoSee private String avoidKey; public String getAvoidKey() { return avoidKey; } public void setAvoidKey(String avoidKey) { this.avoidKey = avoidKey; } public abstract Type getType(); public abstract String getSubCategory(); public static final String BINDING_KEY_PERFIX = "key.event."; public static enum Category { LOCAL, API, } public static class Type { private final String _name; public Type(Category ctg, String subCtg) { _name = BINDING_KEY_PERFIX + ctg.toString() + "." + subCtg; } @Override public String toString() { return _name; } @Override public int hashCode() { return _name.hashCode(); } @Override public boolean equals(Object t) { if (!(t instanceof Type)) { return false; } Type type = (Type) t; return _name.equals(type.toString()); } } }
即Local和API。從名字上很好看出來,一個用來回復APIMsg的,一個用來發布本地消息。不過要了解這裏面的細節,就得看EventMaid
了。
private class EventMaid extends AbstractConsumer { Map<String, List<EventListenerWrapper>> listeners = new ConcurrentHashMap<String, List<EventListenerWrapper>>(); Channel eventChan; String queueName = makeEventQueueName(String.format("eventMaid.%s", Platform.getUuid())); public void construct() { try { eventChan = conn.createChannel(); eventChan.queueDeclare(queueName, false, false, true, queueArguments()); eventChan.basicConsume(queueName, true, this); } catch (IOException e) { throw new CloudRuntimeException(e); } } public void destruct() { try { eventChan.close(); } catch (IOException e) { throw new CloudRuntimeException(e); } } public void listen(Event evt, EventListenerWrapper l) { String type = evt.getType().toString(); try { synchronized (listeners) { List<EventListenerWrapper> lst = listeners.get(type); if (lst == null) { lst = new CopyOnWriteArrayList<EventListenerWrapper>(); listeners.put(type, lst); eventChan.queueBind(queueName, BusExchange.BROADCAST.toString(), type); logger.debug(String.format("[listening event]: %s", type)); } if (!lst.contains(l)) { lst.add(l); } } } catch (IOException e) { throw new CloudRuntimeException(e); } } public void unlisten(Event evt, EventListenerWrapper l) { String type = evt.getType().toString(); try { synchronized (listeners) { List<EventListenerWrapper> lst = listeners.get(type); if (lst == null) { return; } lst.remove(l); if (lst.isEmpty()) { listeners.remove(type); eventChan.queueUnbind(queueName, BusExchange.BROADCAST.toString(), type); logger.debug(String.format("[unlistening event]: %s", type)); } } } catch (IOException e) { throw new CloudRuntimeException(e); } } @SyncThread(level = 10) @MessageSafe private void dispatch(Event evt, EventListenerWrapper l) { setThreadLoggingContext(evt); l.callEventListener(evt); } private void handle(Event evt) { String type = evt.getType().toString(); List<EventListenerWrapper> lst = listeners.get(type); if (lst == null) { return; } if (logger.isTraceEnabled()) { logger.trace(String.format("[event received]: %s", wire.dumpMessage(evt))); } for (EventListenerWrapper l : lst) { dispatch(evt, l); } } @Override public void handleDelivery(String s, com.rabbitmq.client.Envelope envelope, AMQP.BasicProperties basicProperties, byte[] bytes) throws IOException { Event evt = null; try { evt = (Event) wire.toMessage(bytes, basicProperties); handle(evt); } catch (final Throwable t) { final Event fevt = evt; throwableSafe(new Runnable() { @Override public void run() { if (fevt != null) { logger.warn(String.format("unhandled throwable when handling event[%s], dump: %s", fevt.getClass().getName(), wire.dumpMessage(fevt)), t); } else { logger.warn(String.format("unhandled throwable"), t); } } }); } } }
這段代碼得先從handleDelivery
開始看:
@Override public void handleDelivery(String s, com.rabbitmq.client.Envelope envelope, AMQP.BasicProperties basicProperties, byte[] bytes) throws IOException { Event evt = null; try { evt = (Event) wire.toMessage(bytes, basicProperties); handle(evt); } catch (final Throwable t) { final Event fevt = evt; throwableSafe(new Runnable() { @Override public void run() { if (fevt != null) { logger.warn(String.format("unhandled throwable when handling event[%s], dump: %s", fevt.getClass().getName(), wire.dumpMessage(fevt)), t); } else { logger.warn(String.format("unhandled throwable"), t); } } }); } }
能夠看到,這裏是重載了Consumer
接口的handleDelivery,咱們看一下它的方法註釋:
/** * Called when a <code><b>basic.deliver</b></code> is received for this consumer. * @param consumerTag the <i>consumer tag</i> associated with the consumer * @param envelope packaging data for the message * @param properties content header data for the message * @param body the message body (opaque, client-specific byte array) * @throws IOException if the consumer encounters an I/O error while processing the message * @see Envelope */ void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException;
這樣保證EventMaid的對象可以接收到Msg。在try代碼塊中,從byte轉換出了Event,而後走向了handle邏輯。
private void handle(Event evt) { //前面提過,有兩種Type,API和Local String type = evt.getType().toString(); //因此只會取出兩種List List<EventListenerWrapper> lst = listeners.get(type); if (lst == null) { return; } if (logger.isTraceEnabled()) { logger.trace(String.format("[event received]: %s", wire.dumpMessage(evt))); } for (EventListenerWrapper l : lst) { //跳到下一個邏輯 dispatch(evt, l); } }
@SyncThread(level = 10) @MessageSafe private void dispatch(Event evt, EventListenerWrapper l) { setThreadLoggingContext(evt); //跳至下一段邏輯 l.callEventListener(evt); }
@Override public EventSubscriberReceipt subscribeEvent(final CloudBusEventListener listener, final Event... events) { final EventListenerWrapper wrapper = new EventListenerWrapper() { @Override public void callEventListener(Event e) { //走到各自的handle邏輯,若是返回true則unlisten if (listener.handleEvent(e)) { maid.unlisten(e, this); } } }; // 一個event對應一個ListenWrapper for (Event e : events) { maid.listen(e, wrapper); } return new EventSubscriberReceipt() { @Override public void unsubscribe(Event e) { maid.unlisten(e, wrapper); } @Override public void unsubscribeAll() { for (Event e : events) { maid.unlisten(e, wrapper); } } }; }
再看listen:
public void listen(Event evt, EventListenerWrapper l) { String type = evt.getType().toString(); try { synchronized (listeners) { List<EventListenerWrapper> lst = listeners.get(type); if (lst == null) { lst = new CopyOnWriteArrayList<EventListenerWrapper>(); listeners.put(type, lst); eventChan.queueBind(queueName, BusExchange.BROADCAST.toString(), type); logger.debug(String.format("[listening event]: %s", type)); } if (!lst.contains(l)) { lst.add(l); } } } catch (IOException e) { throw new CloudRuntimeException(e); } }
首先加鎖了listeners這個put,並根據type取出相應的list。同時將這個list轉換爲CopyOnWriteArrayList
,這樣這個list的引用就不會泄露出去了。而後綁定一個channel做爲通道。另外,若是EventListenerWrapper List中不存在提交的EventListenerWrapper,則添加進去。
相信講了這麼多,有一部分讀者可能已經繞暈了。這邊寫一個關於EventMaid
的邏輯調用小結:
CloudBusEventListener
接口的對象發送事件,由他們本身選擇是否處理這些事件。CloudBus和EventFascade就是這樣協同工做的。
在本文,咱們一塊兒瀏覽的ZStack中提供消息驅動特性組件的源碼——顯然,這兩個組件的API很是好用,簡潔明瞭。但在具體邏輯中有幾個能夠改進的點: