当前位置:   article > 正文

实现Flume多维度metrics测量的解决方案_flumemetricsmgr

flumemetricsmgr

由于公司大数据架构是,flume收集所有数据,流到kafka。
kafka自带相关metrics测量,而flume没有,导致数据定位发生困难。
为此,特地研究了kafka metrics的相关源码模块,将其实现原理移植到flume

kafka metrics 实现方式

kafka 相关测量模块主要在入口处,是用scala语言实现。
通过研究,发现kafka 是用 开源测量库 yammer metrics 2.2.0,实现了count gague、histogram、meter、timer 五种测量类型

增强flume测量功能

在flume Interceptor 位置,编写自定义的Interceptor拦截器。模拟kafka的实现,也利用yammer metrics 2.2.0。

先定义全局的MetricsRegistry

其实不定义也行yammer metrics 2.2.0有默认的MetricsRegistry,为防止受干扰,自定义了下

package global;

import com.yammer.metrics.core.MetricsRegistry;

/**
 * Created by shen.xiangxiang on 2017/4/21.
 */
public class GlobalMetricRegistry {

    public static  MetricsRegistry flumeMetricsRegistry = new MetricsRegistry();

}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
编写Interceptor

编写自定义的Interceptor ,对经过flume的每一条event 进行测量
关键是覆写intercept() 方法
此处贴出样例代码,读者可以根据自己的业务场景做相应的修改

@Override
    public Event intercept(Event event) {
        log.debug("HeaderTopicInterceptor intercept start");
        Event eventOut;
        Map<String, String> eventHeader = new HashMap<String, String>();
        // 获得当前消息头
        Map<String, String> eventInputHeader = event.getHeaders();
        // 提取消息头的消息类型字段
        String messageType = "";
        if (eventInputHeader.containsKey("mt")) {
            messageType = eventInputHeader.get("mt");
            if (messageType.equals("TEXT")) {
                // 文本消息,增加主题键
                String eventBody = new String(event.getBody());
                // split eventBody by '|'
                String[] eventBodySplit = eventBody.split("\\|");
                if (eventBodySplit.length >= 6) {
                    // check first item in eventBodySplit is 'V2'
                    if (eventBodySplit[0].equals(DEFAULT_TEXT_MSG_HEADER_VERSION)) {
                        // set Topic Key as the third item in eventBodySplit
                        // TODO: 增加一个计数器
                        // 判断subject是否合法 subject = domain.dataType dataType即topic
                        boolean illegal = false;
                        String topic = eventBodySplit[2];
                        String businessDomain = eventBodySplit[3];
                        String _subject = businessDomain + "." + topic;
                        String timestampString = eventBodySplit[4];
                        SimpleDateFormat timestampFormater = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");

                        //用于测量
                        Map<String, String> mapEvent = new HashMap<String, String>();
                        mapEvent.put("domain", businessDomain);
                        mapEvent.put("topic", topic);
                        mapEvent.put("id", ID);
                        mapEvent.put("mt", messageType);
                        String scope = bulidScope(mapEvent);


                        try {
                            timestampFormater.setLenient(false);
                            timestampFormater.parse(timestampString);
                            if (SubjectWhitelistUtil.subjects.contains(_subject)) {
                                illegal = true;
                            }
                        } catch (ParseException e) {

                        }

                        if (illegal) {
                            eventHeader.put(KafkaSink.TOPIC_HDR, topic);

                            //message count 统计
                            Meter meterMessage = GlobalMetricRegistry.flumeMetricsRegistry.newMeter(new MetricName(getClass(), "flume_FilterSuccessMessages", scope), "messages", TimeUnit.SECONDS);
                            meterMessage.mark(1);

                            //message byte
                            Meter meterBytes = GlobalMetricRegistry.flumeMetricsRegistry.newMeter(new MetricName(getClass(), "flume_FilterSuccessBytes", scope), "bytes", TimeUnit.SECONDS);
                            meterBytes.mark(event.getBody().length);

                            Histogram hisBytes = GlobalMetricRegistry.flumeMetricsRegistry.newHistogram(new MetricName(getClass(), "flume_FilterSuccessBytesHis", scope), false);
                            hisBytes.update(event.getBody().length);


                        } else {
                            eventHeader.put(KafkaSink.TOPIC_HDR, DEFAULT_UNKNOWN_TOPIC);

                            mapEvent.put("dropReason", "notInSubjectWhitelist");
                            scope = bulidScope(mapEvent);

                            //message count 统计
                            Meter meterMessage = GlobalMetricRegistry.flumeMetricsRegistry.newMeter(new MetricName(getClass(), "flume_FilterDropMessages", scope), "messages", TimeUnit.SECONDS);
                            meterMessage.mark(1);

                            //message byte
                            Meter meterBytes = GlobalMetricRegistry.flumeMetricsRegistry.newMeter(new MetricName(getClass(), "flume_FilterDropBytes", scope), "bytes", TimeUnit.SECONDS);
                            meterBytes.mark(event.getBody().length);

                            Histogram hisBytes = GlobalMetricRegistry.flumeMetricsRegistry.newHistogram(new MetricName(getClass(), "flume_FilterDropBytesHis", scope), false);
                            hisBytes.update(event.getBody().length);

                        }


                    } 
                } 
            } 
        } else {
            // Cannot find message type
            // TODO: 增加一个计数器
            eventHeader.put(KafkaSink.TOPIC_HDR, DEFAULT_UNKNOWN_TOPIC);
        }

        // set Event
        eventOut = EventBuilder.withBody(event.getBody(), eventHeader);
        log.debug("HeaderTopicInterceptor intercept end");
        return eventOut;
    }
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36
  • 37
  • 38
  • 39
  • 40
  • 41
  • 42
  • 43
  • 44
  • 45
  • 46
  • 47
  • 48
  • 49
  • 50
  • 51
  • 52
  • 53
  • 54
  • 55
  • 56
  • 57
  • 58
  • 59
  • 60
  • 61
  • 62
  • 63
  • 64
  • 65
  • 66
  • 67
  • 68
  • 69
  • 70
  • 71
  • 72
  • 73
  • 74
  • 75
  • 76
  • 77
  • 78
  • 79
  • 80
  • 81
  • 82
  • 83
  • 84
  • 85
  • 86
  • 87
  • 88
  • 89
  • 90
  • 91
  • 92
  • 93
  • 94
  • 95
  • 96
  • 97

