死磕java底層(二)—消息服務

這一節做爲上一節多線程的延續,先說一下java原生的阻塞隊列(Blocking Queue),以後再說一下JMS(Java Messaging Service,java消息服務)以及它的實現之一ActiveMQ消息隊列,因此都歸併到消息服務中討論。java

1.阻塞隊列(Blocking Queue)

BlockingQueue也是java.util.concurrent下的接口,它解決了多線程中如何高效傳輸數據的問題,經過這些高效而且線程安全的類,咱們能夠搭建高質量的多線程程序。 主要用來控制線程同步的工具。 BlockingQueue是一個接口,裏面的方法以下:數據庫

public interface BlockingQueue<E> extends Queue<E> {
    boolean add(E e);
    boolean offer(E e);
    void put(E e) throws InterruptedException;
    boolean offer(E e, long timeout, TimeUnit unit);
    E take() throws InterruptedException;
    E poll(long timeout, TimeUnit unit)
    int remainingCapacity();
    boolean remove(Object o);
    public boolean contains(Object o);
    int drainTo(Collection<? super E> c);
    int drainTo(Collection<? super E> c, int maxElements);
}
複製代碼
  • 插入:
  1. add(anObject):把anObject加到BlockingQueue裏,即若是BlockingQueue能夠容納,則返回true,不然拋出異常,很差
  2. offer(anObject):表示若是可能的話,將anObject加到BlockingQueue裏,即若是BlockingQueue能夠容納,則返回true,不然返回false.
  3. put(anObject):把anObject加到BlockingQueue裏,若是BlockQueue沒有空間,則調用此方法的線程被阻斷直到BlockingQueue裏面有空間再繼續, 有阻塞, 放不進去就等待
  • 讀取:
  1. poll(time):取走BlockingQueue裏排在首位的對象,若不能當即取出,則能夠等time參數規定的時間,取不到時返回null; 取不到返回null
  2. take():取走BlockingQueue裏排在首位的對象,若BlockingQueue爲空,阻斷進入等待狀態直到Blocking有新的對象被加入爲止; 阻塞, 取不到就一直等
  • 其餘
  1. int remainingCapacity();返回隊列剩餘的容量,在隊列插入和獲取的時候使用,數據可能不許。
  2. boolean remove(Object o); 從隊列移除元素,若是存在,即移除一個或者更多,隊列改變了返回true
  3. public boolean contains(Object o); 查看隊列是否存在這個元素,存在返回true
  4. int drainTo(Collection<? super E> c); 移除此隊列中全部可用的元素,並將它們添加到給定 collection 中。(即取出放到集合中)
  5. int drainTo(Collection<? super E> c, int maxElements); 和上面方法的區別在於,指定了移動的數量; (取出指定個數放到集合) 主要的方法是:put、take一對阻塞存取;add、poll一對非阻塞存取。 上面說了BlockingQueue是一個接口,它有四個具體的實現類,經常使用的有兩個:
  6. ArrayBlockingQueue:一個由數組支持的有界阻塞隊列,規定大小的BlockingQueue,其構造函數必須帶一個int參數來指明其大小.其所含的對象是以FIFO(先入先出)順序排序的。
  7. LinkedBlockingQueue:大小不定的BlockingQueue,其構造函數中能夠指定容量,也能夠不指定,不指定的話,默認最大是Integer.MAX_VALUE,其中主要用到put和take方法,put方法在隊列滿的時候會阻塞直到有隊列成員被消費,take方法在隊列空的時候會阻塞,直到有隊列成員被放進來。 LinkedBlockingQueue和ArrayBlockingQueue區別: LinkedBlockingQueue和ArrayBlockingQueue比較起來,它們背後所用的數據結構不同,致使LinkedBlockingQueue的數據吞吐量要大於ArrayBlockingQueue,但在線程數量很大時其性能的可預見性低於ArrayBlockingQueue. 下面是是用BlockingQueue實現的生產者和消費者的示例: 生產者Product:
public class Product implements Runnable{

