Flink之SideOutput(数据分流)

Flink在早期版本有一个split算子用来做数据分流使用的,但是在flink-1.12开始这个API就已经被删除了,在1.12版本以后我们是通过process算子来做数据分流的,这里就介绍一下如何使用prodess进行数据分流.

  • 代码
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.ProcessFunction;
import org.apache.flink.util.Collector;
import org.apache.flink.util.OutputTag;/*** @Author: J* @Version: 1.0* @CreateTime: 2023/8/7* @Description: 测流输出**/
public class FlinkSideOutput {public static void main(String[] args) throws Exception {// 构建流环境StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();// 设置并行度env.setParallelism(3);// 这里使用的是自定义数据源为了方便测试,具体数据源根据自己的实际情况进行更换DataStreamSource<CustomizeBean> customizeSourceStream = env.addSource(new CustomizeSource());/*** 需求* 1. 将性别为M且爱好为'羽毛球运动爱好者'分到一个流* 2. 将性别为W且爱好为'篮球运动爱好者'或'钓鱼爱好者'分到一个流* 3. 其他保留到主流**/SingleOutputStreamOperator<CustomizeBean> processedStream = customizeSourceStream.process(new ProcessFunction<CustomizeBean, CustomizeBean>() {@Overridepublic void processElement(CustomizeBean value, ProcessFunction<CustomizeBean, CustomizeBean>.Context ctx, Collector<CustomizeBean> out) throws Exception {String gender = value.getGender(); // 性别String hobbit = value.getHobbit(); // 爱好if (gender.equals("M") && hobbit.equals("羽毛球运动爱好者")) {// 将性别为M且爱好为'羽毛球运动爱好者'进行分流, 注意这里要声明类型,Java无法自行推断ctx.output(new OutputTag<CustomizeBean>("M-羽毛球", TypeInformation.of(CustomizeBean.class)), value);} else if (gender.equals("W") && (hobbit.equals("篮球运动爱好者") || hobbit.equals("钓鱼爱好者"))) {// 将性别为W且爱好为'篮球运动爱好者'或'钓鱼爱好者'进行分流, 注意这里要声明类型,Java无法自行推断ctx.output(new OutputTag<CustomizeBean>("W-篮球/钓鱼", TypeInformation.of(CustomizeBean.class)), value);} else {// 将剩下的数据保留在主流中out.collect(value);}}});// 获取'M-羽毛球'分流数据,这里也要加上类型声明DataStream<CustomizeBean> mSideOutput = processedStream.getSideOutput(new OutputTag<CustomizeBean>("M-羽毛球", TypeInformation.of(CustomizeBean.class)));// 打印'M-羽毛球'结果mSideOutput.print("M-羽毛球");// 获取'W-篮球/钓鱼'分流数据,这里也要加上类型声明DataStream<CustomizeBean> wSideOutput = processedStream.getSideOutput(new OutputTag<CustomizeBean>("W-篮球/钓鱼", TypeInformation.of(CustomizeBean.class)));// 打印结果wSideOutput.print("W-篮球/钓鱼");// 主流数据打印结果processedStream.print("主数据流");env.execute("Side Output");}
}
  • 结果数据
主数据流:2> CustomizeBean(name=AAA-641, age=44, gender=W, hobbit=非遗文化爱好者)
主数据流:3> CustomizeBean(name=AAA-17, age=62, gender=M, hobbit=书法爱好者)
主数据流:1> CustomizeBean(name=AAA-429, age=25, gender=W, hobbit=非遗文化爱好者)
主数据流:2> CustomizeBean(name=AAA-218, age=33, gender=M, hobbit=旅游爱好者)
主数据流:3> CustomizeBean(name=AAA-826, age=39, gender=M, hobbit=篮球运动爱好者)
主数据流:1> CustomizeBean(name=AAA-190, age=31, gender=M, hobbit=旅游爱好者)
主数据流:2> CustomizeBean(name=AAA-266, age=32, gender=W, hobbit=网吧战神)
主数据流:3> CustomizeBean(name=AAA-106, age=70, gender=M, hobbit=书法爱好者)
主数据流:1> CustomizeBean(name=AAA-911, age=50, gender=M, hobbit=网吧战神)
M-羽毛球:2> CustomizeBean(name=AAA-925, age=65, gender=M, hobbit=羽毛球运动爱好者)
主数据流:3> CustomizeBean(name=AAA-20, age=59, gender=M, hobbit=书法爱好者)
主数据流:1> CustomizeBean(name=AAA-409, age=79, gender=W, hobbit=天文知识爱好者)
主数据流:2> CustomizeBean(name=AAA-865, age=58, gender=W, hobbit=天文知识爱好者)
主数据流:3> CustomizeBean(name=AAA-898, age=33, gender=M, hobbit=天文知识爱好者)
主数据流:1> CustomizeBean(name=AAA-85, age=38, gender=W, hobbit=非遗文化爱好者)
主数据流:2> CustomizeBean(name=AAA-883, age=51, gender=M, hobbit=美食爱好者)
主数据流:3> CustomizeBean(name=AAA-243, age=37, gender=M, hobbit=钓鱼爱好者)
主数据流:1> CustomizeBean(name=AAA-430, age=28, gender=W, hobbit=旅游爱好者)
主数据流:2> CustomizeBean(name=AAA-127, age=65, gender=W, hobbit=网吧战神)
W-篮球/钓鱼:3> CustomizeBean(name=AAA-986, age=52, gender=W, hobbit=钓鱼爱好者)
主数据流:1> CustomizeBean(name=AAA-840, age=50, gender=W, hobbit=旅游爱好者)
M-羽毛球:2> CustomizeBean(name=AAA-196, age=34, gender=M, hobbit=羽毛球运动爱好者)
主数据流:3> CustomizeBean(name=AAA-142, age=46, gender=W, hobbit=乒乓球运动爱好者)
主数据流:1> CustomizeBean(name=AAA-985, age=78, gender=W, hobbit=美食爱好者)
W-篮球/钓鱼:2> CustomizeBean(name=AAA-490, age=50, gender=W, hobbit=钓鱼爱好者)
主数据流:3> CustomizeBean(name=AAA-295, age=77, gender=M, hobbit=篮球运动爱好者)
主数据流:1> CustomizeBean(name=AAA-754, age=50, gender=M, hobbit=天文知识爱好者)
主数据流:2> CustomizeBean(name=AAA-249, age=35, gender=W, hobbit=羽毛球运动爱好者)
W-篮球/钓鱼:3> CustomizeBean(name=AAA-908, age=27, gender=W, hobbit=钓鱼爱好者)
主数据流:1> CustomizeBean(name=AAA-674, age=73, gender=M, hobbit=非遗文化爱好者)

通过结果内容可以看到数据完全按照我们分流的逻辑进行输出的,如果想在主数据流中讲所有数据保留下来,Collector<Object> out单独拎出来即可,也就是不加到判断逻辑中,代码如下,这里就只展示部分代码了

SingleOutputStreamOperator<CustomizeBean> processedStream = customizeSourceStream.process(new ProcessFunction<CustomizeBean, CustomizeBean>() {@Overridepublic void processElement(CustomizeBean value, ProcessFunction<CustomizeBean, CustomizeBean>.Context ctx, Collector<CustomizeBean> out) throws Exception {String gender = value.getGender(); // 性别String hobbit = value.getHobbit(); // 爱好// 将所有数据保留在主流中out.collect(value);// 开始进行分流处理if (gender.equals("M") && hobbit.equals("羽毛球运动爱好者")) {// 将性别为M且爱好为'羽毛球运动爱好者'进行分流, 注意这里要声明类型,Java无法自行推断ctx.output(new OutputTag<CustomizeBean>("M-羽毛球", TypeInformation.of(CustomizeBean.class)), value);} else if ((gender.equals("W") && (hobbit.equals("篮球运动爱好者")) || (gender.equals("W") && hobbit.equals("钓鱼爱好者")))) {// 将性别为W且爱好为'篮球运动爱好者'或'钓鱼爱好者'进行分流, 注意这里要声明类型,Java无法自行推断ctx.output(new OutputTag<CustomizeBean>("W-篮球/钓鱼", TypeInformation.of(CustomizeBean.class)), value);}}});

所有的内容到这里就结束了.

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/28144.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

【网络编程】实现一个简单多线程版本TCP服务器(附源码)

TCP多线程 &#x1f335;预备知识&#x1f384; Accept函数&#x1f332;字节序转换函数&#x1f333;listen函数 &#x1f334;代码&#x1f331;Log.hpp&#x1f33f;Makefile☘️TCPClient.cc&#x1f340;TCPServer.cc&#x1f38d; util.hpp &#x1f335;预备知识 &…

RabbitMQ - 简单案例

目录 0.引用 1.Hello world 2.轮训分发消息 2.1 抽取工具类 2.2 启动两个工作线程接受消息 2.4 结果展示 3.消息应答 3.1 自动应答 3.2 手动消息应答的方法 3.3 消息自动重新入队 3.4 消息手动应答代码 4.RabbitMQ 持久化 4.1 队列如何实现持久化 4.2 消息实现持久化 5.不…

对 Promise 的理解

Promise 是异步编程的一种解决方案&#xff0c;它是一个对象&#xff0c;可以获取异步 操作的消息&#xff0c;他的出现大大改善了异步编程的困境&#xff0c;避免了地狱回调&#xff0c; 它比传统的解决方案回调函数和事件更合理和更强大。 所谓 Promise&#xff0c;简单说就…

算法通关村——透彻理解二分查找

1. 循环法 public static int binarySearch(int[] arr, int low, int high, int target) {while (low < high) {// 这样写主要是避免溢出的情况&#xff0c;以及>>优先级小于&#xff0c;避免出现死循环int mid low ((high - low) >> 1);if (arr[mid] target…

7.1 动手实现AlexNet

AlexNet引入了dropput层 代码 import torch from torch import nn from d2l import torch as d2lnet nn.Sequential(# 样本数为1,通道数为96,11x11的卷积核,步幅为4&#xff0c;减少输出的高度和深度。 LeNet的通道数才6&#xff0c;此处96&#xff0c;为什么要增加这么多通…

MIT 6.830数据库系统 -- lab six

MIT 6.830数据库系统 -- lab six 项目拉取引言steal/no-force策略redo log与undo log日志格式和检查点 开始回滚练习1&#xff1a;LogFile.rollback() 恢复练习2&#xff1a;LogFile.recover() 测试结果疑问点分析 项目拉取 原项目使用ant进行项目构建&#xff0c;我已经更改为…

在python中使用nvidia的VPF库对RTSP流进行硬解码并使用opencv进行显示

解码并处理视频流的多线程应用 随着视频处理技术的不断发展&#xff0c;越来越多的应用需要对视频流进行解码和处理。在本文中&#xff0c;我们将介绍一个基于Python的多线程应用程序&#xff0c;该应用程序可以解码并处理多个RTSP视频流&#xff0c;同时利用GPU加速&#xff0…

use gnustep objective-c

first app #import <Foundation/Foundation.h>int main(int argc, const char * argv[]) {NSAutoreleasePool *pool [NSAutoreleasePool new];NSLog("first start");[pool drain];return 0; }tech 专注于概念&#xff0c;而不是迷失在语言技术细节中编程语言…

微服务技术栈(1.0)

微服务技术栈 认识微服务 单体架构 单体架构&#xff1a;将业务的所有功能集中在一个项目中开发&#xff0c;打成一个包部署 优点&#xff1a; 架构简单部署成本低 缺点&#xff1a; 耦合度高 分布式架构 分布式架构&#xff1a;根据业务功能对系统进行拆分&#xff0c…

npm四种下载方式的区别

npm install moduleName 命令 安装模块到项目node_modules目录下。 不会将模块依赖写入devDependencies或dependencies 节点。 运行 npm install 初始化项目时不会下载模块。npm install -g moduleName 命令 安装模块到全局&#xff0c;不会在项目node_modules目录中保存模块包…

如何在 Spring Boot 中集成日志框架 SLF4J、Log4j

文章目录 具体步骤附录 笔者的操作环境&#xff1a; Spring Cloud Alibaba&#xff1a;2022.0.0.0-RC2 Spring Cloud&#xff1a;2022.0.0 Spring Boot&#xff1a;3.0.2 Nacos 2.2.3 Maven 3.8.3 JDK 17.0.7 IntelliJ IDEA 2022.3.1 (Ultimate Edition) 具体步骤 因为 …

Java课题笔记~ 使用 Spring 的事务注解管理事务(掌握)

通过Transactional 注解方式&#xff0c;可将事务织入到相应 public 方法中&#xff0c;实现事务管理。 Transactional 的所有可选属性如下所示&#xff1a; propagation&#xff1a;用于设置事务传播属性。该属性类型为 Propagation 枚举&#xff0c; 默认值为 Propagation.R…

【学习日记】【FreeRTOS】链表结构体及函数详解

写在前面 本文主要是对于 FreeRTOS 中链表相关内容的详细解释&#xff0c;代码大部分参考了野火FreeRTOS教程配套源码&#xff0c;作了一小部分修改。 一、结构体定义 主要包含三种结构体&#xff1a; 普通节点结构体结尾节点&#xff08;mini节点&#xff09;结构体链表结…

awk经典实战、正则表达式

目录 1.筛选给定时间范围内的日志 2.统计独立IP 案列 需求 代码 运行结果 3.根据某字段去重 案例 运行结果 4.正则表达式 1&#xff09;认识正则 2&#xff09;匹配字符 3&#xff09;匹配次数 4&#xff09;位置锚定&#xff1a;定位出现的位置 5&#xff09;分组…

ESP32 Max30102 (3)修复心率误差

1. 运行效果 2. 新建修复心率误差.py 代码如下: from machine import sleep, SoftI2C, Pin, Timer from utime import ticks_diff, ticks_us from max30102 import MAX30102, MAX30105_PULSE_AMP_MEDIUM from hrcalc import calc_hr_and_spo2BEATS = 0 # 存储心率 FINGER_F…

如何识别手机是否有灵动岛(dynamic island)

如何识别手机是否有灵动岛&#xff08;dynamic island&#xff09; 灵动岛是苹果2022年9月推出的iPhone 14 Pro、iPhone 14 Pro Max首次出现&#xff0c;操作系统最低是iOS16.0。带灵动岛的手机在竖屏时顶部工具栏大于等于51像素。 #define isHaveDynamicIsland ({ BOOL isH…

Taro保存图片到手机

萌新亚历山大啊&#xff0c;搞了一下午&#xff0c;真多坑 Taro.downloadFile({url: res,filePath: Taro.env.USER_DATA_PATH /xcxcode.jpg,success: res > {if (res.statusCode 200) {console.log(res)const tempFilePath res.filePath; // 获取下载的临时文件路径// …

编程(Leetcode, SQL)知识

有志者&#xff0c;事竟成 1. Nim游戏 2. sql中求连续时间的问题 思路&#xff1a;一般来说&#xff0c;有两种方向。一种如果是日期的格式&#xff0c;求连续时间可以先提取出日期中的月份或者天&#xff0c;然后减去row_number()生成的rank&#xff0c;以此来计算分组或者不…

微信小程序的项目解构

视频链接 黑马程序员前端微信小程序开发教程&#xff0c;微信小程序从基础到发布全流程_企业级商城实战(含uni-app项目多端部署)_哔哩哔哩_bilibili 接口文档 https://www.escook.cn/docs-uni-shop/mds/1.start.html 1&#xff1a;微信小程序宿主环境 1&#xff1a;常见的宿…

如何创建Google test shared library(dll)和static library(lib),并编写测试用例

从Github下载google test源码确保本地安装Visual Studio和CMake GUI&#xff0c;本次测试使用VS2017及Cmake GUI 3.20.5解压googletest-main&#xff0c;打开Cmake GUI&#xff0c;配置源码路径&#xff08;googletest-main路径&#xff09;&#xff0c;和生成路径&#xff08;…