封装的bulidScope() 方法

public String bulidScope(Map<String, String> mapBusiness) {
        String scope = "";
        Iterator<Map.Entry<String, String>> entries = mapBusiness.entrySet().iterator();
        while (entries.hasNext()) {
            Map.Entry<String, String> entry = entries.next();
            String strKey = entry.getKey();
            String strVal = entry.getValue();
            scope += strKey + "." + strVal + ".";
        }
        return scope.substring(0, scope.length() - 1);
    }
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
创建监控服务 InfluxMetricsServer

样例代码如下


package org.apache.flume.instrumentation.http;

import global.GlobalMetricRegistry;
import org.apache.flume.Context;
import org.apache.flume.conf.ConfigurationException;
import org.apache.flume.instrumentation.MonitorService;
import org.apache.flume.instrumentation.http.tool.HostUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.util.concurrent.TimeUnit;

/**
 * A Monitor service implementation that runs a web server on a configurable
 * port and returns the metrics for components in JSON format.
 * <p>
 * Optional parameters:
 * <p>
 * <tt>port</tt> : The port on which the server should listen to.
 * <p>
 * Returns metrics in the following format:
 * <p>
 *
 * {
 * <p>
 * "componentName1":{"metric1" : "metricValue1","metric2":"metricValue2"}
 * <p>
 * "componentName1":{"metric3" : "metricValue3","metric4":"metricValue4"}
 * <p>
 */
public class InfluxMetricsServer implements MonitorService {

    private static Logger logger = LoggerFactory
            .getLogger(InfluxMetricsServer.class);

    public final String CONF_HOST = "host";
    public final String CONF_POLL_FREQUENCY = "pollFrequency";
    public final int DEFAULT_POLL_FREQUENCY = 60;
    public final String CONF_VERSION = "version";
    public final String DEFAULT_VERSION = "1.0.0";
    public final String CONF_INSTANCE_ID = "instanceId";

