前面兩篇咱們學習了node
接下來咱們來學習微服務中的異步通訊 - 消息隊列。在這篇文章的學習中,默認你已經掌握了 Docker、docker-compose 的知識。若是你尚未掌握,能夠翻閱個人歷史文章。若是個人文章對你有幫助,歡迎關注、點贊、轉發,這樣我會更有動力作原創分享。python
註冊成功以後,寫入數據庫,寫完以後,再發郵件,最後再發短信。整體耗時:場景描述:在正常的用戶註冊流程中,用戶註冊完成都須要,發送郵件與短信,其中傳統狀況下,有串行與並行兩種方式。docker
註冊成功以後,寫入數據庫,並行發郵件和發短信。整體耗時:50 + 50 + 50 = 150 ms數據庫
註冊成功以後,寫入數據庫,將寫入成功的信息,發送至消息隊列,因爲消費者本身去消息隊列中取消息,這樣在寫入消息以後,便可成功返回。整體耗時:50 + 50 = 100 msbootstrap
50 + 5 = 55 msapi
三個數據一對比,誰優誰劣就很明顯了bash
在傳統模式下(上圖),若是庫存系統出現錯誤,則訂單建立失敗,並且二者過分耦合,對後面的新需求與維護也是至關大的挑戰。在原有的架構上進行升級,則有下圖:場景說明:用戶下單後,須要減去庫存系統中相應數量的庫存。服務器
當下,二者就解耦了,庫存系統出現錯誤,也能夠正常進行下單了。由於只是入門文章,應用場景就先介紹到這裏。更多使用場景請自行百度、Google。網絡
如圖,P 爲 producer 生產者,C 爲 consumer 消費者,紅色部分爲消息隊列。通俗地解釋一下消息隊列,你想象一個場景:你到報社訂閱了一份報紙,報社每日生產一份新報紙,便將新報紙發往郵局並告訴郵局你的地址,郵遞員將你的報紙送往你的郵箱,你即可以愉快地閱讀今天的時事新聞了。固然,可能一我的訂閱了好幾家報社,一家報社也能夠被多我的訂閱。在這個場景中,消息隊列就擔任了,郵箱、郵局、郵遞員的角色。架構
在網絡上搜索到一個比較全面的對比圖,我這邊就直接引用原圖,再也不造輪子了。原圖地址:(cloud.tencent.com/developer/a…
普遍來講,電商、金融等對事務性要求很高的,能夠考慮RabbitMQ和RocketMQ,對性能要求高的可考慮Kafka。因爲這裏只是入門,就只使用單節點了。這裏使用 Docker 來構建 kafka,其代碼以下:docker-compose.yaml
version: '3'
services:
#<!--定義zk層服務-->
zookeeper:
image: wurstmeister/zookeeper
ports:
- "2181:2181"
#<!--定義Kafka層-->
kafka:
image: wurstmeister/kafka
depends_on: [ zookeeper ]
ports:
- "9092:9092"
environment:
KAFKA_ADVERTISED_HOST_NAME: 123.45.567.89
KAFKA_CREATE_TOPICS: "test:1:1"
KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181
volumes:
- /var/run/docker.sock:/var/run/docker.sock
複製代碼
docker-compose up 啓動 kafaka。
咱們先來看看,nodejs 爲 producer ,python 爲 comsumer 的狀況:
再看看,python 爲 producer,nodejs 爲consumer 的狀況:
nodejs producer
var kafka = require('kafka-node'),
Producer = kafka.Producer,
KeyedMessage = kafka.KeyedMessage,
client = new kafka.KafkaClient({kafkaHost: '123.45.567.89:9092'}),
producer = new Producer(client),
payloads = [
{ topic: 'my_favorite_topic', messages: 'produce by nodejs', partition: 0 }
];
producer.on('ready', function () {
producer.send(payloads, function (err, data) {
console.log(data);
});
});
producer.on('error', function (err) {})
複製代碼
nodejs consumer
var kafka = require('kafka-node'),
Consumer = kafka.Consumer,
client = new kafka.KafkaClient({kafkaHost: '123.45.567.89:9092'}),
consumer = new Consumer(
client,
[
{topic: 'my_favorite_topic', partition: 0}
],
{
autoCommit: false
}
);
consumer.on('message', function (message) {
console.log(message);
});
複製代碼
python producer
from kafka import KafkaProducer
producer = KafkaProducer(bootstrap_servers=['123.45.567.89:9092'],
api_version=(0, 10, 1)
) # 此處ip能夠是多個['0.0.0.1:9092','0.0.0.2:9092','0.0.0.3:9092' ]
for _ in range(100):
producer.send('my_favorite_topic', b'produce by python')
producer.close()
複製代碼
python consumer
from kafka import KafkaConsumer
consumer = KafkaConsumer('my_favorite_topic', bootstrap_servers=['123.45.567.89:9092'], api_version=(0, 10, 1)
for msg in consumer:
print(msg)
複製代碼
介紹過 Kafka 以後,RabbitMQ 就不在詳細介紹了,請看源碼了。
不知不覺,消息隊列也講完了,要真真切切地區實踐,才能真正地掌握。 我的的知識儲備老是有限的,若有錯誤的地方,還請大佬斧正。點擊閱讀原文,連接到個人知乎,我會在知乎上對文章錯誤的地方進行修改。
本篇文章首發於公衆號「zone7」,關注公衆號獲取最新推文,後臺回覆【小白微服務】獲取源碼。