网站开发工作方案文稿写作网站

news/2025/9/30 1:33:15/文章来源:
网站开发工作方案,文稿写作网站,计算机编程是做网站,泉州企业网站维护制作上一篇我们使用keyby后发现数据严重倾斜 https://datamining.blog.csdn.net/article/details/105316728 大概看下问题所在#xff0c;大量数据在一个subtask中运行 这里我们使用两阶段keyby 解决该问题 之前的问题如下图所示 我们期望的是 但我们的需要根据key进行聚合统计大量数据在一个subtask中运行 这里我们使用两阶段keyby 解决该问题 之前的问题如下图所示 我们期望的是 但我们的需要根据key进行聚合统计那么把相同的key放在不同的subtask如何统计 我们看下图只画了主要部分 1.首先将key打散我们加入将key转化为 key-随机数 ,保证数据散列 2.对打散后的数据进行聚合统计这时我们会得到数据比如 : (key1-12,1),(key1-13,19),(key1-1,20),(key2-123,11),(key2-123,10) 3.将散列key还原成我们之前传入的key这时我们的到数据是聚合统计后的结果不是最初的原数据 4.二次keyby进行结果统计输出到addSink 直接看实现代码 import org.apache.flink.api.common.functions.AggregateFunction import org.apache.flink.api.common.state.{ValueState, ValueStateDescriptor} import org.apache.flink.api.java.tuple.Tuple import org.apache.flink.api.scala.typeutils.Types import org.apache.flink.streaming.api.functions.KeyedProcessFunction import org.apache.flink.streaming.api.functions.windowing.WindowFunction import org.apache.flink.streaming.api.scala._ import org.apache.flink.streaming.api.windowing.time.Time import org.apache.flink.streaming.api.windowing.windows.TimeWindow import org.apache.flink.util.Collectorobject ProcessFunctionScalaV2 {def main(args: Array[String]): Unit {val env: StreamExecutionEnvironment StreamExecutionEnvironment.getExecutionEnvironmentenv.enableCheckpointing(2000)val stream: DataStream[String] env.socketTextStream(localhost, 9999)val typeAndData: DataStream[(String, Long)] stream.map(x (x.split(,)(0), x.split(,)(1).toLong))val dataStream: DataStream[(String, Long)] typeAndData.map(x (x._1 - scala.util.Random.nextInt(100), x._2))val keyByAgg: DataStream[DataJast] dataStream.keyBy(_._1).timeWindow(Time.seconds(10)).aggregate(new CountAggregate())keyByAgg.print(第一次keyby输出)val result: DataStream[DataJast] keyByAgg.map(data {val newKey: String data.key.substring(0, data.key.indexOf(-))println(newKey)DataJast(newKey, data.count)}).keyBy(_.key).process(new MyProcessFunction())result.print(第二次keyby输出)env.execute()}case class DataJast(key :String,count:Long)//计算keyby后每个Window中的数据总和class CountAggregate extends AggregateFunction[(String, Long),DataJast, DataJast] {override def createAccumulator(): DataJast {println(初始化)DataJast(null,0)}override def add(value: (String, Long), accumulator: DataJast): DataJast {if(accumulator.keynull){printf(第一次加载,key:%s,value:%d\n,value._1,value._2)DataJast(value._1,value._2)}else{printf(数据累加,key:%s,value:%d\n,value._1,accumulator.countvalue._2)DataJast(value._1,accumulator.count value._2)}}override def getResult(accumulator: DataJast): DataJast {println(返回结果accumulator)accumulator}override def merge(a: DataJast, b: DataJast): DataJast {DataJast(a.key,a.countb.count)}}/*** 实现* 根据key分类统计每个key进来的数据量定期统计数量*/class MyProcessFunction extends KeyedProcessFunction[String,DataJast,DataJast]{val delayTime : Long 1000L * 30lazy val valueState:ValueState[Long] getRuntimeContext.getState[Long](new ValueStateDescriptor[Long](ccount,classOf[Long]))override def processElement(value: DataJast, ctx: KeyedProcessFunction[String, DataJast, DataJast]#Context, out: Collector[DataJast]): Unit {if(valueState.value()0){valueState.update(value.count)printf(运行task:%s,第一次初始化数量:%s\n,getRuntimeContext.getIndexOfThisSubtask,value.count)val currentTime: Long ctx.timerService().currentProcessingTime()//注册定时器ctx.timerService().registerProcessingTimeTimer(currentTime delayTime)}else{valueState.update(valueState.value()value.count)printf(运行task:%s,更新统计结果:%s\n ,getRuntimeContext.getIndexOfThisSubtask,valueState.value())}}override def onTimer(timestamp: Long, ctx: KeyedProcessFunction[String, DataJast, DataJast]#OnTimerContext, out: Collector[DataJast]): Unit {//定时器执行可加入业务操作printf(运行task:%s,触发定时器,30秒内数据一共,key:%s,value:%s\n,getRuntimeContext.getIndexOfThisSubtask,ctx.getCurrentKey,valueState.value())//定时统计完成初始化统计数据valueState.update(0)//注册定时器val currentTime: Long ctx.timerService().currentProcessingTime()ctx.timerService().registerProcessingTimeTimer(currentTime delayTime)}}}对key进行散列  val dataStream: DataStream[(String, Long)] typeAndData.map(x (x._1 - scala.util.Random.nextInt(100), x._2)) 设置窗口滚动时间每隔十秒统计一次每隔key下的数据总量 val keyByAgg: DataStream[DataJast] dataStream.keyBy(_._1).timeWindow(Time.seconds(10)).aggregate(new AverageAggregate())keyByAgg.print(第一次keyby输出) 还原key并进行二次keyby对数据总量进行累加 val result: DataStream[DataJast] keyByAgg.map(data {val newKey: String data.key.substring(0, data.key.indexOf(-))println(newKey)DataJast(newKey, data.count)}).keyBy(_.key).process(new MyProcessFunction()) 我们看下优化后的状态 先看下第一map直接从端口拿数据这不涉及keyby所以这个没影响 再看下第一次keyby后的结果因为我们散列后flink根据哈希进行分配所以数据不是百分之百平均但是很明显基本上已经均衡了不会出现这里1一条那里1条这种状况 再看下第二次keyby这里会发现我们ID的2的subtask有820条数据其他的没有数据这里是正常现象因为我们是对第一次聚合后的数据进行keyby统计所以这里的数据大小会非常小比如我们原始数据一条数据有1M大小1000条数据就1个G业务往往还有其他操作我们再第一次keyby 散列时处理其他逻辑比如ETL等等操作最终将统计结果输出给第二次keyby很可能1个G的数据最终只有1kb这比我们将1个G的数据放在一个subtask中处理好很多。 上面我们自定义了MyProcessFunction方法设置每30秒执行一次实际业务场景我们可能会设置一小时执行一次。 至此我们既保证了数据定时统计也保证了数据不倾斜问题。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/922395.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

