ceilometer 源碼分析(polling)(O版)

1、簡單介紹ceilometer

這裏長話短說, ceilometer是用來採集openstack下面各類資源的在某一時刻的資源值,好比雲硬盤的大小等。下面是官網如今的架構圖python

這裏除了ceilometer的架構圖,還有另外三個組件:redis

  • Panko 用來存儲事件的, 後面用來實現cloudkitty事件秒級計費也是個人工做之一,目前實現來一部分,有時間單獨在寫一篇博文。
  • gnocchi是用來存儲ceilometer的計量數據,以前的版本是存在mongo中, 不過隨着計量數據的不斷累計, 查詢性能變得極低, 所以openstack後面推出來gnocchi項目,gnocchi的存儲後端支持redis,file,ceph等等。這一塊也是我負責,目前已經實現了, 有時間也能夠寫一篇文章。 
  • Aodh 是用來告警的。

這裏須要注意ceilometer 主要有兩個agent:後端

  • 一個是polling 主要是定時調用相應的openstack接口來獲取計量數據,
  • 一個是notification 主要是用來監聽openstack的事件消息,而後轉換成相應的計量數據,

二者的計量數據, 最後經過定義的pipeline,傳遞給gnocchi暴露出來的rest API ,後面由gnocchi來作聚合處理以及存儲api

下面來看一下,具體的官網的數據採集和處理轉發的架構圖架構

再來看一下數據的處理過程app

這裏,我以爲官方文檔的架構圖描述得很是好, 我就不在多說來。less

2、源碼分析

  說實話, 目測openstack估計是最大的python項目了,真的是一個龐然大物。第一次接觸的時候,徹底不知所措。不過看看就強一點了, 雖然有不少地方仍是懵逼。看openstack下面的項目的話,其實有些文件很重要好比setup.py, 裏面配置了項目的入口點。這篇文章,我主要分析polling這一塊是如何實現的, 其餘的地方相似。dom

  • ceilometer polling-agent啓動的地方
    # ceilometer/cmd/polling.py
    1
    def main(): 2 conf = cfg.ConfigOpts() 3 conf.register_cli_opts(CLI_OPTS) 4 service.prepare_service(conf=conf) 5 sm = cotyledon.ServiceManager() 6 sm.add(create_polling_service, args=(conf,)) 7 oslo_config_glue.setup(sm, conf) 8 sm.run()

    # 前面幾行是讀取配置文件, 而後經過cotyledon這個庫add一個polling的service,最後run 起來。 cotyledon這個庫簡單看了一下,能夠用來啓動進程任務

    def create_polling_service(worker_id, conf):
    return manager.AgentManager(worker_id,
    conf,
    conf.polling_namespaces,
    conf.pollster_list)
    # create_polling_service 返回了一個polling agent polling-namespaces的默認值爲choices=['compute', 'central', 'ipmi'],
     
      
  • polling AgentManager # ceilometer/agent/manager.py
 1 class AgentManager(service_base.PipelineBasedService):
 2 
 3     def __init__(self, worker_id, conf, namespaces=None, pollster_list=None, ):
 4 
 5         namespaces = namespaces or ['compute', 'central']
 6         pollster_list = pollster_list or []
 7         group_prefix = conf.polling.partitioning_group_prefix
 8 
 9         # features of using coordination and pollster-list are exclusive, and
10         # cannot be used at one moment to avoid both samples duplication and
11         # samples being lost
12         if pollster_list and conf.coordination.backend_url:
13             raise PollsterListForbidden()
14 
15         super(AgentManager, self).__init__(worker_id, conf)
16 
17         def _match(pollster):
18             """Find out if pollster name matches to one of the list."""
19             return any(fnmatch.fnmatch(pollster.name, pattern) for
20                        pattern in pollster_list)
21 
22         if type(namespaces) is not list:
23             namespaces = [namespaces]
24 
25         # we'll have default ['compute', 'central'] here if no namespaces will
26         # be passed
27         extensions = (self._extensions('poll', namespace, self.conf).extensions
28                       for namespace in namespaces)
29         # get the extensions from pollster builder
30         extensions_fb = (self._extensions_from_builder('poll', namespace)
31                          for namespace in namespaces)
32         if pollster_list:
33             extensions = (moves.filter(_match, exts)
34                           for exts in extensions)
35             extensions_fb = (moves.filter(_match, exts)
36                              for exts in extensions_fb)
37 
38         self.extensions = list(itertools.chain(*list(extensions))) + list(
39             itertools.chain(*list(extensions_fb)))
40 
41         if self.extensions == []:
42             raise EmptyPollstersList()
43 
44         discoveries = (self._extensions('discover', namespace,
45                                         self.conf).extensions
46                        for namespace in namespaces)
47         self.discoveries = list(itertools.chain(*list(discoveries)))
48         self.polling_periodics = None
49 
50         self.partition_coordinator = coordination.PartitionCoordinator(
51             self.conf)
52         self.heartbeat_timer = utils.create_periodic(
53             target=self.partition_coordinator.heartbeat,
54             spacing=self.conf.coordination.heartbeat,
55             run_immediately=True)
56 
57         # Compose coordination group prefix.
58         # We'll use namespaces as the basement for this partitioning.
59         namespace_prefix = '-'.join(sorted(namespaces))
60         self.group_prefix = ('%s-%s' % (namespace_prefix, group_prefix)
61                              if group_prefix else namespace_prefix)
62 
63         self.notifier = oslo_messaging.Notifier(
64             messaging.get_transport(self.conf),
65             driver=self.conf.publisher_notifier.telemetry_driver,
66             publisher_id="ceilometer.polling")
67 
68         self._keystone = None
69         self._keystone_last_exception = None
70 
71   
72     def run(self):
73         super(AgentManager, self).run()
74         self.polling_manager = pipeline.setup_polling(self.conf)
75         self.join_partitioning_groups()
76         self.start_polling_tasks()
77         self.init_pipeline_refresh()

