1、activemq簡介與安裝html
1. ActiveMQ 簡介java
ActiveMQ是Apache所提供的一個開源的消息系統,徹底採用Java來實現,所以,它能很好地支持J2EE提出的JMS(Java Message Service,即Java消息服務)規範。JMS是一組Java應用程序接口,它提供消息的建立、發送、讀取等一系列服務。JMS提供了一組公共應用程序接口和響應的語法,相似於Java數據庫的統一訪問接口JDBC,它是一種與廠商無關的API,使得Java程序可以與不一樣廠商的消息組件很好地進行通訊。數據庫
消息隊列中間件是分佈式系統中的重要組件,主要解決異步消息,應用解耦,流量削鋒等問題,從而實現高性能,高可用,可伸縮和最終一致性的架構apache
使用較多的消息隊列有ActiveMQ,RabbitMQ,Kafka,MetaMQ等markdown
2.activemq下載與安裝session
2.目錄結構架構
啓動activemq步驟: bin -->win64(或者win32) -->activemq.bat 點擊運行便可app
3.啓動完畢,http://localhost:8161/admin/index.jsp 用戶名:admin 密碼 admin異步
能夠啓動多個消費者 來區分 隊列和 主題模式 jsp
import org.apache.activemq.ActiveMQConnection;
import org.apache.activemq.ActiveMQConnectionFactory;
import javax.jms.*;
/**
* @author Yang
* 描述: 生產者
*/
public class Producer {
/**
* 用戶名
*/
private static final String userName = ActiveMQConnection.DEFAULT_USER;
/**
* 密碼
*/
private static final String passWord = ActiveMQConnection.DEFAULT_PASSWORD;
/**
* url
*/
private static final String brokerUrl = ActiveMQConnection.DEFAULT_BROKER_URL;
public void send(String message) {
try {
ActiveMQConnectionFactory connectionFactory = new ActiveMQConnectionFactory(userName, passWord, brokerUrl);
final Connection connection = connectionFactory.createConnection();
connection.start();
// true 爲開啓會話 須要提交
Session session = connection.createSession(Boolean.FALSE, Session.AUTO_ACKNOWLEDGE);
//建立隊列
final Queue queue = session.createQueue("test");
final MessageProducer producer = session.createProducer(queue);
TextMessage textMessage = session.createTextMessage(message);
//發送
producer.send(textMessage);
// 開啓會話 須要提交
// session.commit();
producer.close();
session.close();
connection.close();
} catch (JMSException e) {
e.printStackTrace();
}
}
public static void main(String[] args) {
Producer producer = new Producer();
producer.send("hello world");
}
}
複製代碼
2.消費者
import org.apache.activemq.ActiveMQConnection;
import org.apache.activemq.ActiveMQConnectionFactory;
import javax.jms.*;
/**
* @author Yang
* 描述: 消費者
*/
public class Consumer {
/**
* 用戶名
*/
private static final String userName = ActiveMQConnection.DEFAULT_USER;
/**
* 密碼
*/
private static final String passWord = ActiveMQConnection.DEFAULT_PASSWORD;
/**
* url
*/
private static final String brokerUrl = ActiveMQConnection.DEFAULT_BROKER_URL;
public void receive() {
try {
ActiveMQConnectionFactory connectionFactory = new ActiveMQConnectionFactory(userName, passWord, brokerUrl);
final Connection connection = connectionFactory.createConnection();
connection.start();
Session session = connection.createSession(Boolean.FALSE, Session.AUTO_ACKNOWLEDGE);
//建立隊列
final Queue queue = session.createQueue("test");
final MessageConsumer messageConsumer = session.createConsumer(queue);
messageConsumer.setMessageListener(n -> {
try {
TextMessage msg = (TextMessage) n;
final String text = msg.getText();
if (text.equalsIgnoreCase("hello world")) {
System.out.println(" 接受信息: " + msg.getText());
} else {//默認爲6次
System.out.println(" 測試重發次數 " );
int i = 1 / 0;
}
} catch (JMSException e) {
// e.printStackTrace();
}
});
} catch (JMSException e) {
// e.printStackTrace();
}
}
public static void main(String[] args) {
Consumer consumer = new Consumer();
consumer.receive();
}
複製代碼
1.producer 生產者
import org.apache.activemq.ActiveMQConnection;
import org.apache.activemq.ActiveMQConnectionFactory;
import javax.jms.*;
/**
* @author Yang
* 描述: 生產者
*/
public class TopicProducer {
/**
* 用戶名
*/
private static final String userName = ActiveMQConnection.DEFAULT_USER;
/**
* 密碼
*/
private static final String passWord = ActiveMQConnection.DEFAULT_PASSWORD;
/**
* url
*/
private static final String brokerUrl = ActiveMQConnection.DEFAULT_BROKER_URL;
public void send(String message) {
try {
ActiveMQConnectionFactory connectionFactory = new ActiveMQConnectionFactory(userName, passWord, brokerUrl);
final Connection connection = connectionFactory.createConnection();
connection.start();
Session session = connection.createSession(Boolean.FALSE, Session.AUTO_ACKNOWLEDGE);
//建立隊列
final Topic topic = session.createTopic("topic-test");
final MessageProducer producer = session.createProducer(topic);
TextMessage textMessage = session.createTextMessage(message);
producer.setDeliveryMode(DeliveryMode.PERSISTENT);
// producer.setTimeToLive(10);
// 發送
producer.send(textMessage);
// producer.send(textMessage, DeliveryMode.PERSISTENT, 1, 60 * 60 * 24);
// session.commit();
producer.close();
session.close();
connection.close();
} catch (JMSException e) {
e.printStackTrace();
}
}
public static void main(String[] args) {
TopicProducer producer = new TopicProducer();
producer.send("hello world");
}
複製代碼
2.consumer 消費者
import org.apache.activemq.ActiveMQConnection;
import org.apache.activemq.ActiveMQConnectionFactory;
import javax.jms.*;
/**
* @author Yang
* 描述: 消費者
*/
public class TopicConsumer {
/**
* 用戶名
*/
private static final String userName = ActiveMQConnection.DEFAULT_USER;
/**
* 密碼
*/
private static final String passWord = ActiveMQConnection.DEFAULT_PASSWORD;
/**
* url
*/
private static final String brokerUrl = ActiveMQConnection.DEFAULT_BROKER_URL;
public void receive() {
try {
ActiveMQConnectionFactory connectionFactory = new ActiveMQConnectionFactory(userName, passWord, brokerUrl);
final Connection connection = connectionFactory.createConnection();
//設置客戶端id
// connection.setClientID("client-1");
connection.start();
Session session = connection.createSession(Boolean.FALSE, Session.AUTO_ACKNOWLEDGE);
//建立隊列
final Topic topic = session.createTopic("topic-test");
final MessageConsumer messageConsumer = session.createConsumer(topic);//普通訂閱
// MessageConsumer consumer = session.createDurableSubscriber(topic,"bb"); //持久訂閱
messageConsumer.setMessageListener(n -> {
try {
TextMessage msg = (TextMessage) n;
final String text = msg.getText();
if (text.equalsIgnoreCase("hello world")) {
System.out.println(" 接受信息: " + msg.getText());
} else {
System.out.println(" 測試重發次數 ");
int i = 1 / 0;
}
} catch (JMSException e) {
// e.printStackTrace();
}
});
} catch (JMSException e) {
// e.printStackTrace();
}
}
public static void main(String[] args) {
TopicConsumer consumer = new TopicConsumer();
consumer.receive();
}
}
複製代碼