基於python yield機制的異步操做同步化編程模型

     又一個milestone即將結束,有了些許的時間總結研發過程當中的點滴心得,今天總結下如何在編寫python代碼時對異步操做進行同步化模擬,從而提升代碼的可讀性和可擴展性。 python

     遊戲引擎通常都採用分佈式框架,經過必定的策略來均衡服務器集羣的資源負載,從而保證服務器運算的高併發性和CPU高利用率,最終提升遊戲的性能和負載。因爲引擎的邏輯層調用是非搶佔式的,服務器之間都是經過異步調用來進行通信,致使遊戲邏輯沒法同步執行,因此在代碼層不得不人爲地添加不少回調函數,使一個本來完整的功能碎片化地分佈在各個回調函數中。 編程

異步邏輯 服務器

     以遊戲中的副本評分邏輯爲例,在副本結束時副本管理進程須要收集副本中每一個玩家的戰鬥信息,再結合管理進程內部的統計信息最終給出一個副本評分,發放相應獎勵。由於每一個玩家實體都隨機分佈在不一樣進程中,因此管理進程須要經過異步調用來獲取玩家身上的戰鬥信息。 併發

實現代碼以下所示: 框架

# -*- coding: gbk -*-
import random

# 玩家實體類
class Player(object):
    def __init__(self, entityId):
        super(Player, self).__init__()
        # 玩家標識
        self.entityId = entityId

    def onFubenEnd(self, mailBox):
        score = random.randint(1, 10)
        print "onFubenEnd player %d score %d"%(self.entityId, score)

        # 向副本管理進程發送本身的id和戰鬥信息
        mailBox.onEvalFubenScore(self.entityId, score)

# 副本管理類
class FubenStub(object):
    def __init__(self, players):
        super(FubenStub, self).__init__()
        self.players = players

    def evalFubenScore(self):
        self.playerRelayCnt = 0
        self.totalScore = 0

        # 通知每一個註冊的玩家,副本已經結束,索取戰鬥信息
        for player in self.players:
            player.onFubenEnd(self)

    def onEvalFubenScore(self, entityId, score):
        # 收到其中一個玩家的戰鬥信息
        print "onEvalFubenScore player %d score %d"%(entityId, score)
        self.playerRelayCnt += 1
        self.totalScore += score

        # 當收集完全部玩家的信息後,打印評分
        if len(self.players) == self.playerRelayCnt:
            print 'The fuben totalScore is %d'%self.totalScore

if __name__ == '__main__':
    # 模擬建立玩家實體
    players = [Player(i) for i in xrange(3)]

    # 副本開始時,每一個玩家將本身的MailBox註冊到副本管理進程
    fs = FubenStub(players)

    # 副本進行中
    # ....

    # 副本結束,開始評分
    fs.evalFubenScore()
代碼簡化了副本評分邏輯的實現,其中Player類表示遊戲的玩家實體,在遊戲運行時無縫地在不一樣服務器中切換,FubenStub表示副本的管理進程,在副本剛開始的時候該副本內全部玩家會將本身的MailBox註冊到管理進程中,其中MailBox表示各個實體的遠程調用句柄。在副本結束時,FubenStub首先向各個玩家發送副本結束消息,同時請求玩家的戰鬥信息,玩家在獲得消息後,將本身的戰鬥信息發送給FubenStub;而後當FubenStub收集完全部玩家的信息後,最終打印副本評分。


同步邏輯 dom

    若是Player和FubenStub在同一進程中的話,那全部的操做均可以同步完成,在FubenStub向玩家發送副本結束消息的同時能夠立刻獲得該玩家的戰鬥信息,實現代碼以下所示: 異步

# -*- coding: gbk -*-

import random

class Player(object):
    def __init__(self, entityId):
        super(Player, self).__init__()
        self.entityId = entityId

    def onFubenEnd(self, mailBox):
        score = random.randint(1, 10)
        print "onFubenEnd player %d score %d"%(self.entityId, score)
        return self.entityId, score

class FubenStub(object):
    def __init__(self, players):
        super(FubenStub, self).__init__()
        self.players = players

    def evalFubenScore(self):
        totalScore = 0
        for player in self.players:
            entityId, score = player.onFubenEnd(self)
            print "onEvalFubenScore player %d score %d"%(entityId, score)
            totalScore += score

        print 'The fuben totalScore is %d'%totalScore

if __name__ == '__main__':
    players = [Player(i) for i in xrange(3)]

    fs = FubenStub(players)
    fs.evalFubenScore()

     從以上兩份代碼能夠看到因爲異步操做,FubenStub中的評分邏輯人爲地分紅兩個功能點:1)向玩家發送副本結束消息;2)接受玩家的戰鬥信息;而且兩個功能點分佈在兩個不一樣的函數中。若是遊戲邏輯一旦複雜,勢必會形成功能點分散,出現過多onXXX異步回調函數,最終致使代碼的開發成本和維護成本提升,可讀性和可擴展性降低。 分佈式

     若是有一種方法,可讓函數在異步調用時暫時掛起,而且在回調函數獲得返回值後恢復執行,那麼就能夠用同步化的編程模式開發異步邏輯。 ide