    BlockingQueue<String> queue;
    public Product(BlockingQueue<String> queue) {
        //建立對象時就傳入一個阻塞隊列
        this.queue = queue;
    }
    @Override
    public void run(){
        try {
            System.out.println(Thread.currentThread().getName()+"開始生產");
            String temp =  Thread.currentThread().getName()+":生產線程";
            queue.put(temp);//向隊列中放數據,若是隊列是滿的話,會阻塞當前線程
        } catch (InterruptedException e) {
            e.printStackTrace();
        }

    }
}
複製代碼

消費者Consumer:apache

public class Consumer implements Runnable{
    BlockingQueue<String> queue;
    public Consumer(BlockingQueue<String> queue) {
        //使用有參構造函數的目的是我在建立這個消費者對象的時候就能夠傳進來一個隊列
        this.queue = queue;
    }
    @Override
    public void run() {
        Random random = new Random();
        try {
            while(true){
                Thread.sleep(random.nextInt(10));
                System.out.println(Thread.currentThread().getName()+ "準備消費...");
                String temp = queue.take();//從隊列中取任務消費,若是隊列爲空,會阻塞當前線程
                System.out.println(Thread.currentThread().getName() + " 獲取到工做任務==== " +temp);
            }
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    }
}
複製代碼

測試類TestQueue:數組

public class TestQueue {
    public static void main(String[] args) {
        //新建一個阻塞隊列,隊列長度是5
        BlockingQueue<String> queue = new LinkedBlockingDeque<String>(5);
        //BlockingQueue<String> queue = new ArrayBlockingQueue<String>(5);
        Consumer consumer = new Consumer(queue);
        Product product = new Product(queue);

        for(int i = 0;i<3;i++){
            new Thread(product,"product"+i).start();
        }

        //for (int i = 0;i<5;i++){
            new Thread(consumer,"consumer").start();
        //}
    }
}
複製代碼

整套代碼的意思就是初始化一個消息隊列,裏面放String類型,隊列長度是5,使用生產者線程來模擬三個用戶發出請求,把用戶的請求數據暫時放在BlockingQueue隊列裏面,隨後消費者線程不斷的從隊列裏面取任務進行業務邏輯處理,直到隊列裏面消費的什麼都不剩了。由此能夠看出消息隊列有兩大特色:解耦和削峯填谷。生產者和消費者毛關係沒有,生產者往隊列裏放數據,消費者從隊列裏取數據,它們都跟隊列創建關係,解耦;生產者若是併發量很高也只不過是把數據先放到隊列裏,消費者能夠慢慢吃,實際中不會立馬拖垮服務端。 參考地址:http://blog.csdn.net/ghsau/article/details/8108292安全

2.Java消息服務

2.1JMS簡介

JMS即Java消息服務(Java Message Service)用於在兩個應用程序之間,或分佈式系統中發送消息,進行異步通訊。JMS是一種與廠商(或者說是平臺)無關的 API。相似於JDBC(Java Database Connectivity):這裏,JDBC 是能夠用來訪問許多不一樣關係數據庫的 API,而 JMS 則提供一樣與廠商無關的訪問方法,以訪問消息收發服務。 許多廠商都支持 JMS,包括 IBM 的 MQSeries、BEA的 Weblogic JMS service和 Progress 的 SonicMQ等等。 JMS 可讓你經過消息收發服務從一個 JMS 客戶機向另外一個 JMS客戶機發送消息。 消息是 JMS 中的一種類型對象,由兩部分組成:報頭和消息主體。報頭由路由信息以及有關該消息的元數據組成;消息主體則攜帶着應用程序的數據或有效負載。根據有效負載的類型來劃分,能夠將消息分爲幾種類型,它們分別攜帶:簡單文本(TextMessage)、可序列化的對象 (ObjectMessage)、屬性集合 (MapMessage)、字節流 (BytesMessage)、原始值流 (StreamMessage),還有無有效負載的消息 (Message)。bash

2.2JMS的組成

JMS由如下元素組成: JMS提供者provider:面向消息中間件的,JMS規範的一個實現。提供者能夠是Java平臺的JMS實現,也能夠是非Java平臺的面向消息中間件的適配器。 JMS客戶:生產或消費基於消息的Java應用程序或對象(即生產者和消費者都統稱JMS客戶)。 JMS生產者:建立併發送消息的JMS客戶。 JMS消費者:接收消息的JMS客戶。 JMS消息:能夠在JMS客戶之間傳遞數據的對象 JMS隊列:一個容納被髮送的正在等待閱讀的消息的區域。一個消息若是被閱讀,它將被從隊列中移走。 JMS主題:一種支持發送消息給多個訂閱者的機制。微信

2.3Java消息服務模型

