好久沒po文章了,可是看到.Net裏關於ActiveMQ發送複雜類型的文章確實太少了,因此貼出來和你們分享服務器
發佈:session
//消息發佈 public class Publisher { private IConnection _connection; private ISession _session; private IMessageProducer _producer; /// <summary> /// 初始化 /// </summary> /// <param name="brokerUrl">廣播地址</param> /// <param name="queueDestination">隊列目標</param> public void Init(string brokerUrl = "tcp://localhost:61616", string queueDestination = "nms.msg.topic") { try { IConnectionFactory connectionFactory = new ConnectionFactory(brokerUrl); _connection = connectionFactory.CreateConnection(); _connection.Start(); _session = _connection.CreateSession(); IDestination destination = _session.GetTopic(queueDestination); _producer = _session.CreateProducer(destination); } catch (Exception e) { Log.Error($"activemq初始化異常:{e.InnerException.ToString()}"); } } public void Close() { _session.Close(); _connection.Close(); } /// <summary> /// 發送普通字符串消息 /// </summary> /// <param name="text">字符串</param> public void SendText(string text) { ITextMessage objecto = _producer.CreateTextMessage(text); _producer.Send(objecto); } /// <summary> /// 發送對象消息 /// </summary> /// <param name="mapMessages">MapMessage對象</param> /// <returns></returns> public bool SendObject(List<MapMessage> mapMessages) { bool result = true; if (mapMessages == null || mapMessages.Count < 0) return false; foreach (var mapMessage in mapMessages) { var message = _producer.CreateMapMessage(); ActiveCommon.SetMapMessage<MapMessage>(message, mapMessage); try { _producer.Send(message); result = true; } catch (Exception e) { Log.Error($"activemq發送美好異常:{e.InnerException.ToString()}"); result = false; } } return result; } /// <summary> /// 獲取對象XML結果 /// </summary> /// <param name="m">對象</param> /// <returns></returns> public string GetXmlStr(object m) { return _producer.CreateXmlMessage(m).Text; } }
訂閱:tcp
//消息訂閱 class Subsriber { private IConnection _connection; private ISession _session; private IMessageConsumer _consumer; /// <summary> /// 初始化 /// </summary> /// <param name="brokerUrl">廣播地址</param> /// <param name="queueDestination">隊列目標</param> public void Init(string brokerUrl = "tcp://localhost:61616", string queueDestination = "nms.msg.topic") { try { IConnectionFactory connectionFactory = new ConnectionFactory(brokerUrl); _connection = connectionFactory.CreateConnection(); _connection.Start(); _session = _connection.CreateSession(); IDestination destination = _session.GetTopic(queueDestination); _consumer = _session.CreateConsumer(destination); _consumer.Listener += _consumer_Listener; } catch (Exception e) { Log.Error($"activemq初始化異常:{e.InnerException.ToString()}"); } } private void _consumer_Listener(IMessage message) { var model = ActiveCommon.GetMapMessageByIMapMessage((IMapMessage)message); Log.Infor($"訂閱接收:{_session.CreateXmlMessage(model).Text}"); } }
複雜類型處理:spa
public class ActiveCommon { //設置Message的Body信息 public static void SetMapMessage<T>(IMapMessage mapMessage, T messages) { if (mapMessage == null || object.Equals(messages, null)) { return; } foreach (var propertyInfo in messages.GetType().GetProperties()) { if (propertyInfo.PropertyType.Name == "String") mapMessage.Body.SetString(propertyInfo.Name, Convert.ToString(propertyInfo.GetValue(messages, null))); else mapMessage.Body.SetInt(propertyInfo.Name, Convert.ToInt16(propertyInfo.GetValue(messages, null))); } } public static MapMessage GetMapMessageByIMapMessage(IMapMessage mapMessage) { if (mapMessage == null) { return null; } var MapMessage = new MapMessage(); foreach (var propertyInfo in MapMessage.GetType().GetProperties()) { propertyInfo.SetValue(MapMessage, mapMessage.Body[propertyInfo.Name], null); } return MapMessage; } public static T GetMapMessageByIMapMessage<T>(IMapMessage mapMessage, T MapMessage) { if (mapMessage == null || object.Equals(MapMessage, null)) { return default(T); } foreach (var propertyInfo in MapMessage.GetType().GetProperties()) { propertyInfo.SetValue(MapMessage, mapMessage.Body[propertyInfo.Name.ToUpper()], null); } return MapMessage; } }
重點是跨站點和跨服務器傳輸的時候,須要經過Message的Body去設置傳輸參數code