45、Flink 的指标体系介绍及验证(1)-指标类型及指标实现示例

Flink 系列文章

1、Flink 部署、概念介绍、source、transformation、sink使用示例、四大基石介绍和示例等系列综合文章链接

13、Flink 的table api与sql的基本概念、通用api介绍及入门示例
14、Flink 的table api与sql之数据类型: 内置数据类型以及它们的属性
15、Flink 的table api与sql之流式概念-详解的介绍了动态表、时间属性配置(如何处理更新结果)、时态表、流上的join、流上的确定性以及查询配置
16、Flink 的table api与sql之连接外部系统: 读写外部系统的连接器和格式以及FileSystem示例(1)
16、Flink 的table api与sql之连接外部系统: 读写外部系统的连接器和格式以及Elasticsearch示例(2)
16、Flink 的table api与sql之连接外部系统: 读写外部系统的连接器和格式以及Apache Kafka示例(3)
16、Flink 的table api与sql之连接外部系统: 读写外部系统的连接器和格式以及JDBC示例(4)
16、Flink 的table api与sql之连接外部系统: 读写外部系统的连接器和格式以及Apache Hive示例(6)
17、Flink 之Table API: Table API 支持的操作(1)
17、Flink 之Table API: Table API 支持的操作(2)
18、Flink的SQL 支持的操作和语法
19、Flink 的Table API 和 SQL 中的内置函数及示例(1)
19、Flink 的Table API 和 SQL 中的自定义函数及示例(2)
19、Flink 的Table API 和 SQL 中的自定义函数及示例(3)
19、Flink 的Table API 和 SQL 中的自定义函数及示例(4)
20、Flink SQL之SQL Client: 不用编写代码就可以尝试 Flink SQL,可以直接提交 SQL 任务到集群上
21、Flink 的table API与DataStream API 集成(1)- 介绍及入门示例、集成说明
21、Flink 的table API与DataStream API 集成(2)- 批处理模式和inser-only流处理
21、Flink 的table API与DataStream API 集成(3)- changelog流处理、管道示例、类型转换和老版本转换示例
21、Flink 的table API与DataStream API 集成(完整版)
22、Flink 的table api与sql之创建表的DDL
24、Flink 的table api与sql之Catalogs(介绍、类型、java api和sql实现ddl、java api和sql操作catalog)-1
24、Flink 的table api与sql之Catalogs(java api操作数据库、表)-2
24、Flink 的table api与sql之Catalogs(java api操作视图)-3
24、Flink 的table api与sql之Catalogs(java api操作分区与函数)-4
25、Flink 的table api与sql之函数(自定义函数示例)
26、Flink 的SQL之概览与入门示例
27、Flink 的SQL之SELECT (select、where、distinct、order by、limit、集合操作和去重)介绍及详细示例(1)
27、Flink 的SQL之SELECT (SQL Hints 和 Joins)介绍及详细示例(2)
27、Flink 的SQL之SELECT (窗口函数)介绍及详细示例(3)
27、Flink 的SQL之SELECT (窗口聚合)介绍及详细示例(4)
27、Flink 的SQL之SELECT (Group Aggregation分组聚合、Over Aggregation Over聚合 和 Window Join 窗口关联)介绍及详细示例(5)
27、Flink 的SQL之SELECT (Top-N、Window Top-N 窗口 Top-N 和 Window Deduplication 窗口去重)介绍及详细示例(6)
27、Flink 的SQL之SELECT (Pattern Recognition 模式检测)介绍及详细示例(7)
28、Flink 的SQL之DROP 、ALTER 、INSERT 、ANALYZE 语句
29、Flink SQL之DESCRIBE、EXPLAIN、USE、SHOW、LOAD、UNLOAD、SET、RESET、JAR、JOB Statements、UPDATE、DELETE(1)
29、Flink SQL之DESCRIBE、EXPLAIN、USE、SHOW、LOAD、UNLOAD、SET、RESET、JAR、JOB Statements、UPDATE、DELETE(2)
30、Flink SQL之SQL 客户端(通过kafka和filesystem的例子介绍了配置文件使用-表、视图等)
31、Flink的SQL Gateway介绍及示例
32、Flink table api和SQL 之用户自定义 Sources & Sinks实现及详细示例
33、Flink 的Table API 和 SQL 中的时区
35、Flink 的 Formats 之CSV 和 JSON Format
36、Flink 的 Formats 之Parquet 和 Orc Format
41、Flink之Hive 方言介绍及详细示例
40、Flink 的Apache Kafka connector(kafka source的介绍及使用示例)-1
40、Flink 的Apache Kafka connector(kafka sink的介绍及使用示例)-2
40、Flink 的Apache Kafka connector(kafka source 和sink 说明及使用示例) 完整版
42、Flink 的table api与sql之Hive Catalog
43、Flink之Hive 读写及详细验证示例
44、Flink之module模块介绍及使用示例和Flink SQL使用hive内置函数及自定义函数详细示例–网上有些说法好像是错误的
45、Flink 的指标体系介绍及验证(1)-指标类型及指标实现示例
45、Flink 的指标体系介绍及验证(2)-指标的scope、报告、系统指标以及追踪、api集成示例和dashboard集成
45、Flink 的指标体系介绍及验证(3)- 完整版
46、Flink 的table api与sql之配项列表及示例


文章目录

  • Flink 系列文章
  • 一、Flink 指标体系
    • 1、Registering metrics 注册指标
      • 1)、指标类型
      • 2)、计数器
      • 3)、Gauge
      • 4)、Histogram
      • 5)、Meter


