Gnocchi 源码学习#

Gnocchi原理部分可参考文章:

官方文档:gnocchi

中文翻译:gnocchi 架构

Gnocchi组件主要是处理ceilometer发送的sample数据,将数据根据归档策略与聚合策略取得相应的measure,降低measure的时间以及空间复杂度。

1. 组件组成

console_scripts =
    gnocchi-upgrade = gnocchi.cli:upgrade
    gnocchi-statsd = gnocchi.cli:statsd
    gnocchi-metricd = gnocchi.cli:metricd
    carbonara-dump = gnocchi.carbonara:dump_archive_file
wsgi_scripts =
    gnocchi-api = gnocchi.rest.app:build_wsgi_app

2. gnocchi-api

与其他组件一样,统一使用wsgi框架启动app以及服务,在api-paste.ini中有两种鉴权方式:noauth以及keystone鉴权(即auth)

# Use gnocchi+auth in the pipeline if you want to use keystone authentication
[pipeline:main]
pipeline = gnocchi+noauth

以keystone鉴权为例:

[composite:gnocchi+auth]
use = egg:Paste#urlmap
/ = gnocchiversions_pipeline
/v1 = gnocchiv1+auth

[pipeline:gnocchiv1+auth]
pipeline = http_proxy_to_wsgi keystone_authtoken gnocchiv1

[app:gnocchiv1]
paste.app_factory = gnocchi.rest.app:app_factory
root = gnocchi.rest.V1Controller

根据pipleine定义,最终会执行到gnocchi.rest.V1Controller

class V1Controller(object):

    def __init__(self):
        self.sub_controllers = {
            "search": SearchController(),
            "archive_policy": ArchivePoliciesController(),
            "archive_policy_rule": ArchivePolicyRulesController(),
            "metric": MetricsController(),
            "batch": BatchController(),
            "resource": ResourcesByTypeController(),
            "resource_type": ResourceTypesController(),
            "aggregation": AggregationController(),
            "capabilities": CapabilityController(),
            "status": StatusController(),
        }
        for name, ctrl in self.sub_controllers.items():
            setattr(self, name, ctrl)

3. gnocchi-metricd做异步聚合处理

查看脚本起始调用:

def metricd():
    conf = service.prepare_service()
    MetricdServiceManager(conf).run()

调用MetricdServiceManager中的run方法

class MetricdServiceManager(cotyledon.ServiceManager):
    def __init__(self, conf):
        super(MetricdServiceManager, self).__init__()
        self.conf = conf
        self.queue = multiprocessing.Manager().Queue()

        self.add(MetricScheduler, args=(self.conf, self.queue))
        self.add(MetricProcessor, args=(self.conf, self.queue),
                 workers=conf.metricd.workers)
        self.add(MetricReporting, args=(self.conf,))
        self.add(MetricJanitor, args=(self.conf,))

    def run(self):
        super(MetricdServiceManager, self).run()
        self.queue.close()

在这个进程中添加了几个相应的类,用于处理不同的事务,共同组成了 gnocchi-metricd进程所要完成的任务

MetricScheduler:metric调度,用于调度出要处理的meric项

MetricProcessor:gnocchi-metricd核心类,用于处理sample数据,根据归档以及聚合策略计算出measure。

MetricReporting:用于report当前measure处理状态

MetricJanitor: 用于删除metric关联的数据库中的记录以及后端存储中与其关联的measure

它们都继承MetricProcessBase此基类,其中run方法为:

def _configure(self):
    self.store = storage.get_driver(self.conf)
    self.index = indexer.get_driver(self.conf)
    self.index.connect()

def run(self):
    self._configure()
    # Delay startup so workers are jittered.
    time.sleep(self.startup_delay)

    while not self._shutdown.is_set():
        with timeutils.StopWatch() as timer:
            self._run_job()
            self._shutdown.wait(max(0, self.interval_delay -
                                    timer.elapsed()))
    self._shutdown_done.set()

首先会执行_configure然后执行_run_job方法。

4. gnocchi-statsd

gnocchi-statsd主要是用于接受UDP或者TCP数据,将获取的数据存放到后端存储

def start():
    conf = service.prepare_service()
    ...
    stats = Stats(conf)

    loop = asyncio.get_event_loop()
    # TODO(jd) Add TCP support
    #添加接收 TCP或者UDP数据的endpoint
    listen = loop.create_datagram_endpoint(
        lambda: StatsdServer(stats),
        local_addr=(conf.statsd.host, conf.statsd.port))
    #flush函数用于保存measure到后端存储
    def _flush():
        loop.call_later(conf.statsd.flush_delay, _flush)
        stats.flush()
        
    loop.call_later(conf.statsd.flush_delay, _flush)
    ...
    try:
        #执行监听
        loop.run_forever()

StatsdServer对象中包含了Stats对象,其中datagram_received对接收的数据做一些预处理

def datagram_received(self, data, addr):
    LOG.debug("Received data `%r' from %s" % (data, addr))
    try:
        messages = [m for m in data.decode().split("\n") if m]
        ...
    for message in messages:
        #对message消息格式做校验,Statsd监听的数据支持的格式可参照官网
        metric = message.split("|")
        if len(metric) == 2:
            metric_name, metric_type = metric
            sampling = None
        elif len(metric) == 3:
            metric_name, metric_type, sampling = metric
        ...
        try:
            #主要是校验并根据metic_type类型将相应的值放入到self.times(type类型为ms)
            #self.gauges(type类型为g)、self.counters(类型为c),在flush中将会把
            #这三个对象的数据进行存储
            self.stats.treat_metric(metric_name, metric_type,
                                    value, sampling)
        except Exception as e:
            LOG.error("Unable to treat metric %s: %s" % (message, str(e)))

stats.flush()用处存储相应的measure

def flush(self):
    #resource创建是在stats初始化函数中
    resource = self.indexer.get_resource('generic',
                                         self.conf.statsd.resource_id,
                                         with_metrics=True)

    #这三个对象中保存监听的数据
    for metric_name, measure in itertools.chain(
            six.iteritems(self.gauges),
            six.iteritems(self.counters),
            six.iteritems(self.times)):
        try:
            # NOTE(jd) We avoid considering any concurrency here as statsd
            # is not designed to run in parallel and we do not envision
            # operators manipulating the resource/metrics using the Gnocchi
            # API at the same time.
            metric = resource.get_metric(metric_name)
            if not metric:
                ap_name = self._get_archive_policy_name(metric_name)
                #调用indexer存储(支持sql的数据库)创建metric
                metric = self.indexer.create_metric(
                    uuid.uuid4(),
                    self.conf.statsd.user_id,
                    self.conf.statsd.project_id,
                    archive_policy_name=ap_name,
                    name=metric_name,
                    resource_id=self.conf.statsd.resource_id)
            #调用后端存储保存measures值
            self.storage.add_measures(metric, (measure,))
        except Exception as e:
            LOG.error("Unable to add measure %s: %s"
                      % (metric_name, e))

    self.reset()

后续就与api接收的数据一样,交给gnocchi-metricd做异步聚合处理

Table of Contents