Flink自定义函数UDF

案例用到的测试数据请参考文章:
Flink自定义Source模拟数据流
原文链接:https://blog.csdn.net/m0_52606060/article/details/135436048

概述

用户自定义函数(user-defined function,UDF),即用户可以根据自身需求,重新实现算子的逻辑。
用户自定义函数分为:函数类、匿名函数、富函数类

函数类(Function Classes)

Flink暴露了所有UDF函数的接口,具体实现方式为接口或者抽象类,例如MapFunction、FilterFunction、ReduceFunction等。所以用户可以自定义一个函数类,实现对应的接口。
需求:用来从用户的订单数据中筛选订单金额大于50的内容:

方式一:通过匿名类来实现FilterFunction接口:

import com.zxl.bean.Orders;
import com.zxl.datas.OrdersData;
import org.apache.flink.api.common.functions.FilterFunction;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;public class DemoTest {public static void main(String[] args) throws Exception {//创建Flink流处理执行环境StreamExecutionEnvironment environment = StreamExecutionEnvironment.getExecutionEnvironment();//设置并行度为1environment.setParallelism(1);//调用Flink自定义Source// TODO: 2024/1/6 订单数据DataStreamSource<Orders> ordersDataStreamSource = environment.addSource(new OrdersData());// TODO: 2024/1/7 实现自定义接口FilterFunctionDataStream<Orders> streamOperator = ordersDataStreamSource.filter(new FilterFunction<Orders>() {@Overridepublic boolean filter(Orders orders) throws Exception {//过滤金额大于10000元的订单if (orders.getOrder_amount() > 50) {return true;} else {return false;}}});streamOperator.print();environment.execute();}
}

在这里插入图片描述

方式二: 实现FilterFunction接口

import com.zxl.bean.Orders;
import org.apache.flink.api.common.functions.FilterFunction;public class OrderFilter implements FilterFunction<Orders> {@Overridepublic boolean filter(Orders orders) throws Exception {//过滤金额大于10000元的订单if (orders.getOrder_amount() > 50) {return true;} else {return false;}}
}
import com.zxl.Functions.OrderFilter;
import com.zxl.bean.Orders;
import com.zxl.datas.OrdersData;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;public class DemoTest {public static void main(String[] args) throws Exception {//创建Flink流处理执行环境StreamExecutionEnvironment environment = StreamExecutionEnvironment.getExecutionEnvironment();//设置并行度为1environment.setParallelism(1);//调用Flink自定义Source// TODO: 2024/1/6 订单数据DataStreamSource<Orders> ordersDataStreamSource = environment.addSource(new OrdersData());// TODO: 2024/1/7 返回类型记得修改为 DataStreamDataStream<Orders> operator = ordersDataStreamSource.filter(new OrderFilter());operator.print();environment.execute();}
}

在这里插入图片描述

方式三:采用匿名函数(Lambda)

//创建Flink流处理执行环境StreamExecutionEnvironment environment = StreamExecutionEnvironment.getExecutionEnvironment();//设置并行度为1environment.setParallelism(1);//调用Flink自定义Source// TODO: 2024/1/6 订单数据DataStreamSource<Orders> ordersDataStreamSource = environment.addSource(new OrdersData());// TODO: 2024/1/7 函数使用Lambda表达式,不需要进行类型声明DataStream<Orders> streamOperator = ordersDataStreamSource.filter(orders -> orders.getOrder_amount() > 50);streamOperator.print();environment.execute();

在这里插入图片描述

富函数类(Rich Function Classes)

“富函数类”也是DataStream API提供的一个函数类的接口,所有的Flink函数类都有其Rich版本。富函数类一般是以抽象类的形式出现的。例如:RichMapFunction、RichFilterFunction、RichReduceFunction等。
与常规函数类的不同主要在于,富函数类可以获取运行环境的上下文,并拥有一些生命周期方法,所以可以实现更复杂的功能。
Rich Function有生命周期的概念。典型的生命周期方法有:
open()方法,是Rich Function的初始化方法,也就是会开启一个算子的生命周期。当一个算子的实际工作方法例如map()或者filter()方法被调用之前,open()会首先被调用。
close()方法,是生命周期中的最后一个调用的方法,类似于结束方法。一般用来做一些清理工作。
需要注意的是,这里的生命周期方法,对于一个并行子任务来说只会调用一次;而对应的,实际工作方法,例如RichMapFunction中的map(),在每条数据到来后都会触发一次调用。

import com.zxl.bean.Orders;
import com.zxl.datas.OrdersData;
import org.apache.flink.api.common.functions.RichMapFunction;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;public class DemoTest {public static void main(String[] args) throws Exception {//创建Flink流处理执行环境StreamExecutionEnvironment environment = StreamExecutionEnvironment.getExecutionEnvironment();//设置并行度为1environment.setParallelism(1);//调用Flink自定义Source// TODO: 2024/1/6 订单数据DataStreamSource<Orders> ordersDataStreamSource = environment.addSource(new OrdersData());ordersDataStreamSource.print();// TODO: 2024/1/7 接口类型第一个是传入类型,第二个是输出类型DataStream<String> operator = ordersDataStreamSource.map(new RichMapFunction<Orders, String>() {@Overridepublic void open(Configuration parameters) throws Exception {super.open(parameters);System.out.println("索引是:" + getRuntimeContext().getIndexOfThisSubtask() + " 的任务的生命周期开始");}@Overridepublic String map(Orders orders) throws Exception {return orders.getOrder_date().toString()+"字符串";}@Overridepublic void close() throws Exception {super.close();System.out.println("索引是:" + getRuntimeContext().getIndexOfThisSubtask() + " 的任务的生命周期结束");}});operator.print();environment.execute();}
}

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/602971.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

ResNet论文阅读和简单实现

论文&#xff1a;https://arxiv.org/pdf/1512.03385.pdf Deep Residual Learning for Image Recognition 本模块主要是阅读论文&#xff0c;会做简单的翻译&#xff08;至少满足我自己能看明白&#xff09;。 Introduction 由上图可见&#xff0c;在20层和56层的网络上训练的…

听GPT 讲Rust源代码--compiler(29)

File: rust/compiler/rustc_const_eval/src/util/check_validity_requirement.rs 在Rust编译器的源代码中&#xff0c;rust/compiler/rustc_const_eval/src/util/check_validity_requirement.rs文件的作用是进行验证要求的检查。具体而言&#xff0c;该文件定义了函数check_val…

服务器GPU温度过高挂掉排查记录

服务器GPU挂掉 跑深度学习的代码的时候发现中断了。通过命令查看&#xff1a; nvidia-smi显示 Unable to determine the device handle for GPU 0000:01:00.0: Unknown Error。感觉很莫名其妙。通过重启大法之后&#xff0c;又能用一段时间。 shutdown -r now但是过了一个小…

配置cendos 安装docker 配置阿里云国内加速

由于我安装的cendos是镜像版。已经被配置好了。所以只需要更新相关配置信息即可。 输入 yum update自动更新所有配置 更新完成后输入 yum list docker-ce --showduplicates | sort -r 自动查询所有可用的docker版本 输入 yum install docker-ce docker-ce-cli container…

营业执照代办网站源码 工商注册代账公司模板源码 公司注册和企业资质业务办理网站

这款大气蓝色的网站响应式模板专门为工商记账、公司注册和企业资质业务办理公司设计,包含了16个页面,非常适合正在寻找一个高质量网站模板的企业使用。 该模板使用响应式设计,使其能够在各种设备上呈现良好的用户体验,包括PC、平板和手机等。同时,页面设计非常美观,允许…

netcore html to pdf

一、新建项目&#xff1a;QuestPDFDemo <PackageReference Include"NReco.PdfGenerator" Version"1.2.1" /> 二、上代码 using Microsoft.AspNetCore.Mvc; using Microsoft.Extensions.Logging;using QuestPDFDemo.Models; using System; using Sys…

解决使用localhost或127.0.01模拟CORS失效

解决使用localhost或127.0.01模拟CORS失效 前言问题发现问题解决 前言 CORS (Cross-Origin Resource Sharing) 指的是一种机制&#xff0c;它允许不同源的网页请求访问另一个源服务器上的某些资源。通常情况下&#xff0c;如果 JavaScript 代码在一个源中发起了 AJAX 请求&…

【Qt- C++ Qml 交互】

Qt编程指南 VX&#xff1a;hao541022348 ■ 将C对象注册到 QML中&#xff0c;在QML使用C对象&#xff08;Q_INVOKABLE宏&#xff09;■ C对象注册到元对象系统■ Q_INVOKABLE 宏■ 演示步骤 ■ 将C对象注册到 QML中&#xff0c;在QML使用C对象&#xff08;Q_PROPERTY宏 属性绑定…

基于SpringBoot的名城小区物业管理系统

文章目录 项目介绍主要功能截图:部分代码展示设计总结项目获取方式🍅 作者主页:超级无敌暴龙战士塔塔开 🍅 简介:Java领域优质创作者🏆、 简历模板、学习资料、面试题库【关注我,都给你】 🍅文末获取源码联系🍅 项目介绍 基于SpringBoot的名城小区物业管理系统,…

数据分析——快递电商

一、任务目标 1、任务 总体目的——对账 本项目解决同时使用多个快递发货&#xff0c;部分隔离区域出现不同程度涨价等情形下&#xff0c;如何快速准确核对账单的问题。 1、在订单表中新增一列【运费差异核对】来表示订单运费实际有多少差异&#xff0c;结果为数值。 2、将…

微服务实战系列之Filter

前言 Filter&#xff0c;又名过滤器&#xff0c;当然不是我们日常中见到的&#xff0c;诸如此类构件&#xff1a; 而应该是微服务中常使用的&#xff0c;诸如此类&#xff08;图片来自官网&#xff0c;点击可查看原图&#xff09;&#xff1a; 一般用于字符编码转换&#xf…

最大子数组和【DP】

Problem: 53. 最大子数组和 文章目录 思路 & 解题方法复杂度Code 思路 & 解题方法 以前常见的线性DP&#xff0c;题目简单&#xff0c;可以不需要用数组来存储dp的值。 复杂度 时间复杂度: 添加时间复杂度, 示例&#xff1a; O ( n ) O(n) O(n) 空间复杂度: 添加空间…

Java Swing手搓童年坦克大战游戏(II)

文章目录 0.初衷1.创建游戏窗口2.创建坦克3.实现坦克移动和发射炮弹4.创建地图4.1关于地图瓦片的尺寸遇到的问题 5.坦克与障碍物的碰撞处理5.1碰撞检测5.2坦克与地图中的瓦片碰撞5.3坦克相互碰撞5.4坦克碰见炮弹5.5坦克拐弯 6.道具6.1星星6.2炸弹6.3钟表6.4城堡6.5坦克6.6无敌圈…

《3D数学基础-图形和游戏开发》阅读笔记 | 3D数学基础 (学习中 1.6更新)

文章目录 3D数学基础矢量/向量什么是向量点与矢量的关系 向量基础运算 向量加法向量基础运算 数乘 线性组合 - 坐标系的基如果选择不同的基向量会怎么样&#xff1f;- 张成(Span)的空间三维向量的张成空间线性相关与线性相关 矩阵与线性变换矩阵-几何意义线性变换矩阵乘法与线性…

业务数据技术中台概念与相互关系

随着企业数字化转型和发展模式的转变,企业的应用架构建设模式主要为数据+中台(平台)+应用,这里的企业就涵盖互联网大企业和传统企业,大家都在开展必做的事情,即数字化时代的企业数字化转型 。同时,正好最近在做顶层设计和数字化转型整体应用架构设计,就梳理了一下中台发展…

Kafka消息存储

一、层次结构 具体到某个broker上则是, 数据目录/分区名/日志相关文件集合。其中日志文件集合内包括.log文件, index索引文件和.timeindex时间戳索引文件。 二、.log 结构 .log中记录具体的消息。一般消息由header和body组成, 这点儿在Kafka消息中也同样适用。 message MES…

qt自定义控件的封装

刚学了一个很有意思的东西,前面学了list,Tree,Table三大控件和一部分常用基础控件,但感觉没啥意思,就是用别人的直接用,刚学了一个自定义控件的封装,流程如下: 想把两个不相关的组件封装在一块,直接用ui不行,所以先新添加了qt设计师页面,新添加了一个SmallWidget *ui 在smal…

Mongodb使用指定索引删除数据

回顾Mongodb删除语法 db.collection.deleteMany(<filter>,{writeConcern: <document>,collation: <document>,hint: <document|string>} ) 删除语法中&#xff0c;除了指定过滤器外&#xff0c;还可以指定写入策略&#xff0c;字符序和使用的索引。 …

【Leetcode】230. 二叉搜索树中第K小的元素

一、题目 1、题目描述 给定一个二叉搜索树的根节点 root ,和一个整数 k ,请你设计一个算法查找其中第 k 个最小元素(从 1 开始计数)。 示例1: 输入:root = [3,1,4,null,2], k = 1 输出:1示例2: 输入:root = [5,3,6,2,4,null,null,1], k = 3 输出:3提示: 树中…

字节跳动基础架构SRE-Copilot获得2023 CCF国际AIOps挑战赛冠军

近日&#xff0c;2023 CCF国际AIOps挑战赛决赛暨“大模型时代的AIOps”研讨会在北京成功举办&#xff0c;活动吸引了来自互联网、运营商、科研院所、高校、软硬件厂商等领域多名专家学者参与&#xff0c;为智能运维的前沿学术研究、落地生产实践打开了新思路。决赛中&#xff0…