    private String path = "http://";
    private int pollFrequency;
    private String version;

//  private ScheduledExecutorService service = Executors
//          .newSingleThreadScheduledExecutor();

    public static String localhost = null;
    public static String instanceId = null;



    public void configure(Context context) {
        // http地址
        String host = context.getString(this.CONF_HOST);



        if ((host == null) || host.isEmpty()) {
            throw new ConfigurationException("Hosts list cannot be empty.");
        }



        path += host;
        // 频率/s
        this.pollFrequency = context.getInteger(this.CONF_POLL_FREQUENCY,
                this.DEFAULT_POLL_FREQUENCY);
        // 测量消息schema版本
        this.version = context.getString(this.CONF_VERSION,
                this.DEFAULT_VERSION);
        // 实例id
        localhost = HostUtils.getHostName();

        //实例id
        instanceId = context.getString(this.CONF_INSTANCE_ID,"unknown");
//      if (instanceId == null || instanceId.isEmpty()) {
//          throw new ConfigurationException("instanceId list cannot be empty.");
//      }
        logger.info("InfluxMetricsServer configure finished");
    }

    @Override
    public void start() {



//      if (service.isShutdown() || service.isTerminated()) {
//          service = Executors.newSingleThreadScheduledExecutor();
//      }
//      logger.info("InfluxMetricsServer start before scheduleWithFixedDelay");
//      service.scheduleWithFixedDelay(this.task(), 0, pollFrequency, TimeUnit.SECONDS);
//      logger.info("InfluxMetricsServer start after scheduleWithFixedDelay");

        //TODO  增加 report
        logger.info(" enable InfluxReporter");
//      InfluxReporter reporter = new InfluxReporter(GlobalMetricRegistry.flumeMetricsRegistry,path, MetricPredicate.ALL);
//      reporter.start(pollFrequency, TimeUnit.SECONDS);

        InfluxReporter.enable(GlobalMetricRegistry.flumeMetricsRegistry, pollFrequency, path ,TimeUnit.SECONDS);

        logger.info(" after InfluxReporter");


    }

    @Override
    public void stop() {
        logger.info("InfluxMetricsServer configure stop");
    }



}



  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36
  • 37
  • 38
  • 39
  • 40
  • 41
  • 42
  • 43
  • 44
  • 45
  • 46
  • 47
  • 48
  • 49
  • 50
  • 51
  • 52
  • 53
  • 54
  • 55
  • 56
  • 57
  • 58
  • 59
  • 60
  • 61
  • 62
  • 63
  • 64
  • 65
  • 66
  • 67
  • 68
  • 69
  • 70
  • 71
  • 72
  • 73
  • 74
  • 75
  • 76
  • 77
  • 78
  • 79
  • 80
  • 81
  • 82
  • 83
  • 84
  • 85
  • 86
  • 87
  • 88
  • 89
  • 90
  • 91
  • 92
  • 93
  • 94
  • 95
  • 96
  • 97
  • 98
  • 99
  • 100
  • 101
  • 102
  • 103
  • 104
  • 105
  • 106
  • 107
  • 108
  • 109
  • 110
  • 111
  • 112
  • 113
  • 114
  • 115
  • 116
  • 117
  • 118
  • 119
  • 120
  • 121
report出metrics

上述监控I服务nfluxMetricsServer样例代码使用了 InfluxReporter
样例代码如下,同时也可以参考我之前的blog《基于yammer metrics 开发InfluxReport》: https://blog.csdn.net/zhixingheyi_tian/article/details/77848432


package org.apache.flume.instrumentation.http;

import com.yammer.metrics.Metrics;
import com.yammer.metrics.core.*;
import com.yammer.metrics.core.Timer;
import com.yammer.metrics.reporting.AbstractPollingReporter;
import com.yammer.metrics.stats.Snapshot;
import org.apache.flume.instrumentation.http.tool.HostUtils;
import org.apache.flume.instrumentation.util.JMXPollUtil;
import org.influxdb.InfluxDB;
import org.influxdb.InfluxDBFactory;
import org.influxdb.dto.BatchPoints;
import org.influxdb.dto.Point;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.util.*;
import java.util.concurrent.TimeUnit;


