Telegraf 是一个用 Go 编写的代理程序,是收集和报告指标和数据的代理。可收集系统和服务的统计数据,并写入到 InfluxDB 数据库。Telegraf 具有内存占用小的特点,通过插件系统开发人员可轻松添加支持其他服务的扩展。
Telegraf是TICK Stack的一部分,是一个插件驱动的服务器代理,用于收集和报告指标。
Telegraf 集成了直接从其运行的容器和系统中提取各种指标,事件和日志,从第三方API提取指标,甚至通过StatsD和Kafka消费者服务监听指标。
它还具有输出插件,可将指标发送到各种其他数据存储,服务和消息队列,包括InfluxDB,Graphite,OpenTSDB,Datadog,Librato,Kafka,MQTT,NSQ等等。
Telegraf作为数据采集模块,需要安装至被监控的目标主机上。Telegraf设计目标是较小的内存使用,通过插件来构建各种服务和第三方组件的metrics收集
Telegraf由4个独立的插件驱动:
- Input Plugins
输入插件,收集系统、服务、第三方组件的数据。 - Processor Plugins
处理插件,转换、处理、过滤数据。 - Aggregator Plugins
聚合插件,数据特征聚合。 - Output Plugins
输出插件,写metrics数据。
在平台监控系统中,可以使用 Telegraf 采集多种组件的运行信息,而不需要自己手写脚本定时采集,大大降低数据获取的难度;且 Telegraf 配置极为简单,只要有基本的 Linux 基础即可快速上手。Telegraf 按照时间序列采集数据,数据结构中包含时序信息,时序数据库就是为此类数据设计而来,使用 Influxdb 可以针采集得到的数据完成各种分析计算操作。
telegraf接入influxdb
[root@node1 ~]# wget http://get.influxdb.org/telegraf/telegraf-0.11.1-1.x86_64.rpm [root@node1 ~]# rpm -ivh telegraf-0.11.1-1.x86_64.rpm [root@node1 ~]# systemctl start telegraf
[root@node1 ~]# vim /etc/telegraf/telegraf.conf## 修改内容如下:
[agent]
## Default data collection interval for all inputs
interval = "10s"
[[inputs.cpu]]
## no configuration options required
[[inputs.mem]]
## no configuration options required
[[outputs.influxdb]]urls = ["http://localhost:8086"] # required database = "telegraf" # requiredretention_policy = ""precision = "s"timeout = "5s"username = "telegraf"password = "password" [root@node1 ~]# systemctl restart telegraf
docker-compose.yml
version: '3' services:telegraf:image: telegrafvolumes:- ./telegraf.conf:/etc/telegraf/telegraf.conf:ro
添加各种设备(SNMP、HTTP、MQTT、OPC UA)并扩展Telegraf配置:
[[inputs.opcua]]name = "opcua"endpoint = "opc.tcp://opcua_server:4840/freeopcua/server/"security_policy = "None"security_mode = "None"auth_method = "Anonymous"nodes = [{ name = "status", namespace = "2", identifier_type = "i", identifier = "2" }][[inputs.http_response]]interval = "10s"name_override = "http_metrics"urls = ["http://http_api:8080/metrics"]method = "GET"response_timeout = "5s"data_format = "json"[[inputs.snmp]]agents = ["snmp_agent"]version = 2community = "public"interval = "10s"timeout = "5s"[[inputs.snmp.field]]name = "cpu"oid = "1.3.6.1.4.1.2021.11.11.0"[[inputs.snmp.field]]name = "memory"oid = "1.3.6.1.4.1.2021.4.6.0"[[inputs.mqtt_consumer]]servers = ["tcp://mqtt_broker:1883"]topics = ["sensor/cpu/#", "sensor/mem/#"]data_format = "value"data_type = "float"
# 从RabbitMQ Management API获取指标
[[inputs.rabbitmq]]## RabbitMQ Management API 的URLurl = "http://localhost:15672"username = "telegraf"password = "your_secure_password"## 要采集的指标类型# 采集节点指标(内存、磁盘等)collect = ["connections", "queues", "exchange", "node", "overview"]## 可选:只监控特定的队列,使用正则匹配# queues_include = [".*"] # 监控所有队列# queues_include = ["important_queue", "task_.*"] # 监控指定队列[inputs.rabbitmq.tags]environment = "staging"source = "rabbitmq-cluster"
# 从Kafka主题消费消息
[[inputs.kafka_consumer]]## Kafka Broker 地址brokers = ["localhost:9092"]## 要消费的主题列表topics = ["app_metrics", "server_stats"]## 消费者组ID,用于偏移量管理consumer_group = "telegraf_consumers"## 数据格式:Kafka消息是二进制的,需要告诉Telegraf如何解析## 假设你的Kafka消息是JSON格式的InfluxDB Line Protocoldata_format = "influx"## 如果消息是JSON格式,但不是Line Protocol,可以这样解析:# data_format = "json"# json_query = "" # 如果JSON不是数组,需要指定一个查询来定位数据# tag_keys = ["host", "region"] # 指定哪些JSON字段作为Tag# json_string_fields = ["message"] # 指定哪些字段作为String类型字段## 可选:连接Kafka的认证信息(如果Kafka需要SASL/SSL)# sasl_username = "kafka-user"# sasl_password = "kafka-password"[inputs.kafka_consumer.tags]source = "kafka-cluster-1"data_topic = "app_metrics"
# 从PostgreSQL服务器获取指标
[[inputs.postgresql]]## 指定连接地址,可以同时监控多个数据库实例address = "host=localhost user=telegraf password=your_secure_password sslmode=disable"## 可选:指定要连接的数据库,默认为 'postgres'# databases = ["app_db", "postgres"]## 要采集的指标# 采集数据库大小、表统计、连接数等# 采集详细的查询统计(需要pg_stat_statements)[[inputs.postgresql.query]]query="SELECT * FROM pg_stat_database"measurement="pg_stat_database"[[inputs.postgresql.query]]query="SELECT * FROM pg_stat_statements"measurement="pg_stat_statements"# 这个查询可能返回大量数据,建议启用标签限制# withdbname = false# taglimit = 10[inputs.postgresql.tags]environment = "production"source = "postgres-primary"
# 读取MySQL的指标和统计信息
[[inputs.mysql]]## 指定MySQL服务器连接地址,%s会被替换为下面的数据库名servers = ["tcp(127.0.0.1:3306)/"]## 步骤1中创建的监控用户和密码username = "telegraf"password = "your_secure_password"## 要采集的指标列表# 采集全局状态metric_types = ["global_status", "innodb_metrics", "binary_logs", "table_schema", "user_statistics"]# 可选:指定要监控的数据库,如果为空则监控所有# databases = ["app_db", "test_db"]# 表架构指标采集的时间间隔(较慢,可以设置长一些)interval_slow = "30m"## 可选:添加标签,便于在InfluxDB中筛选[inputs.mysql.tags]environment = "production"source = "mysql-primary"
[[inputs.postgresql_extensible]]address = "host=localhost user=telegraf password=your_secure_password dbname=your_database_name"## 自定义查询 1:订单统计[[inputs.postgresql_extensible.query]]measurement = "order_metrics"sql = """SELECT COUNT(*) as total_orders,SUM(amount) as daily_revenue,COUNT(CASE WHEN status = 'pending' THEN 1 END) as pending_orders,date_trunc('hour', created_at) as timeFROM orders WHERE created_at >= NOW() - INTERVAL '1 hour'GROUP BY time"""# 将 'time' 字段作为时间戳timestamp = "time"## 自定义查询 2:产品库存监控[[inputs.postgresql_extensible.query]]measurement = "inventory"sql = """SELECT product_id,product_name,quantity_in_stock,(quantity_in_stock < low_stock_threshold) as is_low_stockFROM products"""# 不指定 timestamp,使用 Telegraf 采集时间作为时间戳
[[inputs.mysql]]servers = ["tcp(localhost:3306)/your_database_name"]username = "telegraf"password = "your_secure_password"# 禁用默认的监控指标采集(可选,如果只想采集业务数据)# metric_types = []## 自定义指标查询[[inputs.mysql.metric_query]]# 查询名称,会作为 measurement 名称measurement = "user_metrics"# 自定义 SQL 查询query = """SELECT COUNT(*) as total_users,COUNT(CASE WHEN created_at >= CURDATE() THEN 1 END) as new_users_today,DATE(created_at) as dateFROM users GROUP BY DATE(created_at)"""# 指定时间戳字段(可选)# timestamp = "date"
[[outputs.influxdb_v2]]urls = ["http://influxdb:8086"]token = "replace-with-your-own-token"organization = "test-org"bucket = "metrics"
Telegraf 是 InfluxData 平台的核心数据采集组件。它本身不是一个数据源,而是一个拥有超过200个插件的采集器,可以从海量的数据源中拉取或接收数据,然后写入到 InfluxDB。
-
接入方式:通过配置 Telegraf 的 Input Plugins 和 Output Plugins。
-
支持的数据源(通过Telegraf):
-
系统指标:CPU、内存、磁盘、网络、进程(通过
cpu
,mem
,disk
,net
等插件)。 -
数据库:MySQL, PostgreSQL, MongoDB, Redis, Elasticsearch, SQL Server, Oracle 等。
-
消息队列:Kafka, RabbitMQ, MQTT(也可作为输入)。
-
云服务:AWS CloudWatch, Google Cloud Monitoring, Azure Monitor。
-
容器与编排:Docker, Kubernetes。
-
日志文件:通过
tail
插件读取日志文件。 -
API 数据:通过
http
插件从任何提供 JSON/XML 等格式的 REST API 拉取数据。 -
网络设备:通过
snmp
插件采集网络设备指标。 -
硬件传感器:通过
sensors
插件读取主板传感器数据。
-
特性 | Telegraf | Logstash |
---|---|---|
核心定位 | 指标收集代理 | 日志处理管道 |
开发背景 | InfluxData(时间序列数据库厂商) | Elastic(搜索和分析引擎厂商) |
架构设计 | 基于插件的代理,轻量级 | 基于 JVM 的完整处理管道 |
资源消耗 | 低内存(通常 10-50MB) | 高内存(通常 500MB-1GB+) |
性能 | 高吞吐,低延迟 | 中等吞吐,处理能力强 |
数据模型 | 主要为指标和时序数据 | 主要为日志和事件数据 |
配置复杂度 | 简单直观的 TOML 配置 | 灵活的 Ruby DSL 配置 |
Telegraf:
-
为时间序列数据而生,专门优化用于指标收集
-
与 InfluxDB 紧密集成,但支持多种输出
-
"Batteries included" 理念 - 开箱即用
Logstash:
-
为日志处理而生,是 ELK/ELK Stack 的核心组件
-
强调数据的解析、转换和丰富
-
"Pipeline" 理念 - 灵活可扩展的数据管道