本文简单的介绍了Flink 的指标体系的第一部分,即指标类型以及四种类型的代码实现示例。
本专题分为三部分,即:
45、Flink 的指标体系介绍及验证(1)-指标类型及指标实现示例
45、Flink 的指标体系介绍及验证(2)-指标的scope、报告、系统指标以及追踪、api集成示例和dashboard集成
45、Flink 的指标体系介绍及验证(3)- 完整版

本文依赖nc能正常使用。
本文分为5个部分,即指标分类、计数器、gauge、histogram和meter四个指标的代码实现。
本文的示例是在Flink 1.17版本中运行。

一、Flink 指标体系

Flink暴露了一个度量系统,允许收集度量并将其公开给外部系统。
本文涉及的maven依赖

	<properties><encoding>UTF-8</encoding><project.build.sourceEncoding>UTF-8</project.build.sourceEncoding><maven.compiler.source>1.8</maven.compiler.source><maven.compiler.target>1.8</maven.compiler.target><java.version>1.8</java.version><scala.version>2.12</scala.version><flink.version>1.17.0</flink.version></properties><dependencies><!-- https://mvnrepository.com/artifact/org.apache.flink/flink-clients --><dependency><groupId>org.apache.flink</groupId><artifactId>flink-clients</artifactId><version>${flink.version}</version><scope>provided</scope></dependency><dependency><groupId>org.apache.flink</groupId><artifactId>flink-java</artifactId><version>${flink.version}</version><scope>provided</scope></dependency><dependency><groupId>org.apache.flink</groupId><artifactId>flink-streaming-java</artifactId><version>${flink.version}</version><scope>provided</scope></dependency><dependency><groupId>org.apache.flink</groupId><artifactId>flink-csv</artifactId><version>${flink.version}</version><scope>provided</scope></dependency><dependency><groupId>org.apache.flink</groupId><artifactId>flink-json</artifactId><version>${flink.version}</version><scope>provided</scope></dependency><!-- flink连接器 --><!-- https://mvnrepository.com/artifact/org.apache.flink/flink-connector-kafka --><dependency><groupId>org.apache.flink</groupId><artifactId>flink-connector-kafka</artifactId><version>${flink.version}</version></dependency></dependencies>

1、Registering metrics 注册指标

通过调用getRuntimeContext().getMetricGroup(),您可以从任何扩展RichFunction的用户函数访问度量系统。此方法返回一个MetricGroup对象,您可以在该对象上创建和注册新度量。

1)、指标类型

Flink支持计数器、仪表盘、柱状图和计量表。Counters, Gauges, Histograms and Meters.

2)、计数器

计数器是用来统计数量的。当前值可以是in-或使用 inc()/inc(long n)或dec()/dec(long n)增减。您可以通过调用MetricGroup上的 counter(String name)来创建和注册计数器。
本示例提供了多种实现方式,供参考。

