最近公司有使用阿里雲消息隊列的需求,爲了更加方便使用,本人用了幾天時間將消息隊列封裝成api調用方式以方便內部系統的調用,如今已經完成,特此記錄其中過程和使用到的相關技術,與君共勉。html
如今阿里雲提供了兩種消息服務:mns服務和ons服務,其中我認爲mns是簡化版的ons,並且mns的消息消費須要自定義輪詢策略的,相比之下,ons的發佈與訂閱模式功能更增強大(好比相對於mns,ons提供了消息追蹤、日誌、監控等功能),其api使用起來更加方便,並且聽聞阿里內部之後再也不對mns進行新的開發,只作維護,ons服務則會逐步替代mns服務成爲阿里消息服務的主打產品,因此,若是有使用消息隊列的需求,建議不要再使用mns,使用ons是最好的選擇。java
參考文檔:https://m.aliyun.com/doc/product/29530.htmlgit
涉及到的技術:Spring,反射、動態代理、Jackson序列化和反序列化github
在看下面的文章以前,須要先看上面的文檔以瞭解相關概念(Topic、Consumer、Producer、Tag等)以及文檔中提供的簡單的發送和接收代碼實現。spring
在一個簡單的cs架構中,假設server會監聽一個Topic的Producer發送的消息,那麼它首先應該提供client一個api,client只須要簡單的調用該api,就能夠經過producer來生產消息api
因爲api是server制定的,因此server固然也知道如何消費這些消息緩存
在這個過程當中,server實際充當着消費者的角色,client實際充當着生產者的角色,可是生產者生產消息的規則則由消費者制定以知足消費者消費需求。服務器
咱們要建立一個單獨的jar包,起名爲queue-core爲生產者和消費者提供依賴和發佈訂閱的具體實現。架構
@Topic(name="kdyzm",producerId="kdyzm_producer") public interface UserQueueResource { @Tag("test1") public void handleUserInfo(@Body @Key("userInfoHandler") UserModel user); @Tag("test2") public void handleUserInfo1(@Body @Key("userInfoHandler1") UserModel user); }
因爲Topic和producer之間是N:1的關係,因此這裏直接將producerId做爲Topic的一個屬性;Tag是一個很關鍵的過濾條件,消費者經過它進行消息的分類作不一樣的業務處理,因此,這裏使用Tag做爲路由條件。ide
因爲消費者只提供了接口給生產者使用,接口是沒有辦法直接使用的,由於沒有辦法實例化,這裏使用動態代理生成對象,在消費者提供的api中,添加以下config,以方便生產者直接導入config便可使用,這裏使用了基於java的spring config,請知悉。
@Configuration public class QueueConfig { @Autowired @Bean public UserQueueResource userQueueResource() { return QueueResourceFactory.createProxyQueueResource(UserQueueResource.class); } }
以上1中全部的註解(Topic、Tag、Body 、Key)以及2中使用到的QueueResourceFactory類都要在queue-core中定義,其中註解的定義只是定義了規則,真正的實現其實是在QueueResourceFactory中
import java.lang.reflect.InvocationHandler; import java.lang.reflect.Method; import java.lang.reflect.Proxy; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import com.aliyun.openservices.ons.api.Message; import com.aliyun.openservices.ons.api.Producer; import com.aliyun.openservices.ons.api.SendResult; import com.wy.queue.core.api.MQConnection; import com.wy.queue.core.utils.JacksonSerializer; import com.wy.queue.core.utils.MQUtils; import com.wy.queue.core.utils.QueueCoreSpringUtils; public class QueueResourceFactory implements InvocationHandler { private static final Logger logger=LoggerFactory.getLogger(QueueResourceFactory.class); private String topicName; private String producerId; private JacksonSerializer serializer=new JacksonSerializer(); private static final String PREFIX="PID_"; public QueueResourceFactory(String topicName,String producerId) { this.topicName = topicName; this.producerId=producerId; } public static <T> T createProxyQueueResource(Class<T> clazz) { String topicName = MQUtils.getTopicName(clazz); String producerId = MQUtils.getProducerId(clazz); T target = (T) Proxy.newProxyInstance(QueueResourceFactory.class.getClassLoader(), new Class<?>[] { clazz }, new QueueResourceFactory(topicName,producerId)); return target; } @Override public Object invoke(Object proxy, Method method, Object[] args) throws Throwable { if(args.length == 0 || args.length>1){ throw new RuntimeException("only accept one param at queueResource interface."); } String tagName=MQUtils.getTagName(method); ProducerFactory producerFactory = QueueCoreSpringUtils.getBean(ProducerFactory.class); MQConnection connectionInfo = QueueCoreSpringUtils.getBean(MQConnection.class); Producer producer = producerFactory.createProducer(PREFIX+connectionInfo.getPrefix()+"_"+producerId); //發送消息 Message msg = new Message( // // 在控制檯建立的 Topic,即該消息所屬的 Topic 名稱 connectionInfo.getPrefix()+"_"+topicName, // Message Tag, // 可理解爲 Gmail 中的標籤,對消息進行再歸類,方便 Consumer 指定過濾條件在 MQ 服務器過濾 tagName, // Message Body // 任何二進制形式的數據, MQ 不作任何干預, // 須要 Producer 與 Consumer 協商好一致的序列化和反序列化方式 serializer.serialize(args[0]).getBytes()); SendResult sendResult = producer.send(msg); logger.info("Send Message success. Message ID is: " + sendResult.getMessageId()); return null; } }
這裏特地將自定義包和第三方使用的包名都貼過來了,以便於區分。
這裏到底作了哪些事情呢?
發送消息的過程就是動態代理建立一個代理對象,該對象調用方法的時候會被攔截,首先解析全部的註解,好比topicName、producerId、tag等關鍵信息從註解中取出來,而後調用阿里sdk發送消息,過程很簡單,可是注意,這裏發送消息的時候是分環境的,通常來說如今企業中會區分QA、staging、product三種環境,其中QA和staging是測試環境,對於消息隊列來說,也是會有三種環境的,可是QA和staging環境每每爲了下降成本使用同一個阿里帳號,因此建立的topic和productId會放到同一個區域下,這樣同名的TopicName是不容許存在的,因此加上了環境前綴加以區分,好比QA_TopicName,PID_Staging_ProducerId等等;另外,queue-core提供了MQConnection接口,以獲取配置信息,生產者服務只須要實現該接口便可。
@Autowired private UserQueueResource userQueueResource; @Override public void sendMessage() { UserModel userModel=new UserModel(); userModel.setName("kdyzm"); userModel.setAge(25); userQueueResource.handleUserInfo(userModel); }
只須要數行代碼便可將消息發送到指定的Topic,相對於原生的發送代碼,精簡了太多。
相對於消息發送,消息的消費要複雜一些。
因爲Topic和Consumer之間是N:N的關係,因此將ConsumerId放到消費者具體實現的方法上
@Controller @QueueResource public class UserQueueResourceImpl implements UserQueueResource { private Logger logger = LoggerFactory.getLogger(this.getClass()); @Override @ConsumerAnnotation("kdyzm_consumer") public void handleUserInfo(UserModel user) { logger.info("收到消息1:{}", new Gson().toJson(user)); } @Override @ConsumerAnnotation("kdyzm_consumer1") public void handleUserInfo1(UserModel user) { logger.info("收到消息2:{}", new Gson().toJson(user)); } }
這裏又有兩個新的註解@QueueResource和@ConsumerAnnotation,這兩個註解後續會討論如何使用。有人會問我爲何要使用ConsumerAnnotation這個名字而不使用Consumer這個名字,由於Consumer這個名字和aliyun提供的sdk中的名字衝突了。。。。
在這裏, 消費者提供api 接口給生產者以方便生產者發送消息,消費者則實現該接口以消費生產者發送的消息,如何實現api接口就實現了監聽,這點是比較關鍵的邏輯。
第一步:使用sping 容器的監聽方法獲取全部加上QueueResource註解的Bean
第二步:分發處理Bean
如何處理這些Bean呢,每一個Bean實際上都是一個對象,有了對象,好比上面例子中的UserQueueResourceImpl 對象,咱們能夠拿到該對象實現的接口字節碼對象,進而能夠拿到該接口UserQueueRerousce上的註解以及方法上和方法中的註解,固然UserQueueResourceImpl實現方法上的註解也能拿獲得,這裏我將獲取到的信息以consumerId爲key,其他相關信息封裝爲Value緩存到了一個Map對象中,核心代碼以下:
Class<?> clazz = resourceImpl.getClass(); Class<?> clazzIf = clazz.getInterfaces()[0]; Method[] methods = clazz.getMethods(); String topicName = MQUtils.getTopicName(clazzIf); for (Method m : methods) { ConsumerAnnotation consumerAnno = m.getAnnotation(ConsumerAnnotation.class); if (null == consumerAnno) { // logger.error("method={} need Consumer annotation.", m.getName()); continue; } String consuerId = consumerAnno.value(); if (StringUtils.isEmpty(consuerId)) { logger.error("method={} ConsumerId can't be null", m.getName()); continue; } Class<?>[] parameterTypes = m.getParameterTypes(); Method resourceIfMethod = null; try { resourceIfMethod = clazzIf.getMethod(m.getName(), parameterTypes); } catch (NoSuchMethodException | SecurityException e) { logger.error("can't find method={} at super interface={} .", m.getName(), clazzIf.getCanonicalName(), e); continue; } String tagName = MQUtils.getTagName(resourceIfMethod); consumersMap.put(consuerId, new MethodInfo(topicName, tagName, m)); }
第三步:經過反射實現消費的動做
首先,先肯定好反射動做執行的時機,那就是監聽到了新的消息
其次,如何執行反射動做?不贅述,有反射相關基礎的童鞋都知道怎麼作,核心代碼以下所示:
MQConnection connectionInfo = QueueCoreSpringUtils.getBean(MQConnection.class); String topicPrefix=connectionInfo.getPrefix()+"_"; String consumerIdPrefix=PREFIX+connectionInfo.getPrefix()+"_"; for(String consumerId:consumersMap.keySet()){ MethodInfo methodInfo=consumersMap.get(consumerId); Properties connectionProperties=convertToProperties(connectionInfo); // 您在控制檯建立的 Consumer ID connectionProperties.put(PropertyKeyConst.ConsumerId, consumerIdPrefix+consumerId); Consumer consumer = ONSFactory.createConsumer(connectionProperties); consumer.subscribe(topicPrefix+methodInfo.getTopicName(), methodInfo.getTagName(), new MessageListener() { //訂閱多個Tag public Action consume(Message message, ConsumeContext context) { try { String messageBody=new String(message.getBody(),"UTF-8"); logger.info("receive message from topic={},tag={},consumerId={},message={}",topicPrefix+methodInfo.getTopicName(),methodInfo.getTagName(),consumerIdPrefix+consumerId,messageBody); Method method=methodInfo.getMethod(); Class<?> parameType = method.getParameterTypes()[0]; Object arg = jacksonSerializer.deserialize(messageBody, parameType); Object[] args={arg}; method.invoke(resourceImpl, args); } catch (Exception e) { logger.error("",e); } return Action.CommitMessage; } }); consumer.start(); logger.info("consumer={} has started.",consumerIdPrefix+consumerId); }