Ceilometer 源码学习 - Collection组件#

collection组件主要是通过mq以及udp采集sample,并存储到相应的位置(数据库、file、gnocchi等)

1.入口

console_scripts =
    ...
    ceilometer-collector = ceilometer.cmd.collector:main

2. ceilometer.cmd.collector:main

在ceilometer/cmd/collector.py 文件中找到该函数,如下:

def main():
    service.prepare_service()
    os_service.launch(CONF, collector.CollectorService(),
                      workers=CONF.collector.workers).wait()

3. 启动组件

def start(self):
    """Bind the UDP socket and handle incoming data."""
    # ensure dispatcher is configured before starting other services
    dispatcher_managers = dispatcher.load_dispatcher_manager()
    (self.meter_manager, self.event_manager) = dispatcher_managers
    ...
    if cfg.CONF.collector.udp_address:
        self.tg.add_thread(self.start_udp)
namespace = 'ceilometer.dispatcher.%s' % dispatcher_type
conf_name = '%s_dispatchers' % dispatcher_type

读取setup.cfg中的ceilometer.dispatcher.meter以及ceilometer.dispatcher.event命令空间下的注册项 并且根据conf_name匹配相应的处理类,以meter为例,conf_name = meter_dispatchers,读取配置文件中meter_dispatchers的配置项的值,默认为default=[‘database’],作为消息队列的endpoint用于接收处理sample。

if list(self.meter_manager):
    sample_target = oslo_messaging.Target(
        topic=cfg.CONF.publisher_notifier.metering_topic)
    self.sample_listener = (
        messaging.get_batch_notification_listener(
            transport, [sample_target],
            [SampleEndpoint(self.meter_manager)],
            allow_requeue=True,
            batch_size=cfg.CONF.collector.batch_size,
            batch_timeout=cfg.CONF.collector.batch_timeout))
    self.sample_listener.start()

topic是“metering”,即notification组件中publish存放到mq队列时指定的topic,在collection中指定topic用于接收,同时指定SampleEndpoint用于处理消息。

SampleEndpoint的父类为CollectorEndpoint,其中定义了sample,即只处理sample级别的消息,处理方式为调用self.method的方法

self.dispatcher_manager.map_method(self.method, samples)

SampleEndpoint定义为:

method = 'record_metering_data'
ep_type = 'sample'

最终会调用record_metering_data方法用于记录数据。

4. 数据库分发(默认方式)

5.gnocchi分发

主要是初始化配置,获取keystone客户端以及gnocchi的客户端,用与连接gnocchi组件,目前url方式已经废弃,通过keystone自动获取gnocchi。

for resource_id, samples_of_resource in resource_grouped_samples:
    ...
    for metric_name, samples in metric_grouped_samples:
        stats['metrics'] += 1

        samples = list(samples)
        #从gnocchi_resources.yaml中读取gnocchi接受的资源,包括resource_type、
        #metrics、attributes,如果在此文件中未定义的meter将会被忽略。
        #特别注意,根据metric_name会得到匹配的某一个resource,然后将resource中
        #定义的metric全部添加进去(rd.metrics),即使此次sample中并不包含此metric项
        rd = self._get_resource_definition_from_metric(metric_name)
        if rd is None:
            LOG.warning(_LW("metric %s is not handled by Gnocchi") %
                        metric_name)
            continue
        if rd.cfg.get("ignore"):
            continue

        res_info['resource_type'] = rd.cfg['resource_type']
        res_info.setdefault("resource", {}).update({
            "id": resource_id,
            "user_id": samples[0]['user_id'],
            "project_id": samples[0]['project_id'],
            "metrics": rd.metrics,
        })

        #在ceilometer中的meter对应于gnocchi中metric的name属性
        for sample in samples:
            ...
            unit = sample['counter_unit']
            metric = sample['counter_name']
            #对于首次发送数据,batch_measures方法会走异常分支,
            #然后创建resource,本次sample中包含的metric项拥有unit,其余的都unit为Null。
            res_info['resource']['metrics'][metric]['unit'] = unit

        ...
        for gnocchi_id, info in gnocchi_data.items():
            resource = info["resource"]
            resource_type = info["resource_type"]
            resource_extra = info["resource_extra"]
            if not resource_extra:
                continue
            try:
                #当此资源中包含其余的metic的sample时,更新resource,此时上文中unit这些项才有
                #真正的单位,而不是Null
                self._if_not_cached("update", resource_type, resource,
                                    self._update_resource, resource_extra)

然后调用 self.batch_measures(measures, gnocchi_data, stats)来发送数据。

def batch_measures(self, measures, resource_infos, stats):
    # NOTE(sileht): We don't care about error here, we want
    # resources metadata always been updated
    try:
        #向gnocchi_client发送批量处理measures请求
        self._gnocchi.metric.batch_resources_metrics_measures(measures)
    except gnocchi_exc.BadRequest as e:
        #首次发送数据时,metric在gnocchi中还不存在,因此会进入异常分支
        #resource_infos中包含了所有的metric项
        m = self.RE_UNKNOW_METRICS.match(six.text_type(e))
        if m is None:
            raise

        # NOTE(sileht): Create all missing resources and metrics
        metric_list = self.RE_UNKNOW_METRICS_LIST.findall(m.group(1))
        gnocchi_ids_freshly_handled = set()
        for gnocchi_id, metric_name in metric_list:
            if gnocchi_id in gnocchi_ids_freshly_handled:
                continue
            resource = resource_infos[gnocchi_id]['resource']
            resource_type = resource_infos[gnocchi_id]['resource_type']
            try:
            #首先会创建resoure,创建resource会带有resource_type,在gnocchi初始化时要执行
            #gnocchi-upgrade,或者在ceilometer组件中初始化时执行ceilometer-upgrade用于
            #创建resource_type
            #创建resource时会将resource的metric一并创建,在gnocchi中metric是属
            #于resource资源中的一个属性,meteric对应于meter,resource中可以包含多个metric
            #gnocchi resource show查看
                self._if_not_cached("create", resource_type, resource,
                                    self._create_resource)
            except gnocchi_exc.ResourceAlreadyExists:
                metric = {'resource_id': resource['id'],
                          'name': metric_name}
                metric.update(resource["metrics"][metric_name])
                try:
                    #此分支一般不会进入
                    self._gnocchi.metric.create(metric)
                ...
        # NOTE(sileht): we have created missing resources/metrics,
        # now retry to post measures
        #再次调用创建measures
        self._gnocchi.metric.batch_resources_metrics_measures(measures)
Table of Contents