一個消息調度框架構建

 

  • 基本框架

 

  1. MDU(消息分發單元):包含一個消息處理任務,包含自身的消息隊列,是一個消息調度的基本單位。
  2. PID (功能子模塊) :框架中用PID做爲模塊的劃分,每一個模塊具備本身的PID編號,根據功能和調度需求能夠安排多個PID到一個MDU中,PID是消息通訊的一個基本單位,每一個PID提供一個消息處理入口。
  3. MQ (消息隊列) :使用消息隊列做爲任務通訊的數據結構。

 

  • 消息處理流程

 

  1. 構建一個MDU模塊,註冊入框架中,初始MDU沒有註冊PID,未構建消息處理任務。
  2. 構建PID,註冊入對應的MDU中,若是是MDU中第一個PID,構建消息處理任務。消息處理任務從該MDU對應的消息隊列中取消息處理。
  3. 消息處理任務獲取消息後根據消息中攜帶的接收PID的信息分發到對應的PID模塊處理。

 

  • 完整的消息交互流程

 

 

  1. 任務A申請消息,消息內容必須包括髮送模塊PID編號、接收模塊PID編號、消息內容。
  2. 經過消息框架提供的消息發送接口直接發送消息,消息框架根據接收PID信息,將消息填入對應MDU的消息隊列中。
  3. MDU的消息處理任務B從消息隊列中獲取消息處理。
  4. MDU消息隊列會被多個任務併發寫入消息,被消息處理任務讀取消息處理,須要對消息隊列進行互斥和同步。詳見http://www.cnblogs.com/chencheng/p/2893421.html

 

  • MUD、PID規劃
  1. MDU做爲一個調度基本單元,若是一個MDU中只有一個PID會致使系統中任務多,任務切換的開銷大。
  2. 若是MDU中包含太多PID,因爲全部PID在一個消息隊列中串行運行,會影響PID的響應,影響系統性能。
  3. 功能緊耦合的PID放入一個MDU中。
  4. 耗時PID和實時要求高的PID不放入一個MDU中。
  • 實現

MDU:html

import myQueue
from myThread import myThread

class mdu:
    def __init__(self, mduID):
        self.mduId  = mduID
        self.msgQue = myQueue.myQueue(10)
        self.map = {}

    def getMduID(self):
        return self.mduId
    
    def registPid(self, pidID, pid):
        self.map[pidID] = pid
        if 1==len(self.map):
            self.run()

    def msgEnQueue(self,msg):
        self.msgQue.enQueue(msg)
            
    def msgProcess(self):
        while True:
            msg = self.msgQue.deQueue()
            recvPid = msg.getRecvPid()
            self.map[recvPid].msgProcess(msg);
            
    def run(self):
        t = myThread(self.msgProcess)
        t.start()
View Code

PID:數據結構

import message
import support
import mdu
import pdb

class pid:
    def __init__(self, pidID):
        self.pidID = pidID
        self.registMe()

    def registMe(self):
        support.registPid(self)

    def getPidID(self):
        return self.pidID
View Code

SUPPORT:架構

import mdu
import pdb

mduMap = {}

def registMdu(mdu):
    mduMap[mdu.getMduID()] = mdu

def getMdu(revPid):
    return mduMap[revPid&0xFFFF0000>>16]

def registPid(pid):
    mdu = getMdu(pid.getPidID())
    #pdb.set_trace()
    mdu.registPid(pid.getPidID(), pid)
    
def sendMsg(msg):
    mdu = getMdu(msg.getRecvPid())
    mdu.msgEnQueue(msg)
View Code

MQ:併發

from threading import Lock
from threading import Condition
import threading

class myQueue:
    def __init__(self, size):
        self.size = size
        self.list = list()
        self.lock = Lock()
        self.notFullCond = Condition(self.lock)
        self.notEmptyCond = Condition(self.lock)

    def isFull(self):
        if self.size == len(self.list):
            return True
        return False

    def isEmpty(self):
        if 0 == len(self.list):
            return True
        return False
    
    def enQueue(self, elem):
        self.lock.acquire()
        while self.isFull():
            print('queue is full, waiting...')
            self.notFullCond.wait()   
        print(threading.current_thread().getName() + ' product ' + str(elem))
        self.list.append(elem)
        self.notEmptyCond.notify()
        self.lock.release()

    def deQueue(self):
        self.lock.acquire()
        while self.isEmpty():
            print('queue is empty, waiting...')
            self.notEmptyCond.wait()

        elem = self.list[0]
        del(self.list[0])
        print(threading.current_thread().getName() + ' consume ' + str(elem))
        
        self.notFullCond.notify_all()
        self.lock.release()

        return elem
View Code

 轉載請註明原始出處:http://www.cnblogs.com/chencheng/p/3236158.htmlapp

相關文章
相關標籤/搜索