Gnocchi 源码学习#
Gnocchi原理部分可参考文章:
官方文档:gnocchi
中文翻译:gnocchi 架构
Gnocchi组件主要是处理ceilometer发送的sample数据,将数据根据归档策略与聚合策略取得相应的measure,降低measure的时间以及空间复杂度。
1. 组件组成
- 脚本为console_scripts以及wsgi_scripts 配置项中的值,启动时配置启动gnocchi-api、gnocchi-metricd。
- gnocchi-statsd是用于接受UDP协议数据,并发送UDP数据,项目中未使用。
console_scripts =
gnocchi-upgrade = gnocchi.cli:upgrade
gnocchi-statsd = gnocchi.cli:statsd
gnocchi-metricd = gnocchi.cli:metricd
carbonara-dump = gnocchi.carbonara:dump_archive_file
wsgi_scripts =
gnocchi-api = gnocchi.rest.app:build_wsgi_app
2. gnocchi-api
与其他组件一样,统一使用wsgi框架启动app以及服务,在api-paste.ini中有两种鉴权方式:noauth以及keystone鉴权(即auth)
# Use gnocchi+auth in the pipeline if you want to use keystone authentication
[pipeline:main]
pipeline = gnocchi+noauth
以keystone鉴权为例:
[composite:gnocchi+auth]
use = egg:Paste#urlmap
/ = gnocchiversions_pipeline
/v1 = gnocchiv1+auth
[pipeline:gnocchiv1+auth]
pipeline = http_proxy_to_wsgi keystone_authtoken gnocchiv1
[app:gnocchiv1]
paste.app_factory = gnocchi.rest.app:app_factory
root = gnocchi.rest.V1Controller
根据pipleine定义,最终会执行到gnocchi.rest.V1Controller
class V1Controller(object):
def __init__(self):
self.sub_controllers = {
"search": SearchController(),
"archive_policy": ArchivePoliciesController(),
"archive_policy_rule": ArchivePolicyRulesController(),
"metric": MetricsController(),
"batch": BatchController(),
"resource": ResourcesByTypeController(),
"resource_type": ResourceTypesController(),
"aggregation": AggregationController(),
"capabilities": CapabilityController(),
"status": StatusController(),
}
for name, ctrl in self.sub_controllers.items():
setattr(self, name, ctrl)
- 不同的对象处理不同的url请求,例如”resource”: ResourcesByTypeController(),处理中/v1/resource相关的请求。
3. gnocchi-metricd做异步聚合处理
查看脚本起始调用:
def metricd():
conf = service.prepare_service()
MetricdServiceManager(conf).run()
调用MetricdServiceManager中的run方法
class MetricdServiceManager(cotyledon.ServiceManager):
def __init__(self, conf):
super(MetricdServiceManager, self).__init__()
self.conf = conf
self.queue = multiprocessing.Manager().Queue()
self.add(MetricScheduler, args=(self.conf, self.queue))
self.add(MetricProcessor, args=(self.conf, self.queue),
workers=conf.metricd.workers)
self.add(MetricReporting, args=(self.conf,))
self.add(MetricJanitor, args=(self.conf,))
def run(self):
super(MetricdServiceManager, self).run()
self.queue.close()
在这个进程中添加了几个相应的类,用于处理不同的事务,共同组成了 gnocchi-metricd进程所要完成的任务
MetricScheduler:metric调度,用于调度出要处理的meric项
MetricProcessor:gnocchi-metricd核心类,用于处理sample数据,根据归档以及聚合策略计算出measure。
MetricReporting:用于report当前measure处理状态
MetricJanitor: 用于删除metric关联的数据库中的记录以及后端存储中与其关联的measure
它们都继承MetricProcessBase此基类,其中run方法为:
def _configure(self):
self.store = storage.get_driver(self.conf)
self.index = indexer.get_driver(self.conf)
self.index.connect()
def run(self):
self._configure()
# Delay startup so workers are jittered.
time.sleep(self.startup_delay)
while not self._shutdown.is_set():
with timeutils.StopWatch() as timer:
self._run_job()
self._shutdown.wait(max(0, self.interval_delay -
timer.elapsed()))
self._shutdown_done.set()
首先会执行_configure然后执行_run_job方法。
-
MetricScheduler:用于调度要处理的meric项
metrics = set(self.store.list_metric_with_measures_to_process( self.block_size, self.block_index)) if metrics and not self.queue.empty(): # NOTE(gordc): drop metrics we previously process to avoid # handling twice number_of_scheduled_metrics = len(metrics) metrics = metrics - self.previously_scheduled_metrics ... metrics = list(metrics) for i in six.moves.range(0, len(metrics), self.BLOCK_SIZE): self.queue.put(metrics[i:i + self.BLOCK_SIZE]) self.previously_scheduled_metrics = set(metrics)
将选择的metric项放入到queue中在MetricProcessor中从queue取得相应的metric,其中previously_scheduled_metrics项类似于缓存上次处理后的metric,以防止重复重复处理。
-
MetricProcessor
def _run_job(self): try: try: #self.queue中是MetricScheduler进程调度出来的metric metrics = self.queue.get(block=True, timeout=10) except six.moves.queue.Empty: # NOTE(sileht): Allow the process to exit gracefully every # 10 seconds return self.store.process_background_tasks(self.index, metrics)
调用process_background_tasks —》process_new_measures
def process_new_measures(self, indexer, metrics_to_process, sync=False): metrics = indexer.list_metrics(ids=metrics_to_process) # 删除index中已经不存在的metric # measures to process for but that are not in the indexer anymore. deleted_metrics_id = (set(map(uuid.UUID, metrics_to_process)) - set(m.id for m in metrics)) for metric_id in deleted_metrics_id: ... for metric in metrics: lock = self._lock(metric.id) agg_methods = list(metric.archive_policy.aggregation_methods) if lock.acquire(blocking=sync): try: locksw = timeutils.StopWatch().start() LOG.debug("Processing measures for %s" % metric) #_process_measure_for_metric主要是封装原始的measure,方便下文处理, #注意此函数中将数据做预处理后,删除后端存储上的原始measure。 with self._process_measure_for_metric(metric) as measures: ... measures = sorted(measures, key=operator.itemgetter(0)) #max_block_size即是定义的聚合策略中最大的时间窗 #ts中的数据值包含两个部分,第一个是新到来的原始measure值,另外一部分是 #aggregated_data,即未过期的历史原始数据值 block_size = metric.archive_policy.max_block_size try: ts = self._get_unaggregated_timeserie_and_unserialize( # noqa metric, block_size=block_size, back_window=metric.archive_policy.back_window) except storage.MetricDoesNotExist: try: self._create_metric(metric) except storage.MetricAlreadyExists: # Created in the mean time, do not worry pass ts = None except CorruptionError as e: LOG.error(e) ts = None if ts is None: # This is the first time we treat measures for this # metric, or data are corrupted, create a new one ts = carbonara.BoundTimeSerie( block_size=block_size, back_window=metric.archive_policy.back_window) current_first_block_timestamp = None else: current_first_block_timestamp = ( ts.first_block_timestamp() ) # NOTE(jd) This is Python where you need such # hack to pass a variable around a closure, # sorry. computed_points = {"number": 0} def _map_add_measures(bound_timeserie): # NOTE (gordc): bound_timeserie is entire set of # unaggregated measures matching largest # granularity. the following takes only the points # affected by new measures for specific granularity tstamp = max(bound_timeserie.first, measures[0][0]) computed_points['number'] = len(bound_timeserie) #definition是个数组,以low策略为例:包含了三个 #ArchivePolicyItem(granularity=60, points=60 * 24) #ArchivePolicyItem(granularity=3600, points=7 * 24) #ArchivePolicyItem(granularity=3600 * 24, points=365) for d in metric.archive_policy.definition: ts = bound_timeserie.group_serie( d.granularity, carbonara.round_timestamp( tstamp, d.granularity * 10e8)) self._map_in_thread( #在_add_measures函数中会删除过期的measure #并对原始measure作处理,根据不同的聚合方法得出不同的值 self._add_measures, ((aggregation, d, metric, ts, current_first_block_timestamp, bound_timeserie.first_block_timestamp()) for aggregation in agg_methods)) with timeutils.StopWatch() as sw: #将measures放入到到ts中,set_values函数中会将ts对象(self)作为参数传递给 #_map_add_measures,然后执行_map_add_measures,进入处理measures ts.set_values( measures, before_truncate_callback=_map_add_measures, ignore_too_old_timestamps=True) ... #由于在数据预处理时将后端存储上的原始数据已经删除,因此需要在此保存未聚合的数据 #值,用于下次新的measure到来时用于计算聚合值,主要保存ts中的时间序列以及对应的 #原始值 self._store_unaggregated_timeserie(metric, ts.serialize()) ....
-
MetricReporting:用于report当前measure处理状态例如metric数目,待处理的sample数目。
def _run_job(self): try: report = self.store.measures_report(details=False) LOG.info("%d measurements bundles across %d " "metrics wait to be processed.", report['summary']['measures'], report['summary']['metrics'])
最终会调用到_build_report,根据不同的后端存储执行_build_report,以ceph为例:
def _build_report(self, details): names = self._list_object_names_to_process() metrics = set() count = 0 metric_details = defaultdict(int) for name in names: count += 1 metric = name.split("_")[1] metrics.add(metric) if details: metric_details[metric] += 1 return len(metrics), count, metric_details if details else None
统计出当前要处理的metric的数值,以及待处理的measures的数目等
-
MetricJanitor:用于删除metric关联的数据库中的记录以及后端存储中与其关联的measure
def _run_job(self): try: self.store.expunge_metrics(self.index)
进而调用storage/__init__.py中:
def expunge_metrics(self, index, sync=False): """Remove deleted metrics :param index: An indexer to be used for querying metrics :param sync: If True, then delete everything synchronously and raise on error :type sync: bool """ #这里需要注意的是首先从index(即数据库)中检索出status = 'delete'的metric记录 #而status = 'delete'这个状态则是在删除resource以及删除metric时将状态置为 #‘delete’,查看api的代码可以看到。即api接口中提供删除的metric只是将状态置为‘delete’ #真正从数据库中删除的操作在此进程中。 metrics_to_expunge = index.list_metrics(status='delete') for m in metrics_to_expunge: try: #从后端存储中删除与metric相关的measures self.delete_metric(m, sync) except Exception: if sync: raise LOG.error("Unable to expunge metric %s from storage" % m, exc_info=True) continue try: #从数据库中删除metic记录 index.expunge_metric(m.id) except indexer.NoSuchMetric: # It's possible another process deleted the metric in the mean # time, not a big deal pass
4. gnocchi-statsd
gnocchi-statsd主要是用于接受UDP或者TCP数据,将获取的数据存放到后端存储
def start():
conf = service.prepare_service()
...
stats = Stats(conf)
loop = asyncio.get_event_loop()
# TODO(jd) Add TCP support
#添加接收 TCP或者UDP数据的endpoint
listen = loop.create_datagram_endpoint(
lambda: StatsdServer(stats),
local_addr=(conf.statsd.host, conf.statsd.port))
#flush函数用于保存measure到后端存储
def _flush():
loop.call_later(conf.statsd.flush_delay, _flush)
stats.flush()
loop.call_later(conf.statsd.flush_delay, _flush)
...
try:
#执行监听
loop.run_forever()
StatsdServer对象中包含了Stats对象,其中datagram_received对接收的数据做一些预处理
def datagram_received(self, data, addr):
LOG.debug("Received data `%r' from %s" % (data, addr))
try:
messages = [m for m in data.decode().split("\n") if m]
...
for message in messages:
#对message消息格式做校验,Statsd监听的数据支持的格式可参照官网
metric = message.split("|")
if len(metric) == 2:
metric_name, metric_type = metric
sampling = None
elif len(metric) == 3:
metric_name, metric_type, sampling = metric
...
try:
#主要是校验并根据metic_type类型将相应的值放入到self.times(type类型为ms)
#self.gauges(type类型为g)、self.counters(类型为c),在flush中将会把
#这三个对象的数据进行存储
self.stats.treat_metric(metric_name, metric_type,
value, sampling)
except Exception as e:
LOG.error("Unable to treat metric %s: %s" % (message, str(e)))
stats.flush()用处存储相应的measure
def flush(self):
#resource创建是在stats初始化函数中
resource = self.indexer.get_resource('generic',
self.conf.statsd.resource_id,
with_metrics=True)
#这三个对象中保存监听的数据
for metric_name, measure in itertools.chain(
six.iteritems(self.gauges),
six.iteritems(self.counters),
six.iteritems(self.times)):
try:
# NOTE(jd) We avoid considering any concurrency here as statsd
# is not designed to run in parallel and we do not envision
# operators manipulating the resource/metrics using the Gnocchi
# API at the same time.
metric = resource.get_metric(metric_name)
if not metric:
ap_name = self._get_archive_policy_name(metric_name)
#调用indexer存储(支持sql的数据库)创建metric
metric = self.indexer.create_metric(
uuid.uuid4(),
self.conf.statsd.user_id,
self.conf.statsd.project_id,
archive_policy_name=ap_name,
name=metric_name,
resource_id=self.conf.statsd.resource_id)
#调用后端存储保存measures值
self.storage.add_measures(metric, (measure,))
except Exception as e:
LOG.error("Unable to add measure %s: %s"
% (metric_name, e))
self.reset()
后续就与api接收的数据一样,交给gnocchi-metricd做异步聚合处理