8、Flink 在 source 处生成水位线 和 在 source 之后生成水位线案例

1、AtSourceGenerateWatermark
注意:从 Flink 1.17开始,FLIP-27 源框架支持拆分级别的水印对齐。

import java.time.Duration;public class _02_AtSourceGenerateWatermark {public static void main(String[] args) throws Exception {StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();KafkaSource<String> kafkaSource = KafkaSource.<String>builder().setBootstrapServers("my-broker").setTopics("my-topic").setGroupId("my-group").setStartingOffsets(OffsetsInitializer.earliest()).setValueOnlyDeserializer(new SimpleStringSchema()).build();DataStreamSource<String> source = env.fromSource(kafkaSource, WatermarkStrategy.forBoundedOutOfOrderness(Duration.ofSeconds(3)), "kafka_source", TypeInformation.of(new TypeHint<String>() {}));source.print();env.execute();}
}

2、在 source 之后生成水位线

import org.apache.flink.api.common.eventtime.SerializableTimestampAssigner;
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;import java.time.Duration;public class _03_AfterSourceGenerateWatermark {public static void main(String[] args) throws Exception {StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();DataStreamSource<String> source = env.socketTextStream("localhost", 8888);SingleOutputStreamOperator<_01_MyEvent> eventMap = source.map(new MapFunction<String, _01_MyEvent>() {@Overridepublic _01_MyEvent map(String value) throws Exception {String[] fields = value.split(",");return new _01_MyEvent(Integer.parseInt(fields[0]),fields[1],Long.parseLong(fields[2]));}});SingleOutputStreamOperator<_01_MyEvent> timestampsAndWatermarks = eventMap.assignTimestampsAndWatermarks(WatermarkStrategy.<_01_MyEvent>forBoundedOutOfOrderness(Duration.ofSeconds(5)).withTimestampAssigner(new SerializableTimestampAssigner<_01_MyEvent>() {@Overridepublic long extractTimestamp(_01_MyEvent element, long recordTimestamp) {return element.getEventTime();}}));timestampsAndWatermarks.print();env.execute();}
}

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/830364.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

数字化wms仓库管理软件,实现企业仓储信息共享与智慧运行-亿发

在经济飞速发展的今天&#xff0c;企业面临着客户需求多样化、质量和交期要求提高以及激烈的市场竞争等挑战。在这样的背景下&#xff0c;许多企业开始考虑采用数字化仓储WMS系统来解决这些问题。 数字化仓储WMS系统通过打造高效、规范的仓库管理体系&#xff0c;实现了对产品…

爱普生晶振在物联网LoRa通讯中的应用

LoRa 是LPWAN通信技术中的一种&#xff0c;是美国Semtech公司采用和推广的一种基于扩频技术的超远距离无线传输方案。这一方案改变了以往关于传输距离与功耗的折衷考虑方式&#xff0c;为用户提供一种简单的能实现远距离、长电池寿命、大容量的系统&#xff0c;进而扩展传感网络…

神经网络高效训练:优化GPU受限环境下的大规模CSV数据处理指南

最近训练模型,需要加载wifi sci data 数据量特别大,直接干爆内存,训练也特别慢,快放弃了!随后冷静下来,然后靠着多年的经验,来进行层层优化,随诞生了这篇博客。 背景介绍 机器学习模型的训练通常需要大量的数据,尤其是对于深度神经网络模型。然而,当数据集非常庞大时…

网络之路29:三层链路聚合

正文共&#xff1a;1666 字 17 图&#xff0c;预估阅读时间&#xff1a;3 分钟 目录 网络之路第一章&#xff1a;Windows系统中的网络 0、序言 1、Windows系统中的网络1.1、桌面中的网卡1.2、命令行中的网卡1.3、路由表1.4、家用路由器 网络之路第二章&#xff1a;认识企业设备…

新质生产力实践,我用chatgpt开发网站

是的&#xff0c;我用chatgpt开发了一个网站&#xff0c;很轻松。 我之前一点不懂前端&#xff0c;也没有网站开发的代码基础&#xff0c;纯正的0基础。 从0开始到最后成品上线&#xff0c;时间总计起来大致一共花了2-3周的时间。 初始想法我是想给我公司开发一个网站&#…

【弱监督语义分割】AllSpark:从transformer中的未标记特征重生标记特征,用于半监督语义分割

AllSpark: Reborn Labeled Features from Unlabeled in Transformer for Semi-Supervised Semantic Segmentation 摘要&#xff1a; 目前最先进的方法是用真实标签训练标注数据&#xff0c;用伪标签训练未标注数据。然而&#xff0c;这两个训练流程是分开的&#xff0c;这就使…

Android数据恢复:如何在手机上恢复丢失的文件和照片

我们都有 我们错误地从手机中删除重要内容的时刻。确实如此 不一定是我们的错。其他人可以对您的手机数据执行此操作 有意或无意。这在某个时间点发生在我们所有人身上。 但是&#xff0c;今天市场上有各种各样的软件可以 帮助恢复已删除的文件。这些类型的软件被归类为数据恢复…

Pandas数据可视化 - Matplotlib、Seaborn、Pandas Plot、Plotly

