下面介紹ryu/ryu目錄下的主要目錄內容。html
base中有一個很是重要的文件:app_manager.py,其做用是RYU應用的管理中心。用於加載RYU應用程序,接受從APP發送過來的信息,同時也完成消息的路由。python
其主要的函數有app註冊、註銷、查找、並定義了RYUAPP基類,定義了RYUAPP的基本屬性。包含name, threads, events, event_handlers和observers等成員,以及對應的許多基本函數。如:start(), stop()等。web
這個文件中還定義了AppManager基類,用於管理APP。定義了加載APP等函數。不過若是僅僅是開發APP的話,這個類能夠沒必要關心。shell
controller文件夾中許多很是重要的文件,如events.py, ofp_handler.py, controller.py等。其中controller.py中定義了OpenFlowController基類。用於定義OpenFlow的控制器,用於處理交換機和控制器的鏈接等事件,同時還能夠產生事件和路由事件。其事件系統的定義,能夠查看events.py和ofp_events.py。編程
在ofp_handler.py中定義了基本的handler(應該怎麼稱呼呢?句柄?處理函數?),完成了基本的如:握手,錯誤信息處理和keep alive 等功能。更多的如packet_in_handler應該在app中定義。api
在dpset.py文件中,定義了交換機端的一些消息,如端口狀態信息等,用於描述和操做交換機。如添加端口,刪除端口等操做。數組
lib中定義了咱們須要使用到的基本的數據結構,如dpid, mac和ip等數據結構。在lib/packet目錄下,還定義了許多網絡協議,如ICMP, DHCP, MPLS和IGMP等協議內容。而每個數據包的類中都有parser和serialize兩個函數。用於解析和序列化數據包。cookie
lib目錄下,還有ovs, netconf目錄,對應的目錄下有一些定義好的數據類型,再也不贅述。網絡
在這個目錄下,基本分爲兩類文件,一類是協議的數據結構定義,另外一類是協議解析,也即數據包處理函數文件。如ofproto_v1_0.py是1.0版本的OpenFlow協議數據結構的定義,而ofproto_v1_0_parser.py則定義了1.0版本的協議編碼和解碼。數據結構
包含了switches.py等文件,基本定義了一套交換機的數據結構。event.py定義了交換上的事件。dumper.py定義了獲取網絡拓撲的內容。最後api.py向上提供了一套調用topology目錄中定義函數的接口。
這個文件夾主要存放的是開源社區貢獻者的代碼。
定義了RYU的命令系統,爲controller的執行建立環境,接收和處理相關命令
完成了BGP和vrrp的實現。
tests目錄下存放了單元測試以及整合測試的代碼。
RYU SDN 架構:
組件功能:
Ryu 事件處理、進程與線程:
1) Applications:該類繼承自ryu.base.app_manager.RyuApp,用戶邏輯被描述爲一個APP。
2) Event : 繼承自ryu.controller.event.EventBase , 應用程序之間的通訊由transmitting and receiving events 完成。
3) Event Queue:每個application 都有一個隊列用於接收事件。
4) Threads:Ryu 使用第三方庫eventlets 運行多線程。由於線程是非搶佔式的,所以,當執行耗時的處理程序時要很是當心。
5) Event loops: 建立一個application 時,會自動生成一個線程,該線程運行一個事件循環。當隊列事件不爲空時,這個事件循環會加載該事件而且調用相應的事件處理函數(註冊以後)。
6) Additional threads:可使用hub.spawn()添加其它線程,用來處理特殊的應用
7) Eventlets:這是一個第三方庫,裏面的庫函數被封裝到hub 模塊中被開發人員加載使用。【提供線程和事件隊列的實現】
8) Event handlers:使用ryu.controller.handler.set_ev_cls 修飾一個事件處理函數。當該類型的事件觸發後,事件處理函數就會被應用程序的事件循環調用。
負責底層數據通訊的模塊是ofp\_handler模塊。ofp\_handler啓動以後,start函數實例化了一個controller.OpenFlowController實例。OpenFlowController實例化以後,當即調用\__call\__()函數,call函數啓動了server\_loop去建立server socket,其handler爲domain\_connection\_factory函數。每當收到一個switch鏈接,domain\_connection\_factory就會實例化一個datapath對象。這個對象用於描述交換機的全部行爲。其中定義了接收循環和發送循環。
datapath.serve函數是socket通訊收發邏輯的入口。該函數啓動了一個綠色線程去處理髮送循環,而後本線程負責接收循環的處理。self.\_send\_loop是發送主循環。其主要邏輯爲:不斷獲取發送隊列是否有數據,如有,則發送;底層調用的是socket.send\_all()函數。
1
2
3
4
5
6
7
8
9
10
11
12
|
def
serve(
self
):
send_thr
=
hub.spawn(
self
._send_loop)
# send hello message immediately
hello
=
self
.ofproto_parser.OFPHello(
self
)
self
.send_msg(hello)
try
:
self
._recv_loop()
finally
:
hub.kill(send_thr)
hub.joinall([send_thr])
|
接收函數\_reck\_loop中實現了數據的接收和解析。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
|
@_deactivate
def
_recv_loop(
self
):
buf
=
bytearray()
#初始化一個字節數組
required_len
=
ofproto_common.OFP_HEADER_SIZE
# ofproto_common模塊定義了OpenFlow經常使用的公共屬性
# 如報頭長度=8
count
=
0
while
self
.is_active:
ret
=
self
.socket.recv(required_len)
if
len
(ret)
=
=
0
:
self
.is_active
=
False
break
buf
+
=
ret
while
len
(buf) >
=
required_len:
# ofproto_parser是在Datapath實例的父類ProtocolDesc的屬性。
# 用於尋找對應協議版本的解析文件,如ofproto_v1_0_parser.py
# header函數是解析報頭的函數。定義在ofproto_parser.py。
(version, msg_type, msg_len, xid)
=
ofproto_parser.header(buf)
required_len
=
msg_len
if
len
(buf) < required_len:
break
# ofproto_parser.msg的定義並無在對應的ofproto_parser中
# msg函數的位置和header函數位置同樣,都在ofproto_parser.py中。
# msg返回的是解析完成的消息。
# msg函數返回了msg_parser函數的返回值
# ofproto_parser.py中的_MSG_PARSERS記錄了不一樣版本對應的msg_parser。其註冊手法是經過@ofproto_parser.register_msg_parser(ofproto.OFP_VERSION)裝飾器。
# 在對應版本的ofproto_parser,如ofproto_v1_0_parser.py中,都有定義一個同名的_MSG_PARSERS字典,這個字典用於記錄報文類型和解析函數的關係。此處命名不恰當,引入混淆。
# parser函數經過@register_parser來將函數註冊/記錄到_MSG_PARSERS字典中。
msg
=
ofproto_parser.msg(
self
,
version, msg_type, msg_len, xid, buf)
# LOG.debug('queue msg %s cls %s', msg, msg.__class__)
if
msg:
# Ryu定義的Event system很簡單,在報文名前加上前綴「Event」,便是事件的名稱。
# 同時此類繫帶msg信息。
# 使用send_event_to_obserevrs()函數將事件分發給監聽事件的handler,完成事件的分發。
ev
=
ofp_event.ofp_msg_to_ev(msg)
self
.ofp_brick.send_event_to_observers(ev,
self
.state)
dispatchers
=
lambda
x: x.callers[ev.__class__].dispatchers
# handler的註冊是經過使用controller.handler.py文件下定義的set_ev_handler做爲裝飾器去註冊。
# self.ofp_brick在初始化時,由註冊在服務列表中查找名爲"ofp_event"的模塊賦值。
# ofp_handler模塊的名字爲"ofp_event",因此對應的模塊是ofp_handler
handlers
=
[handler
for
handler
in
self
.ofp_brick.get_handlers(ev)
if
self
.state
in
dispatchers(handler)]
for
handler
in
handlers:
handler(ev)
buf
=
buf[required_len:]
required_len
=
ofproto_common.OFP_HEADER_SIZE
# We need to schedule other greenlets. Otherwise, ryu
# can't accept new switches or handle the existing
# switches. The limit is arbitrary. We need the better
# approach in the future.
count
+
=
1
if
count >
2048
:
count
=
0
hub.sleep(
0
)
|
OpenFlow協議解析部分代碼大部分在ofproto目錄下,少部分在controller目錄下。首先介紹ofproto目錄下的源碼內容,再介紹controller目錄下的ofp_event文件。
首先,__init__.py並不爲空。該文件定義了兩個功能相似的函數get_ofp_module()和get_ofp_modules(),前者用於取得協議版本對應的協議定義文件和協議解析模塊,後者則取出整個字典。對應的字典在ofproto_protocol模塊中定義。
在ofproto\_protocol定義了\_versions字典,具體以下:
1
2
3
4
5
6
|
_versions
=
{
ofproto_v1_0.OFP_VERSION: (ofproto_v1_0, ofproto_v1_0_parser),
ofproto_v1_2.OFP_VERSION: (ofproto_v1_2, ofproto_v1_2_parser),
ofproto_v1_3.OFP_VERSION: (ofproto_v1_3, ofproto_v1_3_parser),
ofproto_v1_4.OFP_VERSION: (ofproto_v1_4, ofproto_v1_4_parser),
}
|
除此以外,該文件還定義了Datapath的父類ProtocolDesc,此類基本上只完成了與協議版本相關的內容。該類最重要的兩個成員是self.ofproto和self.ofproto\_parser,其值指明所本次通訊所使用的OpenFlow協議的版本以及對應的解析模塊。
ofproto\_common文件比較簡單,主要定義了OpenFlow須要使用的公共屬性,如監聽端口,報頭長度,報頭封裝格式等內容。
ofproto\_parser文件定義了全部版本都須要的解析相關的公共屬性。如定義了最重要的基類MsgBase(StringifyMixin)。
StringifyMixin類的定義在lib.stringify文件,有興趣的讀者可自行查看。MsgBase基類定義了最基礎的屬性信息,具體以下所示:
1
2
3
4
5
6
7
8
9
|
@create_list_of_base_attributes
def
__init__(
self
, datapath):
super
(MsgBase,
self
).__init__()
self
.datapath
=
datapath
self
.version
=
None
self
.msg_type
=
None
self
.msg_len
=
None
self
.xid
=
None
self
.buf
=
None
|
此外,該類還定義了基礎的parser函數和serialize函數。基礎的parser函數基本什麼都沒有作,僅返回一個賦值後的消息體。
1
2
3
4
5
6
|
@classmethod
def
parser(
cls
, datapath, version, msg_type, msg_len, xid, buf):
msg_
=
cls
(datapath)
msg_.set_headers(version, msg_type, msg_len, xid)
msg_.set_buf(buf)
return
msg_
|
serialize函數分爲3部分,self.\_serialize\_pre(), self.\_serialize\_body()和self.\_serialize\_header()。本質上完成了header的序列化。關於body的序列化,將在對應的派生類中獲得重寫。
以1.0版本爲例介紹ofproto\_v1\_x.py文件的做用。因爲Ryu支持多版本的OpenFlow,因此在ofproto目錄下,定義了從1.0到1.5版本的全部代碼實現。因此其文件命名爲ofproto\_v1_x.py,x從[1,2,3,4,5]中得到,分別對應相應的協議版本。
此類文件最重要的一個目的是定義了全部須要的靜態內容,包括某字段的全部選項以及消息封裝的格式以及長度。與OpenFlow消息內容相關的有協議的類型,動做的類型,port的類型等。此外對應每個報文,都須要定義其封裝的格式,以及封裝的長度。Ryu採用了Python的Struct庫去完成數據的解封裝工做,關於Struct的介紹將在後續內容介紹。具體定義內容舉例以下:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
|
# enum ofp_port
OFPP_MAX
=
0xff00
OFPP_IN_PORT
=
0xfff8
# Send the packet out the input port. This
# virtual port must be explicitly used
# in order to send back out of the input
# port.
OFPP_TABLE
=
0xfff9
# Perform actions in flow table.
# NB: This can only be the destination
# port for packet-out messages.
OFPP_NORMAL
=
0xfffa
# Process with normal L2/L3 switching.
OFPP_FLOOD
=
0xfffb
# All physical ports except input port and
# those disabled by STP.
OFPP_ALL
=
0xfffc
# All physical ports except input port.
OFPP_CONTROLLER
=
0xfffd
# Send to controller.
OFPP_LOCAL
=
0xfffe
# Local openflow "port".
OFPP_NONE
=
0xffff
# Not associated with a physical port.
# enum ofp_type
OFPT_HELLO
=
0
# Symmetric message
OFPT_ERROR
=
1
# Symmetric message
OFPT_ECHO_REQUEST
=
2
# Symmetric message
OFPT_ECHO_REPLY
=
3
# Symmetric message
OFPT_VENDOR
=
4
# Symmetric message
OFPT_FEATURES_REQUEST
=
5
# Controller/switch message
OFPT_FEATURES_REPLY
=
6
# Controller/switch message
OFPT_GET_CONFIG_REQUEST
=
7
# Controller/switch message
OFPT_GET_CONFIG_REPLY
=
8
# Controller/switch message
OFPT_SET_CONFIG
=
9
# Controller/switch message
OFPT_PACKET_IN
=
10
# Async message
OFPT_FLOW_REMOVED
=
11
# Async message
OFPT_PORT_STATUS
=
12
# Async message
OFPT_PACKET_OUT
=
13
# Controller/switch message
OFPT_FLOW_MOD
=
14
# Controller/switch message
OFPT_PORT_MOD
=
15
# Controller/switch message
OFPT_STATS_REQUEST
=
16
# Controller/switch message
OFPT_STATS_REPLY
=
17
# Controller/switch message
OFPT_BARRIER_REQUEST
=
18
# Controller/switch message
OFPT_BARRIER_REPLY
=
19
# Controller/switch message
OFPT_QUEUE_GET_CONFIG_REQUEST
=
20
# Controller/switch message
OFPT_QUEUE_GET_CONFIG_REPLY
=
21
# Controller/switch message
OFP_HEADER_PACK_STR
=
'!BBHI'
OFP_HEADER_SIZE
=
8
OFP_MSG_SIZE_MAX
=
65535
assert
calcsize(OFP_HEADER_PACK_STR)
=
=
OFP_HEADER_SIZE
|
OFP\_HEADER\_PACK\_STR = '!BBHI'的意思是將header按照8|8|16|32的長度封裝成對應的數值。在Python中分別對應的是1個字節的integer|一個字節的integer|2個字節的integer|4個字節的integer。
calcsize函數用於計算對應的format的長度。
本模塊用於定義報文的解析等動態內容。模塊中定義了與OpenFlow協議對應的Common\_struct及message type對應的類。每個message對應的類都是有MsgBase派生的,其繼承了父類的parser函數和serialize函數。若報文無消息體,如Hello報文,則無需重寫parser和serialize函數。
本模塊定義了幾個重要的全局函數:\_set\_msg\_type,\_register\_parser,msg\_parser和\_set\_msg\_reply。其做用介紹以下:
def _set_msg_type(msg_type):
'''Annotate corresponding OFP message type'''
def _set_cls_msg_type(cls): cls.cls_msg_type = msg_type return cls return _set_cls_msg_type def _register_parser(cls): '''class decorator to register msg parser''' assert cls.cls_msg_type is not None assert cls.cls_msg_type not in _MSG_PARSERS _MSG_PARSERS[cls.cls_msg_type] = cls.parser return cls @ofproto_parser.register_msg_parser(ofproto.OFP_VERSION) def msg_parser(datapath, version, msg_type, msg_len, xid, buf): parser = _MSG_PARSERS.get(msg_type) return parser(datapath, version, msg_type, msg_len, xid, buf) def _set_msg_reply(msg_reply): '''Annotate OFP reply message class''' def _set_cls_msg_reply(cls): cls.cls_msg_reply = msg_reply return cls return _set_cls_msg_reply
報文若是有消息體,則須要重寫parser函數或者serialize函數,具體根據報文內容而不同。此處,分別以Packet\_in和Flow\_mod做爲parser的案例和serialize的案例,示例以下:
@_register_parser
@_set_msg_type(ofproto.OFPT_PACKET_IN)
class OFPPacketIn(MsgBase): def __init__(self, datapath, buffer_id=None, total_len=None, in_port=None, reason=None, data=None): super(OFPPacketIn, self).__init__(datapath) self.buffer_id = buffer_id self.total_len = total_len self.in_port = in_port self.reason = reason self.data = data @classmethod def parser(cls, datapath, version, msg_type, msg_len, xid, buf): # 解析頭部,獲取msg msg = super(OFPPacketIn, cls).parser(datapath, version, msg_type, msg_len, xid, buf) # 解析body,獲取packet_in相關字段。 (msg.buffer_id, msg.total_len, msg.in_port, msg.reason) = struct.unpack_from( ofproto.OFP_PACKET_IN_PACK_STR, msg.buf, ofproto.OFP_HEADER_SIZE) # 將ofproto.OFP_PACKET_IN_SIZE長度以外的buf內容,賦值給data msg.data = msg.buf[ofproto.OFP_PACKET_IN_SIZE:] if msg.total_len < len(msg.data): # discard padding for 8-byte alignment of OFP packet msg.data = msg.data[:msg.total_len] return msg @_set_msg_type(ofproto.OFPT_FLOW_MOD) class OFPFlowMod(MsgBase): def __init__(self, datapath, match, cookie, command, idle_timeout=0, hard_timeout=0, priority=ofproto.OFP_DEFAULT_PRIORITY, buffer_id=0xffffffff, out_port=ofproto.OFPP_NONE, flags=0, actions=None): if actions is None: actions = [] super(OFPFlowMod, self).__init__(datapath) self.match = match self.cookie = cookie self.command = command self.idle_timeout = idle_timeout self.hard_timeout = hard_timeout self.priority = priority self.buffer_id = buffer_id self.out_port = out_port self.flags = flags self.actions = actions def _serialize_body(self): offset = ofproto.OFP_HEADER_SIZE self.match.serialize(self.buf, offset) # 封裝的起點是offset offset += ofproto.OFP_MATCH_SIZE # 按照ofproto.OFP_FLOW_MOD_PACK_STR0的格式,將對應的字段封裝到self.buf中 msg_pack_into(ofproto.OFP_FLOW_MOD_PACK_STR0, self.buf, offset, self.cookie, self.command, self.idle_timeout, self.hard_timeout, self.priority, self.buffer_id, self.out_port, self.flags) offset = ofproto.OFP_FLOW_MOD_SIZE if self.actions is not None: for a in self.actions: # 序列化action a.serialize(self.buf, offset) offset += a.len
此模塊代碼量大,包括OpenFlow協議對應版本內容的徹底描述。分類上可分爲解析和序列化封裝兩個重點內容。讀者在閱讀源碼時可根據需求閱讀片斷便可。
這兩個模塊很是簡單,ether定義了經常使用的以太網的協議類型及其對應的代碼;inet定義了IP協議族中不一樣協議的端口號,如TCP=6。
在1.3等高版本OpenFlow中,使用到了oxm\_field的概念。oxm全稱爲OpenFlow Extensible Match。當OpenFlow逐漸發展成熟,flow的match域愈來愈多。然而不少通訊場景下使用到的匹配字段不多,甚至只有一個。OXM是一種TLV格式,使用OXM能夠在下發流表時僅攜帶使用到的match域內容,而放棄剩餘的大量的match域。當使用的match域較少時,統計機率上會減小報文傳輸的字節數。
該文件定義了nicira extensible match的相關內容。
這個模塊的位置並再也不ofproto,而位於controller目錄下。controller模塊下的event定義了基礎的事件基類。ofp\_event模塊完成了OpenFlow報文到event的生成過程。模塊中定義了EventOFPMsgBase(event.EventBase)類和\_ofp\_msg\_name\_to\_ev\_name(msg\_name)等函數的定義。相關函數都很是的簡單,可從函數名瞭解到其功能。示例代碼以下:
1
2
|
def
_ofp_msg_name_to_ev_name(msg_name):
return
'Event'
+
msg_name
|
Python的struct庫是一個簡單的,高效的數據封裝\解封裝的庫。該庫主要包含5個函數,介紹以下:
更詳細的封裝語法,請查看struct對應的連接。此處僅對經常使用語法進行介紹:
從main函數入手,講述RYU的ryuapp基類細節、app_manager類如何load apps,註冊並運行application,Event的產生以及分發,還有最重要的應用ofp_handler。
RYU的main函數在ryu/cmd/manager.py文件中,部份內容以下:
def main(args=None, prog=None):
try: CONF(args=args, prog=prog, project='ryu', version='ryu-manager %s' % version, default_config_files=['/usr/local/etc/ryu/ryu.conf']) except cfg.ConfigFilesNotFoundError: CONF(args=args, prog=prog, project='ryu', version='ryu-manager %s' % version) log.init_log() #初始化打印log if CONF.pid_file: import os with open(CONF.pid_file, 'w') as pid_file: pid_file.write(str(os.getpid())) app_lists = CONF.app_lists + CONF.app # keep old behaivor, run ofp if no application is specified. if not app_lists: app_lists = ['ryu.controller.ofp_handler'] app_mgr = AppManager.get_instance() #在AppManager類中獲取實例 app_mgr.load_apps(app_lists) #加載App contexts = app_mgr.create_contexts() #建立運行環境,"dpset"/"wsgi" services = [] services.extend(app_mgr.instantiate_apps(**contexts))
#啓動App線程,App實例化
#ryu.controller.dpset.DPSet / rest_firewall.RestFirewallAPI / ryu.controller.ofp_handler.OFPHandler webapp = wsgi.start_service(app_mgr) #webapp啓動 if webapp: thr = hub.spawn(webapp) services.append(thr) try: hub.joinall(services) #調用t.wait(),執行等待,wait()方法使當前線程暫停執行並釋放對象鎖標誌
#循環join,直到有異常或者外部中斷推遲
finally: app_mgr.close()
首先從CONF文件中讀取出app list。若是ryu-manager 命令任何參數,則默認應用爲ofp_handler應用。緊接着實例化一個AppManager對象,調用load_apps函數將應用加載。調用create_contexts函數建立對應的contexts, 而後調用instantiate_apps函數將app_list和context中的app均實例化。啓動wsgi架構,提供web應用。最後將全部的應用做爲任務,做爲coroutine的task去執行,joinall使得程序必須等待全部的task都執行完成才能夠退出程序。最後調用close函數,關閉程序,釋放資源。如下的部分將以主函數中出現的調用順序爲依據,展開講解。
上文說到,若是沒有捕獲Application輸入,那麼默認啓動的應用是OFPHandler應用。該應用主要用於處理OpenFlow消息。在start函數初始化運行了一個OpenFlowController實例。OpenFlowController類將在後續介紹。
def start(self): super(OFPHandler, self).start() return hub.spawn(OpenFlowController())
OFPHandler應用完成了基本的消息處理,如hello_handler:用於處理hello報文,協議版本的協商。其處理並不複雜,可是值得注意的一點是裝飾器:Decorator的使用。
Decorator
Python修飾器的函數式編程 Python Decorator能夠看做是一種聲明,一種修飾。如下舉例參考自Coolshell。舉例以下:
1
2
3
|
@decorator
def
foo():
pass
|
實際上等同於foo = decorator(foo), 並且它還被執行了。舉個例子:
1
2
3
4
5
6
|
def
keyword(fn):
print
"you %s me!"
%
fn.__name__.upper()
@keyword
def
evol():
pass
|
運行以後,就會輸出 you EVOL me
多個decorator:
1
2
3
4
|
@decorator_a
@decorator_b
def
foo():
pass
|
這至關於:
1
|
foo
=
decorator_a(decorator_b(foo))
|
而帶參數的decorator:
1
2
3
|
@decorator
(arg1, arg2)
def
foo():
pass
|
至關於
1
|
foo
=
decorator(arg1,arg2)(foo)
|
decorator(arg1,arg2)將生成一個decorator。
class式的 Decorator
class myDecorator(object): def __init__(self, fn): print "inside myDecorator.__init__()" self.fn = fn def __call__(self): self.fn() print "inside myDecorator.__call__()" @myDecorator def aFunction(): print "inside aFunction()" print "Finished decorating aFunction()" aFunction()
#結果:
>>>
inside myDecorator.__init__()
Finished decorating aFunction()
inside aFunction()
inside myDecorator.__call__()
>>>
@decorator使用時,__init__被調用,當function被調用是,執行__call__函數,而不執行function,因此在__call__函數中須要寫出self.fn = fn,更多內容能夠直接訪問Python Decorator Library。
前一部分提到OFPHandle的start函數會將OpenFlowController啓動。本小節介紹OpenFlowController類。該類的定義在ryu/cmd/controller.py文件中。OpenFlowController.__call__()函數啓動了server_loop()函數,該函數實例化了hub.py中的StreamServer類,並將handler函數初始化爲datapath_connection_factory函數,並調用serve_forever(),不斷進行socket的監聽。StreamServer定義以下:
class StreamServer(object): def __init__(self, listen_info, handle=None, backlog=None, spawn='default', **ssl_args): assert backlog is None assert spawn == 'default' if ':' in listen_info[0]: self.server = eventlet.listen(listen_info, family=socket.AF_INET6) else: self.server = eventlet.listen(listen_info) if ssl_args: def wrap_and_handle(sock, addr): ssl_args.setdefault('server_side', True) handle(ssl.wrap_socket(sock, **ssl_args), addr) self.handle = wrap_and_handle else: self.handle = handle def serve_forever(self): while True: sock, addr = self.server.accept() spawn(self.handle, sock, addr)
Datapath類在RYU中極爲重要,每當一個datapath實體與控制器創建鏈接時,就會實例化一個Datapath的對象。 該類中不只定義了許多的成員變量用於描述一個datapath,還管理控制器與該datapath通訊的數據收發。其中_recv_loop函數完成數據的接收與解析,事件的產生與分發。
@_deactivate def _recv_loop(self): buf = bytearray() required_len = ofproto_common.OFP_HEADER_SIZE count = 0 while self.is_active: ret = self.socket.recv(required_len) if len(ret) == 0: self.is_active = False break buf += ret while len(buf) >= required_len: (version, msg_type, msg_len, xid) = ofproto_parser.header(buf) required_len = msg_len if len(buf) < required_len: break msg = ofproto_parser.msg(self, version, msg_type, msg_len, xid, buf) # 解析報文 # LOG.debug('queue msg %s cls %s', msg, msg.__class__) if msg: ev = ofp_event.ofp_msg_to_ev(msg) # 產生事件 self.ofp_brick.send_event_to_observers(ev, self.state) # 事件分發 dispatchers = lambda x: x.callers[ev.__class__].dispatchers handlers = [handler for handler in self.ofp_brick.get_handlers(ev) if self.state in dispatchers(handler)] for handler in handlers: handler(ev) buf = buf[required_len:] required_len = ofproto_common.OFP_HEADER_SIZE # We need to schedule other greenlets. Otherwise, ryu # can't accept new switches or handle the existing # switches. The limit is arbitrary. We need the better # approach in the future. count += 1 if count > 2048: count = 0 hub.sleep(0)
@_deactivate修飾符做用在於在Datapath斷開鏈接以後,將其狀態is_active置爲False。self.ofp_brick.send_event_to_observers(ev, self.state) 語句完成了事件的分發。self.brick的初始化語句能夠在self.__init__函數中找到:
1
|
self
.ofp_brick
=
ryu.base.app_manager.lookup_service_brick(
'ofp_event'
)
|
由上可知,self.ofp_brick其實是由service_brick(中文能夠成爲:服務鏈表?)中的「ofp_event」服務賦值的。在每個app中,使用@set_ev_cls(ev_cls,dispatchers)時,就會將實例化ofp_event模塊,執行文件中最後一句:
1
|
handler.register_service(
'ryu.controller.ofp_handler'
)
|
register_service函數實體以下:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
|
def
register_service(service):
"""
Register the ryu application specified by 'service' as
a provider of events defined in the calling module.
If an application being loaded consumes events (in the sense of
set_ev_cls) provided by the 'service' application, the latter
application will be automatically loaded.
This mechanism is used to e.g. automatically start ofp_handler if
there are applications consuming OFP events.
"""
frm
=
inspect.stack()[
1
]
m
=
inspect.getmodule(frm[
0
])
m._SERVICE_NAME
=
service
|
其中inspect.stack()[1]返回了調用此函數的caller, inspect.getmodule(frm[0])返回了該caller的模塊,當前例子下,module=ofp_event。
咱們能夠經過ryu-manager --verbose來查看到輸出信息,從而印證這一點。
:~/ryu/ryu/app$ ryu-manager --verbose
loading app ryu.controller.ofp_handler
instantiating app ryu.controller.ofp_handler of OFPHandler
BRICK ofp_event
CONSUMES EventOFPErrorMsg
CONSUMES EventOFPEchoRequest
CONSUMES EventOFPPortDescStatsReply
CONSUMES EventOFPHello
CONSUMES EventOFPSwitchFeatures
因此當運行ofp_handler應用時,就會註冊ofp_event service,爲後續的應用提供服務。分發事件以後,還要處理自身訂閱的事件,因此首先找到符合當前state的caller,而後調用handler。_caller類能夠在handler.py文件中找到,包含dispatchers和ev_source兩個成員變量。前者用於描述caller須要的state,後者是event產生者的模塊名稱。
對應的發送循環由_send_loop完成。self.send_p是一個深度爲16的發送queue。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
|
@_deactivate
def
_send_loop(
self
):
try
:
while
self
.is_active:
buf
=
self
.send_q.get()
self
.socket.sendall(buf)
finally
:
q
=
self
.send_q
# first, clear self.send_q to prevent new references.
self
.send_q
=
None
# there might be threads currently blocking in send_q.put().
# unblock them by draining the queue.
try
:
while
q.get(block
=
False
):
pass
except
hub.QueueEmpty:
pass
|
serve函數完成了發送循環的啓動和接受循環的啓動。啓動一個coroutine去執行self._send_loop(), 而後立刻主動發送hello報文到datapath(能夠理解爲交換網橋:Bridge),最後執行self._recv_loop()。
1
2
3
4
5
6
7
8
9
10
11
12
|
def
serve(
self
):
send_thr
=
hub.spawn(
self
._send_loop)
# send hello message immediately
hello
=
self
.ofproto_parser.OFPHello(
self
)
self
.send_msg(hello)
try
:
self
._recv_loop()
finally
:
hub.kill(send_thr)
hub.joinall([send_thr])
|
而serve函數又在datapath_connection_factory函數中被調用。固然向外提供完整功能的API就是這個。因此在OpenFlowController類中能夠看到在初始化server實例的時候,handler賦值爲datapath_connection_factory。其中使用到的contextlib module具體內容不做介紹,讀者可自行學習。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
|
def
datapath_connection_factory(socket, address):
LOG.debug(
'connected socket:%s address:%s'
, socket, address)
with contextlib.closing(Datapath(socket, address)) as datapath:
try
:
datapath.serve()
except
:
# Something went wrong.
# Especially malicious switch can send malformed packet,
# the parser raise exception.
# Can we do anything more graceful?
if
datapath.
id
is
None
:
dpid_str
=
"%s"
%
datapath.
id
else
:
dpid_str
=
dpid_to_str(datapath.
id
)
LOG.error(
"Error in the datapath %s from %s"
, dpid_str, address)
raise
|
到此爲止,OFPHandler應用的功能實現介紹完畢。RYU啓動時,須要啓動OFPHandler,才能完成數據的收發和解析。更多的上層應用邏輯都是在此基礎之上進行的。若要開發APP則須要繼承RyuApp類,並完成observer監聽事件,以及註冊handler去完成事件處理。
RyuApp類是RYU封裝好的APP基類,用戶只須要繼承該類,就能夠方便地開發應用。而註冊對應的observer和handler都使用@derocator的形式,使得開發很是的簡單高效,這也是Python的優勢之一吧。RyuApp類的定義在ryu/base/app_manager.py文件中。該文件實現了兩個類RyuApp和AppManager。前者用於定義APP基類,爲應用開發提供基本的模板,後者用於Application的管理,加載應用,運行應用,消息路由等功能。
app_manager.py文件中import了instpect和itertools module,從而使得開發更方便簡潔。inspect模塊提供了一些有用的方法,用於類型檢測,獲取內容,檢測是否可迭代等功能。itertools則是一個關於迭代器的模塊,能夠提供豐富的迭代器類型,在數據處理上尤爲有用。
這是一個極其難理解的概念。博主的理解是,_CONTEXT內存儲着name:class的key value pairs。爲何須要存儲這個內容?實際上這個_CONTEXT攜帶的信息是全部本APP須要依賴的APP。須要在啓動本應用以前去啓動,以知足依賴的,好比一個simple_switch.py的應用,若是沒有OFPHandler應用做爲數據收發和解析的基礎的話,是沒法運行的。具體文檔以下:
_CONTEXTS = {} """ A dictionary to specify contexts which this Ryu application wants to use. Its key is a name of context and its value is an ordinary class which implements the context. The class is instantiated by app_manager and the instance is shared among RyuApp subclasses which has _CONTEXTS member with the same key. A RyuApp subclass can obtain a reference to the instance via its __init__'s kwargs as the following. Example:: _CONTEXTS = { 'network': network.Network } def __init__(self, *args, *kwargs): self.network = kwargs['network'] """
用於記錄本應用會產生的event。可是當且僅當定義該event的語句在其餘模塊時纔會被使用到。
__init__函數中初始化了許多重要的成員變量,如self.event_handler用於記錄向外提供的事件處理句柄,而self.observer則恰好相反,用於通知app_manager本應用監聽何種類型的事件。self.event是事件隊列。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
|
def
__init__(
self
,
*
_args,
*
*
_kwargs):
super
(RyuApp,
self
).__init__()
self
.name
=
self
.__class__.__name__
self
.event_handlers
=
{}
# ev_cls -> handlers:list
self
.observers
=
{}
# ev_cls -> observer-name -> states:set
self
.threads
=
[]
self
.events
=
hub.Queue(
128
)
if
hasattr
(
self
.__class__,
'LOGGER_NAME'
):
self
.logger
=
logging.getLogger(
self
.__class__.LOGGER_NAME)
else
:
self
.logger
=
logging.getLogger(
self
.name)
self
.CONF
=
cfg.CONF
# prevent accidental creation of instances of this class outside RyuApp
class
_EventThreadStop(event.EventBase):
pass
self
._event_stop
=
_EventThreadStop()
self
.is_active
=
True
|
start函數將啓動coroutine去處理_event_loop,並將其加入threads字典中。
_event_loop函數用於啓動事件處理循環,經過調用self.get_handlers(ev, state)函數來找到事件對應的handler,而後處理事件。
def get_handlers(self, ev, state=None): """Returns a list of handlers for the specific event. :param ev: The event to handle. :param state: The current state. ("dispatcher") If None is given, returns all handlers for the event. Otherwise, returns only handlers that are interested in the specified state. The default is None. """ ev_cls = ev.__class__ handlers = self.event_handlers.get(ev_cls, []) if state is None: return handlers def _event_loop(self): while self.is_active or not self.events.empty(): ev, state = self.events.get() if ev == self._event_stop: continue handlers = self.get_handlers(ev, state) for handler in handlers: handler(ev)
應用中能夠經過@set_ev_cls修飾符去監聽某些事件。當產生event時,經過event去get observer,獲得對應的觀察者,而後再使用self.send_event函數去發送事件。在這裏,實際上就是直接往self.event隊列中put event。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
|
def
_send_event(
self
, ev, state):
self
.events.put((ev, state))
def
send_event(
self
, name, ev, state
=
None
):
"""
Send the specified event to the RyuApp instance specified by name.
"""
if
name
in
SERVICE_BRICKS:
if
isinstance
(ev, EventRequestBase):
ev.src
=
self
.name
LOG.debug(
"EVENT %s->%s %s"
%
(
self
.name, name, ev.__class__.__name__))
SERVICE_BRICKS[name]._send_event(ev, state)
else
:
LOG.debug(
"EVENT LOST %s->%s %s"
%
(
self
.name, name, ev.__class__.__name__))
def
send_event_to_observers(
self
, ev, state
=
None
):
"""
Send the specified event to all observers of this RyuApp.
"""
for
observer
in
self
.get_observers(ev, state):
self
.send_event(observer, ev, state)
|
其餘函數如註冊handler函數:register_handler,註冊監聽函數:register_observer等都是很是簡單直白的代碼,再也不贅述。
AppManager類是RYU應用的調度中心。用於管理應用的添加刪除,消息路由等等功能。
首先從啓動函數開始介紹,咱們能夠看到run_apps函數中的代碼和前文提到的main函數語句基本同樣。首先獲取一個對象,而後加載對應的apps,而後獲取contexts,context中其實包含的是本應用所須要的依賴應用。因此在調用instantiate_apps函數時,將app_lists內的application和contexts中的services都實例化,而後啓動協程去運行這些服務。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
|