Apache ActiveMQ是根據JMS協議編寫,是最流行和強大的開源apache的消息中間件。它有速度快,支持多個語言客服端和多協議,徹底支持JMS1.1和JMS1.4的協議。Apache ActiveMQ的使用的Apache2協議,下載後經過指南能很快使用。
java
地址:Apache ActiveMQ
apache
a)下載解壓:服務器
b)運行:
session
bin\win64\activemq.bat start
c)訪問tcp
(ps:默認的帳戶/密碼:admin/admin)
測試
a) 鏈接
spa
javax.jms.ConnectionFactory:工廠類,建立鏈接。
code
javax.jms.Connection:鏈接服務器中間件
javax.jms.Session:每個鏈接一個操做隊列
b)目的
javax.jms.Queue:一對一鏈表
javax.jms.Topic:一對多主題
b)消息
javax.jms.Message:消息引用,是java規範
javax.jms.TextMessage: text消息
javax.jms.MapMessage:map消息
javax.jms.BytesMessage:byte消息
javax.jms.StreamMessage:stream消息
c)發送者
javax.jms.MessageProducer:消息發送者
d)接受者
javax.jms.MessageConsumer:消息接受者
javax.jms.MessageListener:接受者監聽處理
/** * @see Apache ActiveMQ的測試 * @author ssHss * @throws Exception * */ @Test public void send() throws Exception { ConnectionFactory factory = new ActiveMQConnectionFactory("tcp://localhost:61616");// 獲取工程類 Connection connection = factory.createConnection();// 獲取鏈接 // 鏈接 connection.start(); // 設置session Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); // 目的地: // Queue Queue = new ActiveMQQueue("testQueue");// 隊列(一對一) Topic topic = new ActiveMQTopic("testTopic");// 話題(一對多) // 消息 TextMessage textMassage = session.createTextMessage("hello JMS");// text消息:能夠傳字符串 // textMassage.wait(10000);// 延遲 BytesMessage bytesMessage = session.createBytesMessage();// 傳byte bytesMessage.writeByte((byte) 'a'); MapMessage mapMessage = session.createMapMessage();// map類型 mapMessage.setBoolean("flage", false); ObjectMessage objectMessage = session.createObjectMessage(); // 傳送object類型 objectMessage.setObject(new String("hello JMS")); StreamMessage streamMessage = session.createStreamMessage(); // stream類型 streamMessage.writeString("Hello JMS"); // 發送者 MessageProducer producer = session.createProducer(topic); // 發送 producer.send(textMassage); producer.close(); session.close(); } @Test public void comsumer() throws Exception { ConnectionFactory factory = new ActiveMQConnectionFactory("tcp://localhost:61616");// 獲取工程類 Connection connection = factory.createConnection();// 獲取鏈接 // 鏈接 connection.start(); // 設置session Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); // 目的地: Queue queue = new ActiveMQQueue("testQueue");// 隊列(一對一) // Topic topic = new ActiveMQTopic("testTopic");// 話題(一對多) // 接收者 MessageConsumer consumer = session.createConsumer(queue); // 監聽 consumer.setMessageListener(new MessageListener() { public void onMessage(Message message) { if (message instanceof TextMessage) { TextMessage textMessage = (TextMessage) message;// 這裏須要將消息轉換爲咱們須要的類型 try { System.out.println("consumer Message:" + textMessage.getText()); } catch (JMSException e) { e.printStackTrace(); } } } }); consumer.close(); } @Test public void comsumers() throws Exception { ConnectionFactory factory = new ActiveMQConnectionFactory("tcp://localhost:61616");// 獲取工程類 Connection connection = factory.createConnection();// 獲取鏈接 // 鏈接 connection.start(); // 設置session Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); // 目的地: // Queue queue = new ActiveMQQueue("testQueue");// 隊列(一對一) Topic topic = new ActiveMQTopic("testTopic");// 話題(一對多) // 接收者(建立多個接受這) MessageConsumer consumer = session.createConsumer(topic); // 監聽 consumer.setMessageListener(new MessageListener() { public void onMessage(Message message) { if (message instanceof TextMessage) { TextMessage textMessage = (TextMessage) message;// 這裏須要將消息轉換爲咱們須要的類型 try { System.out.println("consumer Message:" + textMessage.getText()); } catch (JMSException e) { e.printStackTrace(); } } } }); MessageConsumer consumer2 = session.createConsumer(topic); // 監聽 consumer2.setMessageListener(new MessageListener() { public void onMessage(Message message) { if (message instanceof TextMessage) { TextMessage textMessage = (TextMessage) message;// 這裏須要將消息轉換爲咱們須要的類型 try { System.out.println("consumer Message:" + textMessage.getText()); } catch (JMSException e) { e.printStackTrace(); } } } }); consumer2.close(); }