uwsgi多進程配合kafka-python消息沒法發送

在工做中,使用uwsgi部署項目,其中uwsgi設置爲多進程,而且python中使用了kafka-python模塊做爲生產者不斷產生數據,但上線不久後幾乎全部的生產者消息都報:KafkaTimeoutError這個錯誤,而且在kafka服務器中並無發現收到任何消息。html

因而看了看kafka-python源碼,發如今執行send方法後,消息並無當即發送,而是放到本地的緩存中,在生成KafkaProducer實例時,有個選項buffer_memory設置了緩存的大小,默認爲32M,而後若是這個buffer滿了就會報KafkaTimeoutError,因此初步判斷兩個緣由:python

  1 生產者消息並無發送出去,git

  2 或者消息發送相對於消息生成來講過於緩慢致使github

同時又由於看到kafka服務器中並無接收到任何消息,遂排除第二個緣由。也就是說生產者消息沒有發送出去。因而採用一樣的配置用寫了一個腳本發現kafka服務器能夠接收到消息,鑑定是個人生產者有問題,遂谷歌解決問題,找到該帖子:https://github.com/dpkp/kafka-python/issues/721。發佈人狀況和我差很少,做者回復到:json

You cannot share producer instances across processes, only threads. I expect that is why the master process pattern is failing.bootstrap

Second, producer.send() is async but is not guaranteed to deliver if you close the producer abruptly. In your final example I suspect that your producer instances are so short-lived that they are being reaped before flushing all pending messages. To guarantee delivery (or exception) call producer.send().get(timeout) or producer.flush() . otherwise you'll need to figure out how to get a producer instance per-uwsgi-thread and have it shared across requests (you would still want to flush before thread shutdown to guarantee no messages are dropped)api

 

大致上說明了兩點:緩存

  1 多進程共享同一個生產者實例有問題服務器

  2 send方法是異步的,當執行完send後當即關閉生產者實例的話可能會致使發送失敗。app

第二點錯誤我沒有犯,沾沾自喜,繼續看評論:

Aha, thanks! After looking more closely at uWSGI options I discovered the lazy-apps option, which causes each worker to load the entire app itself. This seems to have resolved my issue.

提問者說他解決了該問題,因而查一查uwsgi中的lazy-apps,發現改文章:https://uwsgi-docs-zh.readthedocs.io/zh_CN/latest/articles/TheArtOfGracefulReloading.html#preforking-vs-lazy-apps-vs-lazy,其中說到:

默認狀況下,uWSGI在第一個進程中加載整個應用,而後在加載完應用以後,會屢次 fork() 本身。

我看看了我本身的代碼我確實是在app生成以前生成了生產者實例,這就致使該實例被父進程與其子進程共享。問題終於明白,開始解決:

  1 使用lazy-apps,這樣就能夠了。

  2 不使用lazy-apps,在代碼層面解決問題: 

# producer.py文件
import json
from kafka import KafkaProducer


class Single(object):
    """單例模式"""
    def __new__(cls, *args, **kwargs):
        if not hasattr(cls, "_instance"):
            cls._instance = super().__new__(cls)
            if hasattr(cls, "initialize"):
                cls._instance.initialize(*args, **kwargs)
        return cls._instance


class MsgQueue(Single):
    """
    這個整成單例模式是由於:uwsgi配合kafka-python在多進程下會有問題,這裏但願每一個進程單獨享有一個kafka producer實例,
    也就是說當初始化app對象後,並不會生成producer實例,而是在運行時再生成,
    具體參考:https://github.com/dpkp/kafka-python/issues/721
    """
    app = None

    def initialize(self):
        self.producer = KafkaProducer(bootstrap_servers=self.app.config["MQ_URI"],
                                      api_version=self.app.config["KAFKA_API_VERSION"])

    @classmethod
    def init_app(cls, app):
        cls.app = app

    def send(self, topic, data):
        """
        :param topic:
        :param data:
        :return:
        """
        data = json.dumps(data, ensure_ascii=True)
        self.producer.send(topic, data.encode())

# app.py文件
from producer import MsgQueue
...
MsgQueue.init_app(app)

# 業務邏輯中用到生產者的文件
from producer import MsgQueue
...
MsgQueue().send(msg)
相關文章
相關標籤/搜索