1 初始化函數裏面經過 ExtensionManager加載setup裏面定義的各個指標的entry point 包括discover和poll,
discover就是調用openstack的api來get 資源,
poll 就是將discover獲取到資源轉換成相應的sample(某一時刻的指標值)
2 若是有多個agent 還會建立一個定時器來作心跳檢測

3 定義收集到的數據經過消息隊列轉發送到哪裏去 (oslo_messaging.Notifier)

4 以後經過run方法啓動polling agent

# setup.py
ceilometer.discover.compute =
local_instances = ceilometer.compute.discovery:InstanceDiscovery


ceilometer.poll.compute =
    disk.read.requests = ceilometer.compute.pollsters.disk:ReadRequestsPollster
disk.write.requests = ceilometer.compute.pollsters.disk:WriteRequestsPollster
disk.read.bytes = ceilometer.compute.pollsters.disk:ReadBytesPollster
disk.write.bytes = ceilometer.compute.pollsters.disk:WriteBytesPollster
disk.read.requests.rate = ceilometer.compute.pollsters.disk:ReadRequestsRatePollster
  ...... 
  • 設置 polling 好比多長的時間間隔去獲取資源的指標
    1 def setup_polling(conf):
    2     """Setup polling manager according to yaml config file."""
    3     cfg_file = conf.polling.cfg_file
    4     return PollingManager(conf, cfg_file)


    class PollingManager(ConfigManagerBase):
    """Polling Manager

    Polling manager sets up polling according to config file.
    """

    def __init__(self, conf, cfg_file):
    """Setup the polling according to config.

    The configuration is supported as follows:

    {"sources": [{"name": source_1,
    "interval": interval_time,
    "meters" : ["meter_1", "meter_2"],
    "resources": ["resource_uri1", "resource_uri2"],
    },
    {"name": source_2,
    "interval": interval_time,
    "meters" : ["meter_3"],
    },
    ]}
    }

    The interval determines the cadence of sample polling

    Valid meter format is '*', '!meter_name', or 'meter_name'.
    '*' is wildcard symbol means any meters; '!meter_name' means
    "meter_name" will be excluded; 'meter_name' means 'meter_name'
    will be included.

    Valid meters definition is all "included meter names", all
    "excluded meter names", wildcard and "excluded meter names", or
    only wildcard.

    The resources is list of URI indicating the resources from where
    the meters should be polled. It's optional and it's up to the
    specific pollster to decide how to use it.

    """
    super(PollingManager, self).__init__(conf)
    try:
    cfg = self.load_config(cfg_file)
    except (TypeError, IOError):
    LOG.warning(_LW('Unable to locate polling configuration, falling '
    'back to pipeline configuration.'))
    cfg = self.load_config(conf.pipeline_cfg_file)
    self.sources = []
    if 'sources' not in cfg:
    raise PollingException("sources required", cfg)
    for s in cfg.get('sources'):
    self.sources.append(PollingSource(s))

    # 根據下面的配置文件 etc/ceilometer/polling.yaml 初始化配置
    ---
    sources:
    - name: all_pollsters
    interval: 600
    meters:
    - "*"

     

  • 將每一個discovery根據相應的group id 加入到同一個組裏面去
     1     def join_partitioning_groups(self):
     2         self.groups = set([self.construct_group_id(d.obj.group_id)
     3                           for d in self.discoveries])
     4         # let each set of statically-defined resources have its own group
     5         static_resource_groups = set([
     6             self.construct_group_id(utils.hash_of_set(p.resources))
     7             for p in self.polling_manager.sources
     8             if p.resources
     9         ])
    10         self.groups.update(static_resource_groups)
    11 
    12         if not self.groups and self.partition_coordinator.is_active():
    13             self.partition_coordinator.stop()
    14             self.heartbeat_timer.stop()
    15 
    16         if self.groups and not self.partition_coordinator.is_active():
    17             self.partition_coordinator.start()
    18             utils.spawn_thread(self.heartbeat_timer.start)
    19 
    20         for group in self.groups:
    21             self.partition_coordinator.join_group(group)

     

  • 開啓polling task
     1     def start_polling_tasks(self):
     2         # allow time for coordination if necessary
     3         delay_start = self.partition_coordinator.is_active()
     4 
     5         # set shuffle time before polling task if necessary
     6         delay_polling_time = random.randint(
     7             0, self.conf.shuffle_time_before_polling_task)
     8 
     9         data = self.setup_polling_tasks()
    10 
    11         # Don't start useless threads if no task will run
    12         if not data:
    13             return
    14 
    15         # One thread per polling tasks is enough
    16         self.polling_periodics = periodics.PeriodicWorker.create(
    17             [], executor_factory=lambda:
    18             futures.ThreadPoolExecutor(max_workers=len(data)))
    19 
    20         for interval, polling_task in data.items():
    21             delay_time = (interval + delay_polling_time if delay_start
    22                           else delay_polling_time)
    23 
    24             @periodics.periodic(spacing=interval, run_immediately=False)
    25             def task(running_task):
    26                 self.interval_task(running_task)
    27 
    28             utils.spawn_thread(utils.delayed, delay_time,
    29                                self.polling_periodics.add, task, polling_task)
    30 
    31         utils.spawn_thread(self.polling_periodics.start, allow_empty=True)


    # 根據以前的polling.yaml和從setup文件動態加載的extensions生成一個個task
    def setup_polling_tasks(self):
    polling_tasks = {}
    for source in self.polling_manager.sources:
    polling_task = None
    for pollster in self.extensions:
    if source.support_meter(pollster.name):
    polling_task = polling_tasks.get(source.get_interval())
    if not polling_task:
    polling_task = self.create_polling_task()
    polling_tasks[source.get_interval()] = polling_task
    polling_task.add(pollster, source)
    return polling_tasks

    以後經過periodics 和polling.yaml定義的間隔週期性的執行任務

    def interval_task(self, task):
    # NOTE(sileht): remove the previous keystone client
    # and exception to get a new one in this polling cycle.
    self._keystone = None
    self._keystone_last_exception = None

    task.poll_and_notify()

    def poll_and_notify(self):
    """Polling sample and notify."""
    cache = {}
    discovery_cache = {}
    poll_history = {}
    for source_name in self.pollster_matches:
    for pollster in self.pollster_matches[source_name]:
    key = Resources.key(source_name, pollster)
    candidate_res = list(
    self.resources[key].get(discovery_cache))
    if not candidate_res and pollster.obj.default_discovery:
    candidate_res = self.manager.discover(
    [pollster.obj.default_discovery], discovery_cache)

    # Remove duplicated resources and black resources. Using
    # set() requires well defined __hash__ for each resource.
    # Since __eq__ is defined, 'not in' is safe here.
    polling_resources = []
    black_res = self.resources[key].blacklist
    history = poll_history.get(pollster.name, [])
    for x in candidate_res:
    if x not in history:
    history.append(x)
    if x not in black_res:
    polling_resources.append(x)
    poll_history[pollster.name] = history

    # If no resources, skip for this pollster
    if not polling_resources:
    p_context = 'new ' if history else ''
    LOG.info(_LI("Skip pollster %(name)s, no %(p_context)s"
    "resources found this cycle"),
    {'name': pollster.name, 'p_context': p_context})
    continue

    LOG.info(_LI("Polling pollster %(poll)s in the context of "
    "%(src)s"),
    dict(poll=pollster.name, src=source_name))
    try:
    polling_timestamp = timeutils.utcnow().isoformat()
    samples = pollster.obj.get_samples(
    manager=self.manager,
    cache=cache,
    resources=polling_resources
    )
    sample_batch = []

    for sample in samples:
    # Note(yuywz): Unify the timestamp of polled samples
    sample.set_timestamp(polling_timestamp)
    sample_dict = (
    publisher_utils.meter_message_from_counter(
    sample, self._telemetry_secret
    ))
    if self._batch:
    sample_batch.append(sample_dict)
    else:
    self._send_notification([sample_dict])

    if sample_batch:
    self._send_notification(sample_batch)

    except plugin_base.PollsterPermanentError as err:
    LOG.error(_LE(
    'Prevent pollster %(name)s from '
    'polling %(res_list)s on source %(source)s anymore!')
    % ({'name': pollster.name, 'source': source_name,
    'res_list': err.fail_res_list}))
    self.resources[key].blacklist.extend(err.fail_res_list)
    except Exception as err:
    LOG.error(_LE(
    'Continue after error from %(name)s: %(error)s')
    % ({'name': pollster.name, 'error': err}),
    exc_info=True)
    # 循環調用discovery的extensions的 discover方法,獲取資源, 以後調用polling的extensions的get_samples方法將資源轉換成相應的指標對象sample
    以後將消息發送到消息隊列裏面去。而後由ceilometer的notification agnet 獲取,以後在作進一步的轉換髮送給gnocchi

     

polling agent 的基本過程就是這樣的,後面就是notification agent 的處理ide

相關文章
相關標籤/搜索