Ceilometer 源码学习 - Polling Agent

简介

Ceilometer是Openstack中用于数据采集的基础设施,包括多个组件:Central Agent,Compute Agent,Notification Agent,Collector等。其中Central Agent和Compute Agent分别运行在Controller和Compute机器上,通过定期调用其他服务的api来完成数据采集。由于二者的区别只是所负责的数据来源,这里我们统称为Polling Agent。

需求导向

Polling Agent的功能很简单:

周期性地向其他服务主动拉取需要的数据,并将数据发送到消息队列。

其结构图如下: 图1 Central Agent结构图

站在设计者的角度,要完成上述功能,需要处理的有如下几个基本问题:

  1. 怎么执行拉取;
  2. 向哪些服务拉取数据;
  3. 对于某个服务收集哪些数据以及如何收集。

下面分别针对上述问题依次介绍Ceilometer的实现方式:

  1. 常驻进程:自然的我们需要一个常驻进程来完成上述调度任务,基本操作包括:
    • 记录全局状态;
    • 周期性的触发;
    • 负责消息的发送。
  2. 插件形式:Ceilometer中用定义插件的方式定义多个收集器(Pollster),程序从配置文件中获得需要加载的收集器列表,用插件的形式是一个很好的选择,因为:
    • python对插件的良好支持:stevedore
    • 简化核心逻辑;
    • 方便扩展。
  3. 共同基类:数据来源多种多样,针对不同的数据来源获取数据方式各有不同,但他们需要完成同样的的动作,Ceilometer中设计Pollster的共同基类,定义了如下接口,是每个Pollster都是要实现的:
    • 默认获取数据来源的方式:default_discovery;
    • 拉取数据:get_samples。

流程简介

正是由于上面所说的实现方式使得Polling Agent的核心逻辑变得非常简单,不需要关注具体的数据收集过程,而将自己解放成一个调度管理者,下面将简单介绍其实现逻辑。在此之前为了方便说明,先介绍其中涉及到的角色或组件:

基本流程如下:

代码细节

接下来从代码层面详细介绍上述逻辑实现:

1. 入口

console_scripts =
    ceilometer-polling = ceilometer.cmd.eventlet.polling:main
    ...

2. ceilometer.cmd.eventlet.polling

相应的,在ceilometer/cmd/eventlet/polling.py 文件中找到该函数,如下:

def main():
     service.prepare_service()
     os_service.launch(CONF, manager.AgentManager(CONF.polling_namespaces,
                                                CONF.pollster_list)).wait()

3. base.AgentManager 初始化

ceilometer/agent/base.py下找到AgentManager的初始化部分代码,部分如下所示:

from stevedore import extension

...
def __init__(self, namespaces, pollster_list, group_prefix=None):
    ...
    # 从配置文件中动态加载收集器Pollster
    extensions = (self._extensions('poll', namespace).extensions
                   for namespace in namespaces)
    ... 
    
    self.extensions = list(itertools.chain(*list(extensions))) + list(
         itertools.chain(*list(extensions_fb)))
    # 从配置文件中动态加载资源发现器Discover
    self.discovery_manager = self._extensions('discover')
    ...

 @staticmethod
 def _get_ext_mgr(namespace):
     def _catch_extension_load_error(mgr, ep, exc):
       ...

     return extension.ExtensionManager(
         namespace=namespace,
         invoke_on_load=True,
         on_load_failure_callback=_catch_extension_load_error,
     )

 def _extensions(self, category, agent_ns=None):
     namespace = ('ceilometer.%s.%s' % (category, agent_ns) if agent_ns
                  else 'ceilometer.%s' % category)
     return self._get_ext_mgr(namespace)
...
ceilometer.poll.central =
      ip.floating = ceilometer.network.floatingip:FloatingIPPollster
      image = ceilometer.image.glance:ImagePollster
      image.size = ceilometer.image.glance:ImageSizePollster
      ...
...   
ceilometer.discover =
      local_instances = ceilometer.compute.discovery:InstanceDiscovery
      endpoint = ceilometer.agent.discovery.endpoint:EndpointDiscovery
      tenant = ceilometer.agent.discovery.tenant:TenantDiscovery
      ...
 ...

4. base.AgentManager 启动

了解AgentManager初始化之后,再来看启动部分的代码实现:

def start(self):
    # 读取pipeline.yaml配置文件
    self.polling_manager = pipeline.setup_polling()
    ...
    # 
    self.pollster_timers = self.configure_polling_tasks()
    ...
...

下面分别介绍这两行代码的功能:

---
  sources:
      - name: meter_source
        interval: 600
        meters:
            - "*"
        sinks:
            - meter_sink
      - name: cpu_source
        ...
      ...
  sinks:
      - name: meter_sink
        transformers:
        publishers:
      ...
  ...

ceilometer中用pipeline配置文件的方式定义meter数据从收集到处理到发送的过程,在Polling Agent中我们只需要关心sources部分,在上述pipeline.setup_polling()中读取pipeline文件并解析封装其中的sources内容,供后面使用,并不做数据处理,数据处理的pipeline功能在notification组件中完成。

def configure_polling_tasks(self):
    ...
    pollster_timers = []
    # 创建PollingTask
    data = self.setup_polling_tasks()
    # PollingTask定时执行
    for interval, polling_task in data.items():
        delay_time = (interval + delay_polling_time if delay_start
                      else delay_polling_time)
        pollster_timers.append(self.tg.add_timer(interval,
                               self.interval_task,   #PollsterTask执行内容
                               initial_delay=delay_time,
                               task=polling_task)) 
    ...
    return pollster_timers

