赞
踩
Skywalking的告警模块源码解析,带你探索它的前世今生
本文基于Skywalking8.4.0,所贴代码皆添加了注释
告警模块的介绍和使用请参看【弄nèng - Skywalking】入门篇(三)—— Skywalking告警模块使用
官方文档:
http://skywalking.apache.org/docs/main/v8.4.0/en/setup/backend/backend-alarm/
告警模块中处理逻辑涉及到的类及其职责介绍
org.apache.skywalking.oap.server.core.alarm.AlarmModule,继承 ModuleDefine 抽象类
报警模块定义了报警实现者的主入口,SkyWalking支持报警实现可插拔
org.apache.skywalking.oap.server.core.alarm.provider.AlarmModuleProvider,实现 ModuleProvider 抽象类,Alarm模块的提供者Provider。在准备阶段ModuleDefine中调用,运行prepare()
进行准备操作
@Override public void prepare() throws ServiceNotProvidedException, ModuleStartException { Reader applicationReader; try { // 1.加载alarm-settings.yml applicationReader = ResourceUtils.read("alarm-settings.yml"); } catch (FileNotFoundException e) { throw new ModuleStartException("can't load alarm-settings.yml", e); } RulesReader reader = new RulesReader(applicationReader); // 解析alarm-settings.yml,转换成Rules,包括List<AlarmRule> rules和List<String> webhooks; Rules rules = reader.readRules(); // 转换成AlarmRulesWatcher, 包括构建RunningRule集合 alarmRulesWatcher = new AlarmRulesWatcher(rules, this); notifyHandler = new NotifyHandler(alarmRulesWatcher); // 注入持久化Callback notifyHandler.init(new AlarmStandardPersistence()); // 为这个MetricsNotify提供程序的服务注册一个实现 this.registerServiceImplementation(MetricsNotify.class, notifyHandler); }
将Rules转换成RunningRule,构建Map<String, List> runningContext,metricsName对应RunningRule集合
和Map<AlarmRule, RunningRule> alarmRuleRunningRuleMap 规则和RunningRule对应MAP
-notify(Rules newRules)
void notify(Rules newRules) { Map<AlarmRule, RunningRule> newAlarmRuleRunningRuleMap = new HashMap<>(); Map<String, List<RunningRule>> newRunningContext = new HashMap<>(); newRules.getRules().forEach(rule -> { /* * If there is already an alarm rule that is the same as the new one, we'll reuse its * corresponding runningRule, to keep its history metrics */ RunningRule runningRule = alarmRuleRunningRuleMap.getOrDefault(rule, new RunningRule(rule)); newAlarmRuleRunningRuleMap.put(rule, runningRule); String metricsName = rule.getMetricsName(); List<RunningRule> runningRules = newRunningContext.computeIfAbsent(metricsName, key -> new ArrayList<>()); runningRules.add(runningRule); }); this.rules = newRules; this.runningContext = newRunningContext; this.alarmRuleRunningRuleMap = newAlarmRuleRunningRuleMap; log.info("Update alarm rules to {}", rules); }
告警服务的实现类,处理告警逻辑
属性:
方法
in()
添加到Window中@Override public void notify(Metrics metrics) { WithMetadata withMetadata = (WithMetadata) metrics; MetricsMetaInfo meta = withMetadata.getMeta(); int scope = meta.getScope(); if (!DefaultScopeDefine.inServiceCatalog(scope) && !DefaultScopeDefine.inServiceInstanceCatalog(scope) && !DefaultScopeDefine.inEndpointCatalog(scope) && !DefaultScopeDefine.inServiceRelationCatalog(scope) && !DefaultScopeDefine.inServiceInstanceRelationCatalog(scope) && !DefaultScopeDefine.inEndpointRelationCatalog(scope)) { return; } MetaInAlarm metaInAlarm; if (DefaultScopeDefine.inServiceCatalog(scope)) { final String serviceId = meta.getId(); final IDManager.ServiceID.ServiceIDDefinition serviceIDDefinition = IDManager.ServiceID.analysisId( serviceId); ServiceMetaInAlarm serviceMetaInAlarm = new ServiceMetaInAlarm(); serviceMetaInAlarm.setMetricsName(meta.getMetricsName()); serviceMetaInAlarm.setId(serviceId); serviceMetaInAlarm.setName(serviceIDDefinition.getName()); metaInAlarm = serviceMetaInAlarm; } else if (DefaultScopeDefine.inServiceInstanceCatalog(scope)) { final String instanceId = meta.getId(); final IDManager.ServiceInstanceID.InstanceIDDefinition instanceIDDefinition = IDManager.ServiceInstanceID.analysisId( instanceId); final IDManager.ServiceID.ServiceIDDefinition serviceIDDefinition = IDManager.ServiceID.analysisId( instanceIDDefinition.getServiceId()); ServiceInstanceMetaInAlarm instanceMetaInAlarm = new ServiceInstanceMetaInAlarm(); instanceMetaInAlarm.setMetricsName(meta.getMetricsName()); instanceMetaInAlarm.setId(instanceId); instanceMetaInAlarm.setName(instanceIDDefinition.getName() + " of " + serviceIDDefinition.getName()); metaInAlarm = instanceMetaInAlarm; } else if (DefaultScopeDefine.inEndpointCatalog(scope)) { final String endpointId = meta.getId(); final IDManager.EndpointID.EndpointIDDefinition endpointIDDefinition = IDManager.EndpointID.analysisId( endpointId); final IDManager.ServiceID.ServiceIDDefinition serviceIDDefinition = IDManager.ServiceID.analysisId( endpointIDDefinition.getServiceId()); EndpointMetaInAlarm endpointMetaInAlarm = new EndpointMetaInAlarm(); endpointMetaInAlarm.setMetricsName(meta.getMetricsName()); endpointMetaInAlarm.setId(meta.getId()); endpointMetaInAlarm.setName( endpointIDDefinition.getEndpointName() + " in " + serviceIDDefinition.getName()); metaInAlarm = endpointMetaInAlarm; } else if (DefaultScopeDefine.inServiceRelationCatalog(scope)) { final String serviceRelationId = meta.getId(); final IDManager.ServiceID.ServiceRelationDefine serviceRelationDefine = IDManager.ServiceID.analysisRelationId( serviceRelationId); final IDManager.ServiceID.ServiceIDDefinition sourceIdDefinition = IDManager.ServiceID.analysisId( serviceRelationDefine.getSourceId()); final IDManager.ServiceID.ServiceIDDefinition destIdDefinition = IDManager.ServiceID.analysisId( serviceRelationDefine.getDestId()); ServiceRelationMetaInAlarm serviceRelationMetaInAlarm = new ServiceRelationMetaInAlarm(); serviceRelationMetaInAlarm.setMetricsName(meta.getMetricsName()); serviceRelationMetaInAlarm.setId(serviceRelationId); serviceRelationMetaInAlarm.setName(sourceIdDefinition.getName() + " to " + destIdDefinition.getName()); metaInAlarm = serviceRelationMetaInAlarm; } else if (DefaultScopeDefine.inServiceInstanceRelationCatalog(scope)) { final String instanceRelationId = meta.getId(); final IDManager.ServiceInstanceID.ServiceInstanceRelationDefine serviceRelationDefine = IDManager.ServiceInstanceID.analysisRelationId( instanceRelationId); final IDManager.ServiceInstanceID.InstanceIDDefinition sourceIdDefinition = IDManager.ServiceInstanceID.analysisId( serviceRelationDefine.getSourceId()); final IDManager.ServiceID.ServiceIDDefinition sourceServiceId = IDManager.ServiceID.analysisId( sourceIdDefinition.getServiceId()); final IDManager.ServiceInstanceID.InstanceIDDefinition destIdDefinition = IDManager.ServiceInstanceID.analysisId( serviceRelationDefine.getDestId()); final IDManager.ServiceID.ServiceIDDefinition destServiceId = IDManager.ServiceID.analysisId( destIdDefinition.getServiceId()); ServiceInstanceRelationMetaInAlarm instanceRelationMetaInAlarm = new ServiceInstanceRelationMetaInAlarm(); instanceRelationMetaInAlarm.setMetricsName(meta.getMetricsName()); instanceRelationMetaInAlarm.setId(instanceRelationId); instanceRelationMetaInAlarm.setName(sourceIdDefinition.getName() + " of " + sourceServiceId.getName() + " to " + destIdDefinition.getName() + " of " + destServiceId.getName()); metaInAlarm = instanceRelationMetaInAlarm; } else if (DefaultScopeDefine.inEndpointRelationCatalog(scope)) { final String endpointRelationId = meta.getId(); final IDManager.EndpointID.EndpointRelationDefine endpointRelationDefine = IDManager.EndpointID.analysisRelationId( endpointRelationId); final IDManager.ServiceID.ServiceIDDefinition sourceService = IDManager.ServiceID.analysisId( endpointRelationDefine.getSourceServiceId()); final IDManager.ServiceID.ServiceIDDefinition destService = IDManager.ServiceID.analysisId( endpointRelationDefine.getDestServiceId()); EndpointRelationMetaInAlarm endpointRelationMetaInAlarm = new EndpointRelationMetaInAlarm(); endpointRelationMetaInAlarm.setMetricsName(meta.getMetricsName()); endpointRelationMetaInAlarm.setId(endpointRelationId); endpointRelationMetaInAlarm.setName(endpointRelationDefine.getSource() + " in " + sourceService.getName() + " to " + endpointRelationDefine.getDest() + " in " + destService.getName()); metaInAlarm = endpointRelationMetaInAlarm; } else { return; } // MetricsName对应的RunningRule集合 List<RunningRule> runningRules = core.findRunningRule(meta.getMetricsName()); if (runningRules == null) { return; } // 遍历执行,如果metrics的实体(服务,端口等)在rule的include中,则根据timeBucket添加到该runningRule的values中 runningRules.forEach(rule -> rule.in(metaInAlarm, metrics)); }
告警规则执行类,计算指标值
方法
public Optional<AlarmMessage> checkAlarm() { if (isMatch()) { /* * When * 1. Metrics value threshold triggers alarm by rule * 2. Counter reaches the count threshold; * 3. Isn't in silence stage, judged by SilenceCountdown(!=0). */ counter++; if (counter >= countThreshold && silenceCountdown < 1) { silenceCountdown = silencePeriod; return Optional.of(new AlarmMessage()); } else { silenceCountdown--; } } else { silenceCountdown--; if (counter > 0) { counter--; } } return Optional.empty(); }
一个指标窗口,基于警报规则#period。这个窗口随时间滑动,只保留最近的N(period)桶
方法
告警服务数据入口,判断有误初始化alarm模块,找到NotifyHandler,传送数据
报警通知Worker,接受Metrics,做一个简单的路由到报警核心。在MetricsAggregateWorker进行L1聚合之后执行告警工作。
回调,包括webhook,持久化等等
allCallbacks.add(new WebhookCallback(alarmRulesWatcher));
allCallbacks.add(new GRPCCallback(alarmRulesWatcher));
allCallbacks.add(new SlackhookCallback(alarmRulesWatcher));
allCallbacks.add(new WechatHookCallback(alarmRulesWatcher));
allCallbacks.add(new DingtalkHookCallback(alarmRulesWatcher));
allCallbacks.add(new FeishuHookCallback(alarmRulesWatcher));
告警核心处理,根据告警设置,包括某些时间窗口的指标值。通过使用它的内部定时器
*触发器和报警规则来决定是否发送报警到数据库和webhook(s)
public void start(List<AlarmCallback> allCallbacks) { LocalDateTime now = LocalDateTime.now(); lastExecuteTime = now; Executors.newSingleThreadScheduledExecutor().scheduleAtFixedRate(() -> { try { final List<AlarmMessage> alarmMessageList = new ArrayList<>(30); LocalDateTime checkTime = LocalDateTime.now(); int minutes = Minutes.minutesBetween(lastExecuteTime, checkTime).getMinutes(); boolean[] hasExecute = new boolean[]{false}; // 遍历所有窗口数据 alarmRulesWatcher.getRunningContext().values().forEach(ruleList -> ruleList.forEach(runningRule -> { // 如果与上次间隔大于1分钟,则执行窗口里的runningRule if (minutes > 0) { // 移动窗口 runningRule.moveTo(checkTime); /* * Don't run in the first quarter per min, avoid to trigger false alarm. */ // 秒刻度大于15 if (checkTime.getSecondOfMinute() > 15) { hasExecute[0] = true; // runningRule.check()执行告警计算逻辑,产生告警信息 alarmMessageList.addAll(runningRule.check()); } } })); // Set the last execute time, and make sure the second is `00`, such as: 18:30:00 if (hasExecute[0]) { lastExecuteTime = checkTime.minusSeconds(checkTime.getSecondOfMinute()); } // 如果存在告警,则进行符合告警判断 if (alarmMessageList.size() > 0) { if (alarmRulesWatcher.getCompositeRules().size() > 0) { // 复合告警处理逻辑 List<AlarmMessage> messages = alarmRulesWatcher.getCompositeRuleEvaluator().evaluate(alarmRulesWatcher.getCompositeRules(), alarmMessageList); alarmMessageList.addAll(messages); } // 回调处理告警信息 List<AlarmMessage> filteredMessages = alarmMessageList.stream().filter(msg -> !msg.isOnlyAsCondition()).collect(Collectors.toList()); allCallbacks.forEach(callback -> callback.doAlarm(filteredMessages)); } } catch (Exception e) { LOGGER.error(e.getMessage(), e); } }, 10, 10, TimeUnit.SECONDS); }
以上是告警处理过程中涉及到类,整体的告警处理逻辑如下:
告警模块中规则动态读取涉及到的类及其职责介绍
告警模块在准备阶段会初始化告警规则,AlarmModuleProvider加载alarm-setting.yml
当我们使用动态配置alarm-setting.yml的时候,处理流程,涉及到的类及其职责介绍如下:
更多关于skywalking动态配置请看官方文档–动态配置
我们以grpc方式动态配置进行讲解,其他原理相同,只是接收数据方式不同
配置监听默认实现者,实现 DynamicConfigurationService
方法
void configSync() { // 主动拉取配置,readConfig根据不同的数据源使用不同的实现类 Optional<ConfigTable> configTable = readConfig(register.keys()); // Config table would be null if no change detected from the implementation. configTable.ifPresent(config -> { config.getItems().forEach(item -> { // alarm.default.alarm-settings String itemName = item.getName(); WatcherHolder holder = register.get(itemName); if (holder != null) { ConfigChangeWatcher watcher = holder.getWatcher(); String newItemValue = item.getValue(); if (newItemValue == null) { if (watcher.value() != null) { // Notify watcher, the new value is null with delete event type. watcher.notify( new ConfigChangeWatcher.ConfigChangeEvent(null, ConfigChangeWatcher.EventType.DELETE)); } else { // Don't need to notify, stay in null. } } else { if (!newItemValue.equals(watcher.value())) { // 新值与旧值不同则更新配置,也就是调用AlarmRulesWatcher.notify() 重新获取RunningRule集合 watcher.notify(new ConfigChangeWatcher.ConfigChangeEvent( newItemValue, ConfigChangeWatcher.EventType.MODIFY )); } else { // Don't need to notify, stay in the same config value. } } } else { LOGGER.warn("Config {} from configuration center, doesn't match any watcher, ignore.", itemName); } }); LOGGER.trace("Current configurations after the sync." + LINE_SEPARATOR + register.toString()); }); }
前面介绍了告警处理和告警规则动态配置,告警处理的NotifyHandler.notify(Metrics metrics): 接收Metrics进行告警处理。但是谁把告警Metrics传递过来呢,接下来我们继续探究万恶之源
流程启动类
分析器提供者,加载oal/core.oal 解析成 OALDefine
@Override public void start() throws ModuleStartException { // load official analysis // 加载oal getManager().find(CoreModule.NAME) .provider() .getService(OALEngineLoaderService.class) .load(CoreOALDefine.INSTANCE); DynamicConfigurationService dynamicConfigurationService = getManager().find(ConfigurationModule.NAME) .provider() .getService( DynamicConfigurationService.class); dynamicConfigurationService.registerConfigChangeWatcher(thresholds); dynamicConfigurationService.registerConfigChangeWatcher(uninstrumentedGatewaysConfig); dynamicConfigurationService.registerConfigChangeWatcher(traceSampleRateWatcher); segmentParserService.setListenerManager(listenerManager()); processService.start(meterConfigs); }
aol加载处理类
-load(): 解析OALDefine
public void load(OALDefine define) throws ModuleStartException { if (oalDefineSet.contains(define)) { // each oal define will only be activated once return; } try { OALEngine engine = loadOALEngine(define); StreamAnnotationListener streamAnnotationListener = new StreamAnnotationListener(moduleManager); // 设置@stream注解监听器 engine.setStreamListener(streamAnnotationListener); // 设置Dispatcher监听器 engine.setDispatcherListener(moduleManager.find(CoreModule.NAME) .provider() .getService(SourceReceiver.class) .getDispatcherDetectorListener()); // 调用OALRuntime.start engine.start(OALEngineLoaderService.class.getClassLoader()); // 调用OALRuntime.notifyAllListeners engine.notifyAllListeners(); oalDefineSet.add(define); } catch (ReflectiveOperationException | OALCompileException e) { throw new ModuleStartException(e.getMessage(), e); } }
OAL Runtime是类生成引擎,它从OAL脚本定义加载生成的类,这个运行时动态加载
notifyAllListeners()
@Override public void notifyAllListeners() throws ModuleStartException { for (Class metricsClass : metricsClasses) { try { streamAnnotationListener.notify(metricsClass); } catch (StorageException e) { throw new ModuleStartException(e.getMessage(), e); } } for (Class dispatcherClass : dispatcherClasses) { try { dispatcherDetectorListener.addIfAsSourceDispatcher(dispatcherClass); } catch (Exception e) { throw new ModuleStartException(e.getMessage(), e); } } }
@Stream注解处理类,根据Processor类型处理,以上生成的metricsClasses注解@Sream中processor=MetricsStreamProcessor
notify()
@Override public void notify(Class aClass) throws StorageException { if (aClass.isAnnotationPresent(Stream.class)) { Stream stream = (Stream) aClass.getAnnotation(Stream.class); if (stream.processor().equals(RecordStreamProcessor.class)) { RecordStreamProcessor.getInstance().create(moduleDefineHolder, stream, aClass); } else if (stream.processor().equals(MetricsStreamProcessor.class)) { MetricsStreamProcessor.getInstance().create(moduleDefineHolder, stream, aClass); } else if (stream.processor().equals(TopNStreamProcessor.class)) { TopNStreamProcessor.getInstance().create(moduleDefineHolder, stream, aClass); } else if (stream.processor().equals(NoneStreamProcessor.class)) { NoneStreamProcessor.getInstance().create(moduleDefineHolder, stream, aClass); } else if (stream.processor().equals(ManagementStreamProcessor.class)) { ManagementStreamProcessor.getInstance().create(moduleDefineHolder, stream, aClass); } else { throw new UnexpectedException("Unknown stream processor."); } } else { throw new UnexpectedException( "Stream annotation listener could only parse the class present stream annotation."); } }
流式计算,指标Stream聚合处理器
属性
方法
提供了内存中的指标合并功能。这个聚合叫做L1聚合,它合并后的数据刚刚接收分析。属于同一实体、指标类型和时间的指标bucket, L1聚合将它们合并成一个指标对象,以减少不必要的内存和网络
private void onWork(List<Metrics> metricsList) { metricsList.forEach(metrics -> { aggregationCounter.inc(); // 执行Metrics实体里的combine() 进行函数计算 mergeDataCache.accept(metrics); }); // 传递给下一个worker mergeDataCache.read().forEach( data -> { if (log.isDebugEnabled()) { log.debug(data.toString()); } nextWorker.in(data); } ); }
#一.告警处理--AlarmNotifyWorker
至此整个告警流程首尾相连!
接图1告警来源入口,整个告警流程首尾相连!
IT-CLOUD :IT服务管理平台,集成基础服务,中间件服务,监控告警服务等。
IT-CLOUD-ACTIVITI6 :Activiti教程源码。博文在本CSDN Activiti系列中。
IT-CLOUD-ELASTICSEARCH :elasticsearch教程源码。博文在本CSDN elasticsearch系列中。
IT-CLOUD-KAFKA :spring整合kafka教程源码。博文在本CSDN kafka系列中。
IT-CLOUD-KAFKA-CLIENT :kafka client教程源码。博文在本CSDN kafka系列中。开源项目,持续更新中,喜欢请 Star~
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。