  • 點對點模型 在點對點隊列模型下,一個生產者向一個特定的隊列發佈消息,一個消費者從該隊列中讀取消息。這裏,生產者知道消費者的隊列,並直接將消息發送到消費者的隊列。 這種模式有以下特色: 只有一個消費者將得到消息; 生產者不須要消費者在消費該消息期間處於運行狀態,消費者也一樣不須要在生產者在發送消息時處於運行狀態; 每個成功處理的消息都由消費者者簽收。
  • 發佈者/訂閱者模型 發佈者/訂閱者模型支持向一個特定的消息主題發佈消息。在這種模型下,發佈者和訂閱者彼此不知道對方,相似於匿名公告板。 這種模式有以下特色: 多個消費者能夠得到消息; 在發佈者和訂閱者之間存在時間依賴性。發佈者須要創建一個訂閱(subscription),以便消費者可以訂閱。訂閱者必須保持持續的活動狀態以接收消息,除非訂閱者創建了持久的訂閱。

2.4消息隊列(ActiveMQ)

ActiveMQ是JMS規範的一種實現,下面說如何使用session

  • 下載ActiveMQ 去官方網站下載:http://activemq.apache.org/
  • 運行ActiveMQ 解壓apache-activemq-5.5.1-bin.zip(相似於Tomcat,解壓便可用),我在網上搜的有的人修改了配置文件activeMQ.xml中鏈接的地址和協議,我在測試時沒有修改也能夠測試成功,若是你測試不成功能夠修改以下:
<transportConnectors>
   <transportConnector name="openwire" uri="tcp://localhost:61616"/>
   <transportConnector name="ssl"     uri="ssl://localhost:61617"/>
   <transportConnector name="stomp"   uri="stomp://localhost:61613"/>
   <transportConnector uri="http://localhost:8081"/>
   <transportConnector uri="udp://localhost:61618"/>
</transportConnectors>
複製代碼

測試代碼以下: 生產者Product:數據結構

public class Product {

    private String username = ActiveMQConnectionFactory.DEFAULT_USER;
    private String password = ActiveMQConnectionFactory.DEFAULT_PASSWORD;
    private String url = ActiveMQConnectionFactory.DEFAULT_BROKER_URL;

    private Connection connection = null;
    private Session session = null;
    private String subject = "myQueue";
    private Destination destination = null;
    private MessageProducer producer = null;
    /**
     * @Description 初始化方法
     * @Author 劉俊重
     * @Date 2017/12/20
     */
    private void init() throws JMSException {
        ActiveMQConnectionFactory connectionFactory = new ActiveMQConnectionFactory(username,password,url);
        connection = connectionFactory.createConnection();
        session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
        destination = session.createQueue(subject);
        producer = session.createProducer(destination);
        producer.setDeliveryMode(DeliveryMode.NON_PERSISTENT);
    }

    public void productMessage(String message) throws JMSException {
        this.init();
        TextMessage textMessage = session.createTextMessage(message);
        connection.start();
        System.out.println("生產者準備發送消息:"+textMessage);
        producer.send(textMessage);
        System.out.println("生產者已發送完畢消息。。。");
    }

    public void close() throws JMSException {
        System.out.println("生產者開始關閉鏈接");
        if(null!=producer){
            producer.close();
        }
        if(null!=session){
            session.close();
        }
        if(null!=connection){
            connection.close();
        }
    }
}
複製代碼

消費者Consumer:多線程

public class Consumer implements MessageListener,ExceptionListener{
    private String name = ActiveMQConnectionFactory.DEFAULT_USER;
    private String password = ActiveMQConnectionFactory.DEFAULT_PASSWORD;
    private String url = ActiveMQConnectionFactory.DEFAULT_BROKER_URL;
    private ActiveMQConnectionFactory connectionFactory = null;
    private Connection connection = null;
    private Session session = null;
    private String subject = "myQueue";
    private Destination destination = null;
    private MessageConsumer consumer = null;