三门峡市建设局官方网站网络舆情监测与预警系统通过对海量

青少年软件编程(Python)等级考试试卷(一级) 一、单选题(共25题,共50分) 1. 可以对Python代码进行多行注释的是?( ) A. #

US$164 Scorpio-LK Emulators SLK-02 for Tango Key Programmer including Authorization

Scorpio-LK Emulators SLK-02 for TangoPrice including authrization and the emulator together.Quick Referrence Table Package Includes:1pc x Scorpio-LK Emulators SLK-02 for TangoPictures of Scorpio-LK Emu…

做网站图片素材在线编辑做投票页面什么网站好

QUESTION:Sublime Text 3无法安装Package Control插件的解决? ANSWER: 为了更准确的定位问题,建议插件在安装前开启控制台(快捷键Ctrl~),同时在开启debug模式,这样可以在安装过程中了解哪一步出了问题,然后有针对性…

US$54 New Design DB25 Adapter for CG PRO 9S12 Programmer

New Design DB25 Adapter for CG PRO 9S12 ProgrammerFunctions: Operating Motorola Series(Freescale)This is New Design DB25 Adapter, please make sure the connector of your CG Pro 9S12 is New.We will arran…

购物网站后台设计无法转换中文wordpress

通过 Kubeflow XGBoost Training Operator 支持在 Kubernetes 上进行分布式 XGBoost 训练和批量预测。 操作步骤 为在 Kubernetes 集群上运行 XGBoost 作业,执行以下步骤: 在 Kubernetes 集群上安装 XGBoost Operator。 XGBoost Operator 旨在管理 XGB…

网站备案时间周期一般多久网站开发实用技术2.8.5

