事件驅動的簡明講解(python實現)

關鍵詞:編程範式,事件驅動,回調函數,觀察者模式
做者:碼匠信龍php

舉個簡單的例子:
有些人喜歡的某個公衆號,而後去關注這個公衆號,哪天這個公衆號發佈了篇新的文章,沒多久訂閱者就會在微信裏收到這個公衆號推送的新消息,若是感興趣就打開來閱讀。python

公衆號例子

事件驅動模型能夠理解爲上面的例子,是設計模式中觀察者模式的一種典型應用。除了訂閱公衆號外,如你關注某人的微博,關注某人的簡書,當被關注者發了個新狀態或者新文章,你會收到他們新的消息,這些均可以理解爲事件驅動模型。數據庫

實際上,世間萬物各類屬性的變化,咱們均可以抽象爲事件,最直觀的是圖形界面應用裏,如常見的點擊、雙擊、拖動操做,又或者是遊戲裏的英雄升級了,怪物死亡了等等,均可以視爲一個事件發生了。而發送事件的事物稱爲事件源,對這個事件感興趣的事物爲監聽者,事件發生後監聽者會收到這個消息,而後作相應的反應。編程

例如上面公衆號例子能夠翻譯爲,監聽器(訂閱者)監聽了(關注了)事件源(公衆號),當事件源的發送事件時(公衆號發佈文章),全部監聽該事件的監聽器(訂閱者)都會接收到消息並做出響應(閱讀文章)。設計模式

1.公衆號爲事件源
2.訂閱者爲事件監聽器
3.訂閱者關注公衆號,至關於監聽器監聽了事件源
4.公衆號發佈文章這個動做爲發送事件
5.訂閱者收到事件後,作出閱讀文章的響應動做服務器

公衆號例子按事件驅動能夠理解成下圖微信

公衆號例子翻譯

因此事件驅動模式能夠進一步抽象理解爲由事件源,事件對象,以及事件監聽器三元素構成,能完成監聽器監聽事件源、事件源發送事件,監聽器收到事件後調用響應函數的動做。網絡

事件驅動主要包含如下元素和操做函數:
元素
1.事件源
2.事件監聽器
3.事件對象app

操做函數
4.監聽動做
5.發送事件
6.調用監聽器響應函數編程語言

瞭解清楚了事件驅動的工做原理後,讀者能夠試着用本身熟悉的編程語言實現,編程主要實現下面的內容,筆者後續給python實現

用戶根據實際業務邏輯定義
事件源 EventSources
監聽器 Listeners

事件管理者 EventManager
成員
1.響應函數隊列 Handlers
2.事件對象 Event
3.事件對象列表 EventQueue
操做函數
4.監聽動做 AddEventListener
5.發送事件 SendEvent
6.調用響應函數 EventProcess

在實際的軟件開發過程當中,你會常常看到事件驅動的影子,幾乎全部的GUI界面都採用事件驅動編程模型,不少服務器網絡模型的消息處理也會採用,甚至複雜點的數據庫業務處理也會用這種模型,由於這種模型解耦事件發送者和接收者之間的聯繫,事件可動態增長減小接收者,業務邏輯越複雜,越能體現它的優點。下面,筆者用python實現EventManager事件管理類,大概就百來行代碼左右。

# encoding: UTF-8
# 系統模塊
from Queue import Queue, Empty
from threading import *
########################################################################
class EventManager:
    #----------------------------------------------------------------------
    def __init__(self):
        """初始化事件管理器"""
        # 事件對象列表
        self.__eventQueue = Queue()
        # 事件管理器開關
        self.__active = False
        # 事件處理線程
        self.__thread = Thread(target = self.__Run)

        # 這裏的__handlers是一個字典,用來保存對應的事件的響應函數
        # 其中每一個鍵對應的值是一個列表,列表中保存了對該事件監聽的響應函數,一對多
        self.__handlers = {}

    #----------------------------------------------------------------------
    def __Run(self):
        """引擎運行"""
        while self.__active == True:
            try:
                # 獲取事件的阻塞時間設爲1秒
                event = self.__eventQueue.get(block = True, timeout = 1)  
                self.__EventProcess(event)
            except Empty:
                pass

    #----------------------------------------------------------------------
    def __EventProcess(self, event):
        """處理事件"""
        # 檢查是否存在對該事件進行監聽的處理函數
        if event.type_ in self.__handlers:
            # 若存在,則按順序將事件傳遞給處理函數執行
            for handler in self.__handlers[event.type_]:
                handler(event)

    #----------------------------------------------------------------------
    def Start(self):
        """啓動"""
        # 將事件管理器設爲啓動
        self.__active = True
        # 啓動事件處理線程
        self.__thread.start()

    #----------------------------------------------------------------------
    def Stop(self):
        """中止"""
        # 將事件管理器設爲中止
        self.__active = False
        # 等待事件處理線程退出
        self.__thread.join()

    #----------------------------------------------------------------------
    def AddEventListener(self, type_, handler):
        """綁定事件和監聽器處理函數"""
        # 嘗試獲取該事件類型對應的處理函數列表,若無則建立
        try:
            handlerList = self.__handlers[type_]
        except KeyError:
            handlerList = []
            
        self.__handlers[type_] = handlerList
        # 若要註冊的處理器不在該事件的處理器列表中,則註冊該事件
        if handler not in handlerList:
            handlerList.append(handler)
            
    #----------------------------------------------------------------------
    def RemoveEventListener(self, type_, handler):
        """移除監聽器的處理函數"""
        #讀者本身試着實現
        
    #----------------------------------------------------------------------
    def SendEvent(self, event):
        """發送事件,向事件隊列中存入事件"""
        self.__eventQueue.put(event)

########################################################################
"""事件對象"""
class Event:
    def __init__(self, type_=None):
        self.type_ = type_      # 事件類型
        self.dict = {}          # 字典用於保存具體的事件數據

測試代碼

#-------------------------------------------------------------------
# encoding: UTF-8
import sys
from datetime import datetime
from threading import *
from EventManager import *

#事件名稱  新文章
EVENT_ARTICAL = "Event_Artical"

#事件源 公衆號
class PublicAccounts:
    def __init__(self,eventManager):
        self.__eventManager = eventManager

    def WriteNewArtical(self):
        #事件對象,寫了新文章
        event = Event(type_=EVENT_ARTICAL)
        event.dict["artical"] = u'如何寫出更優雅的代碼\n'
        #發送事件
        self.__eventManager.SendEvent(event)
        print u'公衆號發送新文章\n'

#監聽器 訂閱者
class Listener:
    def __init__(self,username):
        self.__username = username

    #監聽器的處理函數 讀文章
    def ReadArtical(self,event):
        print(u'%s 收到新文章' % self.__username)
        print(u'正在閱讀新文章內容:%s'  % event.dict["artical"])

"""測試函數"""
#--------------------------------------------------------------------
def test():
    listner1 = Listener("thinkroom") #訂閱者1
    listner2 = Listener("steve")#訂閱者2

    eventManager = EventManager()
    
    #綁定事件和監聽器響應函數(新文章)
    eventManager.AddEventListener(EVENT_ARTICAL, listner1.ReadArtical)
    eventManager.AddEventListener(EVENT_ARTICAL, listner2.ReadArtical)
    eventManager.Start()

    publicAcc = PublicAccounts(eventManager)
    timer = Timer(2, publicAcc.WriteNewArtical)
    timer.start()
    
if __name__ == '__main__':
    test()
相關文章
相關標籤/搜索