JMS服務器ActiveMQ

JMS服務器ActiveMQ

1、概述

    1.1 簡述

       Apache  ActiveMQ是根據JMS協議編寫,是最流行和強大的開源apache的消息中間件。它有速度快,支持多個語言客服端和多協議,徹底支持JMS1.1和JMS1.4的協議。Apache ActiveMQ的使用的Apache2協議,下載後經過指南能很快使用。
java

    1.2 下載

        地址:Apache ActiveMQ
apache

    1.3 運行

        a)下載解壓:服務器

    b)運行:
session

bin\win64\activemq.bat start

    c)訪問tcp

    (ps:默認的帳戶/密碼:admin/admin)
測試

2、JMS操做

    2.1 結構圖

    2.2 依賴類

       a) 鏈接
spa

                javax.jms.ConnectionFactory:工廠類,建立鏈接。
code

                javax.jms.Connection:鏈接服務器中間件

                javax.jms.Session:每個鏈接一個操做隊列

        b)目的

                javax.jms.Queue:一對一鏈表

                javax.jms.Topic:一對多主題

        b)消息

                javax.jms.Message:消息引用,是java規範

                javax.jms.TextMessage: text消息

                javax.jms.MapMessage:map消息

                javax.jms.BytesMessage:byte消息

                javax.jms.StreamMessage:stream消息

        c)發送者

                javax.jms.MessageProducer:消息發送者

        d)接受者

                javax.jms.MessageConsumer:消息接受者

                javax.jms.MessageListener:接受者監聽處理

    2.3 發送/接受

/**
	 * @see Apache ActiveMQ的測試
	 * @author ssHss
	 * @throws Exception
	 *
	 */

	@Test
	public void send() throws Exception {
		ConnectionFactory factory = new ActiveMQConnectionFactory("tcp://localhost:61616");// 獲取工程類
		Connection connection = factory.createConnection();// 獲取鏈接
		// 鏈接
		connection.start();
		// 設置session
		Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);

		// 目的地:
		// Queue Queue = new ActiveMQQueue("testQueue");// 隊列(一對一)
		Topic topic = new ActiveMQTopic("testTopic");// 話題(一對多)

		// 消息
		TextMessage textMassage = session.createTextMessage("hello JMS");// text消息:能夠傳字符串
		// textMassage.wait(10000);// 延遲

		BytesMessage bytesMessage = session.createBytesMessage();// 傳byte
		bytesMessage.writeByte((byte) 'a');

		MapMessage mapMessage = session.createMapMessage();// map類型
		mapMessage.setBoolean("flage", false);

		ObjectMessage objectMessage = session.createObjectMessage(); // 傳送object類型
		objectMessage.setObject(new String("hello JMS"));

		StreamMessage streamMessage = session.createStreamMessage(); // stream類型
		streamMessage.writeString("Hello JMS");

		// 發送者
		MessageProducer producer = session.createProducer(topic);
		// 發送
		producer.send(textMassage);
		producer.close();
		session.close();
	}

	@Test
	public void comsumer() throws Exception {
		ConnectionFactory factory = new ActiveMQConnectionFactory("tcp://localhost:61616");// 獲取工程類
		Connection connection = factory.createConnection();// 獲取鏈接
		// 鏈接
		connection.start();
		// 設置session
		Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);

		// 目的地:
		Queue queue = new ActiveMQQueue("testQueue");// 隊列(一對一)
		// Topic topic = new ActiveMQTopic("testTopic");// 話題(一對多)

		// 接收者
		MessageConsumer consumer = session.createConsumer(queue);
		// 監聽
		consumer.setMessageListener(new MessageListener() {
			public void onMessage(Message message) {
				if (message instanceof TextMessage) {
					TextMessage textMessage = (TextMessage) message;// 這裏須要將消息轉換爲咱們須要的類型
					try {
						System.out.println("consumer Message:" + textMessage.getText());
					} catch (JMSException e) {
						e.printStackTrace();
					}
				}
			}
		});
		consumer.close();
	}

	@Test
	public void comsumers() throws Exception {
		ConnectionFactory factory = new ActiveMQConnectionFactory("tcp://localhost:61616");// 獲取工程類
		Connection connection = factory.createConnection();// 獲取鏈接
		// 鏈接
		connection.start();
		// 設置session
		Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);

		// 目的地:
		// Queue queue = new ActiveMQQueue("testQueue");// 隊列(一對一)
		Topic topic = new ActiveMQTopic("testTopic");// 話題(一對多)

		// 接收者(建立多個接受這)
		MessageConsumer consumer = session.createConsumer(topic);
		// 監聽
		consumer.setMessageListener(new MessageListener() {
			public void onMessage(Message message) {
				if (message instanceof TextMessage) {
					TextMessage textMessage = (TextMessage) message;// 這裏須要將消息轉換爲咱們須要的類型
					try {
						System.out.println("consumer Message:" + textMessage.getText());
					} catch (JMSException e) {
						e.printStackTrace();
					}
				}
			}
		});

		MessageConsumer consumer2 = session.createConsumer(topic);
		// 監聽
		consumer2.setMessageListener(new MessageListener() {
			public void onMessage(Message message) {
				if (message instanceof TextMessage) {
					TextMessage textMessage = (TextMessage) message;// 這裏須要將消息轉換爲咱們須要的類型
					try {
						System.out.println("consumer Message:" + textMessage.getText());
					} catch (JMSException e) {
						e.printStackTrace();
					}
				}
			}
		});
		
		consumer2.close();
	}
相關文章
相關標籤/搜索