需求:使用 python 程序向 activemq
的主題推送數據,默認推送的數據類型是 BytesMessage,java 程序那邊接收較爲麻煩,改成推送 TextMessage 類型的數據
java
解決方法:想要推送 TextMessage
須要指定 auto_content_length=False
python
示例代碼以下:apache
# coding=utf-8
import stomp
def send_to_topic(msg):
try:
conn = stomp.Connection10([("10.10.19.200", 61613)], auto_content_length=False)
conn.start()
conn.connect()
conn.send('/topic/HIATMP.HISENSE.ILLEGAL.AIREVIEW', msg)
conn.disconnect()
return 1
except Exception as e:
# logging.error(f"send message with activemq failed, error is:{e}")
return 0
if __name__ == "__main__":
result = send_to_topic("ILLEGAL01,2.1,0001,5b9171c2815342c5bce90f601f14d182,1,02,魯BJ0A92,2019-04-26 15:51:45,12080,601078111050,寧夏路與福州南路路口,370202000000,1,1,370202000000011125,3,01,,0,http://10.10.19.250/1.png,http://10.10.19.250/2.png,http://10.10.19.250/3.png,,717/1846/136/36/1,2,2019-04-26 15:51:53,,,,,,,,,1,不按道行駛,,,,,,,,,1,10")
print result
例外,附上 activemq 關於主題的生產者,消費者代碼:session
1)python 版本(stomp協議)dom
生產者:tcp
# coding=utf-8 import stomp def send_to_topic(msg): try: conn = stomp.Connection10([("10.10.19.200", 61613)], auto_content_length=False) conn.start() conn.connect() conn.send('/topic/HIATMP.HISENSE.ILLEGAL.AIREVIEW', msg) conn.disconnect() return 1 except Exception as e: # logging.error(f"send message with activemq failed, error is:{e}") return 0 if __name__ == "__main__": result = send_to_topic("ILLEGAL01,2.1,0001,5b9171c2815342c5bce90f601f14d182,1,02,魯BJ0A92,2019-04-26 15:51:45,12080,601078111050,寧夏路與福州南路路口,370202000000,1,1,370202000000011125,3,01,,0,http://10.10.19.250/1.png,http://10.10.19.250/2.png,http://10.10.19.250/3.png,,717/1846/136/36/1,2,2019-04-26 15:51:53,,,,,,,,,1,不按道行駛,,,,,,,,,1,10") print result
消費者:ide
# coding=utf-8 import stomp class SampleListener(object): def on_message(self, headers, message): print('headers: %s' % headers['destination']) print('message: %s\n' % message) # 從主題接收消息 def receive_from_topic(): conn = stomp.Connection10([("10.10.19.200", 61613)]) conn.set_listener("", SampleListener()) conn.start() conn.connect() conn.subscribe("/topic/HIATMP.HISENSE.ILLEGAL.AIREVIEW") while True: pass # conn.disconnect() if __name__ == '__main__': receive_from_topic()
2)java 版本(tcp協議)ui
生產者:spa
package ActiveMQ; import javax.jms.Connection; import javax.jms.ConnectionFactory; import javax.jms.DeliveryMode; import javax.jms.JMSException; import javax.jms.MessageProducer; import javax.jms.Session; import javax.jms.TextMessage; import javax.jms.Topic; import org.apache.activemq.ActiveMQConnectionFactory; import java.text.SimpleDateFormat; import java.util.Date; import java.util.Random; import java.util.UUID; public class TopicProducer { public static void main(String[] args) { //鏈接信息設置 String username = "admin"; String password = "admin"; String brokerURL = "failover://tcp://10.10.19.200:61616"; //鏈接工廠 ConnectionFactory connectionFactory = null; //鏈接 Connection connection = null; //會話 接受或者發送消息的線程 Session session = null; //消息的主題 Topic topic = null; //消息生產者 MessageProducer messageProducer = null; //實例化鏈接工廠 connectionFactory = new ActiveMQConnectionFactory(username, password, brokerURL); try { //經過鏈接工廠獲取鏈接 connection = connectionFactory.createConnection(); //啓動鏈接 connection.start(); //建立session session = connection.createSession(true, Session.AUTO_ACKNOWLEDGE); //建立名爲TopicTest的主題 // topic = session.createTopic("HIATMP.HISENSE.ILLEGAL"); topic = session.createTopic("HIATMP.HISENSE.ILLEGAL.AIREVIEW"); //建立主題生產者 messageProducer = session.createProducer(topic); messageProducer.setDeliveryMode(DeliveryMode.NON_PERSISTENT);//不將數據持久化 //發送主題 TextMessage message = null; for (int i = 0; i < 1; i--) { //建立要發送的文本信息 SimpleDateFormat df = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss SSS");//設置日期格式 String dateTime = df.format(new Date());// new Date()爲獲取當前系統時間 // message = session.createTextMessage("illegal" + dateTime); String uuid = UUID.randomUUID().toString().replaceAll("-",""); String message_fmt = String.format("ILLEGAL01,2.1,0001,%s,1,02,魯BJ0A92,2019-04-26 15:51:45,12080,601078111050,寧夏路與福州南路路口,370202000000,1,1,370202000000011125,3,01,,0,http://10.10.19.250/1.png,http://10.10.19.250/2.png,http://10.10.19.250/3.png,,717/1846/136/36/1,2,2019-04-26 15:51:53,,,,,,,,,1,不按道行駛,,,,,,,,,1,10", uuid); message = session.createTextMessage(message_fmt); //經過主題生產者發出消息 messageProducer.send(message); System.out.println("發送成功:" + message.getText()); session.commit(); // 提交到mq Thread.sleep( 200 * 1 ); } session.commit(); } catch (Exception e) { e.printStackTrace(); } finally { if (null != connection) { try { connection.close(); } catch (JMSException e) { e.printStackTrace(); } } } } }
消費者:線程
package ActiveMQ; import javax.jms.Connection; import javax.jms.ConnectionFactory; import javax.jms.JMSException; import javax.jms.Message; import javax.jms.MessageConsumer; import javax.jms.MessageListener; import javax.jms.Session; import javax.jms.TextMessage; import javax.jms.BytesMessage; import javax.jms.Topic; import org.apache.activemq.ActiveMQConnectionFactory; import org.apache.activemq.transport.stomp.Stomp; public class TopicConsumer { public static void main(String[] args) { // Stomp.Headers.Send.PERSISTENT; //鏈接信息設置 String username = "admin"; String password = "admin"; String brokerURL = "failover://tcp://10.10.19.200:61616"; // String brokerURL = "failover://stomp://0.0.0.0:61613"; //鏈接工廠 ConnectionFactory connectionFactory = null; //鏈接 Connection connection = null; //會話 接受或者發送消息的線程 Session session = null; //主題的目的地 Topic topic = null; //主題消費者 MessageConsumer messageConsumer = null; //實例化鏈接工廠 connectionFactory = new ActiveMQConnectionFactory(username, password, brokerURL); try { //經過鏈接工廠獲取鏈接 connection = connectionFactory.createConnection(); //啓動鏈接 connection.start(); //建立session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); //建立一個鏈接TopicTest的主題 topic = session.createTopic("HIATMP.HISENSE.ILLEGAL.AIREVIEW"); //建立主題消費者 messageConsumer = session.createConsumer(topic); messageConsumer.setMessageListener(new MyMessageListener()); } catch (JMSException e) { e.printStackTrace(); } } } class MyMessageListener implements MessageListener { @Override public void onMessage(Message message) { System.out.println(message); // 接收 BytesMessage // BytesMessage bytesMessage = (BytesMessage) message; TextMessage bytesMessage = (TextMessage) message; try { // 接收 BytesMessage // byte []bt = new byte[(int) bytesMessage.getBodyLength()]; // bytesMessage.readBytes(bt); // String str = new String(bt); // System.out.println("接收訂閱主題:" + str); System.out.println("接收訂閱主題:" + bytesMessage.getText()); } catch (JMSException e) { e.printStackTrace(); } } }
end~