使用 SPL 高效实现 Flink SLS Connector 下推

作者:潘伟龙(豁朗)

背景

日志服务 SLS 是云原生观测与分析平台,为 Log、Metric、Trace 等数据提供大规模、低成本、实时的平台化服务,基于日志服务的便捷的数据接入能力,可以将系统日志、业务日志等接入 SLS 进行存储、分析;阿里云 Flink 是阿里云基于 Apache Flink 构建的大数据分析平台,在实时数据分析、风控检测等场景应用广泛。阿里云 Flink 原生支持阿里云日志服务 SLS 的 Connector,可以在阿里云 Flink 平台将 SLS 作为源表或者结果表使用。

在阿里云 Flink 配置 SLS 作为源表时,默认会消费 SLS 的 Logstore 数据进行动态表的构建,在消费的过程中,可以指定起始时间点,消费的数据也是指定时间点以后的全量数据;在特定场景中,往往只需要对某类特征的日志或者日志的某些字段进行分析处理,此类需求可以通过 Flink SQL 的 WHERE 和 SELECT 完成,这样做有两个问题:

1)Connector 从源头拉取了过多不必要的数据行或者数据列造成了网络的开销;

2)这些不必要的数据需要在 Flink 中进行过滤投影计算,这些清洗工作并不是数据分析的关注的重点,造成了计算的浪费。

对于这种场景,有没有更好的办法呢?

答案是肯定的,SLS 推出了 SPL 语言, 可以高效的对日志数据的清洗,加工。 这种能力也集成在了日志消费场景,包括阿里云 Flink 中 SLS Connector,通过配置 SLS SPL 即可实现对数据的清洗规则,在减少网络传输的数据量的同时,也可以减少 Flink 端计算消耗。

接下来对 SPL 及 SPL 在阿里云 Flink SLS Connector 中应用进行介绍及举例。

SLS SPL 介绍

图片

SLS SPL 是日志服务推出的一款针对弱结构化的高性能日志处理语言,可以同时在 Logtail 端、查询扫描、流式消费场景使用,具有交互式、探索式、使用简洁等特点。

SPL 基本语法如下:

<data-source> 
| <spl-cmd> -option=<option> -option ... <expression>, ... as <output>, ...
| <spl-cmd> ...
| <spl-cmd> ...

< spl-cmd > 是 SPL 指令,支持行过滤、列扩展、列裁剪、正则取值、字段投影、数值计算、JSON、CSV 等半结构化数据处理,具体参考 SPL 指令 [ 1] 介绍,具体来说包括:

结构化数据 SQL 计算指令:

支持行过滤、列扩展、数值计算、SQL 函数调用

  • extend 通过 SQL 表达式计算结果产生新字段
  • where 根据 SQL 表达式计算结果过滤数据条目
*
| extend latency=cast(latency as BIGINT)
| where status='200' AND latency>100

字段操作指令:

支持字段投影、字段重名、列裁剪

  • project 保留与给定模式相匹配的字段、重命名指定字段
  • project-away 保留与给定模式相匹配的字段、重命名指定字段
  • project-rename 重命名指定字段,并原样保留其他所有字段
*
| project-away -wildcard "__tag__:*"
| project-rename __source__=remote_addr

非结构化数据提取指令:

支持 JSON、正则、CSV 等非结构化字段值处理

  • parse-regexp 提取指定字段中的正则表达式分组匹配信息
  • parse-json 提取指定字段中的第一层 JSON 信息
  • parse-csv 提取指定字段中的 CSV 格式信息
*
| parse-csv -delim='^_^' content as time, body
| parse-regexp body, '(\S+)\s+(\w+)' as msg, user

SPL 在 Flink SLS Connector 中的原理介绍

阿里云 Flink 支持 SLS Connector,通过 SLS Connector 实时拉取 SLS 中 Logstore 的数据,分析后的数据也可以实时写入 SLS,作为一个高性能计算引擎,Flink SQL 也在越来越广泛的应用在 Flink 计算中,借助 SQL 语法可以对结构化的数据进行分析。

在 SLS Connector 中,可以配置日志字段为 Flink SQL 中的 Table 字段,然后基于 SQL 进行数据分析;在未支持 SPL 配置之前,SLS Connector 会实时消费全量的日志数据到 Flink 计算平台,当前消费方式有如下特点:

  • 在 Flink 中计算的往往不需要所有的日志行,比如在安全场景中,可能仅需要符合某种特征的数据,需要进行日志进行过滤,事实上不需要的日志行也会被拉取,造成网络带宽的浪费。
  • 在 Flink 中计算的一般是特定的字段列,比如在 Logstore 中有 30 个字段,真正需要在 Flink 计算的可能仅有 10 个字段,全字段的拉取造成了网络带宽的浪费。

