其主要的函數有app註冊、註銷、查找、並定義了RYUAPP基類,定義了RYUAPP的基本屬性。包含name, threads, events, event_handlers和observers等成員,以及對應的許多基本函數。如:start(), stop()等。web
controller文件夾中許多很是重要的文件,如events.py, ofp_handler.py, controller.py等。其中controller.py中定義了OpenFlowController基類。用於定義OpenFlow的控制器,用於處理交換機和控制器的鏈接等事件,同時還能夠產生事件和路由事件。其事件系統的定義,能夠查看events.py和ofp_events.py。編程
在ofp_handler.py中定義了基本的handler(應該怎麼稱呼呢?句柄?處理函數?),完成了基本的如:握手,錯誤信息處理和keep alive 等功能。更多的如packet_in_handler應該在app中定義。api
lib中定義了咱們須要使用到的基本的數據結構,如dpid, mac和ip等數據結構。在lib/packet目錄下,還定義了許多網絡協議,如ICMP, DHCP, MPLS和IGMP等協議內容。而每個數據包的類中都有parser和serialize兩個函數。用於解析和序列化數據包。cookie
lib目錄下,還有ovs, netconf目錄,對應的目錄下有一些定義好的數據類型,再也不贅述。網絡
Ryu 事件處理、進程與線程:
1) Applications:該類繼承自ryu.base.app_manager.RyuApp,用戶邏輯被描述爲一個APP。
2) Event : 繼承自ryu.controller.event.EventBase , 應用程序之間的通訊由transmitting and receiving events 完成。
3) Event Queue:每個application 都有一個隊列用於接收事件。
4) Threads:Ryu 使用第三方庫eventlets 運行多線程。由於線程是非搶佔式的,所以,當執行耗時的處理程序時要很是當心。
5) Event loops: 建立一個application 時,會自動生成一個線程,該線程運行一個事件循環。當隊列事件不爲空時,這個事件循環會加載該事件而且調用相應的事件處理函數(註冊以後)。
6) Additional threads:可使用hub.spawn()添加其它線程,用來處理特殊的應用
7) Eventlets:這是一個第三方庫,裏面的庫函數被封裝到hub 模塊中被開發人員加載使用。【提供線程和事件隊列的實現】
8) Event handlers:使用ryu.controller.handler.set_ev_cls 修飾一個事件處理函數。當該類型的事件觸發後,事件處理函數就會被應用程序的事件循環調用。
負責底層數據通訊的模塊是ofp\_handler模塊。ofp\_handler啓動以後,start函數實例化了一個controller.OpenFlowController實例。OpenFlowController實例化以後,當即調用\__call\__()函數,call函數啓動了server\_loop去建立server socket,其handler爲domain\_connection\_factory函數。每當收到一個switch鏈接,domain\_connection\_factory就會實例化一個datapath對象。這個對象用於描述交換機的全部行爲。其中定義了接收循環和發送循環。
def serve(self): send_thr = hub.spawn(self._send_loop) # send hello message immediately hello = self.ofproto_parser.OFPHello(self) self.send_msg(hello) try: self._recv_loop() finally: hub.kill(send_thr) hub.joinall([send_thr])
@_deactivate def _recv_loop(self): buf = bytearray() #初始化一個字節數組 required_len = ofproto_common.OFP_HEADER_SIZE # ofproto_common模塊定義了OpenFlow經常使用的公共屬性 # 如報頭長度=8 count = 0 while self.is_active: ret = self.socket.recv(required_len) if len(ret) == 0: self.is_active = False break buf += ret while len(buf) >= required_len: # ofproto_parser是在Datapath實例的父類ProtocolDesc的屬性。 # 用於尋找對應協議版本的解析文件,如ofproto_v1_0_parser.py # header函數是解析報頭的函數。定義在ofproto_parser.py。 (version, msg_type, msg_len, xid) = ofproto_parser.header(buf) required_len = msg_len if len(buf) < required_len: break # ofproto_parser.msg的定義並無在對應的ofproto_parser中 # msg函數的位置和header函數位置同樣,都在ofproto_parser.py中。 # msg返回的是解析完成的消息。 # msg函數返回了msg_parser函數的返回值 # ofproto_parser.py中的_MSG_PARSERS記錄了不一樣版本對應的msg_parser。其註冊手法是經過@ofproto_parser.register_msg_parser(ofproto.OFP_VERSION)裝飾器。 # 在對應版本的ofproto_parser,如ofproto_v1_0_parser.py中,都有定義一個同名的_MSG_PARSERS字典,這個字典用於記錄報文類型和解析函數的關係。此處命名不恰當,引入混淆。 # parser函數經過@register_parser來將函數註冊/記錄到_MSG_PARSERS字典中。 msg = ofproto_parser.msg(self, version, msg_type, msg_len, xid, buf) # LOG.debug('queue msg %s cls %s', msg, msg.__class__) if msg: # Ryu定義的Event system很簡單,在報文名前加上前綴「Event」,便是事件的名稱。 # 同時此類繫帶msg信息。 # 使用send_event_to_obserevrs()函數將事件分發給監聽事件的handler,完成事件的分發。 ev = ofp_event.ofp_msg_to_ev(msg) self.ofp_brick.send_event_to_observers(ev, self.state) dispatchers = lambda x: x.callers[ev.__class__].dispatchers # handler的註冊是經過使用controller.handler.py文件下定義的set_ev_handler做爲裝飾器去註冊。 # self.ofp_brick在初始化時,由註冊在服務列表中查找名爲"ofp_event"的模塊賦值。 # ofp_handler模塊的名字爲"ofp_event",因此對應的模塊是ofp_handler handlers = [handler for handler in self.ofp_brick.get_handlers(ev) if self.state in dispatchers(handler)] for handler in handlers: handler(ev) buf = buf[required_len:] required_len = ofproto_common.OFP_HEADER_SIZE # We need to schedule other greenlets. Otherwise, ryu # can't accept new switches or handle the existing # switches. The limit is arbitrary. We need the better # approach in the future. count += 1 if count > 2048: count = 0 hub.sleep(0)
_versions = { ofproto_v1_0.OFP_VERSION: (ofproto_v1_0, ofproto_v1_0_parser), ofproto_v1_2.OFP_VERSION: (ofproto_v1_2, ofproto_v1_2_parser), ofproto_v1_3.OFP_VERSION: (ofproto_v1_3, ofproto_v1_3_parser), ofproto_v1_4.OFP_VERSION: (ofproto_v1_4, ofproto_v1_4_parser), }
@create_list_of_base_attributes def __init__(self, datapath): super(MsgBase, self).__init__() self.datapath = datapath self.version = None self.msg_type = None self.msg_len = None self.xid = None self.buf = None
@classmethod def parser(cls, datapath, version, msg_type, msg_len, xid, buf): msg_ = cls(datapath) msg_.set_headers(version, msg_type, msg_len, xid) msg_.set_buf(buf) return msg_
serialize函數分爲3部分,self.\_serialize\_pre(), self.\_serialize\_body()和self.\_serialize\_header()。本質上完成了header的序列化。關於body的序列化,將在對應的派生類中獲得重寫。
# enum ofp_port OFPP_MAX = 0xff00 OFPP_IN_PORT = 0xfff8 # Send the packet out the input port. This # virtual port must be explicitly used # in order to send back out of the input # port. OFPP_TABLE = 0xfff9 # Perform actions in flow table. # NB: This can only be the destination # port for packet-out messages. OFPP_NORMAL = 0xfffa # Process with normal L2/L3 switching. OFPP_FLOOD = 0xfffb # All physical ports except input port and # those disabled by STP. OFPP_ALL = 0xfffc # All physical ports except input port. OFPP_CONTROLLER = 0xfffd # Send to controller. OFPP_LOCAL = 0xfffe # Local openflow "port". OFPP_NONE = 0xffff # Not associated with a physical port. # enum ofp_type OFPT_HELLO = 0 # Symmetric message OFPT_ERROR = 1 # Symmetric message OFPT_ECHO_REQUEST = 2 # Symmetric message OFPT_ECHO_REPLY = 3 # Symmetric message OFPT_VENDOR = 4 # Symmetric message OFPT_FEATURES_REQUEST = 5 # Controller/switch message OFPT_FEATURES_REPLY = 6 # Controller/switch message OFPT_GET_CONFIG_REQUEST = 7 # Controller/switch message OFPT_GET_CONFIG_REPLY = 8 # Controller/switch message OFPT_SET_CONFIG = 9 # Controller/switch message OFPT_PACKET_IN = 10 # Async message OFPT_FLOW_REMOVED = 11 # Async message OFPT_PORT_STATUS = 12 # Async message OFPT_PACKET_OUT = 13 # Controller/switch message OFPT_FLOW_MOD = 14 # Controller/switch message OFPT_PORT_MOD = 15 # Controller/switch message OFPT_STATS_REQUEST = 16 # Controller/switch message OFPT_STATS_REPLY = 17 # Controller/switch message OFPT_BARRIER_REQUEST = 18 # Controller/switch message OFPT_BARRIER_REPLY = 19 # Controller/switch message OFPT_QUEUE_GET_CONFIG_REQUEST = 20 # Controller/switch message OFPT_QUEUE_GET_CONFIG_REPLY = 21 # Controller/switch message OFP_HEADER_PACK_STR = '!BBHI' OFP_HEADER_SIZE = 8 OFP_MSG_SIZE_MAX = 65535 assert calcsize(OFP_HEADER_PACK_STR) == OFP_HEADER_SIZE
OFP\_HEADER\_PACK\_STR = '!BBHI'的意思是將header按照8|8|16|32的長度封裝成對應的數值。在Python中分別對應的是1個字節的integer|一個字節的integer|2個字節的integer|4個字節的integer。
本模塊用於定義報文的解析等動態內容。模塊中定義了與OpenFlow協議對應的Common\_struct及message type對應的類。每個message對應的類都是有MsgBase派生的,其繼承了父類的parser函數和serialize函數。若報文無消息體,如Hello報文,則無需重寫parser和serialize函數。
def _set_msg_type(msg_type):
'''Annotate corresponding OFP message type'''
def _set_cls_msg_type(cls): cls.cls_msg_type = msg_type return cls return _set_cls_msg_type def _register_parser(cls): '''class decorator to register msg parser''' assert cls.cls_msg_type is not None assert cls.cls_msg_type not in _MSG_PARSERS _MSG_PARSERS[cls.cls_msg_type] = cls.parser return cls @ofproto_parser.register_msg_parser(ofproto.OFP_VERSION) def msg_parser(datapath, version, msg_type, msg_len, xid, buf): parser = _MSG_PARSERS.get(msg_type) return parser(datapath, version, msg_type, msg_len, xid, buf) def _set_msg_reply(msg_reply): '''Annotate OFP reply message class''' def _set_cls_msg_reply(cls): cls.cls_msg_reply = msg_reply return cls return _set_cls_msg_reply
class OFPPacketIn(MsgBase): def __init__(self, datapath, buffer_id=None, total_len=None, in_port=None, reason=None, data=None): super(OFPPacketIn, self).__init__(datapath) self.buffer_id = buffer_id self.total_len = total_len self.in_port = in_port self.reason = reason self.data = data @classmethod def parser(cls, datapath, version, msg_type, msg_len, xid, buf): # 解析頭部,獲取msg msg = super(OFPPacketIn, cls).parser(datapath, version, msg_type, msg_len, xid, buf) # 解析body,獲取packet_in相關字段。 (msg.buffer_id, msg.total_len, msg.in_port, msg.reason) = struct.unpack_from( ofproto.OFP_PACKET_IN_PACK_STR, msg.buf, ofproto.OFP_HEADER_SIZE) # 將ofproto.OFP_PACKET_IN_SIZE長度以外的buf內容,賦值給data msg.data = msg.buf[ofproto.OFP_PACKET_IN_SIZE:] if msg.total_len < len(msg.data): # discard padding for 8-byte alignment of OFP packet msg.data = msg.data[:msg.total_len] return msg @_set_msg_type(ofproto.OFPT_FLOW_MOD) class OFPFlowMod(MsgBase): def __init__(self, datapath, match, cookie, command, idle_timeout=0, hard_timeout=0, priority=ofproto.OFP_DEFAULT_PRIORITY, buffer_id=0xffffffff, out_port=ofproto.OFPP_NONE, flags=0, actions=None): if actions is None: actions = [] super(OFPFlowMod, self).__init__(datapath) self.match = match self.cookie = cookie self.command = command self.idle_timeout = idle_timeout self.hard_timeout = hard_timeout self.priority = priority self.buffer_id = buffer_id self.out_port = out_port self.flags = flags self.actions = actions def _serialize_body(self): offset = ofproto.OFP_HEADER_SIZE self.match.serialize(self.buf, offset) # 封裝的起點是offset offset += ofproto.OFP_MATCH_SIZE # 按照ofproto.OFP_FLOW_MOD_PACK_STR0的格式,將對應的字段封裝到self.buf中 msg_pack_into(ofproto.OFP_FLOW_MOD_PACK_STR0, self.buf, offset, self.cookie, self.command, self.idle_timeout, self.hard_timeout, self.priority, self.buffer_id, self.out_port, self.flags) offset = ofproto.OFP_FLOW_MOD_SIZE if self.actions is not None: for a in self.actions: # 序列化action a.serialize(self.buf, offset) offset += a.len
在1.3等高版本OpenFlow中,使用到了oxm\_field的概念。oxm全稱爲OpenFlow Extensible Match。當OpenFlow逐漸發展成熟,flow的match域愈來愈多。然而不少通訊場景下使用到的匹配字段不多,甚至只有一個。OXM是一種TLV格式,使用OXM能夠在下發流表時僅攜帶使用到的match域內容,而放棄剩餘的大量的match域。當使用的match域較少時,統計機率上會減小報文傳輸的字節數。
該文件定義了nicira extensible match的相關內容。
def _ofp_msg_name_to_ev_name(msg_name): return 'Event' + msg_name
從main函數入手,講述RYU的ryuapp基類細節、app_manager類如何load apps,註冊並運行application,Event的產生以及分發,還有最重要的應用ofp_handler。
def main(args=None, prog=None):
try: CONF(args=args, prog=prog, project='ryu', version='ryu-manager %s' % version, default_config_files=['/usr/local/etc/ryu/ryu.conf']) except cfg.ConfigFilesNotFoundError: CONF(args=args, prog=prog, project='ryu', version='ryu-manager %s' % version) log.init_log() #初始化打印log if CONF.pid_file: import os with open(CONF.pid_file, 'w') as pid_file: pid_file.write(str(os.getpid())) app_lists = CONF.app_lists + CONF.app # keep old behaivor, run ofp if no application is specified. if not app_lists: app_lists = ['ryu.controller.ofp_handler'] app_mgr = AppManager.get_instance() #在AppManager類中獲取實例 app_mgr.load_apps(app_lists) #加載App contexts = app_mgr.create_contexts() #建立運行環境,"dpset"/"wsgi" services = [] services.extend(app_mgr.instantiate_apps(**contexts))
#ryu.controller.dpset.DPSet / rest_firewall.RestFirewallAPI / ryu.controller.ofp_handler.OFPHandler webapp = wsgi.start_service(app_mgr) #webapp啓動 if webapp: thr = hub.spawn(webapp) services.append(thr) try: hub.joinall(services) #調用t.wait(),執行等待,wait()方法使當前線程暫停執行並釋放對象鎖標誌
finally: app_mgr.close()
首先從CONF文件中讀取出app list。若是ryu-manager 命令任何參數,則默認應用爲ofp_handler應用。緊接着實例化一個AppManager對象,調用load_apps函數將應用加載。調用create_contexts函數建立對應的contexts, 而後調用instantiate_apps函數將app_list和context中的app均實例化。啓動wsgi架構,提供web應用。最後將全部的應用做爲任務,做爲coroutine的task去執行,joinall使得程序必須等待全部的task都執行完成才能夠退出程序。最後調用close函數,關閉程序,釋放資源。如下的部分將以主函數中出現的調用順序爲依據,展開講解。
def start(self): super(OFPHandler, self).start() return hub.spawn(OpenFlowController())
Python修飾器的函數式編程 Python Decorator能夠看做是一種聲明,一種修飾。如下舉例參考自Coolshell。舉例以下:
@decorator def foo(): pass
實際上等同於foo = decorator(foo), 並且它還被執行了。舉個例子:
def keyword(fn): print "you %s me!" % fn.__name__.upper() @keyword def evol(): pass
運行以後,就會輸出 you EVOL me
@decorator_a @decorator_b def foo(): pass
foo = decorator_a(decorator_b(foo))
@decorator(arg1, arg2) def foo(): pass
foo = decorator(arg1,arg2)(foo)
class式的 Decorator
class myDecorator(object): def __init__(self, fn): print "inside myDecorator.__init__()" self.fn = fn def __call__(self): self.fn() print "inside myDecorator.__call__()" @myDecorator def aFunction(): print "inside aFunction()" print "Finished decorating aFunction()" aFunction()
inside myDecorator.__init__()
Finished decorating aFunction()
inside aFunction()
inside myDecorator.__call__()
@decorator使用時,__init__被調用,當function被調用是,執行__call__函數,而不執行function,因此在__call__函數中須要寫出self.fn = fn,更多內容能夠直接訪問Python Decorator Library。
class StreamServer(object): def __init__(self, listen_info, handle=None, backlog=None, spawn='default', **ssl_args): assert backlog is None assert spawn == 'default' if ':' in listen_info[0]: self.server = eventlet.listen(listen_info, family=socket.AF_INET6) else: self.server = eventlet.listen(listen_info) if ssl_args: def wrap_and_handle(sock, addr): ssl_args.setdefault('server_side', True) handle(ssl.wrap_socket(sock, **ssl_args), addr) self.handle = wrap_and_handle else: self.handle = handle def serve_forever(self): while True: sock, addr = self.server.accept() spawn(self.handle, sock, addr)
Datapath類在RYU中極爲重要,每當一個datapath實體與控制器創建鏈接時,就會實例化一個Datapath的對象。 該類中不只定義了許多的成員變量用於描述一個datapath,還管理控制器與該datapath通訊的數據收發。其中_recv_loop函數完成數據的接收與解析,事件的產生與分發。
@_deactivate def _recv_loop(self): buf = bytearray() required_len = ofproto_common.OFP_HEADER_SIZE count = 0 while self.is_active: ret = self.socket.recv(required_len) if len(ret) == 0: self.is_active = False break buf += ret while len(buf) >= required_len: (version, msg_type, msg_len, xid) = ofproto_parser.header(buf) required_len = msg_len if len(buf) < required_len: break msg = ofproto_parser.msg(self, version, msg_type, msg_len, xid, buf) # 解析報文 # LOG.debug('queue msg %s cls %s', msg, msg.__class__) if msg: ev = ofp_event.ofp_msg_to_ev(msg) # 產生事件 self.ofp_brick.send_event_to_observers(ev, self.state) # 事件分發 dispatchers = lambda x: x.callers[ev.__class__].dispatchers handlers = [handler for handler in self.ofp_brick.get_handlers(ev) if self.state in dispatchers(handler)] for handler in handlers: handler(ev) buf = buf[required_len:] required_len = ofproto_common.OFP_HEADER_SIZE # We need to schedule other greenlets. Otherwise, ryu # can't accept new switches or handle the existing # switches. The limit is arbitrary. We need the better # approach in the future. count += 1 if count > 2048: count = 0 hub.sleep(0)
@_deactivate修飾符做用在於在Datapath斷開鏈接以後,將其狀態is_active置爲False。self.ofp_brick.send_event_to_observers(ev, self.state) 語句完成了事件的分發。self.brick的初始化語句能夠在self.__init__函數中找到:
self.ofp_brick = ryu.base.app_manager.lookup_service_brick('ofp_event')
def register_service(service): """ Register the ryu application specified by 'service' as a provider of events defined in the calling module. If an application being loaded consumes events (in the sense of set_ev_cls) provided by the 'service' application, the latter application will be automatically loaded. This mechanism is used to e.g. automatically start ofp_handler if there are applications consuming OFP events. """ frm = inspect.stack()[1] m = inspect.getmodule(frm[0]) m._SERVICE_NAME = service
其中inspect.stack()[1]返回了調用此函數的caller, inspect.getmodule(frm[0])返回了該caller的模塊,當前例子下,module=ofp_event。
咱們能夠經過ryu-manager --verbose來查看到輸出信息,從而印證這一點。
:~/ryu/ryu/app$ ryu-manager --verbose
loading app ryu.controller.ofp_handler
instantiating app ryu.controller.ofp_handler of OFPHandler
BRICK ofp_event
CONSUMES EventOFPEchoRequest
CONSUMES EventOFPPortDescStatsReply
CONSUMES EventOFPSwitchFeatures
因此當運行ofp_handler應用時,就會註冊ofp_event service,爲後續的應用提供服務。分發事件以後,還要處理自身訂閱的事件,因此首先找到符合當前state的caller,而後調用handler。_caller類能夠在handler.py文件中找到,包含dispatchers和ev_source兩個成員變量。前者用於描述caller須要的state,後者是event產生者的模塊名稱。
@_deactivate def _send_loop(self): try: while self.is_active: buf = self.send_q.get() self.socket.sendall(buf) finally: q = self.send_q # first, clear self.send_q to prevent new references. self.send_q = None # there might be threads currently blocking in send_q.put(). # unblock them by draining the queue. try: while q.get(block=False): pass except hub.QueueEmpty: pass
serve函數完成了發送循環的啓動和接受循環的啓動。啓動一個coroutine去執行self._send_loop(), 而後立刻主動發送hello報文到datapath(能夠理解爲交換網橋:Bridge),最後執行self._recv_loop()。
def serve(self): send_thr = hub.spawn(self._send_loop) # send hello message immediately hello = self.ofproto_parser.OFPHello(self) self.send_msg(hello) try: self._recv_loop() finally: hub.kill(send_thr) hub.joinall([send_thr])
而serve函數又在datapath_connection_factory函數中被調用。固然向外提供完整功能的API就是這個。因此在OpenFlowController類中能夠看到在初始化server實例的時候,handler賦值爲datapath_connection_factory。其中使用到的contextlib module具體內容不做介紹,讀者可自行學習。
def datapath_connection_factory(socket, address): LOG.debug('connected socket:%s address:%s', socket, address) with contextlib.closing(Datapath(socket, address)) as datapath: try: datapath.serve() except: # Something went wrong. # Especially malicious switch can send malformed packet, # the parser raise exception. # Can we do anything more graceful? if datapath.id is None: dpid_str = "%s" % datapath.id else: dpid_str = dpid_to_str(datapath.id) LOG.error("Error in the datapath %s from %s", dpid_str, address) raise
app_manager.py文件中import了instpect和itertools module,從而使得開發更方便簡潔。inspect模塊提供了一些有用的方法,用於類型檢測,獲取內容,檢測是否可迭代等功能。itertools則是一個關於迭代器的模塊,能夠提供豐富的迭代器類型,在數據處理上尤爲有用。
這是一個極其難理解的概念。博主的理解是,_CONTEXT內存儲着name:class的key value pairs。爲何須要存儲這個內容?實際上這個_CONTEXT攜帶的信息是全部本APP須要依賴的APP。須要在啓動本應用以前去啓動,以知足依賴的,好比一個simple_switch.py的應用,若是沒有OFPHandler應用做爲數據收發和解析的基礎的話,是沒法運行的。具體文檔以下:
_CONTEXTS = {} """ A dictionary to specify contexts which this Ryu application wants to use. Its key is a name of context and its value is an ordinary class which implements the context. The class is instantiated by app_manager and the instance is shared among RyuApp subclasses which has _CONTEXTS member with the same key. A RyuApp subclass can obtain a reference to the instance via its __init__'s kwargs as the following. Example:: _CONTEXTS = { 'network': network.Network } def __init__(self, *args, *kwargs): self.network = kwargs['network'] """
def __init__(self, *_args, **_kwargs): super(RyuApp, self).__init__() self.name = self.__class__.__name__ self.event_handlers = {} # ev_cls -> handlers:list self.observers = {} # ev_cls -> observer-name -> states:set self.threads = [] self.events = hub.Queue(128) if hasattr(self.__class__, 'LOGGER_NAME'): self.logger = logging.getLogger(self.__class__.LOGGER_NAME) else: self.logger = logging.getLogger(self.name) self.CONF = cfg.CONF # prevent accidental creation of instances of this class outside RyuApp class _EventThreadStop(event.EventBase): pass self._event_stop = _EventThreadStop() self.is_active = True
_event_loop函數用於啓動事件處理循環,經過調用self.get_handlers(ev, state)函數來找到事件對應的handler,而後處理事件。
def get_handlers(self, ev, state=None): """Returns a list of handlers for the specific event. :param ev: The event to handle. :param state: The current state. ("dispatcher") If None is given, returns all handlers for the event. Otherwise, returns only handlers that are interested in the specified state. The default is None. """ ev_cls = ev.__class__ handlers = self.event_handlers.get(ev_cls, []) if state is None: return handlers def _event_loop(self): while self.is_active or not self.events.empty(): ev, state = self.events.get() if ev == self._event_stop: continue handlers = self.get_handlers(ev, state) for handler in handlers: handler(ev)
應用中能夠經過@set_ev_cls修飾符去監聽某些事件。當產生event時,經過event去get observer,獲得對應的觀察者,而後再使用self.send_event函數去發送事件。在這裏,實際上就是直接往self.event隊列中put event。
def _send_event(self, ev, state): self.events.put((ev, state)) def send_event(self, name, ev, state=None): """ Send the specified event to the RyuApp instance specified by name. """ if name in SERVICE_BRICKS: if isinstance(ev, EventRequestBase): ev.src = self.name LOG.debug("EVENT %s->%s %s" % (self.name, name, ev.__class__.__name__)) SERVICE_BRICKS[name]._send_event(ev, state) else: LOG.debug("EVENT LOST %s->%s %s" % (self.name, name, ev.__class__.__name__)) def send_event_to_observers(self, ev, state=None): """ Send the specified event to all observers of this RyuApp. """ for observer in self.get_observers(ev, state): self.send_event(observer, ev, state)
@staticmethod def run_apps(app_lists): """Run a set of Ryu applications A convenient method to load and instantiate apps. This blocks until all relevant apps stop. """ app_mgr = AppManager.get_instance() app_mgr.load_apps(app_lists) contexts = app_mgr.create_contexts() services = app_mgr.instantiate_apps(**contexts) webapp = wsgi.start_service(app_mgr) if webapp: services.append(hub.spawn(webapp)) try: hub.joinall(services) finally: app_mgr.close()
def load_apps(self, app_lists): app_lists = [app for app in itertools.chain.from_iterable(app.split(',') for app in app_lists)] while len(app_lists) > 0: app_cls_name = app_lists.pop(0) context_modules = map(lambda x: x.__module__, self.contexts_cls.values()) if app_cls_name in context_modules: continue LOG.info('loading app %s', app_cls_name) cls = self.load_app(app_cls_name) if cls is None: continue self.applications_cls[app_cls_name] = cls services = [] for key, context_cls in cls.context_iteritems(): v = self.contexts_cls.setdefault(key, context_cls) assert v == context_cls context_modules.append(context_cls.__module__) if issubclass(context_cls, RyuApp): services.extend(get_dependent_services(context_cls)) # we can't load an app that will be initiataed for # contexts. for i in get_dependent_services(cls): if i not in context_modules: services.append(i) if services: app_lists.extend([s for s in set(services) if s not in app_lists])
context實例化函數將context中name:service class鍵值對的內容實例化成對應的對象,以便加入到services 列表中,從而獲得加載。首先從列表中取出對應數據,而後判斷是否時RyuApp的子類,是則實例化,不然直接賦值service class。load_app函數在讀取的時候還會再次判斷是不是RyuApp子類。
def create_contexts(self): for key, cls in self.contexts_cls.items(): if issubclass(cls, RyuApp): # hack for dpset context = self._instantiate(None, cls) else: context = cls() LOG.info('creating context %s', key) assert key not in self.contexts self.contexts[key] = context return self.contexts
此函數調用了self._instantiate函數,在_instantiate函數中又調用了register_app()函數,此函數將app添加到SERVICE_BRICKS字典之中,而後繼續調用了ryu.controller.handler 中的 register_instance函數,最終完成了應用的註冊。此後繼續調用self._update_bricks函數完成了服務鏈表的更新,最後啓動了全部的應用。
def instantiate_apps(self, *args, **kwargs): for app_name, cls in self.applications_cls.items(): self._instantiate(app_name, cls, *args, **kwargs) self._update_bricks() self.report_bricks() threads = [] for app in self.applications.values(): t = app.start() if t is not None: threads.append(t) return threads def _instantiate(self, app_name, cls, *args, **kwargs): # for now, only single instance of a given module # Do we need to support multiple instances? # Yes, maybe for slicing. #LOG.info('instantiating app %s of %s', app_name, cls.__name__) if hasattr(cls, 'OFP_VERSIONS') and cls.OFP_VERSIONS is not None: ofproto_protocol.set_app_supported_versions(cls.OFP_VERSIONS) if app_name is not None: assert app_name not in self.applications app = cls(*args, **kwargs) register_app(app) assert app.name not in self.applications self.applications[app.name] = app return app
此函數完成了更新service_bricks的功能。首先從獲取到service實例,而後再獲取到service中的方法,若方法有callers屬性,即便用了@set_ev_cls的裝飾符,擁有了calls屬性。(caller類中的ev_source和dispatcher成員變量描述了產生該event的source module, dispatcher描述了event須要在什麼狀態下才能夠被分發。如:HANDSHAKE_DISPATCHER,CONFIG_DISPATCHER等。)最後調用register_observer函數註冊了observer。
def _update_bricks(self): for i in SERVICE_BRICKS.values(): for _k, m in inspect.getmembers(i, inspect.ismethod): if not hasattr(m, 'callers'): continue for ev_cls, c in m.callers.iteritems(): if not c.ev_source: continue brick = _lookup_service_brick_by_mod_name(c.ev_source) if brick: brick.register_observer(ev_cls, i.name, c.dispatchers) # allow RyuApp and Event class are in different module for brick in SERVICE_BRICKS.itervalues(): if ev_cls in brick._EVENTS: brick.register_observer(ev_cls, i.name, c.dispatchers)
以上的部分介紹了App的註冊,observer的註冊,handler的查找和使用,可是,始終沒有提到handler在何處註冊。實際上,handler的註冊在register_instance部分完成了。爲何他的位置在handler文件,而不在app_manager文件呢?我的認爲多是爲了給其餘非Ryu APP的模塊使用吧。
def register_instance(i): for _k, m in inspect.getmembers(i, inspect.ismethod): # LOG.debug('instance %s k %s m %s', i, _k, m) if _has_caller(m): for ev_cls, c in m.callers.iteritems(): i.register_handler(ev_cls, m)
ryu.base import app_manager:該文件中定義了RyuApp基類,開發APP須要繼承該基類;
保存爲L2Switch.py 運行: ryu-manager L2Switch.py
from ryu.base import app_manager class L2Switch(app_manager.RyuApp): def __init__(self, *args, **kwargs): super(L2Switch, self).__init__(*args, **kwargs)
from ryu.base import app_manager from ryu.controller import ofp_event from ryu.controller.handler import MAIN_DISPATCHER from ryu.controller.handler import set_ev_cls class L2Switch(app_manager.RyuApp): def __init__(self, *args, **kwargs): super(L2Switch, self).__init__(*args, **kwargs) @set_ev_cls(ofp_event.EventOFPPacketIn, MAIN_DISPATCHER) def packet_in_handler(self, ev): msg = ev.msg datapath = msg.datapath ofp = datapath.ofproto ofp_parser = datapath.ofproto_parser actions = [ofp_parser.OFPActionOutput(ofp.OFPP_FLOOD)] out = ofp_parser.OFPPacketOut( datapath=datapath, buffer_id=msg.buffer_id, in_port=msg.in_port, actions=actions) datapath.send_msg(out)
import struct import logging from ryu.base import app_manager from ryu.controller import mac_to_port from ryu.controller import ofp_event from ryu.controller.handler import MAIN_DISPATCHER from ryu.controller.handler import set_ev_cls from ryu.ofproto import ofproto_v1_0 from ryu.lib.mac import haddr_to_bin from ryu.lib.packet import packet from ryu.lib.packet import ethernet class L2Switch(app_manager.RyuApp): OFP_VERSIONS = [ofproto_v1_0.OFP_VERSION]#define the version of OpenFlow def __init__(self, *args, **kwargs): super(L2Switch, self).__init__(*args, **kwargs) self.mac_to_port = {} def add_flow(self, datapath, in_port, dst, actions): ofproto = datapath.ofproto match = datapath.ofproto_parser.OFPMatch( in_port = in_port, dl_dst = haddr_to_bin(dst)) mod = datapath.ofproto_parser.OFPFlowMod( datapath = datapath, match = match, cookie = 0, command = ofproto.OFPFC_ADD, idle_timeout = 10,hard_timeout = 30, priority = ofproto.OFP_DEFAULT_PRIORITY, flags =ofproto.OFPFF_SEND_FLOW_REM, actions = actions) datapath.send_msg(mod) @set_ev_cls(ofp_event.EventOFPPacketIn, MAIN_DISPATCHER) def packet_in_handler(self, ev): msg = ev.msg datapath = msg.datapath ofproto = datapath.ofproto pkt = packet.Packet(msg.data) eth = pkt.get_protocol(ethernet.ethernet) dst = eth.dst src = eth.src dpid = datapath.id #get the dpid self.mac_to_port.setdefault(dpid, {}) self.logger.info("packet in %s %s %s %s", dpid, src, dst , msg.in_port) #To learn a mac address to avoid FLOOD next time. self.mac_to_port[dpid][src] = msg.in_port out_port = ofproto.OFPP_FLOOD #Look up the out_port if dst in self.mac_to_port[dpid]: out_port = self.mac_to_port[dpid][dst] ofp_parser = datapath.ofproto_parser actions = [ofp_parser.OFPActionOutput(out_port)] if out_port != ofproto.OFPP_FLOOD: self.add_flow(datapath, msg.in_port, dst, actions) #We always send the packet_out to handle the first packet. packet_out = ofp_parser.OFPPacketOut(datapath = datapath, buffer_id = msg.buffer_id, in_port = msg.in_port, actions = actions) datapath.send_msg(packet_out) #To show the message of ports' status. @set_ev_cls(ofp_event.EventOFPPortStatus, MAIN_DISPATCHER) def _port_status_handler(self, ev): msg = ev.msg reason = msg.reason port_no = msg.desc.port_no ofproto = msg.datapath.ofproto if reason == ofproto.OFPPR_ADD: self.logger.info("port added %s", port_no) elif reason == ofproto.OFPPR_DELETE: self.logger.info("port deleted %s", port_no) elif reason == ofproto.OFPPR_MODIFY: self.logger.info("port modified %s", port_no) else: self.logger.info("Illeagal port state %s %s", port_no, reason)
第一,在RYU控制器開啓simple-switch.py的APP,輸入命令:ryu-manager simple-switch.py:
第二,在另一個終端上創建mininet模擬拓撲,輸入命令:mn --topo single,3 --mac --switch ovsk --controller remote
此時,在交換機的轉發流表是空的,所以此時主機之間是不能夠通訊的,在使用h1去ping h2的時候,就會自動創建流表