/**
 * Created by shen.xiangxiang on 2017/4/20.
 */

/**
 * 继承 AbstractPollingReporter,实现influx输出格式及httpreport方式
 */

public class InfluxReporter extends AbstractPollingReporter implements MetricProcessor<InfluxReporter.Context> {



    /**
     * 创建 enable
     * Enables the console reporter for the default metrics registry, and causes it to print to
     * influx with the specified period.
     *
     * @param period the period between successive outputs
     * @param unit   the time unit of {@code period}
     */
    public static void enable(long period, String hostPort ,TimeUnit unit) {
        enable(Metrics.defaultRegistry(),period, hostPort,unit);
    }

    /**
     * Enables the console reporter for the given metrics registry, and causes it to print to influx
     * with the specified period and unrestricted output.
     *
     * @param metricsRegistry the metrics registry
     * @param period          the period between successive outputs
     * @param unit            the time unit of {@code period}
     */
    public static void enable(MetricsRegistry metricsRegistry, long period, String hostPort ,TimeUnit unit) {
        final InfluxReporter reporter = new InfluxReporter(metricsRegistry,
                hostPort,
                MetricPredicate.ALL);
        reporter.start(period, unit);
    }


    private static final Logger LOG = LoggerFactory.getLogger(InfluxReporter.class);

    private static final MetricPredicate DEFAULT_METRIC_PREDICATE = MetricPredicate.ALL;

    private BatchPoints batchPoints;

    //传入 host名
    private String hostport;

    //本地 host
    private String tag_host;

    private InfluxDB influxDBclient;

    private Clock clock;

    private final MetricPredicate predicate;

    private Context context;

    /**
     *
     * @param hostPort,
     * 快捷构造 ,for the default metrics registry,
     */
    public InfluxReporter(String hostPort) {
        this(Metrics.defaultRegistry(), hostPort, MetricPredicate.ALL);
    }


    /**
     * Creates a new {@link AbstractPollingReporter} instance.
     **/
    public InfluxReporter(MetricsRegistry metricsRegistry, String hostPort, MetricPredicate predicate) {
        super(metricsRegistry, "influx-reporter");
        this.hostport = hostPort;
        this.influxDBclient = InfluxDBFactory.connect(hostPort);
        this.clock = Clock.defaultClock();
        this.tag_host = HostUtils.getHostName();
        this.predicate = predicate;
        this.context = new Context() {
            @Override
            public long getTime() {
                return InfluxReporter.this.clock.time();
            }
        };
    }


    @Override
    public void run() {
        try {
            // BatchPoints 构造参数  均没有用,随便填,因为数据最终以http协议发出
            this.batchPoints = BatchPoints
                    .database("Metrics")
                    .retentionPolicy("autogen")
                    .consistency(InfluxDB.ConsistencyLevel.ALL)
                    .build();
            printRegularMetrics(context);
            //TODO
            processFlumeCounter();

            //TODO
//            LOG.info("batchPoints.lineProtocol():" + batchPoints.lineProtocol());
            this.influxDBclient.write(batchPoints);
        } catch (Exception e) {
            LOG.error("Cannot send metrics to InfluxDB {}", e);
            LOG.error("error batchPoints.lineProtocol():" + batchPoints.lineProtocol());
        }
    }



    private void printRegularMetrics(final Context context) {

        //TODO
        LOG.info("getMetricsRegistry().groupedMetrics(DEFAULT_METRIC_PREDICATE).size():" + getMetricsRegistry().groupedMetrics(DEFAULT_METRIC_PREDICATE).size());

        for (Map.Entry<String, SortedMap<MetricName, Metric>> entry : getMetricsRegistry().groupedMetrics(DEFAULT_METRIC_PREDICATE).entrySet()) {
            for (Map.Entry<MetricName, Metric> subEntry : entry.getValue().entrySet()) {
                final MetricName metricName = subEntry.getKey();
                final Metric metric = subEntry.getValue();
                if (predicate.matches(metricName, metric)){
                    try {
                        metric.processWith(this, subEntry.getKey(), context);
                    } catch (Exception ignored) {
                        LOG.error("Error printing regular metrics:", ignored);
                    }
                }
            }
        }
    }