在以上场景中,可能会增加并不需要的网络流量和计算开销,基于这些特点,SLS 将 SPL 的能力集成到 SLS Connector 的新版本中,可以实现数据在到达 Flink 之前已经进行了行过滤和列裁剪,这些预处理能力内置在 SLS 服务端,可以达到同时节省网络流量与 Flink 计算(过滤、列裁剪)开销的目的。

原理对比

  • 未配置 SPL 语句时:Flink 会拉取 SLS 的全量日志数据(包含所有列、所有行)进行计算,如图 1。
  • 配置 SPL 语句时:SPL 可以对拉取到的数据如果 SPL 语句包含过滤及列裁剪等,Flink 拉取到的是进行过滤和列裁剪后部分数据进行计算,如图 2。

图片

在 Flink 中使用 SLS SPL

接下来以一个 Nginx 日志为例,来介绍基于 SLS SPL 的能力来使用 Flink。为了便于演示,这里在 Flink 控制台配置 SLS 的源表,然后开启一个连续查询以观察效果。在实际使用过程中,可以直接修改 SLS 源表,保留其余分析和写出逻辑。

接下来介绍下阿里云 Flink 中使用 SPL 实现行过滤与列裁剪功能。

在 SLS 准备数据

  • 开通 SLS,在 SLS 创建 Project,Logstore,并创建具有消费 Logstore 的权限的账号 AK/SK。
  • 当前 Logstore 数据使用 SLS 的的 SLB 七层日志模拟接入方式产生模拟数据,其中包含 10 多个字段。

图片

模拟接入会持续产生随机的日志数据,日志内容示例如下:

{"__source__": "127.0.0.1","__tag__:__receive_time__": "1706531737","__time__": "1706531727","__topic__": "slb_layer7","body_bytes_sent": "3577","client_ip": "114.137.195.189","host": "www.pi.mock.com","http_host": "www.cwj.mock.com","http_user_agent": "Mozilla/5.0 (Windows NT 6.2; rv:22.0) Gecko/20130405 Firefox/23.0","request_length": "1662","request_method": "GET","request_time": "31","request_uri": "/request/path-0/file-3","scheme": "https","slbid": "slb-02","status": "200","upstream_addr": "42.63.187.102","upstream_response_time": "32","upstream_status": "200","vip_addr": "223.18.47.239"
}

Logstore 中 slbid 字段有两种值:slb-01 和 slb-02,对 15 分钟的日志数据进行 slbid 统计,可以发现 slb-01 与 slb-02 数量相当。

图片

行过滤场景

在数据处理中过滤数据是一种常见需求,在 Flink 中可以使用 filter 算子或者 SQL 中的 where 条件进行过滤,使用非常方便;但是在 Flink 使用 filter 算子,往往意味着数据已经通过网络进入 Flink 计算引擎中,全量的数据会消耗着网络带宽和 Flink 的计算性能,这种场景下,SLS SPL 为 Flink SLS Connector 提供了一种支持过滤“下推”的能力,通过配置 SLS Connector 的 query 语句中,过滤条件,即可实现过滤条件下推。避免全量数据传输和全量数据过滤计算。

图片

创建 SQL 作业

在阿里云 Flink 控制台创建一个空白的 SQL 的流作业草稿,点击下一步,进入作业编写。

图片

在作业草稿中输入如下创建临时表的语句:

CREATE TEMPORARY TABLE sls_input(request_uri STRING,scheme STRING,slbid STRING,status STRING,`__topic__` STRING METADATA VIRTUAL,`__source__` STRING METADATA VIRTUAL,`__timestamp__` STRING METADATA VIRTUAL,__tag__ MAP<VARCHAR, VARCHAR> METADATA VIRTUAL,proctime as PROCTIME()
) WITH ('connector' = 'sls','endpoint' ='cn-beijing-intranet.log.aliyuncs.com','accessId' = '${ak}','accessKey' = '${sk}','starttime' = '2024-01-21 00:00:00','project' ='${project}','logstore' ='test-nginx-log','query' = '* | where slbid = ''slb-01'''
);
  • 这里为了演示方便,仅设置 request_uri、scheme、slbid、status 和一些元数据字段作为表字段。
  • a k 、 {ak}、 ak{sk}、${project} 替换为具有 Logstore 消费权限的账号。
  • endpoint:填写同地域的 SLS 的私网地址。
  • query:填写 SLS 的 SPL 语句,这里填写了 SPL 的过滤语句:* | where slbid = ‘‘slb-01’’,注意在阿里云 Flink 的 SQL 作业开发中,字符串需要使用英文单引号进行转义。