其中,setup_polling_tasks中新建PollingTask,并根据上一步中封装的sources内容,将每一个收集器Pollster根据其interval设置分配到不同的PollingTask中,interval相同的收集器会分配到同一个PollingTask中。之后每个PollingTask都根据其运行周期设置Timer定时执行。 注意,其中interval_task函数指定timer需要执行的任务。

5. PollingTask 执行

上边我们了解到PollingTask会定时执行,而interval_task中定义了他的内容:

@staticmethod
def interval_task(task):
     task.poll_and_notify()

def poll_and_notify(self):
    ...
    for source_name in self.pollster_matches:
       # 循环处理PollingTask中的每一个收集器Pollster
       for pollster in self.pollster_matches[source_name]:
           key = Resources.key(source_name, pollster)
                candidate_res = list(
                    self.resources[key].get(discovery_cache))
           # Discover发现可用的数据源
           # 其中discovery_cache作用是用于缓存已经查询的资源,如果没有在pipeline
           # 中指定discovery以及resource,则candidate_res为空
           # discovery_cache在首次执行下面的函数后将会赋予缓存的资源,用于循环中相同的
           # discovery_cach直接取得,不需要重复查询
           if not candidate_res and pollster.obj.default_discovery:
                candidate_res = self.manager.discover(
                    [pollster.obj.default_discovery], discovery_cache)
                    
                # Remove duplicated resources and black resources. Using
                # set() requires well defined __hash__ for each resource.
                # Since __eq__ is defined, 'not in' is safe here.
                # 周期性任务是根据pipeline中的时间间隔进行分组,在同一组中如果出现同一个meter,
                # 则第二次meter将会直接略过,因为polling_resources此时为空。
                polling_resources = []
                black_res = self.resources[key].blacklist
                history = poll_history.get(pollster.name, [])
                for x in candidate_res:
                    if x not in history:
                        history.append(x)
                        if x not in black_res:
                            polling_resources.append(x)
                poll_history[pollster.name] = history
                
                # If no resources, skip for this pollster
                if not polling_resources:
                    p_context = 'new ' if history else ''
                    LOG.info(_LI("Skip pollster %(name)s, no %(p_context)s"
                                 "resources found this cycle"),
                             {'name': pollster.name, 'p_context': p_context})
                    continue

             ... 

             try:
                 # 从数据源处拉取采样数据
                 samples = pollster.obj.get_samples(
                     manager=self.manager,
                     cache=cache,
                     resources=polling_resources
                 )
                 sample_batch = []

                 # 发送数据到消息队列
                 for sample in samples:
                     sample_dict = (
                         publisher_utils.meter_message_from_counter(
                             sample, self._telemetry_secret
                         ))
                     if self._batch:
                         sample_batch.append(sample_dict)
                     else:
                         self._send_notification([sample_dict])

                 if sample_batch:
                     self._send_notification(sample_batch)

            except 
                ...
    

可以看出,在这段代码中完成了比较核心的几个步骤: 1. 资源发现器Discover发现可用数据源; 2. 收集器Pollster拉取采样数据; 3. 发送数据到消息队列。

6. Pollster示例

上面介绍了Polling Agent中如何是如何加载Pollster执行数据的收集工作的。下面以获取image基本信息的ImagePollster为例,看一下具体的实现:

class _Base(plugin_base.PollsterBase):

     @property
     def default_discovery(self):
         return 'endpoint:%s' % cfg.CONF.service_types.glance
         #此处为endpoint:glance,从discover找到相应的处理类,即:
         endpoint = ceilometer.agent.discovery.endpoint:EndpointDiscovery
         并将glance作为参数传入。

     def get_glance_client(ksclient, endpoint):
         ...
    
     def _get_images(self, ksclient, endpoint):
         client = self.get_glance_client(ksclient, endpoint)
         ...
         return client.images.list(filters={"is_public": None}, **kwargs)

     def _iter_images(self, ksclient, cache, endpoint):
         key = '%s-images' % endpoint
         if key not in cache:
             cache[key] = list(self._get_images(ksclient, endpoint))
         return iter(cache[key])
class ImagePollster(_Base):
    def get_samples(self, manager, cache, resources):
        for endpoint in resources:
            for image in self._iter_images(manager.keystone, cache, endpoint):
                yield sample.Sample(
                    name='image',
                    type=sample.TYPE_GAUGE,
                    unit='image',
                    ...
                )


像上边介绍过的,Pollster需要实现两个接口:

7. Discover示例

class EndpointDiscovery(plugin.DiscoveryBase):
    """Discovery that supplies service endpoints.
    """

    @staticmethod
    def discover(manager, param=None):
        endpoints = manager.keystone.service_catalog.get_urls(
            service_type=param,
            endpoint_type=cfg.CONF.service_credentials.os_endpoint_type,
            region_name=cfg.CONF.service_credentials.os_region_name)
        if not endpoints:
            LOG.warning(_LW('No endpoints found for service %s'),
                        "<all services>" if param is None else param)
            return []
        return endpoints

可以看到上面的ImagePollster所指定的Discover中慧从keystone获取所有的glance的endpoint列表, 这些endpoint列表最终会作为数据来源传给ImagePollster的get_samples

8. 其他

除了上述提到的内容外,还有一些点需要注意:

核心实体

AgentManger

PollingTask

参考

官方文档:Ceilometer Architecture 

Github:Ceilometer Source Code

Ceilometer 源码学习 - Polling Agent(作者:catkang)

Table of Contents