    public void processGauge(MetricName name, Gauge<?> gauge, Context context) throws Exception {

        Point.Builder pointbuilder = buildMetricsPoint(name, context);
        pointbuilder.tag("metric_type", "gague");

        Object fieldValue = gauge.value();
        String fieldName = "value";
        // Long Interger 统一转Float型,以防止 schema冲突
        if( fieldValue instanceof Float)
            pointbuilder.addField(fieldName, (Float)fieldValue);
        else if( fieldValue instanceof Double)
            pointbuilder.addField(fieldName, (Double)fieldValue);
        else if( fieldValue instanceof Long)
            pointbuilder.addField(fieldName, Float.valueOf(((Long)fieldValue).toString()));
        else if( fieldValue instanceof Integer)
            pointbuilder.addField(fieldName, Float.valueOf(((Integer)fieldValue).toString()));
        else if( fieldValue instanceof String)
            pointbuilder.addField(fieldName, (String)fieldValue);
        else
            return;
        batchPoints.point(pointbuilder.build());
    }

    @Override
    public void processCounter(MetricName metricName, Counter counter, Context context) throws Exception {

        Point.Builder pointbuilder = buildMetricsPoint(metricName, context);
        pointbuilder.tag("metric_type", "counter");

        pointbuilder.addField("count", counter.count());
        batchPoints.point(pointbuilder.build());

    }


    @Override
    public void processMeter(MetricName metricName, Metered meter, Context context) throws Exception {

        Point.Builder pointbuilder = buildMetricsPoint(metricName, context);
        pointbuilder.tag("metric_type", "meter");
        pointbuilder.tag("eventType", meter.eventType());


        pointbuilder.addField("count", meter.count());
        pointbuilder.addField("meanRate", meter.meanRate());
        pointbuilder.addField("1MinuteRate", meter.oneMinuteRate());
        pointbuilder.addField("5MinuteRate", meter.fiveMinuteRate());
        pointbuilder.addField("15MinuteRate", meter.fifteenMinuteRate());


        batchPoints.point(pointbuilder.build());

    }



    @Override
    public void processHistogram(MetricName metricName, Histogram histogram, Context context) throws Exception {
        final Snapshot snapshot = histogram.getSnapshot();

        Point.Builder pointbuilder = buildMetricsPoint(metricName, context);
        pointbuilder.tag("metric_type", "histogram");

        pointbuilder.addField("max", histogram.max());
        pointbuilder.addField("mean", histogram.mean());
        pointbuilder.addField("min", histogram.min());
        pointbuilder.addField("stddev", histogram.max());
        pointbuilder.addField("sum", histogram.sum());

        pointbuilder.addField("median", snapshot.getMedian());
        pointbuilder.addField("p75", snapshot.get75thPercentile());
        pointbuilder.addField("p95", snapshot.get95thPercentile());
        pointbuilder.addField("p98", snapshot.get98thPercentile());
        pointbuilder.addField("p99", snapshot.get99thPercentile());
        pointbuilder.addField("p999", snapshot.get999thPercentile());

        batchPoints.point(pointbuilder.build());

    }

    public void processTimer(MetricName metricName, Timer timer, Context context) throws Exception {
        final Snapshot snapshot = timer.getSnapshot();

        Point.Builder pointbuilder = buildMetricsPoint(metricName, context);
        pointbuilder.tag("metric_type", "timer");


        pointbuilder.addField("count", timer.count());
        pointbuilder.addField("meanRate", timer.meanRate());
        pointbuilder.addField("1MinuteRate", timer.oneMinuteRate());
        pointbuilder.addField("5MinuteRate", timer.fiveMinuteRate());
        pointbuilder.addField("15MinuteRate", timer.fifteenMinuteRate());


        pointbuilder.addField("max", timer.max());
        pointbuilder.addField("mean", timer.mean());
        pointbuilder.addField("min", timer.min());
        pointbuilder.addField("stddev", timer.max());
        pointbuilder.addField("sum", timer.sum());

        pointbuilder.addField("median", snapshot.getMedian());
        pointbuilder.addField("p75", snapshot.get75thPercentile());
        pointbuilder.addField("p95", snapshot.get95thPercentile());
        pointbuilder.addField("p98", snapshot.get98thPercentile());
        pointbuilder.addField("p99", snapshot.get99thPercentile());
        pointbuilder.addField("p999", snapshot.get999thPercentile());




        batchPoints.point(pointbuilder.build());
    }