import org.apache.flink.api.common.RuntimeExecutionMode;
import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.common.functions.RichFlatMapFunction;
import org.apache.flink.api.common.functions.RichMapFunction;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.api.java.tuple.Tuple3;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.metrics.Counter;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.util.Collector;/*** @author alanchan**/
public class TestMetricsDemo {//	public class LineMapper extends RichMapFunction<String, String> {
//		private transient Counter counter;
//
//		@Override
//		public void open(Configuration config) {
//			this.counter = getRuntimeContext().getMetricGroup().counter("result2LineCounter");
//		}
//
//		@Override
//		public String map(String value) throws Exception {
//			this.counter.inc();
//			return value;
//		}
//	}public static void test1() throws Exception {StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();env.setRuntimeMode(RuntimeExecutionMode.AUTOMATIC);env.setParallelism(1);// sourceDataStream<String> lines = env.socketTextStream("192.168.10.42", 9999);// transformationDataStream<Tuple2<String, Integer>> result = lines.flatMap(new FlatMapFunction<String, String>() {@Overridepublic void flatMap(String value, Collector<String> out) throws Exception {String[] arr = value.split(",");for (String word : arr) {out.collect(word);}}}).map(new MapFunction<String, Tuple2<String, Integer>>() {@Overridepublic Tuple2<String, Integer> map(String value) throws Exception {return Tuple2.of(value, 1);}}).keyBy(t -> t.f0).sum(1);//		SingleOutputStreamOperator<Tuple2<Integer, Integer>> result1 = lines.map(new RichMapFunction<String, Tuple2<Integer, Integer>>() {
//
//			@Override
//			public Tuple2<Integer, Integer> map(String value) throws Exception {
//				int subTaskId = getRuntimeContext().getIndexOfThisSubtask();// 子任务id/分区编号
//				return new Tuple2(subTaskId, 1);
//			}
//			// 按照子任务id/分区编号分组,并统计每个子任务/分区中有几个元素
//		}).keyBy(t -> t.f0).sum(1);// RichFlatMapFunction<IN, OUT>// Tuple3<String, Long, Integer> 输入的字符串,行数,统计单词的总数DataStream<Tuple3<String, Long, Integer>> result2 = lines.flatMap(new RichFlatMapFunction<String, Tuple2<String, Long>>() {
//			private transient Counter counter;private long result2LineCounter = 0;@Overridepublic void open(Configuration config) {
//				this.counter = getRuntimeContext().getMetricGroup().counter("result2LineCounter:");result2LineCounter = getRuntimeContext().getMetricGroup().counter("result2LineCounter:").getCount();}@Overridepublic void flatMap(String value, Collector<Tuple2<String, Long>> out) throws Exception {
//				this.counter.inc();result2LineCounter++;System.out.println("计数器行数:" + result2LineCounter);String[] arr = value.split(",");for (String word : arr) {out.collect(Tuple2.of(word, result2LineCounter));}}}).map(new MapFunction<Tuple2<String, Long>, Tuple3<String, Long, Integer>>() {@Overridepublic Tuple3<String, Long, Integer> map(Tuple2<String, Long> value) throws Exception {
//				Tuple3<String, Long, Integer> t = Tuple3.of(value.f0, value.f1, 1);return Tuple3.of(value.f0, value.f1, 1);}}).keyBy(t -> t.f0).sum(2);// sinkresult.print("result:");result2.print("result2:");env.execute();}public static void main(String[] args) throws Exception {test1();
//		StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
//		env.setParallelism(1);
//		DataStream<String> input = env.fromElements("a", "b", "c", "a", "b", "c");
//
//		input.keyBy(value -> value).map(new RichMapFunction<String, String>() {
//			private long count = 0;
//
//			@Override
//			public void open(Configuration parameters) throws Exception {
                super.open(parameters);
//				count = getRuntimeContext().getMetricGroup().counter("myCounter").getCount();
//			}
//
//			@Override
//			public String map(String value) throws Exception {
//				count++;
//				return value + ": " + count;
//			}
//		}).print();
//
//		env.execute("Flink Count Counter Example");}}
///验证数据///
// 输入数据
[alanchan@server2 bin]$ nc -lk 9999
hello,123
alan,flink,good
alan_chan,hi,flink//控制台输出:
计数器行数:1
result:> (hello,1)
result2:> (hello,1,1)
result:> (123,1)
result2:> (123,1,1)
计数器行数:2
result2:> (alan,2,1)
result:> (alan,1)
result2:> (flink,2,1)
result:> (flink,1)
result2:> (good,2,1)
result:> (good,1)
计数器行数:3
result:> (alan_chan,1)
result2:> (alan_chan,3,1)
result:> (hi,1)
result2:> (hi,3,1)
result:> (flink,2)
result2:> (flink,2,2)

或者,您也可以使用自己的Counter实现:

import org.apache.flink.api.common.RuntimeExecutionMode;
import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.common.functions.RichFlatMapFunction;
import org.apache.flink.api.common.functions.RichMapFunction;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.api.java.tuple.Tuple3;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.metrics.Counter;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.util.Collector;/*** @author alanchan**/
public class TestMetricsDemo {public static void test2() throws Exception {StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();env.setRuntimeMode(RuntimeExecutionMode.AUTOMATIC);env.setParallelism(1);// sourceDataStream<String> lines = env.socketTextStream("192.168.10.42", 9999);// transformation// Tuple3<String, Long, Integer> 输入的字符串,行数,统计单词的总数DataStream<Tuple3<String, Long, Integer>> result = lines.flatMap(new RichFlatMapFunction<String, Tuple2<String, Long>>() {private transient Counter counter;@Overridepublic void open(Configuration config) {this.counter = getRuntimeContext().getMetricGroup().counter("result2LineCounter", new AlanCustomCounter());}@Overridepublic void flatMap(String value, Collector<Tuple2<String, Long>> out) throws Exception {this.counter.inc();
//				result2LineCounter++;System.out.println("计数器行数:" + this.counter.getCount());String[] arr = value.split(",");for (String word : arr) {out.collect(Tuple2.of(word, this.counter.getCount()));}}}).map(new MapFunction<Tuple2<String, Long>, Tuple3<String, Long, Integer>>() {@Overridepublic Tuple3<String, Long, Integer> map(Tuple2<String, Long> value) throws Exception {return Tuple3.of(value.f0, value.f1, 1);}}).keyBy(t -> t.f0).sum(2);// sinkresult.print("result:");env.execute();}public static class AlanCustomCounter implements Counter {private long count;@Overridepublic void inc() {count += 2;}@Overridepublic void inc(long n) {count += n;}@Overridepublic void dec() {count -= 2;}@Overridepublic void dec(long n) {count -= n;}@Overridepublic long getCount() {return count;}}public static void main(String[] args) throws Exception {test2();}}///验证数据///
// 输入数据
[alanchan@server2 bin]$ nc -lk 9999
hello,123
alan,flink,good
alan_chan,hi,flink//控制台输出:
计数器行数:2
result:> (hello,2,1)
result:> (123,2,1)
计数器行数:4
result:> (alan,4,1)
result:> (flink,4,1)
result:> (good,4,1)
计数器行数:6
result:> (alan_chan,6,1)
result:> (hi,6,1)
result:> (flink,4,2)

3)、Gauge

仪表可根据需要提供任何类型的值。为了使用Gauge,您必须首先创建一个实现org.apache.flink.metrics.Guge接口的类。返回值的类型没有限制。您可以通过调用MetricGroup上的gauge(String name, Gauge gauge) 来注册gauge。

import org.apache.flink.api.common.RuntimeExecutionMode;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.common.functions.RichFlatMapFunction;
import org.apache.flink.api.common.functions.RichMapFunction;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.api.java.tuple.Tuple3;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.metrics.Counter;
import org.apache.flink.metrics.Gauge;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.util.Collector;/*** @author alanchan**/
public class TestMetricsGaugeDemo {
//	public class MyMapper extends RichMapFunction<String, String> {
//		private transient int valueToExpose = 0;
//
//		@Override
//		public void open(Configuration config) {
//			getRuntimeContext().getMetricGroup().gauge("MyGauge", new Gauge<Integer>() {
//				@Override
//				public Integer getValue() {
//					return valueToExpose;
//				}
//			});
//		}
//
//		@Override
//		public String map(String value) throws Exception {
//			valueToExpose++;
//			return value;
//		}
//	}public static void test1() throws Exception {StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();env.setRuntimeMode(RuntimeExecutionMode.AUTOMATIC);env.setParallelism(1);// sourceDataStream<String> lines = env.socketTextStream("192.168.10.42", 9999);// transformation// RichFlatMapFunction<IN, OUT>// Tuple3<String, String, Integer> 输入的字符串,alan lines[行数],统计单词的总数DataStream<Tuple3<String, String, Integer>> result = lines.flatMap(new RichFlatMapFunction<String, Tuple2<String, String>>() {private long result2LineCounter = 0;private Gauge<String> gauge = null;@Overridepublic void open(Configuration config) {result2LineCounter = getRuntimeContext().getMetricGroup().counter("resultLineCounter:").getCount();gauge = getRuntimeContext().getMetricGroup().gauge("alanGauge", new Gauge<String>() {@Overridepublic String getValue() {return "alan lines[" + result2LineCounter + "]";}});}@Overridepublic void flatMap(String value, Collector<Tuple2<String, String>> out) throws Exception {result2LineCounter++;System.out.println("计数器行数:" + result2LineCounter);String[] arr = value.split(",");for (String word : arr) {out.collect(Tuple2.of(word, gauge.getValue()));}}}).map(new MapFunction<Tuple2<String, String>, Tuple3<String, String, Integer>>() {@Overridepublic Tuple3<String, String, Integer> map(Tuple2<String, String> value) throws Exception {return Tuple3.of(value.f0, value.f1, 1);}}).keyBy(t -> t.f0).sum(2);// sinkresult.print("result:");env.execute();}public static void main(String[] args) throws Exception {test1();}}///验证数据///
// 输入数据
[alanchan@server2 bin]$ nc -lk 9999
hello,123
alan,flink,good
alan_chan,hi,flink//控制台输出:
计数器行数:1
result:> (hello,alan lines[1],1)
result:> (123,alan lines[1],1)
计数器行数:2
result:> (alan,alan lines[2],1)
result:> (flink,alan lines[2],1)
result:> (good,alan lines[2],1)
计数器行数:3
result:> (alan_chan,alan lines[3],1)
result:> (hi,alan lines[3],1)
result:> (flink,alan lines[2],2)

报告器会将暴露的对象转换为String,这意味着需要一个有意义的toString()实现。

4)、Histogram

直方图测量长值的分布。您可以通过调用MetricGroup上的histogram(String name, Histogram histogram) 来注册一个对象。
下面的示例是自己实现的Histogram接口,仅仅用于演示实现过程。

import java.io.Serializable;import org.apache.flink.api.common.RuntimeExecutionMode;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.common.functions.RichFlatMapFunction;
import org.apache.flink.api.common.functions.RichMapFunction;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.api.java.tuple.Tuple3;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.metrics.Gauge;
//import com.codahale.metrics.Histogram;
import org.apache.flink.metrics.Histogram;
import org.apache.flink.metrics.HistogramStatistics;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.util.Collector;/*** @author alanchan**/
public class TestMetricsHistogramDemo {//	public class MyMapper extends RichMapFunction<Long, Long> {
//		private transient Histogram histogram;
//
//		@Override
//		public void open(Configuration config) {
//			this.histogram = getRuntimeContext().getMetricGroup().histogram("alanHistogram", new AlanHistogram());
//		}
//
//		@Override
//		public Long map(Long value) throws Exception {
//			this.histogram.update(value);
//			return value;
//		}
//	}public static class AlanHistogram implements Histogram {private CircularDoubleArray descriptiveStatistics = new CircularDoubleArray(10);;public AlanHistogram() {}public AlanHistogram(int windowSize) {this.descriptiveStatistics = new CircularDoubleArray(windowSize);}@Overridepublic void update(long value) {this.descriptiveStatistics.addValue(value);}@Overridepublic long getCount() {return this.descriptiveStatistics.getElementsSeen();}@Overridepublic HistogramStatistics getStatistics() {
//			return new DescriptiveStatisticsHistogramStatistics(this.descriptiveStatistics);return null;}class CircularDoubleArray implements Serializable {private static final long serialVersionUID = 1L;private final double[] backingArray;private int nextPos = 0;private boolean fullSize = false;private long elementsSeen = 0;CircularDoubleArray(int windowSize) {this.backingArray = new double[windowSize];}synchronized void addValue(double value) {backingArray[nextPos] = value;++elementsSeen;++nextPos;if (nextPos == backingArray.length) {nextPos = 0;fullSize = true;}}synchronized double[] toUnsortedArray() {final int size = getSize();double[] result = new double[size];System.arraycopy(backingArray, 0, result, 0, result.length);return result;}private synchronized int getSize() {return fullSize ? backingArray.length : nextPos;}private synchronized long getElementsSeen() {return elementsSeen;}}}public static void test1() throws Exception {StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();env.setRuntimeMode(RuntimeExecutionMode.AUTOMATIC);env.setParallelism(1);// sourceDataStream<String> lines = env.socketTextStream("192.168.10.42", 9999);// transformation// RichFlatMapFunction<IN, OUT>// Tuple3<String, String, Integer> 输入的字符串,alan lines[行数],统计单词的总数DataStream<Tuple3<String, String, Integer>> result = lines.flatMap(new RichFlatMapFunction<String, Tuple2<String, String>>() {private long result2LineCounter = 0;private Gauge<String> gauge = null;private Histogram histogram = null;;@Overridepublic void open(Configuration config) {result2LineCounter = getRuntimeContext().getMetricGroup().counter("resultLineCounter:").getCount();gauge = getRuntimeContext().getMetricGroup().gauge("alanGauge", new Gauge<String>() {@Overridepublic String getValue() {return "alan lines[" + result2LineCounter + "]";}});this.histogram = getRuntimeContext().getMetricGroup().histogram("alanHistogram", new AlanHistogram());}@Overridepublic void flatMap(String value, Collector<Tuple2<String, String>> out) throws Exception {result2LineCounter++;this.histogram.update(result2LineCounter * 3);// 此处仅仅示例this.histogram.getCount()的值,没有实际的意义System.out.println("计数器行数:" + result2LineCounter + "  histogram:" + this.histogram.getCount());String[] arr = value.split(",");for (String word : arr) {out.collect(Tuple2.of(word, gauge.getValue()));}}}).map(new MapFunction<Tuple2<String, String>, Tuple3<String, String, Integer>>() {@Overridepublic Tuple3<String, String, Integer> map(Tuple2<String, String> value) throws Exception {return Tuple3.of(value.f0, value.f1, 1);}}).keyBy(t -> t.f0).sum(2);// sinkresult.print("result:");env.execute();}public static void main(String[] args) throws Exception {test1();}}///验证数据///
// 输入数据
[alanchan@server2 bin]$ nc -lk 9999
hello,123
alan,flink,good
alan_chan,hi,flink//控制台输出:
计数器行数:1  histogram:1
result:> (hello,alan lines[1],1)
result:> (123,alan lines[1],1)
计数器行数:2  histogram:2
result:> (alan,alan lines[2],1)
result:> (flink,alan lines[2],1)
result:> (good,alan lines[2],1)
计数器行数:3  histogram:3
result:> (alan_chan,alan lines[3],1)
result:> (hi,alan lines[3],1)
result:> (flink,alan lines[2],2)

Flink没有提供直方图的默认实现,但提供了一个允许使用Codahale/DropWizard直方图的包装器。要使用此包装器,
在pom.xml中添加以下依赖项:

<dependency><groupId>org.apache.flink</groupId><artifactId>flink-metrics-dropwizard</artifactId><version>1.17.1</version>
</dependency>

下面的示例是使用 Codahale/DropWizard直方图,如下所示:

import java.io.Serializable;import org.apache.flink.api.common.RuntimeExecutionMode;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.common.functions.RichFlatMapFunction;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.api.java.tuple.Tuple3;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.dropwizard.metrics.DropwizardHistogramWrapper;
import org.apache.flink.metrics.Gauge;
//import com.codahale.metrics.Histogram;
import org.apache.flink.metrics.Histogram;
import org.apache.flink.metrics.HistogramStatistics;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.util.Collector;import com.codahale.metrics.SlidingWindowReservoir;/*** @author alanchan**/
public class TestMetricsHistogramDemo {public static void test2() throws Exception {StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();env.setRuntimeMode(RuntimeExecutionMode.AUTOMATIC);env.setParallelism(1);// sourceDataStream<String> lines = env.socketTextStream("192.168.10.42", 9999);// transformation// RichFlatMapFunction<IN, OUT>// Tuple3<String, String, Integer> 输入的字符串,alan lines[行数],统计单词的总数DataStream<Tuple3<String, String, Integer>> result = lines.flatMap(new RichFlatMapFunction<String, Tuple2<String, String>>() {private long result2LineCounter = 0;private Gauge<String> gauge = null;private Histogram histogram = null;;@Overridepublic void open(Configuration config) {result2LineCounter = getRuntimeContext().getMetricGroup().counter("resultLineCounter:").getCount();gauge = getRuntimeContext().getMetricGroup().gauge("alanGauge", new Gauge<String>() {@Overridepublic String getValue() {return "alan lines[" + result2LineCounter + "]";}});com.codahale.metrics.Histogram dropwizardHistogram = new com.codahale.metrics.Histogram(new SlidingWindowReservoir(500));
//				this.histogram = getRuntimeContext().getMetricGroup().histogram("alanHistogram", new AlanHistogram());this.histogram = getRuntimeContext().getMetricGroup().histogram("alanHistogram", new DropwizardHistogramWrapper(dropwizardHistogram));}@Overridepublic void flatMap(String value, Collector<Tuple2<String, String>> out) throws Exception {result2LineCounter++;this.histogram.update(result2LineCounter * 3);// 此处仅仅示例this.histogram.getCount()的值,没有实际的意义System.out.println("计数器行数:" + result2LineCounter + "  histogram:" + this.histogram.getCount());String[] arr = value.split(",");for (String word : arr) {out.collect(Tuple2.of(word, gauge.getValue()));}}}).map(new MapFunction<Tuple2<String, String>, Tuple3<String, String, Integer>>() {@Overridepublic Tuple3<String, String, Integer> map(Tuple2<String, String> value) throws Exception {return Tuple3.of(value.f0, value.f1, 1);}}).keyBy(t -> t.f0).sum(2);// sinkresult.print("result:");env.execute();}public static void main(String[] args) throws Exception {test2();}}///验证数据///
// 输入数据
[alanchan@server2 bin]$ nc -lk 9999
hello,123
alan,flink,good
alan_chan,hi,flink//控制台输出://控制台输出:
计数器行数:1  histogram:1
result:> (hello,alan lines[1],1)
result:> (123,alan lines[1],1)
计数器行数:2  histogram:2
result:> (alan,alan lines[2],1)
result:> (flink,alan lines[2],1)
result:> (good,alan lines[2],1)
计数器行数:3  histogram:3
result:> (alan_chan,alan lines[3],1)
result:> (hi,alan lines[3],1)
result:> (flink,alan lines[2],2)

5)、Meter

仪表测量平均吞吐量。可以使用markEvent()方法注册事件的发生。可以使用markEvent(long n)方法注册同时发生多个事件。您可以通过在MetricGroup上调用meter(String name, Meter meter)来注册meter。

下面的示例展示了自定义的Meter实现,可能很不严谨,实际上应用更多的是本部分的第二个示例。

import org.apache.flink.api.common.RuntimeExecutionMode;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.common.functions.RichFlatMapFunction;
import org.apache.flink.api.common.functions.RichMapFunction;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.api.java.tuple.Tuple3;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.dropwizard.metrics.DropwizardHistogramWrapper;
import org.apache.flink.metrics.Counter;
import org.apache.flink.metrics.Gauge;
import org.apache.flink.metrics.Histogram;
import org.apache.flink.metrics.Meter;
import org.apache.flink.metrics.SimpleCounter;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.util.Collector;//import com.codahale.metrics.Meter;
import com.codahale.metrics.SlidingWindowReservoir;/*** @author alanchan**/
public class TestMetricsMeterDemo {public class MyMapper extends RichMapFunction<Long, Long> {private transient Meter meter;@Overridepublic void open(Configuration config) {this.meter = getRuntimeContext().getMetricGroup().meter("myMeter", new AlanMeter());}@Overridepublic Long map(Long value) throws Exception {this.meter.markEvent();return value;}}public static class AlanMeter implements Meter {/** The underlying counter maintaining the count. */private final Counter counter = new SimpleCounter();;/** The time-span over which the average is calculated. */private final int timeSpanInSeconds = 0;/** Circular array containing the history of values. */private final long[] values = null;;/** The index in the array for the current time. */private int time = 0;/** The last rate we computed. */private double currentRate = 0;@Overridepublic void markEvent() {this.counter.inc();}@Overridepublic void markEvent(long n) {this.counter.inc(n);}@Overridepublic long getCount() {return counter.getCount();}@Overridepublic double getRate() {return currentRate;}public void update() {time = (time + 1) % values.length;values[time] = counter.getCount();currentRate = ((double) (values[time] - values[(time + 1) % values.length]) / timeSpanInSeconds);}}public static void test1() throws Exception {StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();env.setRuntimeMode(RuntimeExecutionMode.AUTOMATIC);env.setParallelism(1);// sourceDataStream<String> lines = env.socketTextStream("192.168.10.42", 9999);// transformation// RichFlatMapFunction<IN, OUT>// Tuple3<String, String, Integer> 输入的字符串,alan lines[行数],统计单词的总数DataStream<Tuple3<String, String, Integer>> result = lines.flatMap(new RichFlatMapFunction<String, Tuple2<String, String>>() {private long result2LineCounter = 0;private Gauge<String> gauge = null;private Histogram histogram = null;private Meter meter;@Overridepublic void open(Configuration config) {result2LineCounter = getRuntimeContext().getMetricGroup().counter("resultLineCounter:").getCount();gauge = getRuntimeContext().getMetricGroup().gauge("alanGauge", new Gauge<String>() {@Overridepublic String getValue() {return "alan lines[" + result2LineCounter + "]";}});com.codahale.metrics.Histogram dropwizardHistogram = new com.codahale.metrics.Histogram(new SlidingWindowReservoir(500));this.histogram = getRuntimeContext().getMetricGroup().histogram("alanHistogram", new DropwizardHistogramWrapper(dropwizardHistogram));this.meter = getRuntimeContext().getMetricGroup().meter("alanMeter", new AlanMeter());}@Overridepublic void flatMap(String value, Collector<Tuple2<String, String>> out) throws Exception {result2LineCounter++;this.histogram.update(result2LineCounter * 3);this.meter.markEvent();// 此处仅仅示例this.histogram.getCount()、this.meter.getRate()的值,没有实际的意义,具体使用以实际使用场景为准System.out.println("计数器行数:" + result2LineCounter + ",  histogram:" + this.histogram.getCount() + ",   meter.getRate:" + this.meter.getRate());String[] arr = value.split(",");for (String word : arr) {out.collect(Tuple2.of(word, gauge.getValue()));}}}).map(new MapFunction<Tuple2<String, String>, Tuple3<String, String, Integer>>() {@Overridepublic Tuple3<String, String, Integer> map(Tuple2<String, String> value) throws Exception {return Tuple3.of(value.f0, value.f1, 1);}}).keyBy(t -> t.f0).sum(2);// sinkresult.print("result:");env.execute();}public static void main(String[] args) throws Exception {test1();}}///验证数据///
// 输入数据
[alanchan@server2 bin]$ nc -lk 9999
hello,123
alan,flink,good
alan_chan,hi,flink//控制台输出:
计数器行数:1,  histogram:1,   meter.getRate:0.0
result:> (hello,alan lines[1],1)
result:> (123,alan lines[1],1)
计数器行数:2,  histogram:2,   meter.getRate:0.0
result:> (alan,alan lines[2],1)
result:> (flink,alan lines[2],1)
result:> (good,alan lines[2],1)
计数器行数:3,  histogram:3,   meter.getRate:0.0
result:> (alan_chan,alan lines[3],1)
result:> (hi,alan lines[3],1)
result:> (flink,alan lines[2],2)

Flink提供了一个允许使用Codahale/DropWizard仪表的包装器。要使用此包装器,
在pom.xml中添加以下依赖项:

<dependency><groupId>org.apache.flink</groupId><artifactId>flink-metrics-dropwizard</artifactId><version>1.17.1</version>
</dependency>

下面使用Codahale/DropWizard注册的示例,如下所示:

import org.apache.flink.api.common.RuntimeExecutionMode;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.common.functions.RichFlatMapFunction;
import org.apache.flink.api.common.functions.RichMapFunction;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.api.java.tuple.Tuple3;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.dropwizard.metrics.DropwizardHistogramWrapper;
import org.apache.flink.dropwizard.metrics.DropwizardMeterWrapper;
import org.apache.flink.metrics.Counter;
import org.apache.flink.metrics.Gauge;
import org.apache.flink.metrics.Histogram;
import org.apache.flink.metrics.Meter;
import org.apache.flink.metrics.SimpleCounter;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.util.Collector;//import com.codahale.metrics.Meter;
import com.codahale.metrics.SlidingWindowReservoir;/*** @author alanchan**/
public class TestMetricsMeterDemo {public static void test2() throws Exception {StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();env.setRuntimeMode(RuntimeExecutionMode.AUTOMATIC);env.setParallelism(1);// sourceDataStream<String> lines = env.socketTextStream("192.168.10.42", 9999);// transformation// RichFlatMapFunction<IN, OUT>// Tuple3<String, String, Integer> 输入的字符串,alan lines[行数],统计单词的总数DataStream<Tuple3<String, String, Integer>> result = lines.flatMap(new RichFlatMapFunction<String, Tuple2<String, String>>() {private long result2LineCounter = 0;private Gauge<String> gauge = null;private Histogram histogram = null;private Meter meter;@Overridepublic void open(Configuration config) {result2LineCounter = getRuntimeContext().getMetricGroup().counter("resultLineCounter:").getCount();gauge = getRuntimeContext().getMetricGroup().gauge("alanGauge", new Gauge<String>() {@Overridepublic String getValue() {return "alan lines[" + result2LineCounter + "]";}});com.codahale.metrics.Histogram dropwizardHistogram = new com.codahale.metrics.Histogram(new SlidingWindowReservoir(500));this.histogram = getRuntimeContext().getMetricGroup().histogram("alanHistogram", new DropwizardHistogramWrapper(dropwizardHistogram));//				this.meter = getRuntimeContext().getMetricGroup().meter("alanMeter", new AlanMeter());com.codahale.metrics.Meter dropwizardMeter = new com.codahale.metrics.Meter();this.meter = getRuntimeContext().getMetricGroup().meter("alanMeter", new DropwizardMeterWrapper(dropwizardMeter));}@Overridepublic void flatMap(String value, Collector<Tuple2<String, String>> out) throws Exception {result2LineCounter++;this.histogram.update(result2LineCounter * 3);this.meter.markEvent();// 此处仅仅示例this.histogram.getCount()、this.meter.getRate()的值,没有实际的意义,具体使用以实际使用场景为准System.out.println("计数器行数:" + result2LineCounter + ",  histogram:" + this.histogram.getCount() + ",   meter.getRate:" + this.meter.getRate());String[] arr = value.split(",");for (String word : arr) {out.collect(Tuple2.of(word, gauge.getValue()));}}}).map(new MapFunction<Tuple2<String, String>, Tuple3<String, String, Integer>>() {@Overridepublic Tuple3<String, String, Integer> map(Tuple2<String, String> value) throws Exception {return Tuple3.of(value.f0, value.f1, 1);}}).keyBy(t -> t.f0).sum(2);// sinkresult.print("result:");env.execute();}public static void main(String[] args) throws Exception {test2();}}//控制台输出:
计数器行数:1,  histogram:1,   meter.getRate:0.0
result:> (hello,alan lines[1],1)
result:> (123,alan lines[1],1)
计数器行数:2,  histogram:2,   meter.getRate:0.0
result:> (alan,alan lines[2],1)
result:> (flink,alan lines[2],1)
result:> (good,alan lines[2],1)
计数器行数:3,  histogram:3,   meter.getRate:0.0
result:> (alan_chan,alan lines[3],1)
result:> (hi,alan lines[3],1)
result:> (flink,alan lines[2],2)

以上,本文简单的介绍了Flink 的指标体系的第一部分,即指标类型以及四种类型的代码实现示例。
本专题分为三部分,即:
45、Flink 的指标体系介绍及验证(1)-指标类型及指标实现示例
45、Flink 的指标体系介绍及验证(2)-指标的scope、报告、系统指标以及追踪、api集成示例和dashboard集成
45、Flink 的指标体系介绍及验证(3)- 完整版

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/168991.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

Python是个什么鬼?朋友靠它拿了5个offer

闺蜜乐乐&#xff0c;外院科班出身&#xff0c;手持专八和CATTI证书&#xff0c;没想到找工作时却碰了钉子… 半夜12点&#xff0c;乐乐跟我开启了吐槽模式&#xff1a; 拿到offer的都是小公司的翻译活儿&#xff0c;只能糊个口。稍微好点的平台要求可就多了&#xff0c;不仅语…

apple macbook M系列芯片安装 openJDK17

文章目录 1. 查找openjdk版本2. 安装openjdk3. 多jdk之间的切换 在这里我们使用 brew 命令查找并安装。 1. 查找openjdk版本 执行&#xff1a;brew search openjdk&#xff0c;注意&#xff1a;执行命令后&#xff0c;如果得到的结果中没有红框内容&#xff0c;则需要更新一下…

JSP EL表达式之 empty

好 本文我们还是继续说EL表达式 我们来讲一个非空判断的好手 empty 我们直接编写代码如下 <% page contentType"text/html; charsetUTF-8" pageEncoding"UTF-8" %> <%request.setCharacterEncoding("UTF-8");%> <!DOCTYPE html&…

Error querying database. Cause: java.lang.reflect.InaccessibleObjectException:

最近开发过程中&#xff0c;居然碰到了一个Arrays.asList的错&#xff0c;怎么个场景呢&#xff1f;传参一个用固定符号拼接的字符串&#xff0c;需要转成集合然后再myBatis里in判断。然后就报错了。 一、代码层面 service层面&#xff1a; shortDetailUrlList Arrays.asLi…

基于java实现捕鱼达人游戏

开发工具eclipse,jdk1.8 文档截图&#xff1a; package com.qd.fish;import java.awt.Graphics; import java.awt.image.BufferedImage; import java.util.Random;public class Fish {//定义鱼的图片BufferedImage fishImage;//定义鱼的数组帧BufferedImage[] fishFrame;//…

JSP JSTL引入依赖并演示基础使用

然后 我们来讲 JSTL Java server pages standarded tag library 简称 JSTL 这是 一个 JSP的标准标签库 JSP标准标签的集合 封装了JSP中的通用核心功能 根据JSTL类库提供的标签 可以将他分为5个类 1 核心标签 2 格式化标签 3 SQL标签 4 XML标签 5 函数标签 这边 我们主要将 核…

Leetcode201. 数字范围按位与

Every day a Leetcode 题目来源&#xff1a;201. 数字范围按位与 最直观的解决方案就是迭代范围内的每个数字&#xff0c;依次执行按位与运算&#xff0c;得到最终的结果&#xff0c;但此方法在 [left, right] 范围较大的测试用例中会因超出时间限制而无法通过&#xff0c;因…

机器学习与因果推断的高级实践 | 数学建模

文章目录 因果推断因果推断的前世今生&#xff08;1&#xff09;潜在结果框架&#xff08;Potential Outcome Framework&#xff09;&#xff08;2&#xff09;结构因果模型&#xff08;Structual Causal Model&#xff0c;SCM&#xff09; 身处人工智能爆发式增长时代的机器学…

使用 Q 学习寻找最短路径

使用 Q 学习寻找最短路径 Q 学习是一种无模型的强化学习算法,可以用于在马尔可夫决策过程(MDP)建模的环境中找到最优策略和最短路径。在这篇博客文章中,我将解释 Q 学习的工作原理,并展示在网格世界环境中寻找最短路径的实现。 理解 Q 学习算法 Q 学习的目标是学习一个最大化总…

腾讯云运维工程师认证TCA

1、TSF控制台的配置中心&#xff0c;目前支持以下哪一种格式的配置文件? JSON格式 XML格式 YAML格式 Properties配置格式 正确答案&#xff1a;C 解答&#xff1a;无 2、传统应用开发采用瀑布开发模型&#xff0c;瀑布开发模型将软件生命周期划分如下六个基本活动:1.编码;2…

【腾讯云云上实验室-向量数据库】用向量数据库——实现高效文本检索功能

文章目录 前言Tencent Cloud VectorDB 简介Tencent Cloud VectorDB 使用实战申请腾讯云向量数据库腾讯云向量数据库使用步骤腾讯云向量数据库实现文本检索 结论和建议 前言 想必各位开发者一定使用过关系型数据库MySQL去存储我们的项目的数据&#xff0c;也有部分人使用过非关…

牛客 算法 HJ103 Redraiment的走法 golang语言实现

题目 HJ103 Redraiment的走法 实现 package mainimport ("bufio""fmt""os""strconv""strings" )func main() {scanner : bufio.NewScanner(os.Stdin)nums : make([]int, 0)nums_len:0dp:make([]int, 0)for scanner.Scan()…

[JVM] 京东一面~说一下Java 类加载过程

系统加载 Class 类型的文件主要三步&#xff1a;加载->连接->初始化。连接过程又可分为三步&#xff1a;验证->准备->解析。 通过全限定名来加载生成 class 对象到内存中&#xff0c;然后进行验证这个 class 文件&#xff0c;包括文件格式校验、元数据验证&#xf…

crontab 定时检测 Tomcat 状态脚本实现及注意事项

背景 Jenkins 所在的 Tomcat 总是莫名挂掉&#xff0c;虽然任务配置了 NOKILLME 参数&#xff0c;而且并不是总是发生在编译完成后才挂的。怀疑是机器资源不足导致的&#xff0c;没有依据。最简单的办法是创建一个定时任务&#xff0c;检测 Tomcat 状态&#xff0c;不见了就拉…

【Spring集成MyBatis】MyBatis的多表查询

文章目录 1. 一对一什么是一对一User、Order类及Mapper&#xff0c;User、Order表一对一操作的实现一对一操作实现的第二种方式 2. 一对多什么是一对多一对多操作实现 3. 多对多什么是多对多多对多的实现 4. 小结 1. 一对一 什么是一对一 一对一指的是表与表之间通过外键进行…

Python 在 JMeter 中如何使用?

要在JMeter中使用Python&#xff0c;需要使用JSR223 Sampler元素来执行Python脚本。使用JSR223 Sampler执行Python脚本时&#xff0c;需要确保已在JMeter中配置了Python解释器&#xff0c;并设置了正确的环境路径。 1、确保JMeter已安装Python解释器&#xff0c;并将解释器的路…

ubuntu 编译linux webrtc库

ubuntu 编译linux webrtc库 安装依赖 sudo apt-get update sudo apt-get install build-essential libssl-dev libasound2-dev libpulse-dev libjpeg-dev libxv-dev libgtk-3-dev libxtst-dev libwebp-dev sudo apt-get install libglib2.0-dev sudo apt-get update 报错时出…

css Vue尺子样式

原生css生成尺子样式 <template><div class"page"><div class"Light"></div><div class"rile"><ul id"list"><!--尺子需要几个单位就加几个--><li></li><li></li&…

C++ 多态案例三-电脑组装

案例描述&#xff1a; 电脑主要组成部件为 CPU&#xff08;用于计算&#xff09;&#xff0c;显卡&#xff08;用于显示&#xff09;&#xff0c;内存条&#xff08;用于存储&#xff09; 将每个零件封装出抽象基类&#xff0c;并且提供不同的厂商生产不同的零件&#xff0c;…

【实战】K8S Helm部署Redis Cluster Redisinsight

文章目录 前言部署Redis Cluster安装Redis Insight写在最后 前言 在Web服务的开发过程中&#xff0c;Redis一直以来都有着举足轻重的作用。基本上所有的后端服务都会用这个中间件实现具体的业务场景&#xff0c;比如常作为系统缓存、分布式锁&#xff0c;也可以实现排名、定位…