Aodh 源码学习#
Aodh组件主要是处理与告警相关的功能,是从原先的ceilometer-alarm组件中独立出来,专门成立的Aodh组件。数据流程如下:
- aodh-evaluator 根据告警类型从ceilometer接口中或者gnochhi中获取,当数据触发告警规则时将告警发送到rpc或者notification
- aodh-listener 定义了监控event的告警,数据是从直接监听notification中获取,当监听的event触发告警时发送到rpc或者notification
- aodh-notifier 从rpc或者notification中获取告警并进行发送到外部系统例如http、https、Log等
1.组件组成
- 脚本为console_scripts 配置项中的值,启动时配置启动aodh-api、aodh-evaluator、aodh-listener、aodh-notifier
console_scripts =
aodh-api = aodh.cmd.api:main
aodh-dbsync = aodh.cmd.storage:dbsync
aodh-expirer = aodh.cmd.storage:expirer
aodh-evaluator = aodh.cmd.alarm:evaluator
aodh-notifier = aodh.cmd.alarm:notifier
aodh-listener = aodh.cmd.alarm:listener
aodh-data-migration = aodh.cmd.data_migration:main
2. aodh-api
与其他组件一样,统一使用wsgi框架启动app以及服务,最终会调用到aodh\api\controllers\v2\root.py:
class V2Controller(object):
"""Version 2 API controller root."""
alarms = alarms.AlarmsController()
query = query.QueryController()
capabilities = capabilities.CapabilitiesController()
- AlarmsController处理中Alarm相关的请求,例如创建alarm、简单过滤查询alarm;
- QueryController处理历史告警查询,以及复合条件alarm查询
- CapabilitiesController提供查询底层storage对于api支持的能力。
3. aodh-evaluator
查看脚本起始调用:
def evaluator():
conf = service.prepare_service()
os_service.launch(conf, evaluator_svc.AlarmEvaluationService(conf)).wait()
最终会调用AlarmEvaluationService中的start方法,在AlarmEvaluationService初始化时会加载命令空间
中aodh.evaluator的插件,每种类型的插件定义了触发条件的规则,这些规则存在在支持sql的数据库,例如mysql。
if self.evaluators:
interval = self.conf.evaluation_interval
self.tg.add_timer(
interval,
self._evaluate_assigned_alarms,
initial_delay=interval if delay_start else None)
if self.partition_coordinator.is_active():
在start函数中会执行周期性任务,默认为60秒,self._evaluate_assigned_alarms,根据alarm.type类型调用相应的evaluator插件,然后执行该obj的evaluate方法,用于判断是否可以触发告警。
try:
self.evaluators[alarm.type].obj.evaluate(alarm)
以阈值alarm为例,对应的插件为:ThresholdEvaluator。
def evaluate(self, alarm):
if not self.within_time_constraint(alarm):
LOG.debug('Attempted to evaluate alarm %s, but it is not '
'within its time constraint.', alarm.alarm_id)
return
state, trending_state, statistics, outside_count = self.evaluate_rule(
alarm.rule)
self._transition_alarm(alarm, state, trending_state, statistics,
outside_count)
evaluate_rule中会调用_statistics函数,此函数会调用ceilometer_client查询当前时间戳内的数据
def _statistics(self, rule, start, end):
"""Retrieve statistics over the current window."""
after = dict(field='timestamp', op='ge', value=start)
before = dict(field='timestamp', op='le', value=end)
query = copy.copy(rule['query'])
query.extend([before, after])
LOG.debug('stats query %s', query)
try:
return self.cm_client.statistics.list(
meter_name=rule['meter_name'], q=query,
period=rule['period'])
except Exception:
LOG.exception(_('alarm stats retrieval failed'))
return []
后续会根据定义的告警规则进行判断是否会触发告警
def evaluate_rule(self, alarm_rule):
"""Evaluate alarm rule.
:returns: state, trending state and statistics.
"""
start, end = self._bound_duration(alarm_rule)
statistics = self._statistics(alarm_rule, start, end)
statistics = self._sanitize(alarm_rule, statistics)
sufficient = len(statistics) >= alarm_rule['evaluation_periods']
if not sufficient:
return evaluator.UNKNOWN, None, statistics, len(statistics)
def _compare(value):
op = COMPARATORS[alarm_rule['comparison_operator']]
limit = alarm_rule['threshold']
LOG.debug('comparing value %(value)s against threshold'
' %(limit)s', {'value': value, 'limit': limit})
return op(value, limit)
compared = list(six.moves.map(_compare, statistics))
...
如果触发告警则会调用更新告警状态并发送
def _refresh(self, alarm, state, reason, reason_data, always_record=False):
"""Refresh alarm state."""
try:
previous = alarm.state
alarm.state = state
if previous != state or always_record:
LOG.info(_('alarm %(id)s transitioning to %(state)s because '
'%(reason)s') % {'id': alarm.alarm_id,
'state': state,
'reason': reason})
try:
self._storage_conn.update_alarm(alarm)
except storage.AlarmNotFound:
LOG.warning(_LW("Skip updating this alarm's state, the"
"alarm: %s has been deleted"),
alarm.alarm_id)
else:
self._record_change(alarm)
self.notifier.notify(alarm, previous, reason, reason_data)
elif alarm.repeat_actions:
self.notifier.notify(alarm, previous, reason, reason_data)
notifier包含两种方式,如果是AlarmNotifier则以sample级别发送到消息队列。
if conf.ipc_protocol == 'rpc':
self.notifier = rpc.RPCAlarmNotifier(self.conf)
else:
self.notifier = queue.AlarmNotifier(self.conf)
4.aodh-listener
启动脚本
def listener():
conf = service.prepare_service()
os_service.launch(conf, event_svc.EventAlarmEvaluationService(conf)).wait()
与evaluator类似,只不过它是对于event事件进行alarm evaluate
def start(self):
super(EventAlarmEvaluationService, self).start()
self.listener = messaging.get_notification_listener(
messaging.get_transport(self.conf),
[oslo_messaging.Target(topic=self.conf.event_alarm_topic)],
[EventAlarmEndpoint(self.evaluator)])
self.listener.start()
# Add a dummy thread to have wait() working
self.tg.add_timer(604800, lambda: None)
与evaluator不同的是,listener的数据来源是消息队列,监听的topic为alarm.all,因此必须再ceilometer中eventpipeline中配置notifier://?topic=alarm.all这样才能监听到数据
class EventAlarmEndpoint(object):
def __init__(self, evaluator):
self.evaluator = evaluator
def sample(self, ctxt, publisher_id, event_type, payload, metadata):
# TODO(r-mibu): requeue on error
self.evaluator.evaluate_events(payload)
EventAlarmEndpoint中会处理sample级别的消息,调用
EventAlarmEvaluator.evaluate_events
—>_evaluate_alarm根据接收到的时间进行告警预测,如果触发告警,则更新告警状态并通过notifier发送告警
def _evaluate_alarm(self, alarm, event):
...
if alarm.fired_and_no_repeat():
LOG.debug('Skip evaluation of the alarm id=%s which have already '
'fired.', alarm.id)
return
if not alarm.event_type_to_watch(event.obj['event_type']):
LOG.debug('Aborting evaluation of the alarm (id=%s) since '
'event_type is not matched.', alarm.id)
return
def _compare(condition):
v = event.get_value(condition['field'])
LOG.debug('Comparing value=%(v)s against condition=%(c)s .',
{'v': v, 'c': condition})
return condition['op'](v, condition['value'])
for condition in alarm.query:
if not _compare(condition):
LOG.debug('Aborting evaluation of the alarm due to '
'unmet condition=%s .', condition)
return
self._fire_alarm(alarm, event)
5.aodh-notifier
启动脚本
def notifier():
conf = service.prepare_service()
os_service.launch(conf, notifier_svc.AlarmNotifierService(conf)).wait()
初始化
def __init__(self, conf):
super(AlarmNotifierService, self).__init__()
transport = messaging.get_transport(conf)
self.notifiers = extension.ExtensionManager(
self.NOTIFIER_EXTENSIONS_NAMESPACE,
invoke_on_load=True,
invoke_args=(conf,))
if conf.ipc_protocol == 'rpc':
self.ipc = 'rpc'
self.rpc_server = messaging.get_rpc_server(
conf, transport, conf.notifier_rpc_topic, self)
else:
self.ipc = 'queue'
target = oslo_messaging.Target(topic=conf.notifier_topic)
self.listener = messaging.get_notification_listener(
transport, [target],
[AlarmEndpoint(self.notifiers)])
加载命名空间中的”aodh.notifier”中的处理类作为notifier的Endpoint,对应处理类,notifier支持以下方式进行告警的分发:
http、https、log、zaqar、test、trust+http、trust+https、trust+zaqar
class AlarmEndpoint(object):
def __init__(self, notifiers):
self.notifiers = notifiers
def sample(self, ctxt, publisher_id, event_type, payload, metadata):
"""Endpoint for alarm notifications"""
process_alarm(self.notifiers, payload)
AlarmEndpoint中处理消息队列中sample级别的消息,调用process_alarm来处理告警,最终会调用endpoint中的notify方法,根据alarm中的action解析调用的相应类处理。
def _handle_action(notifiers, action, alarm_id, alarm_name, severity,
previous, current, reason, reason_data):
...
try:
action = netutils.urlsplit(action)
except Exception:
LOG.error(
_("Unable to parse action %(action)s for alarm %(alarm_id)s"),
{'action': action, 'alarm_id': alarm_id})
return
try:
notifier = notifiers[action.scheme].obj
except KeyError:
...
try:
LOG.debug("Notifying alarm %(id)s with action %(act)s",
{'id': alarm_id, 'act': action})
notifier.notify(action, alarm_id, alarm_name, severity,
previous, current, reason, reason_data)
except Exception:
LOG.exception(_("Unable to notify alarm %s"), alarm_id)
return
6.注意点
-
在aodh-evaluator中,如果是gnocchi类型的告警评估,在_statistics函数会调用gnocchi_client来获取数据,而不是从ceilometer的api获取。
-
aodh中支持sql,并不支持mongodb,原因如下:
- aodh只记录与alarm相关的数据,数据量不会像ceilometer那么大,sql数据库就可以很好的满足
- openstack其他的组件中默认都是使用sql数据库,为了一致性,去掉了对hbase以及mongodb的支持。
- 为了是aodh代码更为简洁,减少对数据库代码的维护工作量以及测试。
- 更过关注功能性,避免当有新特性时,要考虑多种driver的实现
-
combination 组合告警 与其他告警一起通过逻辑条件来生成,针对多个告警进行组合
composite 复合告警 是通过不同的告警触发条件来生成告警,针对单个条件来组成单个告警