秒殺場景html
AMPQ消息路由必要三部分:交換器、隊列、綁定。java
Java核心組件:ConnectionFactory、Connection、Channel、Delivery、DeliverCallback、CancelCallbacklinux
ConnectionFactory factory = new ConnectionFactory(); factory.setHost("127.0.0.1"); factory.setPort(5672); factory.setUsername("admin"); factory.setPassword("admin"); Connection connection = factory.newConnection(); Channel channel = connection.createChannel();
若是在同一條信道上訂閱了另外一個隊列,那就不能再聲明隊列,必須先取消訂閱。c++
Queue.DeclareOk queueDeclare(String queue, boolean durable, boolean exclusive, boolean autoDelete, Map<String, Object> arguments) throws IOException;
queue:須要指定隊列名稱,若是不指定,MQ會隨機分配一個並在queue.declare命令中返回,git
durable:隊列將在服務器重啓後存在。github
exclusive:爲true時,隊列變成私有的。服務器
autoDelete: 爲true時,當最後一個消費者取消訂閱時,隊列自動移除。架構
Java代碼Channel:併發
String basicConsume(String queue, boolean autoAck, DeliverCallback deliverCallback, CancelCallback cancelCallback) throws IOException;
Java代碼Channel:tcp
void basicAck(long deliveryTag, boolean multiple) throws IOException;
上面的手動確認,第二個參數爲true,批量確認;若是爲false,會一次確認一條。當有耗時任務時,能夠利用手動確認延遲確認消息,防止消息大量涌入應用致使過載。
也能夠在訂閱隊列時就將basicConsume方法的autoAck參數設置爲true,開啓自動確認。確認成功後rabbitmq會從隊列中刪除消息。
Java代碼Channel:
void basicReject(long deliveryTag, boolean requeue) throws IOException;
第二個參數requeue設置爲true,消息會從新排隊併發送給下一個消費者;爲false則會丟棄該條消息。能夠利用此性質丟棄錯誤格式的消息。
void basicPublish(String exchange, String routingKey, BasicProperties props, byte[] body) throws IOException;
exchange:第一個參數是交換的名稱。空字符串表示默認或無名交換,消息經過routingKey路由到指定隊列。
Java中Channel申明交換器:
Exchange.DeclareOk exchangeDeclare(String exchange, BuiltinExchangeType type) throws IOException;
BuiltinExchangeType對應有四種枚舉類型:
DIRECT("direct"), FANOUT("fanout"), TOPIC("topic"), HEADERS("headers");
channel.queueBind(String queue, String exchange, String routingKey)
RabbitMQ中消息傳遞模型的核心思想是生產者永遠不會將任何消息直接發送到隊列。實際上,生產者甚至不知道消息是否會被傳遞到哪一個隊列。
不指定隊列名時,經過服務器隨機生成隊列名稱:
String queueName = channel.queueDeclare().getQueue();
不傳參數時,queueDeclare生成一個非持久的,獨佔的自動刪除隊列
在linux服務器上能夠經過命令查看全部交換器:
rabbitmqctl list_exchanges
廣播方式會將消息投遞給全部附加在此交換器的隊列。
direct類型在綁定時設定一個routing_key,消息的routing_key匹配時, 纔會被交換器投遞到綁定的隊列中去.
按規則投遞,經過通配符#和*組合
*(星號)能夠替代一個單詞。
#(hash)能夠替換零個或多個單詞。
將隊列和交換器的durable屬性設置爲true
申明隊列時:
channel.queueDeclare(String queue, boolean durable, boolean exclusive, boolean autoDelete, Map<String, Object> arguments) throws IOException;
申明交換器時:
channel.exchangeDeclare(String exchange, BuiltinExchangeType type, boolean durable) throws IOException;
投遞消息時將投遞模式(delivery mode)設置爲2,Java代碼中MessageProperties.PERSISTENT_TEXT_PLAIN來設置:
channel.basicPublish(EXCHANGE_NAME, routingKey, MessageProperties.PERSISTENT_TEXT_PLAIN, message.getBytes("UTF-8"));
在MessageProperties源碼中以下所示:
/** Content-type "text/plain", deliveryMode 2 (persistent), priority zero */ public static final BasicProperties PERSISTENT_TEXT_PLAIN = new BasicProperties("text/plain", null, null, 2, 0, null, null, null, null, null, null, null, null, null);
使用事務會使rabbitmq的性能大大下降,爲了不這個問題,rabbitmq支持:發送方確認模式。經過這個模式來保證消息的投遞。當生產者P投遞消息後會等待消費者C發送確認,P收到確認後能夠調用回調函數處理相關業務。在生產者P等待確認的同時也能夠繼續發送下一條消息。
rabbitmq支持建立虛擬主機,默認的虛擬主機爲「/」,默認用戶guest;當在rabbitmq集羣中建立虛擬主機時,整個集羣都會建立。
rabbitmq的日誌文件在/var/log/rabbitmq/下的rabbit@[localhost].log
配置文件在rpm安裝/usr/share/doc/rabbitmq-server-3.6.5/rabbitmq.config.example,複製一份:
cp /usr/share/doc/rabbitmq-server-3.5.3/rabbitmq.config.example /etc/rabbitmq.config
參考官網配置:https://www.rabbitmq.com/configure.html#configuration-file
應用程序和rabbitmq服務器之間創建一條tcp鏈接,tcp鏈接打開後,應用程序就能夠和rabbitmq建立多條AMQP信道,信道是創建在tcp鏈接上的虛擬鏈接。
優勢
單機吞吐量:萬級
topic數量都吞吐量的影響:
時效性:ms級
可用性:高,基於主從架構實現高可用性
消息可靠性:有較低的機率丟失數據
功能支持:MQ領域的功能極其完備
缺點:
官方社區如今對ActiveMQ 5.x維護愈來愈少,較少在大規模吞吐的場景中使用。
號稱大數據的殺手鐗,談到大數據領域內的消息傳輸,則繞不開Kafka,這款爲大數據而生的消息中間件,以其百萬級TPS的吞吐量名聲大噪,迅速成爲大數據領域的寵兒,在數據採集、傳輸、存儲的過程當中發揮着舉足輕重的做用。
Apache Kafka它最初由LinkedIn公司基於獨特的設計實現爲一個分佈式的提交日誌系統( a distributed commit log),以後成爲Apache項目的一部分。
目前已經被LinkedIn,Uber, Twitter, Netflix等大公司所採納。
優勢
性能卓越,單機寫入TPS約在百萬條/秒,最大的優勢,就是吞吐量高。
時效性:ms級
可用性:很是高,kafka是分佈式的,一個數據多個副本,少數機器宕機,不會丟失數據,不會致使不可用
消費者採用Pull方式獲取消息, 消息有序, 經過控制可以保證全部消息被消費且僅被消費一次;
有優秀的第三方Kafka Web管理界面Kafka-Manager;
在日誌領域比較成熟,被多家公司和多個開源項目使用;
功能支持:功能較爲簡單,主要支持簡單的MQ功能,在大數據領域的實時計算以及日誌採集被大規模使用
缺點:
Kafka單機超過64個隊列/分區,Load會發生明顯的飆高現象,隊列越多,load越高,發送消息響應時間變長
使用短輪詢方式,實時性取決於輪詢間隔時間;
消費失敗不支持重試;
支持消息順序,可是一臺代理宕機後,就會產生消息亂序;
社區更新較慢;
RabbitMQ 2007年發佈,是一個在AMQP(高級消息隊列協議)基礎上完成的,可複用的企業消息系統,是當前最主流的消息中間件之一。
RabbitMQ優勢:
因爲erlang語言的特性,mq 性能較好,高併發;
吞吐量到萬級,MQ功能比較完備
健壯、穩定、易用、跨平臺、支持多種語言、文檔齊全;
開源提供的管理界面很是棒,用起來很好用
社區活躍度高;
RabbitMQ缺點:
erlang開發,很難去看懂源碼,基本職能依賴於開源社區的快速維護和修復bug,不利於作二次開發和維護。
RabbitMQ確實吞吐量會低一些,這是由於他作的實現機制比較重。
須要學習比較複雜的接口和協議,學習和維護成本較高。
RocketMQ出自 阿里公司的開源產品,用 Java 語言實現,在設計時參考了 Kafka,並作出了本身的一些改進。
RocketMQ在阿里集團被普遍應用在訂單,交易,充值,流計算,消息推送,日誌流式處理,binglog分發等場景。
RocketMQ優勢:
單機吞吐量:十萬級
可用性:很是高,分佈式架構
消息可靠性:通過參數優化配置,消息能夠作到0丟失
功能支持:MQ功能較爲完善,仍是分佈式的,擴展性好
支持10億級別的消息堆積,不會由於堆積致使性能降低
源碼是java,咱們能夠本身閱讀源碼,定製本身公司的MQ,能夠掌控
RocketMQ缺點:
支持的客戶端語言很少,目前是java及c++,其中c++不成熟;
社區活躍度通常
沒有在 mq 核心中去實現JMS等接口,有些系統要遷移須要修改大量代碼
http://youzhixueyuan.com/comparison-of-kafka-rocketmq-rabbitmq.html