【CCF BDCI 2023】多模态多方对话场景下的发言人识别 Baseline 0.71 NLP 部分 概述NLP 简介文本处理词嵌入上下文理解 文本数据加载to_device 函数构造数据加载样本数量 len获取样本 getitem 分词构造函数调用函数轮次嵌入 RobertaRoberta 创新点NSP (Next Sentence Prediction…

最牛的视频网站建设网页设计的流程是什么

RapidClick是一款简单实用的自动点击软件。它可以模拟鼠标点击操作,以便快速、连续地点击屏幕上的特定位置。该软件通常用于自动执行重复性的点击任务或加快某些操作的速度。 以下是RapidClick可能提供的一些主要功能和特点: 自动点击功能:R…

门户网站开发用什么框架好商务网站开发开题报告

简介: 随着 5G/ 芯片 / 区块链等等新技术的不断成熟、云计算的普及和云原生时代带来的诸多便捷,开发者和架构师们眼前的挑战也不再只是 0-1 的建设问题,技术如何更多地带来业务价值成为了一个值得讨论的话题。阿里巴巴集团研究员,…

MMU的作用

内存管理单元(MMU)是计算机系统中用于管理内存访问的硬件组件,它具有以下多方面的好处: 内存保护隔离不同进程:在多任务操作系统中,MMU 可以为每个进程分配独立的虚拟地址空间,使得不同进程之间的内存空间相互隔…

知名网站开发哪里有运营一款app的费用

TCP是一个有状态通讯协议,所谓的有状态是指通信过程中通信的双方各自维护连接的状态。一、TCP keepalive先简单回顾一下TCP连接建立和断开的整个过程。(这里主要考虑主流程,关于丢包、拥塞、窗口、失败重试等情况后面详细讨论。)首先是客户端发送syn(Syn…

US$16 Yanhua BMW F/G Chassis Odometer Wiring Harness

Yanhua BMW F/G Chassis Odometer Wiring HarnessGood helper of resetting odometer.Function: Use BMW F/G chassis odometer wiring harness connects the odometer and power the wiring harness. Press the wake…

List-To-Table

List-To-Table导航 (返回顶部)1. List 2. List.txt-List.xlsx2.1 添加软件ID[Sid] 2.2 构造2维表格结构(包含第一列和第一行的字段的空表) 2.3 获取具体信息(填充表格信息)3. XLOOKUP3.1 语法及返回 3.2 参数 3.3 测试…

linux 添加唤醒词

在RK3588 Ubuntu主板上实现FunASR离线语音唤醒系统 RK3588是一款性能强大的SoC,搭配FunASR可以很好地实现离线语音唤醒功能。以下是完整的实施方案: 1. 硬件准备 音频输入设备选择 由于您的主板针脚图未显示专用音频…

US$980 Xhorse VVDI2 BMW OBD + CAS4 +FEM/BDC Functions Full BMW License

Xhorse VVDI2 BMW OBD + CAS4 +FEM/BDC Functions Full BMW LicenseLicense Includes:VB-01 BMW OBDVB-02 BMW CAS4VB-03 BMW FEM/BDCIf you buy VVDI2 Basic or VVDI2 VAG and need to get VVDI2 BMW functions for B…

学网站开发可以创业吗聊天网站模板

MySQLMySQL 是最流行的关系型数据库管理系统,在 WEB 应用方面 MySQL 是最好的 RDBMS(Relational Database Management System:关系数据库管理系统)应用软件之一YUM 安装mysql1、下载 YUM 仓库文件打开网址: https://dev.mysql.com/downloads/…

做电销有什么资料网站深圳宝安区是什么风险

首先,我们通过一张图片来了解一下Oracle数据库的内存结构,如下:每个数据库实例有两个关联的内存结构—系统全局区(SGA),程序全局区(PGA)。系统全局(SGA):一组共享的内存结构(称为SGA 组件),其中包含一个OracleDB 实例的…

建立自己的网站费用广告运营具体是做什么

案例:给"ls -l"命令,设置别名通过”ll“快速访问 1、在项目根目录底下查看有无.bash_profile文件,注意这个是个隐藏文件,需要使用ls -a命令查看: 没有.bash_profile新建一个文件, 在最后添加一行…

做软件外包的网站怎么查看小程序的开发公司

maven-antrun-plugin允许我们在各种maven构建阶段中运行ant目标。 我将专门为具有开发环境的开发人员解释maven-antrun-plugin的非常实际的用法。 通常,使用maven build,您会将项目捆绑到war文件或ear文件中。 您可以使用maven-antrun-plugin直接将此w…

【python】1.基础入门

官网安装可能会有下载较慢的问题 win安装记得勾选 “添加到path”终端运行pythonMAC注意:输入python3,否则不识别 写一行python代码并执行:python3回车后执行注:mac按control+D或者输入exit() -- 退出python代码运…

测速网站开发下沙网站建设

修改version https://developers.weixin.qq.com/miniprogram/dev/framework/plugin/using.html