在工做中,使用uwsgi部署項目,其中uwsgi設置爲多進程,而且python中使用了kafka-python模塊做爲生產者不斷產生數據,但上線不久後幾乎全部的生產者消息都報:KafkaTimeoutError這個錯誤,而且在kafka服務器中並無發現收到任何消息。html
因而看了看kafka-python源碼,發如今執行send方法後,消息並無當即發送,而是放到本地的緩存中,在生成KafkaProducer實例時,有個選項buffer_memory設置了緩存的大小,默認爲32M,而後若是這個buffer滿了就會報KafkaTimeoutError,因此初步判斷兩個緣由:python
1 生產者消息並無發送出去,git
2 或者消息發送相對於消息生成來講過於緩慢致使github
同時又由於看到kafka服務器中並無接收到任何消息,遂排除第二個緣由。也就是說生產者消息沒有發送出去。因而採用一樣的配置用寫了一個腳本發現kafka服務器能夠接收到消息,鑑定是個人生產者有問題,遂谷歌解決問題,找到該帖子:https://github.com/dpkp/kafka-python/issues/721。發佈人狀況和我差很少,做者回復到:json
You cannot share producer instances across processes, only threads. I expect that is why the master process pattern is failing.bootstrap
Second, producer.send() is async but is not guaranteed to deliver if you close the producer abruptly. In your final example I suspect that your producer instances are so short-lived that they are being reaped before flushing all pending messages. To guarantee delivery (or exception) call producer.send().get(timeout) or producer.flush() . otherwise you'll need to figure out how to get a producer instance per-uwsgi-thread and have it shared across requests (you would still want to flush before thread shutdown to guarantee no messages are dropped)api
大致上說明了兩點:緩存
1 多進程共享同一個生產者實例有問題服務器
2 send方法是異步的,當執行完send後當即關閉生產者實例的話可能會致使發送失敗。app
第二點錯誤我沒有犯,沾沾自喜,繼續看評論:
Aha, thanks! After looking more closely at uWSGI options I discovered the lazy-apps option, which causes each worker to load the entire app itself. This seems to have resolved my issue.
提問者說他解決了該問題,因而查一查uwsgi中的lazy-apps,發現改文章:https://uwsgi-docs-zh.readthedocs.io/zh_CN/latest/articles/TheArtOfGracefulReloading.html#preforking-vs-lazy-apps-vs-lazy,其中說到:
默認狀況下,uWSGI在第一個進程中加載整個應用,而後在加載完應用以後,會屢次 fork() 本身。
我看看了我本身的代碼我確實是在app生成以前生成了生產者實例,這就致使該實例被父進程與其子進程共享。問題終於明白,開始解決:
1 使用lazy-apps,這樣就能夠了。
2 不使用lazy-apps,在代碼層面解決問題:
# producer.py文件 import json from kafka import KafkaProducer class Single(object): """單例模式""" def __new__(cls, *args, **kwargs): if not hasattr(cls, "_instance"): cls._instance = super().__new__(cls) if hasattr(cls, "initialize"): cls._instance.initialize(*args, **kwargs) return cls._instance class MsgQueue(Single): """ 這個整成單例模式是由於:uwsgi配合kafka-python在多進程下會有問題,這裏但願每一個進程單獨享有一個kafka producer實例, 也就是說當初始化app對象後,並不會生成producer實例,而是在運行時再生成, 具體參考:https://github.com/dpkp/kafka-python/issues/721 """ app = None def initialize(self): self.producer = KafkaProducer(bootstrap_servers=self.app.config["MQ_URI"], api_version=self.app.config["KAFKA_API_VERSION"]) @classmethod def init_app(cls, app): cls.app = app def send(self, topic, data): """ :param topic: :param data: :return: """ data = json.dumps(data, ensure_ascii=True) self.producer.send(topic, data.encode()) # app.py文件 from producer import MsgQueue ... MsgQueue.init_app(app) # 業務邏輯中用到生產者的文件 from producer import MsgQueue ... MsgQueue().send(msg)