Aodh 源码学习#

Aodh组件主要是处理与告警相关的功能,是从原先的ceilometer-alarm组件中独立出来,专门成立的Aodh组件。数据流程如下:

cover

1.组件组成

console_scripts =
    aodh-api = aodh.cmd.api:main
    aodh-dbsync = aodh.cmd.storage:dbsync
    aodh-expirer = aodh.cmd.storage:expirer
    aodh-evaluator = aodh.cmd.alarm:evaluator
    aodh-notifier = aodh.cmd.alarm:notifier
    aodh-listener = aodh.cmd.alarm:listener
    aodh-data-migration = aodh.cmd.data_migration:main

2. aodh-api

与其他组件一样,统一使用wsgi框架启动app以及服务,最终会调用到aodh\api\controllers\v2\root.py:

class V2Controller(object):
    """Version 2 API controller root."""

    alarms = alarms.AlarmsController()
    query = query.QueryController()
    capabilities = capabilities.CapabilitiesController()

3. aodh-evaluator

查看脚本起始调用:

def evaluator():
    conf = service.prepare_service()
    os_service.launch(conf, evaluator_svc.AlarmEvaluationService(conf)).wait()

最终会调用AlarmEvaluationService中的start方法,在AlarmEvaluationService初始化时会加载命令空间

中aodh.evaluator的插件,每种类型的插件定义了触发条件的规则,这些规则存在在支持sql的数据库,例如mysql。

if self.evaluators:
    interval = self.conf.evaluation_interval
    self.tg.add_timer(
        interval,
        self._evaluate_assigned_alarms,
        initial_delay=interval if delay_start else None)
if self.partition_coordinator.is_active():

在start函数中会执行周期性任务,默认为60秒,self._evaluate_assigned_alarms,根据alarm.type类型调用相应的evaluator插件,然后执行该obj的evaluate方法,用于判断是否可以触发告警。

try:
    self.evaluators[alarm.type].obj.evaluate(alarm)

以阈值alarm为例,对应的插件为:ThresholdEvaluator。

    def evaluate(self, alarm):
        if not self.within_time_constraint(alarm):
            LOG.debug('Attempted to evaluate alarm %s, but it is not '
                      'within its time constraint.', alarm.alarm_id)
            return

        state, trending_state, statistics, outside_count = self.evaluate_rule(
            alarm.rule)
        self._transition_alarm(alarm, state, trending_state, statistics,
                               outside_count)

evaluate_rule中会调用_statistics函数,此函数会调用ceilometer_client查询当前时间戳内的数据

def _statistics(self, rule, start, end):
    """Retrieve statistics over the current window."""
    after = dict(field='timestamp', op='ge', value=start)
    before = dict(field='timestamp', op='le', value=end)
    query = copy.copy(rule['query'])
    query.extend([before, after])
    LOG.debug('stats query %s', query)
    try:
        return self.cm_client.statistics.list(
            meter_name=rule['meter_name'], q=query,
            period=rule['period'])
    except Exception:
        LOG.exception(_('alarm stats retrieval failed'))
        return []

后续会根据定义的告警规则进行判断是否会触发告警

def evaluate_rule(self, alarm_rule):
    """Evaluate alarm rule.

    :returns: state, trending state and statistics.
    """
    start, end = self._bound_duration(alarm_rule)
    statistics = self._statistics(alarm_rule, start, end)
    statistics = self._sanitize(alarm_rule, statistics)
    sufficient = len(statistics) >= alarm_rule['evaluation_periods']
    if not sufficient:
        return evaluator.UNKNOWN, None, statistics, len(statistics)

    def _compare(value):
        op = COMPARATORS[alarm_rule['comparison_operator']]
        limit = alarm_rule['threshold']
        LOG.debug('comparing value %(value)s against threshold'
                  ' %(limit)s', {'value': value, 'limit': limit})
        return op(value, limit)

    compared = list(six.moves.map(_compare, statistics))
    ...

如果触发告警则会调用更新告警状态并发送

def _refresh(self, alarm, state, reason, reason_data, always_record=False):
    """Refresh alarm state."""
    try:
        previous = alarm.state
        alarm.state = state
        if previous != state or always_record:
            LOG.info(_('alarm %(id)s transitioning to %(state)s because '
                       '%(reason)s') % {'id': alarm.alarm_id,
                                        'state': state,
                                        'reason': reason})
            try:
                self._storage_conn.update_alarm(alarm)
            except storage.AlarmNotFound:
                LOG.warning(_LW("Skip updating this alarm's state, the"
                                "alarm: %s has been deleted"),
                            alarm.alarm_id)
            else:
                self._record_change(alarm)
            self.notifier.notify(alarm, previous, reason, reason_data)
        elif alarm.repeat_actions:
            self.notifier.notify(alarm, previous, reason, reason_data)

notifier包含两种方式,如果是AlarmNotifier则以sample级别发送到消息队列。