    private Point.Builder buildMetricsPoint(MetricName metricName, Context context) {

        //name要注意规范,加上前缀
        Point.Builder pointbuilder = Point.measurement(metricName.getName())
                .time(context.getTime(), TimeUnit.MILLISECONDS)
                .tag("group", metricName.getGroup())
                .tag("type", metricName.getType())
                //自动获取  host
                .tag("host", tag_host);


        //扩展区域
        if( metricName.hasScope() ) {
            String scope = metricName.getScope();

            List<String> scopes = Arrays.asList(scope.split("\\."));
            if( scopes.size() % 2 == 0) {
                Iterator<String> iterator = scopes.iterator();
                while (iterator.hasNext()) {
                    pointbuilder.tag(iterator.next(), iterator.next());
                }
            }
            else pointbuilder.tag("scope", scope);
        }
        return pointbuilder;
    }


    public interface Context {

        long getTime();
    }

    //TODO 定制化吐出flumecounter
    public void processFlumeCounter() {

        Map<String, Map<String, String>> metricsMap = JMXPollUtil
                .getAllMBeans();
        LOG.info("metricsMap:" + metricsMap);
        long timestamp = new Date().getTime();

        //参数写死 因为其不起作用
//        BatchPoints batchPoints = BatchPoints
//                .database("FlumeMetrics")
//                .retentionPolicy("autogen")
//                .consistency(InfluxDB.ConsistencyLevel.ALL)
//                .build();

        try {
            for (String component : metricsMap.keySet()) {
                Map<String, String> attributeMap = metricsMap
                        .get(component);
                String type = attributeMap.get("Type");
                attributeMap.remove("Type");
                attributeMap.remove("StopTime");
                attributeMap.remove("StartTime");
                attributeMap.remove("ChannelFillPercentage");
                for (String attribute : attributeMap.keySet()) {

                    //TODO
//                    LOG.info("attribute:" + attribute);
//                    LOG.info("componentname:" + component.replace(type + ".", ""));
//                    LOG.info("type:" + type);


                    Point.Builder pointbuilder = Point.measurement("flume_counter")
                            .time(timestamp, TimeUnit.MILLISECONDS)
                            .tag("name", attribute)
                            .tag("host", tag_host)
                            .tag("componentname", component.replace(type + ".", ""))
                            .tag("type", type)
                            .tag("group", "flume.agent")
                            .tag("metric_type", "counter");

                    pointbuilder.addField("values", Long.valueOf(attributeMap.get(attribute)));

                    batchPoints.point(pointbuilder.build());

                }
            }
        }
        catch(IllegalArgumentException ille)
        {
            ille.printStackTrace();
            LOG.error("error metricsMap:" + metricsMap);
            LOG.error("error:" + ille);
            LOG.error("ille.getMessage():" + ille.getMessage());
        }
        catch(Exception e)
        {
            e.printStackTrace();
            LOG.error("error metricsMap:" + metricsMap);
            LOG.error("e.getMessage():" + e.getMessage());
        }


//        influxDBclient.write(batchPoints);
        LOG.info("InfluxMetricsServer run onetime");

    }

}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36
  • 37
  • 38
  • 39
  • 40
  • 41
  • 42
  • 43
  • 44
  • 45
  • 46
  • 47
  • 48
  • 49
  • 50
  • 51
  • 52
  • 53
  • 54
  • 55
  • 56
  • 57
  • 58
  • 59
  • 60
  • 61
  • 62
  • 63
  • 64
  • 65
  • 66
  • 67
  • 68
  • 69
  • 70
  • 71
  • 72
  • 73
  • 74
  • 75
  • 76
  • 77
  • 78
  • 79
  • 80
  • 81
  • 82
  • 83
  • 84
  • 85
  • 86
  • 87
  • 88
  • 89
  • 90
  • 91
  • 92
  • 93
  • 94
  • 95
  • 96
  • 97
  • 98
  • 99
  • 100
  • 101
  • 102
  • 103
  • 104
  • 105
  • 106
  • 107
  • 108
  • 109
  • 110
  • 111
  • 112
  • 113
  • 114
  • 115
  • 116
  • 117
  • 118
  • 119
  • 120
  • 121
  • 122
  • 123
  • 124
  • 125
  • 126
  • 127
  • 128
  • 129
  • 130
  • 131
  • 132
  • 133
  • 134
  • 135
  • 136
  • 137
  • 138
  • 139
  • 140
  • 141
  • 142
  • 143
  • 144
  • 145
  • 146
  • 147
  • 148
  • 149
  • 150
  • 151
  • 152
  • 153
  • 154
  • 155
  • 156
  • 157
  • 158
  • 159
  • 160
  • 161
  • 162
  • 163
  • 164
  • 165
  • 166
  • 167
  • 168
  • 169
  • 170
  • 171
  • 172
  • 173
  • 174
  • 175
  • 176
  • 177
  • 178
  • 179
  • 180
  • 181
  • 182
  • 183
  • 184
  • 185
  • 186
  • 187
  • 188
  • 189
  • 190
  • 191
  • 192
  • 193
  • 194
  • 195
  • 196
  • 197
  • 198
  • 199
  • 200
  • 201
  • 202
  • 203
  • 204
  • 205
  • 206
  • 207
  • 208
  • 209
  • 210
  • 211
  • 212
  • 213
  • 214
  • 215
  • 216
  • 217
  • 218
  • 219
  • 220
  • 221
  • 222
  • 223
  • 224
  • 225
  • 226
  • 227
  • 228
  • 229
  • 230
  • 231
  • 232
  • 233
  • 234
  • 235
  • 236
  • 237
  • 238
  • 239
  • 240
  • 241
  • 242
  • 243
  • 244
  • 245
  • 246
  • 247
  • 248
  • 249
  • 250
  • 251
  • 252
  • 253
  • 254
  • 255
  • 256
  • 257
  • 258
  • 259
  • 260
  • 261
  • 262
  • 263
  • 264
  • 265
  • 266
  • 267
  • 268
  • 269
  • 270
  • 271
  • 272
  • 273
  • 274
  • 275
  • 276
  • 277
  • 278
  • 279
  • 280
  • 281
  • 282
  • 283
  • 284
  • 285
  • 286
  • 287
  • 288
  • 289
  • 290
  • 291
  • 292
  • 293
  • 294
  • 295
  • 296
  • 297
  • 298
  • 299
  • 300
  • 301
  • 302
  • 303
  • 304
  • 305
  • 306
  • 307
  • 308
  • 309
  • 310
  • 311
  • 312
  • 313
  • 314
  • 315
  • 316
  • 317
  • 318
  • 319
  • 320
  • 321
  • 322
  • 323
  • 324
  • 325
  • 326
  • 327
  • 328
  • 329
  • 330
  • 331
  • 332
  • 333
  • 334
  • 335
  • 336
  • 337
  • 338
  • 339
  • 340
  • 341
  • 342
  • 343
  • 344
  • 345
  • 346
  • 347
  • 348
  • 349
  • 350
  • 351
  • 352
  • 353
  • 354
  • 355
  • 356
  • 357
  • 358
  • 359
  • 360
  • 361
  • 362
  • 363
  • 364
  • 365
  • 366
  • 367
  • 368
  • 369
  • 370
  • 371
  • 372

总结

此处演示了如何增强flume metrics测量的功能,有了测量之后还需要report出去。
主要有三段sample代码,分享给读者及同仁。

声明:本文内容由网友自发贡献,不代表【wpsshop博客】立场,版权归原作者所有,本站不承担相应法律责任。如您发现有侵权的内容,请联系我们。转载请注明出处:https://www.wpsshop.cn/w/酷酷是懒虫/article/detail/952456
推荐阅读
相关标签
  

闽ICP备14008679号