连续查询及效果

在作业中输入分析语句,按照 slbid 进行聚合查询,动态查询会根据日志的变化,实时刷新数字。

SELECT slbid, count(1) as slb_cnt FROM sls_input GROUP BY slbid

点击右上角调试按钮,进行调试,可以看到结果中 slbid 的字段值,始终是 slb-01。

图片

可以看出设置了 SPL 语句后,sls_input 仅包含 slbid=‘slb-01’ 的数据,其他不符合条件的数据被过滤掉了。

流量对比

使用 SPL 后,可以看出在 SLS 的写流量不变的情况下,Flink 对 SLS 的读流量有大幅度下降;同时在过滤占主要很多 Flink CU 的场景下,经过过滤后,Flink CU 也会有相应的降低。

图片

列裁剪场景

在数据处理中列裁剪也是一种常见需求,在原始数据中,往往会有全量的字段,但是实际的计算只需要特定的字段;类似需要在 Flink 中可以使用 project 算子或者 SQL 中的 select 进行列裁剪与变换,使用 Flink 使用 project 算子,往往意味着数据已经通过网络进入 Flink 计算引擎中,全量的数据会消耗着网络带宽和 Flink 的计算性能,这种场景下,SLS SPL 为 Flink SLS Connector 提供了一种支持投影下推的能力,通过配置 SLS Connector 的 query 参数,即可实现投影字段下推。避免全量数据传输和全量数据过滤计算。

创建 SQL 作业

创建步骤同行过滤场景,在作业草稿中输入如下创建临时表的语句,这里 query 参数配置进行了修改,在过滤的基础上增加了投影语句,可以实现从 SLS 服务端仅拉取特定字段的内容。

CREATE TEMPORARY TABLE sls_input(request_uri STRING,scheme STRING,slbid STRING,status STRING,`__topic__` STRING METADATA VIRTUAL,`__source__` STRING METADATA VIRTUAL,`__timestamp__` STRING METADATA VIRTUAL,__tag__ MAP<VARCHAR, VARCHAR> METADATA VIRTUAL,proctime as PROCTIME()
) WITH ('connector' = 'sls','endpoint' ='cn-beijing-intranet.log.aliyuncs.com','accessId' = '${ak}','accessKey' = '${sk}','starttime' = '2024-01-21 00:00:00','project' ='${project}','logstore' ='test-nginx-log','query' = '* | where slbid = ''slb-01'' | project request_uri, scheme, slbid, status, __topic__, __source__, "__tag__:__receive_time__"'
);

为了效果,下面分行展示语句中配置,在 Flink 语句中任然需要单行配置。

* 
| where slbid = ''slb-01'' 
| project request_uri, scheme, slbid, status, __topic__, __source__, "__tag__:__receive_time__"

上面使用了 SLS SPL 的管道式语法来实现数据过滤后投影的操作,类似 Unix 管道,使用|符号将不同指令进行分割,上一条指令的输出作为下一条指令的输入,最后的指令的输出表示整个管道的输出。

连续查询及效果

图片

在作业中输入分析语句,可以看到,结果与行过滤场景结果类似。

SELECT slbid, count(1) as slb_cnt FROM sls_input_project GROUP BY slbid

🔔 注意: 这里与行过滤不同的是,上面的行过滤场景会返回全量的字段,而当前的语句令 SLS Connector 只返回特定的字段,再次减少了数据的网络传输。

SPL 还可以做什么

  • 上述实例中演示了使用 SLS SPL 的过滤和投影功能来实现 SLS Connector 的“下推”功能,可以有效地减少网络流量和 Flink CU 的使用。可以避免在 Flink 进行计算之前,进行额外的过滤和投影计算消耗。
  • SLS SPL 的功能不止于过滤与投影,SLS SPL 完整支持的语法可以参考文档:SPL 指令 [ 1] 。同时,SPL管道式语法已全面支持在 Flink Connector 中进行配置。
  • SLS SPL 支持对于数据进行预处理,比如正则字段、JSON 字段,CSV 字段展开;数据格式转换,列的增加和减少;过滤等。除了用于消费场景,在 SLS 的 Scan 模式与采集端都会应用场景,以便用户在采集端、消费端都可以使用 SPL 的能力。

相关链接:

[1] SPL 指令