可视化工具介绍 让我们一起探讨Matplotlib、Seaborn、Pandas Plot和Plotly这四个数据可视化库的优缺点以及各自的适用场景。这有助于你根据不同的需求选择合适的工具。 1. Matplotlib 优点: 功能强大&#xff1a;几乎可以用于绘制任何静态、动画和交互式图表。高度可定制&a…

用OpenCV先去除边框线,以提升OCR准确率

在OpenCV的魔力下&#xff0c;我们如魔法师般巧妙地抹去表格的边框线&#xff0c;让文字如诗如画地跃然纸上。 首先&#xff0c;我们挥动魔杖&#xff0c;将五彩斑斓的图像转化为单一的灰度世界&#xff0c;如同将一幅绚丽的油画化为水墨画&#xff0c;通过cv2.cvtColor()函数的…

寝室快修|基于SprinBoot+vue的贵工程寝室快修小程序(源码+数据库+文档)

贵工程寝室快修目录 目录 基于SprinBootvue的贵工程寝室快修小程序 一、前言 二、系统设计 三、系统功能设计 1学生信息管理 2 在线报修管理 3公告信息管理 4论坛信息管理 四、数据库设计 五、核心代码 六、论文参考 七、最新计算机毕设选题推荐 八、源码获取&a…

结构方程模型【SEM】:非线性、非正态、交互作用及分类变量分析

张老师&#xff08;研究员&#xff09;&#xff0c;长期从事R语言结构方程模型、群落生态学、保护生物学、景观生态学和生态模型方面的研究和教学工作&#xff0c;已发表了多篇论文&#xff0c;拥有丰富的科研及实践经验。 利用结构方程模型建模往往遇到很多‘特殊’情况&…

Excel 批量创建sheet页

参考资料 最巧妙的Excel批量创建工作表方法 一. 需求 ⏹有如下模板&#xff0c;现想根据提供的姓名&#xff0c;批量创建sheet页&#xff0c;要求每个sheet页拥有相同的模板 二. 通过透视表&#xff0c;批量创建sheet页面 ⏹如下图所示的步骤&#xff0c;创建透视表后&#…

人工 VS AGV无人搬运机器人,AGV赋能中国智能制造

agv 机器人作为智能制造的重要抓手&#xff0c;正在渗透到各个传统行业&#xff0c;成为我国制造业转型升级的焦点。未来&#xff0c;智能AGV将不仅仅是简单的把货物搬运到指定的位置&#xff0c;而是要把5G技术、大数据、物联网、云计算等贯穿于产品的设计中&#xff0c;让智能…

《动手学深度学习(Pytorch版)》Task03:线性神经网络——4.29打卡

《动手学深度学习&#xff08;Pytorch版&#xff09;》Task03&#xff1a;线性神经网络 线性回归基本元素线性模型损失函数随机梯度下降 正态分布与平方损失 线性回归的从零开始实现读取数据集初始化模型参数定义模型定义损失函数定义优化算法训练 线性回归的简洁实现读取数据集…

帕累托森林李朝政博士受聘「天工开物开源基金会」专家顾问

导语&#xff1a; 开源铸造了当前最前沿的科技引擎。开源驱动了软件生态&#xff0c;也以指数级速度驱动硬件生态。 3月中旬&#xff0c;天工开物开源基金会授予李朝政博士专家顾问&#xff0c;表彰他积极推动参与中国智能软件生态的建设&#xff0c;期待一起共筑未来新生态。…

Python_AI库 Pandas的时间序列操作详解

Python_AI库 Pandas的时间序列操作详解 本文默认读者具备以下技能&#xff1a; 熟悉python基础知识&#xff0c;vscode或其它编辑工具 了解pandas,matplotlib的基础操作 具备自主扩展学习能力 在数据分析和处理中&#xff0c;时间序列数据是一类常见且重要的数据类型。大量的…

CSS实现各种优惠券效果

一、左半圆效果 <style style"text/css">.coupon {width: 240px;height: 100px;margin-top: 15px;background-color: #ff6347;-webkit-mask: radial-gradient(circle at left center, transparent 20px, red 20px); } </style><div class"coupon…

TruLens

文章目录 一、关于 TruLensHow it works 二、安装三、快速使用Get DataInCreate Vector StoreBuild RAG from scratchSet up feedback functions.Construct the appRun the app 一、关于 TruLens Evaluate and Track LLM Applications 官网&#xff1a;https://www.trulens.o…

linux,从零安装mysql 8.0.30 ,并且更新至mysql 8.0.36

前言&#xff1a; 系统使用的CentOS 7&#xff0c;系统默认最小安装。 一、基础配置 配置虚拟机IP&#xff0c;需要更改的内容&#xff0c;如下红框中 修改之后 至此&#xff0c;基础配置完成。注意&#xff1a;此处虚拟机网络适配器使用的是&#xff1a;桥接模式 二、软件…

掌握Lazada自养号测评技巧,轻松提升产品销量与排名

Lazada店铺销量不佳&#xff0c;时常让卖家们感到困扰。然而&#xff0c;仅仅感叹和自我安慰并不能解决问题。作为卖家&#xff0c;我们需要专注于打牢基础&#xff0c;尤其是要深入了解Lazada店铺测评的益处及其运用技巧。通过巧妙地结合运营策略和测评方法&#xff0c;我们可…