if conf.ipc_protocol == 'rpc':
    self.notifier = rpc.RPCAlarmNotifier(self.conf)
else:
    self.notifier = queue.AlarmNotifier(self.conf)

4.aodh-listener

启动脚本

def listener():
    conf = service.prepare_service()
    os_service.launch(conf, event_svc.EventAlarmEvaluationService(conf)).wait()

与evaluator类似,只不过它是对于event事件进行alarm evaluate

def start(self):
    super(EventAlarmEvaluationService, self).start()
    self.listener = messaging.get_notification_listener(
        messaging.get_transport(self.conf),
        [oslo_messaging.Target(topic=self.conf.event_alarm_topic)],
        [EventAlarmEndpoint(self.evaluator)])
    self.listener.start()
    # Add a dummy thread to have wait() working
    self.tg.add_timer(604800, lambda: None)

与evaluator不同的是,listener的数据来源是消息队列,监听的topic为alarm.all,因此必须再ceilometer中eventpipeline中配置notifier://?topic=alarm.all这样才能监听到数据

class EventAlarmEndpoint(object):

    def __init__(self, evaluator):
        self.evaluator = evaluator

    def sample(self, ctxt, publisher_id, event_type, payload, metadata):
        # TODO(r-mibu): requeue on error
        self.evaluator.evaluate_events(payload)

EventAlarmEndpoint中会处理sample级别的消息,调用

EventAlarmEvaluator.evaluate_events

—>_evaluate_alarm根据接收到的时间进行告警预测,如果触发告警,则更新告警状态并通过notifier发送告警

def _evaluate_alarm(self, alarm, event):
    ...
    if alarm.fired_and_no_repeat():
        LOG.debug('Skip evaluation of the alarm id=%s which have already '
                  'fired.', alarm.id)
        return

    if not alarm.event_type_to_watch(event.obj['event_type']):
        LOG.debug('Aborting evaluation of the alarm (id=%s) since '
                  'event_type is not matched.', alarm.id)
        return

    def _compare(condition):
        v = event.get_value(condition['field'])
        LOG.debug('Comparing value=%(v)s against condition=%(c)s .',
                  {'v': v, 'c': condition})
        return condition['op'](v, condition['value'])

    for condition in alarm.query:
        if not _compare(condition):
            LOG.debug('Aborting evaluation of the alarm due to '
                      'unmet condition=%s .', condition)
            return

    self._fire_alarm(alarm, event)

5.aodh-notifier

启动脚本

def notifier():
    conf = service.prepare_service()
    os_service.launch(conf, notifier_svc.AlarmNotifierService(conf)).wait()

初始化

def __init__(self, conf):
    super(AlarmNotifierService, self).__init__()
    transport = messaging.get_transport(conf)

    self.notifiers = extension.ExtensionManager(
        self.NOTIFIER_EXTENSIONS_NAMESPACE,
        invoke_on_load=True,
        invoke_args=(conf,))

    if conf.ipc_protocol == 'rpc':
        self.ipc = 'rpc'
        self.rpc_server = messaging.get_rpc_server(
            conf, transport, conf.notifier_rpc_topic, self)
    else:
        self.ipc = 'queue'
        target = oslo_messaging.Target(topic=conf.notifier_topic)
        self.listener = messaging.get_notification_listener(
            transport, [target],
            [AlarmEndpoint(self.notifiers)])

加载命名空间中的”aodh.notifier”中的处理类作为notifier的Endpoint,对应处理类,notifier支持以下方式进行告警的分发:

http、https、log、zaqar、test、trust+http、trust+https、trust+zaqar

class AlarmEndpoint(object):

    def __init__(self, notifiers):
        self.notifiers = notifiers

    def sample(self, ctxt, publisher_id, event_type, payload, metadata):
        """Endpoint for alarm notifications"""
        process_alarm(self.notifiers, payload)

AlarmEndpoint中处理消息队列中sample级别的消息,调用process_alarm来处理告警,最终会调用endpoint中的notify方法,根据alarm中的action解析调用的相应类处理。

def _handle_action(notifiers, action, alarm_id, alarm_name, severity,
                   previous, current, reason, reason_data):
    ...

    try:
        action = netutils.urlsplit(action)
    except Exception:
        LOG.error(
            _("Unable to parse action %(action)s for alarm %(alarm_id)s"),
            {'action': action, 'alarm_id': alarm_id})
        return

    try:
        notifier = notifiers[action.scheme].obj
    except KeyError:
        ...

    try:
        LOG.debug("Notifying alarm %(id)s with action %(act)s",
                  {'id': alarm_id, 'act': action})
        notifier.notify(action, alarm_id, alarm_name, severity,
                        previous, current, reason, reason_data)
    except Exception:
        LOG.exception(_("Unable to notify alarm %s"), alarm_id)
        return

6.注意点

Table of Contents