https://help.aliyun.com/zh/sls/user-guide/spl-instruction?spm=a2c4g.11186623.0.0.33f35a3dl8g8KD

[2] 日志服务概述

https://help.aliyun.com/zh/sls/product-overview/what-is-log-service

[3] SPL 概述

https://help.aliyun.com/zh/sls/user-guide/spl-overview

[4] 阿里云 Flink Connector SLS

https://help.aliyun.com/zh/flink/developer-reference/log-service-connector

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/736385.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

软信天成:超全整理!数据资产入表基础篇

自2024年1月1日&#xff0c;《企业数据资源相关会计处理暂行规定》正式实施&#xff0c;以下简称《暂行规定》。该规定根据《中华人民共和国会计法》和企业会计准则等相关规定&#xff0c;由财政部制定&#xff0c;旨在规范企业数据资源相关会计处理&#xff0c;强化相关会计信…

打字通小游戏制作教程:用HTML5和JavaScript提升打字速度

&#x1f31f; 前言 欢迎来到我的技术小宇宙&#xff01;&#x1f30c; 这里不仅是我记录技术点滴的后花园&#xff0c;也是我分享学习心得和项目经验的乐园。&#x1f4da; 无论你是技术小白还是资深大牛&#xff0c;这里总有一些内容能触动你的好奇心。&#x1f50d; &#x…

Redis主从架构和管道Lua(一)

Redis主从架构 架构 Redis主从工作原理 如果为master配置了一个slave,不管这个slave是否是第一次连接上Master,它都会发送一个PSYNC命令给master请求复制数据。master受到PSYNC命令&#xff0c;会在后台进行数据持久化通过bgsave生成最新的 RDB快照文件&#xff0c;持久化期间…

C# chart曲线控件专题

1.控件基本设置 chart1.ChartAreas[0].AxisY.IsStartedFromZero false; //设置Y轴自适应chart1.Series["瞬时值"].BorderWidth 2; // 设置曲线宽度为2个像素&#xff0c;注意[]中写入的Series的Namechart1.Series["瞬时值"].Color Color.Red; // 设置曲…

Java 集合类的高级特性介绍

在 Java 编程中&#xff0c;了解集合类的高级特性对于编写高效和可维护的代码至关重要。以下是一些你应该知道的 Java 集合类的高级特性&#xff0c;以及简单的例子来说明它们的用法。 1. 迭代器&#xff08;Iterators&#xff09;和列表迭代器&#xff08;ListIterators&#…

Babel:现代JavaScript的桥梁

&#x1f90d; 前端开发工程师、技术日更博主、已过CET6 &#x1f368; 阿珊和她的猫_CSDN博客专家、23年度博客之星前端领域TOP1 &#x1f560; 牛客高级专题作者、打造专栏《前端面试必备》 、《2024面试高频手撕题》 &#x1f35a; 蓝桥云课签约作者、上架课程《Vue.js 和 E…

基于YOLOv8深度学习的路面坑洞检测与分割系统【python源码+Pyqt5界面+数据集+训练代码】深度学习实战、目标分割

《博主简介》 小伙伴们好&#xff0c;我是阿旭。专注于人工智能、AIGC、python、计算机视觉相关分享研究。 ✌更多学习资源&#xff0c;可关注公-仲-hao:【阿旭算法与机器学习】&#xff0c;共同学习交流~ &#x1f44d;感谢小伙伴们点赞、关注&#xff01; 《------往期经典推…

如何入驻1688跨境市场,拿新赛道下的百万流量!|1688API接口一键链接国内外

✦ ✦ ✦ 前言 1688是国内领先源头厂货直销平台&#xff0c;拥有2亿用户、6500万专业卖家&#xff0c;覆盖57大行业、年交易额︎8000亿&#xff0c;100万中小企业已入驻。刚刚升为阿里第一批战略级创新业务的1688&#xff0c;又被曝出新消息。近期&#xff0c;关于“阿里16…