    public static Boolean isconnection=false;
    /**
     * @Description 鏈接ActiveMQ
     * @Author 劉俊重
     * @Date 2017/12/20
     */
    private void init() throws JMSException {
        connectionFactory = new ActiveMQConnectionFactory(name,password,url);
        connection = connectionFactory.createConnection();
        session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
        destination = session.createQueue(subject);
        consumer = session.createConsumer(destination);
    }

    public void consumerMessage() throws JMSException {
        this.init();
        connection.start();

        //設置消息監聽和異常監聽
        consumer.setMessageListener(this);
        connection.setExceptionListener(this);
        System.out.println("消費者開始監聽....");
        isconnection = true;
        //Message receive = consumer.receive();
    }

    public void close() throws JMSException {
        if(null!=consumer){
            consumer.close();
        }
        if(null!=session){
            session.close();
        }
        if(null!=connection){
            connection.close();
        }
    }
    /**
     * 異常處理函數
     */
    @Override
    public void onException(JMSException exception) {
        //發生異常關閉鏈接
        isconnection = false;
    }

    /**
     * 消息處理函數
     */
    @Override
    public void onMessage(Message message) {
        try {
            if(message instanceof TextMessage){
                TextMessage textMsg = (TextMessage) message;
                String text = textMsg.getText();
                System.out.println("消費者接收到的消息======="+text);
            }else {
                System.out.println("接收的消息不符合");
            }
        } catch (JMSException e) {
            e.printStackTrace();
        }

    }
}

複製代碼

注意:消費者須要實現MessageListener和ExceptionListener來監聽收到消息和出錯時的處理。 生產者測試類TestProduct:

public class TestProduct {
    public static void main(String[] args) throws JMSException {
        for(int i=0;i<100;i++){
            Product product = new Product();
            product.productMessage("Hello World!"+i);
            product.close();
        }
    }
}
複製代碼

TestProduct是用來模擬生成100條消息,寫入到ActiveMQ隊列中。 消費者測試類TestConsumer:

public class TestConsumer implements Runnable {
    static Thread thread = null;
    public static void main(String[] args) throws InterruptedException {
        thread = new Thread(new TestConsumer());
        thread.start();
        while (true){
            //時刻監聽消息隊列,若是線程死了,則新建一個線程
            boolean alive = thread.isAlive();
            System.out.println("當前線程狀態:"+alive);
            if(!alive){
                thread = new Thread(new TestConsumer());
                thread.start();
                System.out.println("線程重啓完成");
            }
            Thread.sleep(1000);
        }
    }
    @Override
    public void run() {
        try {
            Consumer consumer = new Consumer();
            consumer.consumerMessage();
            while (Consumer.isconnection) {
                //System.out.println(123);
            }
        } catch (JMSException e) {
            e.printStackTrace();
        }
    }
}
複製代碼

TestConsumer這裏用了多線程,保證時刻有個線程活着等着接收ActiveMQ的消息隊列並調用消費者處理。 總結:個人理解是線程間通訊使用queue,如BlockingQueue,進程間通訊使用JMS,如ActiveMQ。 另附上一篇將58架構師沈劍老師寫的消息隊列的文章,能夠做爲參考:http://dwz.cn/78yLxL 須要強調的是任何一項技術的引用都要爲解決業務問題服務,而不能是單純的炫技。舉個例子,就拿消息服務來講,好比用戶註冊某個網站,註冊完了以後我要調用郵件和短信服務給他發通知,我可能還要經過他填的信息,給他推薦一下可能認識的用戶,那麼這裏核心業務是註冊,其它的發通知和推薦用戶就能夠放在消息隊列裏處理,先響應註冊信息,隨後調用其它服務來處理髮通知和推薦用戶這兩個業務。可是網站前期可能用戶量比較少,不用消息隊列就能知足個人需求了,引用消息隊列反而會增長項目的複雜性,因此新技術的使用必定是須要解決業務的問題,而不是單純的炫技。 參考文檔: http://blog.csdn.net/fanzhigang0/article/details/43764121 http://blog.csdn.net/u010702229/article/details/18085263

附一下我的微信公衆號,歡迎跟我交流。

Java開發日記
相關文章
相關標籤/搜索