yield 關鍵字 函數

     yield  Python中的一個關鍵字,凡是函數體中出現了 yield 關鍵字, Python將改變整個函數的上下文,調用該函數再也不返回值, 而是一個生成器對象。只有調用這個生成器的迭代函數next才能開始執行生成器對象,當生成器對象執行到包含 yield 表達式時, 函數將暫時掛起,等待下一次next調用來恢復執行,具體機制以下:

         1)調用生成器對象的next方法,啓動函數執行;

         2)當生成器對象執行到包含 yield 表達式時, 函數掛起;

         3)下一次 next 函數調用又會驅動該生成器對象繼續執行此後的語句, 直到碰見下一個 yield 再次掛起;

         4)若是某次 next 調用驅動了生成器繼續執行, 而此後函數正常結束,生成器會拋出 StopIteration 異常;

以下代碼所示:


def f():
    print "Before first yield"
    yield 1
    print "Before second yield"
    yield 2
    print "After second yield"

g = f()
print "Before first next"
g.next()
print "Before second next"
g.next()
print "Before third yield"
g.next()

執行結果爲:

Before first next

Before first yield

Before second next

Before second yield

Before third yield

After second yield

StopIteration

     哈,有了讓函數暫時掛起的機制,最後就剩下如何傳遞異步調用的返回值問題了。其實生成器的next函數已經實現了將參數從生成器對象內部向外傳遞的機制,而且python還提供了一個send函數將參數從外向生成器對象內部傳遞的機制,具體機制以下:

         1 調用next 函數驅動生成器時, next會同時等待生成器中下一個 yield 掛起,並將該yield後面的參數返回給next

         2)往生成器中傳遞參數,須要將next函數替換成send,此時send的功能與next相同(驅動生成器執行,等待返回值),同時send將後面的參數傳遞給生成器內部以前掛起的yield

以下代碼所示:

def f():
    msg = yield 'first yield msg'
    print "generator inner receive:", msg
    msg = yield 'second yield msg'
    print "generator inner receive:", msg

g = f()
msg = g.next()
print "generator outer receive:", msg
msg = g.send('first send msg')
print "generator outer receive:", msg
g.send('second send msg')

執行結果爲:

generator outer receive: first yield msg

generator inner receive: first send msg

generator outer receive: second yield msg

generator inner receive: second send msg

StopIteration

同步化實現

     好了,萬事俱備只欠東風,下面就是簡單對yield機制進行工程上封裝以方便以後開發。下面的代碼提供了一個叫IFakeSyncCall的interface,全部包含異步操做的邏輯類均可以繼承這個接口:

class IFakeSyncCall(object):
    def __init__(self):
        super(IFakeSyncCall, self).__init__()
        self.generators = {}

    @staticmethod
    def FAKE_SYNCALL():
        def fwrap(method):
            def fakeSyncCall(instance, *args, **kwargs):
                instance.generators[method.__name__] = method(instance, *args, **kwargs)
                func, args = instance.generators[method.__name__].next()
                func(*args)
            return fakeSyncCall
        return fwrap

    def onFakeSyncCall(self, identify, result):
        try:
            func, args  = self.generators[identify].send(result)
            func(*args)
        except StopIteration:
            self.generators.pop(identify)

    其中interface中屬性generators用來保存類中已經開始執行的生成器對象;函數FAKE_SYNCALL是一個decorator,裝飾類中包含有yield的函數,改變函數的調用上下文,在fakeSyncCall內部封裝了對生成器對象的next調用;函數onFakeSyncCall封裝了全部onXXX函數的邏輯,其餘實體經過調用這個函數傳遞異步回調的返回值。

下面就是通過同步化改進後的異步副本評分邏輯代碼:
# -*- coding: gbk -*-
import random

class Player(object):
    def __init__(self, entityId):
        super(Player, self).__init__()
        self.entityId = entityId

    def onFubenEnd(self, mailBox):
        score = random.randint(1, 10)
        print "onFubenEnd player %d score %d"%(self.entityId, score)
        mailBox.onFakeSyncCall('evalFubenScore', (self.entityId, score))

class FubenStub(IFakeSyncCall):
    def __init__(self, players):
        super(FubenStub, self).__init__()
        self.players = players

    @IFakeSyncCall.FAKE_SYNCALL()
    def evalFubenScore(self):
        totalScore = 0
        for player in self.players:
            entityId, score = yield (player.onFubenEnd, (self,))
            print "onEvalFubenScore player %d score %d"%(entityId, score)
            totalScore += score

        print 'the totalScore is %d'%totalScore

if __name__ == '__main__':
    players = [Player(i) for i in xrange(3)]

    fs = FubenStub(players)
    fs.evalFubenScore()

比較evalFubenScore函數,基本已經和本來的同步邏輯代碼相差無幾。

      利用yield機制實現同步化編程模型的另一個優勢是能夠保證全部異步調用的邏輯串行化,從而保證數據的一致性和有效性,特別是在各類異步初始化流程中能夠摒棄傳統的timer sleep機制,從源頭上扼殺一些隱藏很深的因爲數據不一致性所致使的bug

相關文章
相關標籤/搜索