這一節做爲上一節多線程的延續,先說一下java原生的阻塞隊列(Blocking Queue),以後再說一下JMS(Java Messaging Service,java消息服務)以及它的實現之一ActiveMQ消息隊列,因此都歸併到消息服務中討論。java
BlockingQueue也是java.util.concurrent下的接口,它解決了多線程中如何高效傳輸數據的問題,經過這些高效而且線程安全的類,咱們能夠搭建高質量的多線程程序。 主要用來控制線程同步的工具。 BlockingQueue是一個接口,裏面的方法以下:數據庫
public interface BlockingQueue<E> extends Queue<E> {
boolean add(E e);
boolean offer(E e);
void put(E e) throws InterruptedException;
boolean offer(E e, long timeout, TimeUnit unit);
E take() throws InterruptedException;
E poll(long timeout, TimeUnit unit)
int remainingCapacity();
boolean remove(Object o);
public boolean contains(Object o);
int drainTo(Collection<? super E> c);
int drainTo(Collection<? super E> c, int maxElements);
}
複製代碼
public class Product implements Runnable{
BlockingQueue<String> queue;
public Product(BlockingQueue<String> queue) {
//建立對象時就傳入一個阻塞隊列
this.queue = queue;
}
@Override
public void run(){
try {
System.out.println(Thread.currentThread().getName()+"開始生產");
String temp = Thread.currentThread().getName()+":生產線程";
queue.put(temp);//向隊列中放數據,若是隊列是滿的話,會阻塞當前線程
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
複製代碼
消費者Consumer:apache
public class Consumer implements Runnable{
BlockingQueue<String> queue;
public Consumer(BlockingQueue<String> queue) {
//使用有參構造函數的目的是我在建立這個消費者對象的時候就能夠傳進來一個隊列
this.queue = queue;
}
@Override
public void run() {
Random random = new Random();
try {
while(true){
Thread.sleep(random.nextInt(10));
System.out.println(Thread.currentThread().getName()+ "準備消費...");
String temp = queue.take();//從隊列中取任務消費,若是隊列爲空,會阻塞當前線程
System.out.println(Thread.currentThread().getName() + " 獲取到工做任務==== " +temp);
}
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
複製代碼
測試類TestQueue:數組
public class TestQueue {
public static void main(String[] args) {
//新建一個阻塞隊列,隊列長度是5
BlockingQueue<String> queue = new LinkedBlockingDeque<String>(5);
//BlockingQueue<String> queue = new ArrayBlockingQueue<String>(5);
Consumer consumer = new Consumer(queue);
Product product = new Product(queue);
for(int i = 0;i<3;i++){
new Thread(product,"product"+i).start();
}
//for (int i = 0;i<5;i++){
new Thread(consumer,"consumer").start();
//}
}
}
複製代碼
整套代碼的意思就是初始化一個消息隊列,裏面放String類型,隊列長度是5,使用生產者線程來模擬三個用戶發出請求,把用戶的請求數據暫時放在BlockingQueue隊列裏面,隨後消費者線程不斷的從隊列裏面取任務進行業務邏輯處理,直到隊列裏面消費的什麼都不剩了。由此能夠看出消息隊列有兩大特色:解耦和削峯填谷。生產者和消費者毛關係沒有,生產者往隊列裏放數據,消費者從隊列裏取數據,它們都跟隊列創建關係,解耦;生產者若是併發量很高也只不過是把數據先放到隊列裏,消費者能夠慢慢吃,實際中不會立馬拖垮服務端。 參考地址:http://blog.csdn.net/ghsau/article/details/8108292安全
JMS即Java消息服務(Java Message Service)用於在兩個應用程序之間,或分佈式系統中發送消息,進行異步通訊。JMS是一種與廠商(或者說是平臺)無關的 API。相似於JDBC(Java Database Connectivity):這裏,JDBC 是能夠用來訪問許多不一樣關係數據庫的 API,而 JMS 則提供一樣與廠商無關的訪問方法,以訪問消息收發服務。 許多廠商都支持 JMS,包括 IBM 的 MQSeries、BEA的 Weblogic JMS service和 Progress 的 SonicMQ等等。 JMS 可讓你經過消息收發服務從一個 JMS 客戶機向另外一個 JMS客戶機發送消息。 消息是 JMS 中的一種類型對象,由兩部分組成:報頭和消息主體。報頭由路由信息以及有關該消息的元數據組成;消息主體則攜帶着應用程序的數據或有效負載。根據有效負載的類型來劃分,能夠將消息分爲幾種類型,它們分別攜帶:簡單文本(TextMessage)、可序列化的對象 (ObjectMessage)、屬性集合 (MapMessage)、字節流 (BytesMessage)、原始值流 (StreamMessage),還有無有效負載的消息 (Message)。bash
JMS由如下元素組成: JMS提供者provider:面向消息中間件的,JMS規範的一個實現。提供者能夠是Java平臺的JMS實現,也能夠是非Java平臺的面向消息中間件的適配器。 JMS客戶:生產或消費基於消息的Java應用程序或對象(即生產者和消費者都統稱JMS客戶)。 JMS生產者:建立併發送消息的JMS客戶。 JMS消費者:接收消息的JMS客戶。 JMS消息:能夠在JMS客戶之間傳遞數據的對象 JMS隊列:一個容納被髮送的正在等待閱讀的消息的區域。一個消息若是被閱讀,它將被從隊列中移走。 JMS主題:一種支持發送消息給多個訂閱者的機制。微信
ActiveMQ是JMS規範的一種實現,下面說如何使用session
<transportConnectors>
<transportConnector name="openwire" uri="tcp://localhost:61616"/>
<transportConnector name="ssl" uri="ssl://localhost:61617"/>
<transportConnector name="stomp" uri="stomp://localhost:61613"/>
<transportConnector uri="http://localhost:8081"/>
<transportConnector uri="udp://localhost:61618"/>
</transportConnectors>
複製代碼
測試代碼以下: 生產者Product:數據結構
public class Product {
private String username = ActiveMQConnectionFactory.DEFAULT_USER;
private String password = ActiveMQConnectionFactory.DEFAULT_PASSWORD;
private String url = ActiveMQConnectionFactory.DEFAULT_BROKER_URL;
private Connection connection = null;
private Session session = null;
private String subject = "myQueue";
private Destination destination = null;
private MessageProducer producer = null;
/**
* @Description 初始化方法
* @Author 劉俊重
* @Date 2017/12/20
*/
private void init() throws JMSException {
ActiveMQConnectionFactory connectionFactory = new ActiveMQConnectionFactory(username,password,url);
connection = connectionFactory.createConnection();
session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
destination = session.createQueue(subject);
producer = session.createProducer(destination);
producer.setDeliveryMode(DeliveryMode.NON_PERSISTENT);
}
public void productMessage(String message) throws JMSException {
this.init();
TextMessage textMessage = session.createTextMessage(message);
connection.start();
System.out.println("生產者準備發送消息:"+textMessage);
producer.send(textMessage);
System.out.println("生產者已發送完畢消息。。。");
}
public void close() throws JMSException {
System.out.println("生產者開始關閉鏈接");
if(null!=producer){
producer.close();
}
if(null!=session){
session.close();
}
if(null!=connection){
connection.close();
}
}
}
複製代碼
消費者Consumer:多線程
public class Consumer implements MessageListener,ExceptionListener{
private String name = ActiveMQConnectionFactory.DEFAULT_USER;
private String password = ActiveMQConnectionFactory.DEFAULT_PASSWORD;
private String url = ActiveMQConnectionFactory.DEFAULT_BROKER_URL;
private ActiveMQConnectionFactory connectionFactory = null;
private Connection connection = null;
private Session session = null;
private String subject = "myQueue";
private Destination destination = null;
private MessageConsumer consumer = null;
public static Boolean isconnection=false;
/**
* @Description 鏈接ActiveMQ
* @Author 劉俊重
* @Date 2017/12/20
*/
private void init() throws JMSException {
connectionFactory = new ActiveMQConnectionFactory(name,password,url);
connection = connectionFactory.createConnection();
session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
destination = session.createQueue(subject);
consumer = session.createConsumer(destination);
}
public void consumerMessage() throws JMSException {
this.init();
connection.start();
//設置消息監聽和異常監聽
consumer.setMessageListener(this);
connection.setExceptionListener(this);
System.out.println("消費者開始監聽....");
isconnection = true;
//Message receive = consumer.receive();
}
public void close() throws JMSException {
if(null!=consumer){
consumer.close();
}
if(null!=session){
session.close();
}
if(null!=connection){
connection.close();
}
}
/**
* 異常處理函數
*/
@Override
public void onException(JMSException exception) {
//發生異常關閉鏈接
isconnection = false;
}
/**
* 消息處理函數
*/
@Override
public void onMessage(Message message) {
try {
if(message instanceof TextMessage){
TextMessage textMsg = (TextMessage) message;
String text = textMsg.getText();
System.out.println("消費者接收到的消息======="+text);
}else {
System.out.println("接收的消息不符合");
}
} catch (JMSException e) {
e.printStackTrace();
}
}
}
複製代碼
注意:消費者須要實現MessageListener和ExceptionListener來監聽收到消息和出錯時的處理。 生產者測試類TestProduct:
public class TestProduct {
public static void main(String[] args) throws JMSException {
for(int i=0;i<100;i++){
Product product = new Product();
product.productMessage("Hello World!"+i);
product.close();
}
}
}
複製代碼
TestProduct是用來模擬生成100條消息,寫入到ActiveMQ隊列中。 消費者測試類TestConsumer:
public class TestConsumer implements Runnable {
static Thread thread = null;
public static void main(String[] args) throws InterruptedException {
thread = new Thread(new TestConsumer());
thread.start();
while (true){
//時刻監聽消息隊列,若是線程死了,則新建一個線程
boolean alive = thread.isAlive();
System.out.println("當前線程狀態:"+alive);
if(!alive){
thread = new Thread(new TestConsumer());
thread.start();
System.out.println("線程重啓完成");
}
Thread.sleep(1000);
}
}
@Override
public void run() {
try {
Consumer consumer = new Consumer();
consumer.consumerMessage();
while (Consumer.isconnection) {
//System.out.println(123);
}
} catch (JMSException e) {
e.printStackTrace();
}
}
}
複製代碼
TestConsumer這裏用了多線程,保證時刻有個線程活着等着接收ActiveMQ的消息隊列並調用消費者處理。 總結:個人理解是線程間通訊使用queue,如BlockingQueue,進程間通訊使用JMS,如ActiveMQ。 另附上一篇將58架構師沈劍老師寫的消息隊列的文章,能夠做爲參考:http://dwz.cn/78yLxL 須要強調的是任何一項技術的引用都要爲解決業務問題服務,而不能是單純的炫技。舉個例子,就拿消息服務來講,好比用戶註冊某個網站,註冊完了以後我要調用郵件和短信服務給他發通知,我可能還要經過他填的信息,給他推薦一下可能認識的用戶,那麼這裏核心業務是註冊,其它的發通知和推薦用戶就能夠放在消息隊列裏處理,先響應註冊信息,隨後調用其它服務來處理髮通知和推薦用戶這兩個業務。可是網站前期可能用戶量比較少,不用消息隊列就能知足個人需求了,引用消息隊列反而會增長項目的複雜性,因此新技術的使用必定是須要解決業務的問題,而不是單純的炫技。 參考文檔: http://blog.csdn.net/fanzhigang0/article/details/43764121 http://blog.csdn.net/u010702229/article/details/18085263
附一下我的微信公衆號,歡迎跟我交流。