一、事務偏生產者/簽收偏消費者java
二、生產者提交事務只有兩個狀態true/falsespring
false:只須要執行send方法,消息就會進入隊列中apache
關閉事務,那第二個參數簽收的設置須要有效springboot
true:先執行send方法,在執行commit,消息才被真正的提交到隊列中session
消息須要批量發送,須要緩衝區處理tcp
未開啓事務生產者url
import org.apache.activemq.ActiveMQConnectionFactory; import javax.jms.*; /** * @ProjectName: springbootActiveMQ * @Package: cn.**.test * @Author: huat * @Date: 2020/1/2 17:04 * @Version: 1.0 */ public class ActiveMQTest { //url路徑 private static final String ACTRIVE_URL="tcp://192.168.44.135:61616"; //隊列名稱 private static final String QUEUE_NAME="queue01"; public static void main(String[] args) { //一、建立鏈接工廠 //若是帳號密碼沒有修改的話,帳號密碼默認均爲admin ActiveMQConnectionFactory activeMQConnectionFactory=new ActiveMQConnectionFactory(ACTRIVE_URL); //若是帳號密碼修改的話 //第一個參數爲帳號,第二個爲密碼,第三個爲請求的url //ActiveMQConnectionFactory activeMQConnectionFactory1=new ActiveMQConnectionFactory("admin","admin",ACTRIVE_URL); try { //二、經過鏈接工廠獲取鏈接 Connection connection = activeMQConnectionFactory.createConnection(); connection.start(); //三、建立session會話 //裏面會有兩個參數,第一個爲事務,第二個是簽收 Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); //四、建立目的地(具體是隊列仍是主題),這裏是建立隊列 Queue queue=session.createQueue(QUEUE_NAME); //五、建立消息生產者,隊列模式 MessageProducer messageProducer = session.createProducer(queue); //六、經過messageProducer生產三條消息發送到MQ消息隊列中 for (int i=0;i<3;i++){ //七、建立消息 TextMessage textMessage = session.createTextMessage("msg----->" + i);//建立一個文本消息 //消息屬性 textMessage.setStringProperty("c01","vip"); //八、經過messageProducer發送給mq messageProducer.send(textMessage); //九、數據非持久化 messageProducer.setDeliveryMode(DeliveryMode.PERSISTENT); } messageProducer.close(); session.close(); connection.close(); System.out.println("消息發送成功"); } catch (JMSException e) { e.printStackTrace(); } } }
已開啓事務生產者spa
import org.apache.activemq.ActiveMQConnectionFactory; import javax.jms.*; /** * @ProjectName: springbootActiveMQ * @Package: cn.**.test * @Author: huat * @Date: 2020/1/2 17:04 * @Version: 1.0 */ public class ActiveMQTest { //url路徑 private static final String ACTRIVE_URL="tcp://192.168.44.135:61616"; //隊列名稱 private static final String QUEUE_NAME="queue01"; public static void main(String[] args) { //一、建立鏈接工廠 //若是帳號密碼沒有修改的話,帳號密碼默認均爲admin ActiveMQConnectionFactory activeMQConnectionFactory=new ActiveMQConnectionFactory(ACTRIVE_URL); //若是帳號密碼修改的話 //第一個參數爲帳號,第二個爲密碼,第三個爲請求的url //ActiveMQConnectionFactory activeMQConnectionFactory1=new ActiveMQConnectionFactory("admin","admin",ACTRIVE_URL); try { //二、經過鏈接工廠獲取鏈接 Connection connection = activeMQConnectionFactory.createConnection(); connection.start(); //三、建立session會話 //裏面會有兩個參數,第一個爲事務,第二個是簽收 Session session = connection.createSession(true, Session.AUTO_ACKNOWLEDGE); //四、建立目的地(具體是隊列仍是主題),這裏是建立隊列 Queue queue=session.createQueue(QUEUE_NAME); //五、建立消息生產者,隊列模式 MessageProducer messageProducer = session.createProducer(queue); //六、經過messageProducer生產三條消息發送到MQ消息隊列中 for (int i=0;i<3;i++){ //七、建立消息 TextMessage textMessage = session.createTextMessage("msg----->" + i);//建立一個文本消息 //消息屬性 textMessage.setStringProperty("c01","vip"); //八、經過messageProducer發送給mq messageProducer.send(textMessage); //九、數據非持久化 messageProducer.setDeliveryMode(DeliveryMode.PERSISTENT); } messageProducer.close(); session.commit(); session.close(); connection.close(); System.out.println("消息發送成功"); } catch (JMSException e) { e.printStackTrace(); } } }
1、非事務簽收code
一、自動簽收(默認)隊列
二、手動簽收
Session.CLIENT_ACKNOWLEDGE,須要客戶端調用acknowledge(message.acknowledge();)方法進行手動簽收
三、容許重複消息
Session.DUPS_OK_ACKNOWLEDGE
四、事務級
Session.SESSION_TRANSACTED
2、簽收
一、非事務手動簽收
須要客戶端調用acknowledge(message.acknowledge();)方法進行手動簽收,若是不簽收,消息重複消費
二、開啓事務,設置手動簽收
有commit語句的狀況下,不須要調用message.acknowledge();方法進行簽收,開啓事務會認爲你自動簽收
沒有commit語句的狀況下,即便調用message.acknowledge();方法進行簽收,也會出現重複消費
一、事務大於簽收,因此必定要在開啓事務的狀況下進行簽收,在事務性會話當中,當一個事務被成功提交則消息被自動簽收。若是事務回滾,則消息會被再次傳送。
二、非事務性會話當中,消息什麼時候被簽收取決於建立會話時的應答模式(message.acknowledge();)
一、事物
二、簽收
三、消息持久性
四、集羣