以前公司作即時通信用到openfire,在後面的文章可能會介紹,這裏介紹一下有接觸到的消息隊列,在消息隊列的服務器有幾個主流的:RabbitMq,ActiveMq,ZeroMq。至於在選擇上,就不作介紹了。在網絡上面也比較多的文章點擊打開連接 java
介紹一下有使用到RabbitMq,因爲使用的時間很少,只是作到一些基本的使用。介紹一下RabbitMQ,用erlang語言開發,這個語言沒接觸過,上網搜了一下好像說在大併發方面有很好的效果。因此用這個語言開發的服務器應該在併發方面有比較好的體現。 服務器
接着分點介紹一下:1.概念介紹(複製): 網絡
1. 接收消息,轉發消息到綁定的隊列。四種類型:direct, topic, headers and fanout 併發
direct:轉發消息到routigKey指定的隊列 負載均衡
topic:按規則轉發消息(最靈活) 學習
headers:(這個尚未接觸到) fetch
fanout:轉發消息到全部綁定隊列 編碼
2. 若是沒有隊列綁定在交換機上,則發送到該交換機上的消息會丟失。 spa
3. 一個交換機能夠綁定多個隊列,一個隊列能夠被多個交換機綁定。 .net
4. topic類型交換器經過模式匹配分析消息的routing-key屬性。它將routing-key和binding-key的字符串切分紅單詞。這些單詞之間用點隔開。它一樣也會識別兩個通配符:#匹配0個或者多個單詞,*匹配一個單詞。例如,binding key:*.stock.#匹配routing key:usd.stcok和eur.stock.db,可是不匹配stock.nana。
還有一些其餘的交換器類型,如header、failover、system等,如今在當前的RabbitMQ版本中均未實現。
5. 由於交換器是命名實體,聲明一個已經存在的交換器,可是試圖賦予不一樣類型是會致使錯誤。客戶端須要刪除這個已經存在的交換器,而後從新聲明而且賦予新的類型。
6. 交換器的屬性:
- 持久性:若是啓用,交換器將會在server重啓前都有效。
- 自動刪除:若是啓用,那麼交換器將會在其綁定的隊列都被刪除掉以後自動刪除掉自身。
- 惰性:若是沒有聲明交換器,那麼在執行到使用的時候會致使異常,並不會主動聲明。
1. 隊列是RabbitMQ內部對象,存儲消息。相同屬性的queue能夠重複定義。
2. 臨時隊列。channel.queueDeclare(),有時不須要指定隊列的名字,並但願斷開鏈接時刪除隊列。
3. 隊列的屬性:
- 持久性:若是啓用,隊列將會在server重啓前都有效。
- 自動刪除:若是啓用,那麼隊列將會在全部的消費者中止使用以後自動刪除掉自身。
- 惰性:若是沒有聲明隊列,那麼在執行到使用的時候會致使異常,並不會主動聲明。
- 排他性:若是啓用,隊列只能被聲明它的消費者使用。
這些性質能夠用來建立例如排他和自刪除的transient或者私有隊列。這種隊列將會在全部連接到它的客戶端斷開鏈接以後被自動刪除掉。它們只是短暫地鏈接到server,可是能夠用於實現例如RPC或者在AMQ上的對等通訊。4. RPC的使用是這樣的:RPC客戶端聲明一個回覆隊列,惟一命名(例如用UUID),而且是自刪除和排他的。而後它發送請求給一些交換器,在消息的reply-to字段中包含了以前聲明的回覆隊列的名字。RPC服務器將會回答這些請求,使用消息的reply-to做爲routing key(默認綁定器會綁定全部的隊列到默認交換器,名稱爲「amp.交換器類型名」)發送到默認交換器。注意這僅僅是慣例而已,能夠根據和RPC服務器的約定,它能夠解釋消息的任何屬性(甚至數據體)來決定回覆給誰。
1. 消息在隊列中保存,以輪詢的方式將消息發送給監聽消息隊列的消費者,能夠動態的增長消費者以提升消息的處理能力。
2. 爲了實現負載均衡,能夠在消費者端通知RabbitMQ,一個消息處理完以後纔會接受下一個消息。
channel.basic_qos(prefetch_count=1)
注意:要防止若是全部的消費者都在處理中,則隊列中的消息會累積的狀況。
3. 消息有14個屬性,最經常使用的幾種:
deliveryMode:持久化屬性
contentType:編碼
replyTo:指定一個回調隊列
correlationId:消息id
實例代碼:
4. 消息生產者能夠選擇是否在消息被髮送到交換器而且還未投遞到隊列(沒有綁定器存在)和/或沒有消費者可以當即處理的時候獲得通知。經過設置消息的mandatory和/或immediate屬性爲真,這些投遞保障機制的能力獲得了強化。
5. 此外,一個生產者能夠設置消息的persistent屬性爲真。這樣一來,server將會嘗試將這些消息存儲在一個穩定的位置,直到server崩潰。固然,這些消息確定不會被投遞到非持久的隊列中。
1. 消息ACK,通知RabbitMQ消息已被處理,能夠從內存刪除。若是消費者因宕機或連接失敗等緣由沒有發送ACK(不一樣於ActiveMQ,在RabbitMQ裏,消息沒有過時的概念),則RabbitMQ會將消息從新發送給其餘監聽在隊列的下一個消費者。
channel.basicConsume(queuename, noAck=false, consumer);
2. 消息和隊列的持久化。定義隊列時能夠指定隊列的持久化屬性(問:持久化隊列如何刪除?)
channel.queueDeclare(queuename, durable=true, false, false, null);
發送消息時能夠指定消息持久化屬性:
channel.basicPublish(exchangeName, routingKey,
MessageProperties.PERSISTENT_TEXT_PLAIN,
message.getBytes());
這樣,即便RabbitMQ服務器重啓,也不會丟失隊列和消息。
3. publisher confirms
4. master/slave機制,配合Mirrored Queue,這種狀況下,publisher會正常發送消息和接收消息的confirm,但對於subscriber來講,須要接收Consumer Cancellation Notifications來獲得主節點失敗的通知,而後re-consume from the queue,此時要求client有處理重複消息的能力。注意:若是queue在一個新加入的節點上增長了一個slave,此時slave上沒有此前queue的信息(目前尚未同步機制)。
(經過命令行或管理插件能夠查看哪一個slave是同步的:
rabbitmqctl list_queues name slave_pids synchronised_slave_pids)
當一個slave從新加入mirrored-queue時,若是queue是durable的,則會被清空。
2.最簡單的使用java代碼:
生產者:經過定義隊列的名稱,以及一些屬性(是否持久化等),發送消息便可。