Socket通信Demo(Unity客户端和C#)

新建一个Unity项目新建脚本编写客户端 using System.Net.Sockets; using System.Net; using System; using System.Text;public class Client : MonoBehaviour {private Socket socket;//定义用来存消息的容器private byte[] buffer new byte[1024];// Start is called befor…

八股文打卡day27——数据库(4)

面试题&#xff1a;讲一下事务的隔离级别&#xff1f; 我的回答&#xff1a; 因为事务之间的隔离性&#xff0c;造成了一些问题&#xff0c;比如说&#xff1a;脏读、不可重复读和幻读&#xff08;虚读&#xff09;。 1.什么叫脏读&#xff1f; 就是一个事务读取到了另一个事…

使用GraaVIM打包Linux平台本地镜像

1.创建实例&#xff0c;在WindTerm上面连接云服务器 2.安装Lrzsz文件上传工具 yum install lrzsz 3.上传打好的jar包 lrz 使用ls命令查看是否上传成功 3.安装gcc等环境 sudo yum install gcc glibc-devel zlib-devel 4.下载安装配置Linux下的GraaVIM、native-image 下载链…

RocketMQ入门指南:从零开始学习分布式消息队列技术

RocketMQ 1. MQ介绍1.1 为什么要用MQ1.2 MQ的优点和缺点1.3 各种MQ产品的比较 2. RocketMQ快速入门2.1 准备工作2.1.1 下载RocketMQ2.2.2 环境要求 2.2 安装RocketMQ2.2.1 安装步骤2.2.2 目录介绍 2.3 启动RocketMQ2.4 测试RocketMQ2.4.1 发送消息2.4.2 接收消息 2.5 关闭Rocke…

一个简单的回调函数

回调是一种常见的编程模式&#xff0c;其中一个函数被传递给另一个函数&#xff0c;以便在某个事件发生时执行。以下是一个简单的C回调的例子&#xff0c;其中一个函数接受一个回调函数作为参数&#xff0c;并在特定条件下调用它&#xff1a; #include <iostream>// 定义…

模板不存在:./Application/Home/View/OnContact/Index.html 错误位置

模板不存在:./Application/Home/View/OnContact/Index.html 错误位置FILE: /home/huimingdedhpucixmaihndged5e/wwwroot/ThinkPHP123/Library/Think/View.class.php  LINE: 110 TRACE#0 /home/huimingdedhpucixmaihndged5e/wwwroot/ThinkPHP123/Library/Think/View.class.php(…

案例分析04篇:一篇文章搞定软考设计模式(2024年软考高级系统架构设计师冲刺知识点总结)

设计模式 设计模式篇在软考中也经常考到,尤其是定义部分,要背下来;其中创建型模式、结构型模式、行为型模式要结合顺口溜背下来。 1.1 设计模式的定义 设计模式是一套被反复使用、多人知晓、经过分类的经验总结。 设计模式可理解为对某一类问题的通用解决方案。例如工厂模式…

接收服务端请求,WebSocket 并非唯一选择!(含:ChatGPT 流推送原理解析)

前端训练营&#xff1a;1v1私教&#xff0c;终身辅导计划&#xff0c;帮你拿到满意的 offer。 已帮助数百位同学拿到了中大厂 offer。欢迎来撩~~~~~~~~ Hello&#xff0c;大家好&#xff0c;我是 Sunday。 说到推送数据&#xff0c;大家可能首先想到的是 WebSocket。 事实上&…

STM32利用标准库的方式输出PWM(proteus仿真)

首先打开proteus仿真软件&#xff0c;绘制电路图&#xff1a; 其中示波器的添加很简单的&#xff0c;看图&#xff1a; 再来看看咱们最后程序的效果&#xff1a; 下面就是程序代码了&#xff0c;新建两个文件PWM.c和PWM.h文件&#xff0c;所属关系如图&#xff1a; 整个的编程思…

LCR 120. 寻找文件副本(数组)

设备中存有 n 个文件&#xff0c;文件 id 记于数组 documents。若文件 id 相同&#xff0c;则定义为该文件存在副本。请返回任一存在副本的文件 id。 示例 1&#xff1a; 输入&#xff1a;documents [2, 5, 3, 0, 5, 0] 输出&#xff1a;0 或 5提示&#xff1a; 0 ≤ docum…

蓝桥杯刷题(三)

一、P8752 [蓝桥杯 2021 省 B2] 特殊年份&#xff08;洛谷&#xff09; 题目描述 今年是 2021 年&#xff0c;2021 这个数字非常特殊, 它的千位和十位相等, 个位比百位大 1&#xff0c;我们称满足这样条件的年份为特殊年份。 输入 5 个年份&#xff0c;请计算这里面有多少个…

VBA_NZ系列工具NZ02:VBA读取PDF使用说明

我的教程一共九套及VBA汉英手册一部&#xff0c;分为初级、中级、高级三大部分。是对VBA的系统讲解&#xff0c;从简单的入门&#xff0c;到数据库&#xff0c;到字典&#xff0c;到高级的网抓及类的应用。大家在学习的过程中可能会存在困惑&#xff0c;这么多知识点该如何组织…