from ryu.base import app_manager class Hub(app_manager.RyuApp): pass
ryu-manager simple_switch_13.py --verbose
cmd目錄定義了RYU的命令系統
from ryu.cmd.ryu_base import main main()
def main(): try: base_conf(project='ryu', version='ryu %s' % version) except cfg.RequiredOptError as e: base_conf.print_help() raise SystemExit(1) subcmd_name = base_conf.subcommand try: subcmd_mod_name = subcommands[subcmd_name] except KeyError: base_conf.print_help() raise SystemExit('Unknown subcommand %s' % subcmd_name) subcmd_mod = utils.import_module(subcmd_mod_name) subcmd = SubCommand(name=subcmd_name, entry=subcmd_mod.main) subcmd.run(base_conf.subcommand_args)
from ryu.cmd.manager import main main()
def main(args=None, prog=None): _parse_user_flags() try: CONF(args=args, prog=prog, project='ryu', version='ryu-manager %s' % version, default_config_files=['/usr/local/etc/ryu/ryu.conf']) except cfg.ConfigFilesNotFoundError: CONF(args=args, prog=prog, project='ryu', version='ryu-manager %s' % version) log.init_log() logger = logging.getLogger(__name__) if CONF.enable_debugger: msg = 'debugging is available (--enable-debugger option is turned on)' logger.info(msg) else: hub.patch(thread=True) if CONF.pid_file: with open(CONF.pid_file, 'w') as pid_file: pid_file.write(str(os.getpid())) app_lists = CONF.app_lists + CONF.app # keep old behavior, run ofp if no application is specified. if not app_lists: app_lists = ['ryu.controller.ofp_handler'] app_mgr = AppManager.get_instance() app_mgr.load_apps(app_lists) contexts = app_mgr.create_contexts() services = [] services.extend(app_mgr.instantiate_apps(**contexts)) webapp = wsgi.start_service(app_mgr) if webapp: thr = hub.spawn(webapp) services.append(thr) try: hub.joinall(services) except KeyboardInterrupt: logger.debug("Keyboard Interrupt received. " "Closing RYU application manager...") finally: app_mgr.close()
ryu run ./simple_switch_13.py --verbose
subcommands = { 'run': 'ryu.cmd.manager', 'of-config-cli': 'ryu.cmd.of_config_cli', 'rpc-cli': 'ryu.cmd.rpc_cli', }
def main(args=None, prog=None): _parse_user_flags() try: CONF(args=args, prog=prog, project='ryu', version='ryu-manager %s' % version, default_config_files=['/usr/local/etc/ryu/ryu.conf']) except cfg.ConfigFilesNotFoundError: CONF(args=args, prog=prog, project='ryu', version='ryu-manager %s' % version) log.init_log() logger = logging.getLogger(__name__) if CONF.enable_debugger: msg = 'debugging is available (--enable-debugger option is turned on)' logger.info(msg) else: hub.patch(thread=True) if CONF.pid_file: with open(CONF.pid_file, 'w') as pid_file: pid_file.write(str(os.getpid())) app_lists = CONF.app_lists + CONF.app # keep old behavior, run ofp if no application is specified. if not app_lists: app_lists = ['ryu.controller.ofp_handler'] app_mgr = AppManager.get_instance() app_mgr.load_apps(app_lists) contexts = app_mgr.create_contexts() services = [] services.extend(app_mgr.instantiate_apps(**contexts)) webapp = wsgi.start_service(app_mgr) if webapp: thr = hub.spawn(webapp) services.append(thr) try: hub.joinall(services) except KeyboardInterrupt: logger.debug("Keyboard Interrupt received. " "Closing RYU application manager...") finally: app_mgr.close()
def main(args=None, prog=None): _parse_user_flags() try: CONF(args=args, prog=prog, project='ryu', version='ryu-manager %s' % version, default_config_files=['/usr/local/etc/ryu/ryu.conf']) except cfg.ConfigFilesNotFoundError: CONF(args=args, prog=prog, project='ryu', version='ryu-manager %s' % version) if CONF.enable_debugger:else: hub.patch(thread=True) if CONF.pid_file: with open(CONF.pid_file, 'w') as pid_file: pid_file.write(str(os.getpid())) app_lists = CONF.app_lists + CONF.appif not app_lists: app_lists = ['ryu.controller.ofp_handler'] app_mgr = AppManager.get_instance() app_mgr.load_apps(app_lists) contexts = app_mgr.create_contexts() services = [] services.extend(app_mgr.instantiate_apps(**contexts)) webapp = wsgi.start_service(app_mgr) if webapp: thr = hub.spawn(webapp) services.append(thr) try: hub.joinall(services)finally: app_mgr.close()
Parses user-flags file and loads it to register user defined options.
idx = list(sys.argv).index('--user-flags')
try: CONF(args=args, prog=prog, project='ryu', version='ryu-manager %s' % version, default_config_files=['/usr/local/etc/ryu/ryu.conf']) except cfg.ConfigFilesNotFoundError: CONF(args=args, prog=prog, project='ryu', version='ryu-manager %s' % version)
"""Config options which may be set on the command line or in config files. ConfigOpts is a configuration option manager with APIs for registering option schemas, grouping options, parsing option values and retrieving the values of options. It has built-in support for :oslo.config:option:`config_file` and :oslo.config:option:`config_dir` options. """
if CONF.enable_debugger: msg = 'debugging is available (--enable-debugger option is turned on)' logger.info(msg) else: hub.patch(thread=True)
cfg.BoolOpt('enable-debugger', default=False, help='don\'t overwrite Python standard threading library' '(use only for debugging)'),
if CONF.pid_file: with open(CONF.pid_file, 'w') as pid_file: pid_file.write(str(os.getpid()))
app_lists = CONF.app_lists + CONF.app # keep old behavior, run ofp if no application is specified. if not app_lists: app_lists = ['ryu.controller.ofp_handler'] app_mgr = AppManager.get_instance() app_mgr.load_apps(app_lists) contexts = app_mgr.create_contexts() services = [] services.extend(app_mgr.instantiate_apps(**contexts))
webapp = wsgi.start_service(app_mgr) if webapp: thr = hub.spawn(webapp) 若是開啓,則產生一個協程去運行他 services.append(thr)
try: hub.joinall(services) 等待全部線程/協程結束 except KeyboardInterrupt: 按鍵觸發,常按Ctrl+C的應該看見過 logger.debug("Keyboard Interrupt received. " "Closing RYU application manager...") finally: app_mgr.close() 最後進行資源回收
app_lists = CONF.app_lists + CONF.app # keep old behavior, run ofp if no application is specified. if not app_lists: app_lists = ['ryu.controller.ofp_handler'] app_mgr = AppManager.get_instance() app_mgr.load_apps(app_lists) contexts = app_mgr.create_contexts() services = [] services.extend(app_mgr.instantiate_apps(**contexts))
app_lists = CONF.app_lists + CONF.app print("------Start------") print("------CONF-----") print(CONF) print("------CONF.app_lists-----") print(CONF.app_lists) print("------CONF.app-----") print(CONF.app) print("------End------")
ryu-manager ./simple_switch_13.py ofctl_rest.py --verbose
------Start------ ------CONF----- <oslo_config.cfg.ConfigOpts object at 0x7f89af107ac8> ------CONF.app_lists----- [] ------CONF.app----- ['./simple_switch_13.py', 'ofctl_rest.py'] ------End------
cfg.ListOpt('app-lists', default=[], 默認爲空 help='application module name to run'),
cfg.MultiStrOpt('app', positional=True, default=[], help='application module name to run'),
['./simple_switch_13.py', 'ofctl_rest.py']
if not app_lists: app_lists = ['ryu.controller.ofp_handler']
@staticmethod def get_instance(): if not AppManager._instance: AppManager._instance = AppManager() return AppManager._instance
def load_apps(self, app_lists): app_lists = [app for app in itertools.chain.from_iterable(app.split(',') for app in app_lists)] while len(app_lists) > 0: app_cls_name = app_lists.pop(0) context_modules = [x.__module__ for x in self.contexts_cls.values()] if app_cls_name in context_modules: continue LOG.info('loading app %s', app_cls_name) cls = self.load_app(app_cls_name) if cls is None: continue self.applications_cls[app_cls_name] = cls services = [] for key, context_cls in cls.context_iteritems(): v = self.contexts_cls.setdefault(key, context_cls) assert v == context_cls context_modules.append(context_cls.__module__) if issubclass(context_cls, RyuApp): services.extend(get_dependent_services(context_cls)) # we can't load an app that will be initiataed for # contexts. for i in get_dependent_services(cls): if i not in context_modules: services.append(i) if services: app_lists.extend([s for s in set(services) if s not in app_lists])
def create_contexts(self): for key, cls in self.contexts_cls.items(): print("------start------") print(key) print("-----------------") print(cls) print("------ end ------") if issubclass(cls, RyuApp): # hack for dpset context = self._instantiate(None, cls) else: context = cls() LOG.info('creating context %s', key) assert key not in self.contexts self.contexts[key] = context return self.contexts
for key, cls in self.contexts_cls.items(): print("------start------") print(key) print("-----------------") print(cls) print("------ end ------")
------start------ dpset ----------------- <class 'ryu.controller.dpset.DPSet'> ------ end ------ instantiating app None of DPSet creating context dpset ------start------ wsgi ----------------- <class 'ryu.app.wsgi.WSGIApplication'> ------ end ------ creating context wsgi
services = []
services.extend(app_mgr.instantiate_apps(**contexts))
services = [] services.extend(app_mgr.instantiate_apps(**contexts)) print("---------start-----------") for sv in services: print(sv) print("----------end------------")
---------start----------- <eventlet.greenthread.GreenThread object at 0x7fd6db2add58> ----------end------------
app_mgr = AppManager.get_instance() app_mgr.load_apps(app_lists) contexts = app_mgr.create_contexts() services = [] services.extend(app_mgr.instantiate_apps(**contexts))
def __init__(self): self.applications_cls = {} self.applications = {} self.contexts_cls = {} self.contexts = {} self.close_sem = hub.Semaphore()
def load_apps(self, app_lists): app_lists = [app for app in itertools.chain.from_iterable(app.split(',') for app in app_lists)] while len(app_lists) > 0: app_cls_name = app_lists.pop(0) context_modules = [x.__module__ for x in self.contexts_cls.values()] if app_cls_name in context_modules: continue LOG.info('loading app %s', app_cls_name) cls = self.load_app(app_cls_name) 調用上面加載一個app函數 if cls is None: continue self.applications_cls[app_cls_name] = cls services = [] for key, context_cls in cls.context_iteritems(): v = self.contexts_cls.setdefault(key, context_cls) assert v == context_cls context_modules.append(context_cls.__module__) if issubclass(context_cls, RyuApp): services.extend(get_dependent_services(context_cls)) # we can't load an app that will be initiataed for # contexts. for i in get_dependent_services(cls): if i not in context_modules: services.append(i) if services: app_lists.extend([s for s in set(services) if s not in app_lists])
app_lists = [app for app in itertools.chain.from_iterable(app.split(',') for app in app_lists)]
app_lists = [app for app in itertools.chain.from_iterable(app.split(',') for app in app_lists)] print("----------------") print(app_lists) print("----------------")
while len(app_lists) > 0: app_cls_name = app_lists.pop(0) 提取出一個app類名 context_modules = [x.__module__ for x in self.contexts_cls.values()] if app_cls_name in context_modules: continue LOG.info('loading app %s', app_cls_name) cls = self.load_app(app_cls_name) if cls is None: continue self.applications_cls[app_cls_name] = cls services = [] for key, context_cls in cls.context_iteritems(): v = self.contexts_cls.setdefault(key, context_cls) assert v == context_cls context_modules.append(context_cls.__module__) if issubclass(context_cls, RyuApp): services.extend(get_dependent_services(context_cls)) # we can't load an app that will be initiataed for # contexts. for i in get_dependent_services(cls): if i not in context_modules: services.append(i) if services: app_lists.extend([s for s in set(services) if s not in app_lists])
while len(app_lists) > 0: app_cls_name = app_lists.pop(0) context_modules = [x.__module__ for x in self.contexts_cls.values()] 加載依賴環境 print("----------------") print(context_modules) print("----------------")
---------------- [] ---------------- loading app ./simple_switch_13.py ---------------- [] ---------------- loading app ofctl_rest.py ---------------- ['ryu.controller.dpset', 'ryu.app.wsgi'] ---------------- loading app ryu.controller.ofp_handler
def load_app(self, name):
mod = utils.import_module(name) 模塊導入 clses = inspect.getmembers(mod, lambda cls: (inspect.isclass(cls) and issubclass(cls, RyuApp) and mod.__name__ == cls.__module__)) 獲取多個符合的類 if clses: return clses[0][1] 返回第一個類信息 return None
print("---------------") print(clses) print("---------------") print(name) print(clses[0][1]) print("---------------")
loading app ./simple_switch_13.py --------------- [('SimpleSwitch13', <class 'simple_switch_13.SimpleSwitch13'>)] --------------- ./simple_switch_13.py <class 'simple_switch_13.SimpleSwitch13'> --------------- loading app ofctl_rest.py --------------- [('RestStatsApi', <class 'ofctl_rest.RestStatsApi'>)] --------------- ofctl_rest.py <class 'ofctl_rest.RestStatsApi'> --------------- loading app ryu.controller.ofp_handler --------------- [('OFPHandler', <class 'ryu.controller.ofp_handler.OFPHandler'>)] --------------- ryu.controller.ofp_handler <class 'ryu.controller.ofp_handler.OFPHandler'> ---------------
services = [] for key, context_cls in cls.context_iteritems(): v = self.contexts_cls.setdefault(key, context_cls) assert v == context_cls context_modules.append(context_cls.__module__) 若是是依賴的類,放入context_modules中 if issubclass(context_cls, RyuApp): 若是是應用類子類,加入services中 services.extend(get_dependent_services(context_cls))
for i in get_dependent_services(cls): if i not in context_modules: services.append(i)
# we can't load an app that will be initiataed for # contexts.
if services: app_lists.extend([s for s in set(services) if s not in app_lists])
def _instantiate(self, app_name, cls, *args, **kwargs): # for now, only single instance of a given module # Do we need to support multiple instances? # Yes, maybe for slicing. LOG.info('instantiating app %s of %s', app_name, cls.__name__) if hasattr(cls, 'OFP_VERSIONS') and cls.OFP_VERSIONS is not None: ofproto_protocol.set_app_supported_versions(cls.OFP_VERSIONS) if app_name is not None: assert app_name not in self.applications app = cls(*args, **kwargs) register_app(app) assert app.name not in self.applications self.applications[app.name] = app return app
def create_contexts(self): for key, cls in self.contexts_cls.items(): if issubclass(cls, RyuApp): # hack for dpset context = self._instantiate(None, cls) 進行實例化app依賴類 else: context = cls() 實例化依賴類 LOG.info('creating context %s', key) assert key not in self.contexts self.contexts[key] = context return self.contexts 返回實例化對象列表
------key----cls-------- dict_items([
('dpset', <class 'ryu.controller.dpset.DPSet'>),
('wsgi', <class 'ryu.app.wsgi.WSGIApplication'>)]) ----------end----------- -----subclass---- <class 'ryu.controller.dpset.DPSet'> ------------
def create_contexts(self): print("------key----cls--------") print(self.contexts_cls.items()) print("----------end-----------") for key, cls in self.contexts_cls.items(): if issubclass(cls, RyuApp): # hack for dpset print("-----subclass----") print(cls) print("------------") context = self._instantiate(None, cls) else: context = cls() LOG.info('creating context %s', key) assert key not in self.contexts self.contexts[key] = context return self.contexts
contexts = app_mgr.create_contexts() print("--------contexts----------") print(contexts) print("---------end--------------")
def instantiate_apps(self, *args, **kwargs): print("---------init self.applications_cls.items() ----------") print(self.applications_cls.items()) print('--------end------------------')
---------init self.applications_cls.items() ---------- dict_items([('./simple_switch_13.py', <class 'simple_switch_13.SimpleSwitch13'>), ('ofctl_rest.py', <class 'ofctl_rest.RestStatsApi'>), ('ryu.controller.ofp_handler', <class 'ryu.controller.ofp_handler.OFPHandler'>)]) --------end------------------ instantiating app ./simple_switch_13.py of SimpleSwitch13 實例化對象 instantiating app ofctl_rest.py of RestStatsApi instantiating app ryu.controller.ofp_handler of OFPHandler
self.applications[app.name] = app 將全部實例化對象加入全局字典
print("---------init self.applications_cls.items() ----------") print(self.applications_cls.items()) print('--------end------------------') for app_name, cls in self.applications_cls.items(): self._instantiate(app_name, cls, *args, **kwargs) print("---------finally self.applications ----------") print(self.applications) print('--------end------------------')
loading app ./simple_switch_13.py loading app ofctl_rest.py loading app ryu.controller.ofp_handler instantiating app None of DPSet creating context dpset creating context wsgi ---------init self.applications_cls.items() ---------- dict_items([
('./simple_switch_13.py', <class 'simple_switch_13.SimpleSwitch13'>),
('ofctl_rest.py', <class 'ofctl_rest.RestStatsApi'>),
('ryu.controller.ofp_handler', <class 'ryu.controller.ofp_handler.OFPHandler'>)
]) --------end------------------ instantiating app ./simple_switch_13.py of SimpleSwitch13 instantiating app ofctl_rest.py of RestStatsApi instantiating app ryu.controller.ofp_handler of OFPHandler ---------finally self.applications ---------- {
'dpset': <ryu.controller.dpset.DPSet object at 0x7f223b239c50>,
'SimpleSwitch13': <simple_switch_13.SimpleSwitch13 object at 0x7f223b20aac8>,
'RestStatsApi': <ofctl_rest.RestStatsApi object at 0x7f223b20ac18>,
'ofp_event': <ryu.controller.ofp_handler.OFPHandler object at 0x7f223b19afd0>} --------end------------------
self._update_bricks() 更新服務鏈 self.report_bricks() 報告服務鏈
BRICK dpset
CONSUMES EventOFPStateChange
CONSUMES EventOFPPortStatus
CONSUMES EventOFPSwitchFeatures
BRICK SimpleSwitch13
CONSUMES EventOFPPacketIn -----待解決 CONSUMES EventOFPSwitchFeatures
BRICK RestStatsApi
CONSUMES EventOFPSwitchFeatures
CONSUMES EventOFPQueueGetConfigReply
CONSUMES EventOFPRoleReply
CONSUMES EventOFPStatsReply
CONSUMES EventOFPDescStatsReply
CONSUMES EventOFPFlowStatsReply
CONSUMES EventOFPAggregateStatsReply
CONSUMES EventOFPTableStatsReply
CONSUMES EventOFPTableFeaturesStatsReply
CONSUMES EventOFPPortStatsReply
CONSUMES EventOFPQueueStatsReply
CONSUMES EventOFPQueueDescStatsReply
CONSUMES EventOFPMeterStatsReply
CONSUMES EventOFPMeterFeaturesStatsReply
CONSUMES EventOFPMeterConfigStatsReply
CONSUMES EventOFPGroupStatsReply
CONSUMES EventOFPGroupFeaturesStatsReply
CONSUMES EventOFPGroupDescStatsReply
CONSUMES EventOFPPortDescStatsReply
BRICK ofp_event PROVIDES EventOFPStateChange TO {'dpset': {'main', 'dead'}} PROVIDES EventOFPPortStatus TO {'dpset': {'main'}} PROVIDES EventOFPSwitchFeatures TO {'dpset': {'config'}, 'SimpleSwitch13': {'config'}, 'RestStatsApi': {'main'}} PROVIDES EventOFPPacketIn TO {'SimpleSwitch13': {'main'}} PROVIDES EventOFPQueueGetConfigReply TO {'RestStatsApi': {'main'}} PROVIDES EventOFPRoleReply TO {'RestStatsApi': {'main'}} PROVIDES EventOFPStatsReply TO {'RestStatsApi': {'main'}} PROVIDES EventOFPDescStatsReply TO {'RestStatsApi': {'main'}} PROVIDES EventOFPFlowStatsReply TO {'RestStatsApi': {'main'}} PROVIDES EventOFPAggregateStatsReply TO {'RestStatsApi': {'main'}} PROVIDES EventOFPTableStatsReply TO {'RestStatsApi': {'main'}} PROVIDES EventOFPTableFeaturesStatsReply TO {'RestStatsApi': {'main'}} PROVIDES EventOFPPortStatsReply TO {'RestStatsApi': {'main'}} PROVIDES EventOFPQueueStatsReply TO {'RestStatsApi': {'main'}} PROVIDES EventOFPQueueDescStatsReply TO {'RestStatsApi': {'main'}} PROVIDES EventOFPMeterStatsReply TO {'RestStatsApi': {'main'}} PROVIDES EventOFPMeterFeaturesStatsReply TO {'RestStatsApi': {'main'}} PROVIDES EventOFPMeterConfigStatsReply TO {'RestStatsApi': {'main'}} PROVIDES EventOFPGroupStatsReply TO {'RestStatsApi': {'main'}} PROVIDES EventOFPGroupFeaturesStatsReply TO {'RestStatsApi': {'main'}} PROVIDES EventOFPGroupDescStatsReply TO {'RestStatsApi': {'main'}} PROVIDES EventOFPPortDescStatsReply TO {'RestStatsApi': {'main'}} CONSUMES EventOFPEchoReply CONSUMES EventOFPEchoRequest CONSUMES EventOFPErrorMsg CONSUMES EventOFPHello CONSUMES EventOFPPortDescStatsReply CONSUMES EventOFPPortStatus CONSUMES EventOFPSwitchFeatures
def _update_bricks(self): for i in SERVICE_BRICKS.values(): for _k, m in inspect.getmembers(i, inspect.ismethod): if not hasattr(m, 'callers'): continue for ev_cls, c in m.callers.items(): if not c.ev_source: continue brick = _lookup_service_brick_by_mod_name(c.ev_source) if brick: brick.register_observer(ev_cls, i.name, c.dispatchers) #註冊事件,傳入事件名,和在什麼狀態觸發 # allow RyuApp and Event class are in different module for brick in SERVICE_BRICKS.values(): if ev_cls in brick._EVENTS: brick.register_observer(ev_cls, i.name, c.dispatchers)
@staticmethod def _report_brick(name, app): LOG.debug("BRICK %s", name) for ev_cls, list_ in app.observers.items(): 顯示信息 LOG.debug(" PROVIDES %s TO %s", ev_cls.__name__, list_) for ev_cls in app.event_handlers.keys(): LOG.debug(" CONSUMES %s", ev_cls.__name__) @staticmethod def report_bricks(): for brick, i in SERVICE_BRICKS.items(): AppManager._report_brick(brick, i)
threads = [] for app in self.applications.values(): t = app.start() if t is not None: app.set_main_thread(t) threads.append(t) return threads
for app in self.applications.values(): print("-------app--------") print(app) print("-------end--------") t = app.start() if t is not None: app.set_main_thread(t) threads.append(t)
-------app-------- <ryu.controller.dpset.DPSet object at 0x7f681bbf4be0> -------end-------- -------app-------- <simple_switch_13.SimpleSwitch13 object at 0x7f681bbc5a58> -------end-------- -------app-------- <ofctl_rest.RestStatsApi object at 0x7f681bbc5ba8> -------end-------- -------app-------- <ryu.controller.ofp_handler.OFPHandler object at 0x7f681bb54f60> -------end--------
def start(self): """ Hook that is called after startup initialization is done. """ self.threads.append(hub.spawn(self._event_loop))
def _event_loop(self): while self.is_active or not self.events.empty(): 循環處理事件 ev, state = self.events.get() self._events_sem.release() if ev == self._event_stop: continue handlers = self.get_handlers(ev, state) for handler in handlers: try: handler(ev) except hub.TaskExit: # Normal exit. # Propagate upwards, so we leave the event loop. raise except: LOG.exception('%s: Exception occurred during handler processing. ' 'Backtrace from offending handler ' '[%s] servicing event [%s] follows.', self.name, handler.__name__, ev.__class__.__name__)