有限狀態機(FSM)的簡單理解和Python實現

最近在項目中,涉及到對行爲和狀態進行建模的需求,嘗試用有限狀態機(Finite-state machine, FSM)來實現。python

1. 概念介紹

1.1 運行機制

基於對有限狀態機的粗淺理解,大致的運行機制爲:ide

  • 系統所處的狀態是明確而且有限的,一定屬於狀態全集中的某一種;
  • 系統接受輸入,根據斷定條件,決定是維持當前狀態,仍是切換到某一個新的狀態;
  • 在維持或切換的過程當中,執行一些預設的操做。

能夠認爲有限狀態機是一個離散系統,每接受一次輸入,進行一次判斷和切換。code

1.2 所含要素

一個有限狀態機包含以下幾個要素:orm

  • 狀態:系統所處的狀態,在運行過程當中又能夠分爲當前狀態下一階段狀態blog

  • 事件:也能夠理解爲每一次運行的輸入;繼承

  • 條件:根據輸入事件執行的斷定條件,條件是基於狀態的,當前所處的每一種狀態,均可以有本身對應的一套斷定條件,來決定下一步進入哪種狀態;事件

  • 動做:肯定切換路徑後,執行的附加操做。ci

以一個共3種狀態的FSM爲例,共有3套斷定條件,根據當前所處的狀態來肯定使用哪種斷定條件,共有3*3=9種動做,決定每一種狀態切換過程當中須要執行的動做。input

1.3 分析方法

一般能夠用一個表格來對所處理的FSM進行分析,防止狀況的遺漏。it

在表格中分析清楚每一種狀態切換的斷定條件和執行動做,再用代碼實現,能夠最大程度地減輕思考的難度,減小錯誤的機率。

2. 代碼實現

以OOP的方式,作了一個基礎的Python實現。

FSM基類

class StateMachine:
    def __init__(self, cfg, states, events_handler, actions_handler):
        # config information for an instance
        self.cfg = cfg
        # define the states and the initial state
        self.states = [s.lower() for s in states]
        self.state = self.states[0]
        # process the inputs according to current state
        self.events = dict()
        # actions according to current transfer 
        self.actions = {state: dict() for state in self.states}
        # cached data for temporary use
        self.records = dict()
        # add events and actions
        for i, state in enumerate(self.states):
            self._add_event(state, events_handler[i])
            for j, n_state in enumerate(self.states):
                self._add_action(state, n_state, actions_handler[i][j])

    def _add_event(self, state, handler):
        self.events[state] = handler

    def _add_action(self, cur_state, next_state, handler):
        self.actions[cur_state][next_state] = handler

    def run(self, inputs):
        # decide the state-transfer according to the inputs
        new_state, outputs = self.events[self.state](inputs, self.states, self.records, self.cfg)
        # do the actions related with the transfer 
        self.actions[self.state][new_state](outputs, self.records, self.cfg)
        # do the state transfer
        self.state = new_state
        return new_state

    def reset(self):
        self.state = self.states[0]
        self.records = dict()
        return

# handlers for events and actions, event_X and action_XX are all specific functions
events_handlers = [event_A, event_B]
actions_handlers = [[action_AA, action_AB],
                    [action_BA, action_BB]]

# define an instance of StateMachine
state_machine = StateMachine(cfg, states, events_handlers, actions_handlers)

若是對於狀態機有具體的要求,能夠繼承這個基類進行派生。

好比,有對狀態機分層嵌套的需求。

class StateGeneral(StateMachine):
    def __init__(self, cfg, states):
        super(StateGeneral, self).__init__(cfg, states, events_handler, actions_handler)
        self.sub_state_machines = dict()

    def add_sub_fsm(self, name, fsm):
        self.sub_state_machines[name] = fsm

    def run(self, inputs):
        new_state, outputs = self.events[self.state](inputs, self.states, self.records, self.cfg)
        # operate the sub_state_machines in actions
        self.actions[self.state][new_state](outputs, self.records, self.cfg, \
        									self.sub_state_machines)
        self.state = new_state
        return new_state

    def reset(self):
        self.state = self.states[0]
        self.records = dict()
        for _, sub_fsm in self.sub_state_machines.items():
            sub_fsm.reset()
        return
相關文章
相關標籤/搜索