网站开发工作方案文稿写作网站
news/
2025/9/30 1:33:15/
文章来源:
网站开发工作方案,文稿写作网站,计算机编程是做网站,泉州企业网站维护制作上一篇我们使用keyby后发现数据严重倾斜 https://datamining.blog.csdn.net/article/details/105316728 大概看下问题所在#xff0c;大量数据在一个subtask中运行 这里我们使用两阶段keyby 解决该问题
之前的问题如下图所示 我们期望的是 但我们的需要根据key进行聚合统计大量数据在一个subtask中运行 这里我们使用两阶段keyby 解决该问题
之前的问题如下图所示 我们期望的是 但我们的需要根据key进行聚合统计那么把相同的key放在不同的subtask如何统计
我们看下图只画了主要部分
1.首先将key打散我们加入将key转化为 key-随机数 ,保证数据散列
2.对打散后的数据进行聚合统计这时我们会得到数据比如 : (key1-12,1),(key1-13,19),(key1-1,20),(key2-123,11),(key2-123,10)
3.将散列key还原成我们之前传入的key这时我们的到数据是聚合统计后的结果不是最初的原数据
4.二次keyby进行结果统计输出到addSink 直接看实现代码
import org.apache.flink.api.common.functions.AggregateFunction
import org.apache.flink.api.common.state.{ValueState, ValueStateDescriptor}
import org.apache.flink.api.java.tuple.Tuple
import org.apache.flink.api.scala.typeutils.Types
import org.apache.flink.streaming.api.functions.KeyedProcessFunction
import org.apache.flink.streaming.api.functions.windowing.WindowFunction
import org.apache.flink.streaming.api.scala._
import org.apache.flink.streaming.api.windowing.time.Time
import org.apache.flink.streaming.api.windowing.windows.TimeWindow
import org.apache.flink.util.Collectorobject ProcessFunctionScalaV2 {def main(args: Array[String]): Unit {val env: StreamExecutionEnvironment StreamExecutionEnvironment.getExecutionEnvironmentenv.enableCheckpointing(2000)val stream: DataStream[String] env.socketTextStream(localhost, 9999)val typeAndData: DataStream[(String, Long)] stream.map(x (x.split(,)(0), x.split(,)(1).toLong))val dataStream: DataStream[(String, Long)] typeAndData.map(x (x._1 - scala.util.Random.nextInt(100), x._2))val keyByAgg: DataStream[DataJast] dataStream.keyBy(_._1).timeWindow(Time.seconds(10)).aggregate(new CountAggregate())keyByAgg.print(第一次keyby输出)val result: DataStream[DataJast] keyByAgg.map(data {val newKey: String data.key.substring(0, data.key.indexOf(-))println(newKey)DataJast(newKey, data.count)}).keyBy(_.key).process(new MyProcessFunction())result.print(第二次keyby输出)env.execute()}case class DataJast(key :String,count:Long)//计算keyby后每个Window中的数据总和class CountAggregate extends AggregateFunction[(String, Long),DataJast, DataJast] {override def createAccumulator(): DataJast {println(初始化)DataJast(null,0)}override def add(value: (String, Long), accumulator: DataJast): DataJast {if(accumulator.keynull){printf(第一次加载,key:%s,value:%d\n,value._1,value._2)DataJast(value._1,value._2)}else{printf(数据累加,key:%s,value:%d\n,value._1,accumulator.countvalue._2)DataJast(value._1,accumulator.count value._2)}}override def getResult(accumulator: DataJast): DataJast {println(返回结果accumulator)accumulator}override def merge(a: DataJast, b: DataJast): DataJast {DataJast(a.key,a.countb.count)}}/*** 实现* 根据key分类统计每个key进来的数据量定期统计数量*/class MyProcessFunction extends KeyedProcessFunction[String,DataJast,DataJast]{val delayTime : Long 1000L * 30lazy val valueState:ValueState[Long] getRuntimeContext.getState[Long](new ValueStateDescriptor[Long](ccount,classOf[Long]))override def processElement(value: DataJast, ctx: KeyedProcessFunction[String, DataJast, DataJast]#Context, out: Collector[DataJast]): Unit {if(valueState.value()0){valueState.update(value.count)printf(运行task:%s,第一次初始化数量:%s\n,getRuntimeContext.getIndexOfThisSubtask,value.count)val currentTime: Long ctx.timerService().currentProcessingTime()//注册定时器ctx.timerService().registerProcessingTimeTimer(currentTime delayTime)}else{valueState.update(valueState.value()value.count)printf(运行task:%s,更新统计结果:%s\n ,getRuntimeContext.getIndexOfThisSubtask,valueState.value())}}override def onTimer(timestamp: Long, ctx: KeyedProcessFunction[String, DataJast, DataJast]#OnTimerContext, out: Collector[DataJast]): Unit {//定时器执行可加入业务操作printf(运行task:%s,触发定时器,30秒内数据一共,key:%s,value:%s\n,getRuntimeContext.getIndexOfThisSubtask,ctx.getCurrentKey,valueState.value())//定时统计完成初始化统计数据valueState.update(0)//注册定时器val currentTime: Long ctx.timerService().currentProcessingTime()ctx.timerService().registerProcessingTimeTimer(currentTime delayTime)}}}对key进行散列 val dataStream: DataStream[(String, Long)] typeAndData.map(x (x._1 - scala.util.Random.nextInt(100), x._2)) 设置窗口滚动时间每隔十秒统计一次每隔key下的数据总量 val keyByAgg: DataStream[DataJast] dataStream.keyBy(_._1).timeWindow(Time.seconds(10)).aggregate(new AverageAggregate())keyByAgg.print(第一次keyby输出)
还原key并进行二次keyby对数据总量进行累加 val result: DataStream[DataJast] keyByAgg.map(data {val newKey: String data.key.substring(0, data.key.indexOf(-))println(newKey)DataJast(newKey, data.count)}).keyBy(_.key).process(new MyProcessFunction()) 我们看下优化后的状态
先看下第一map直接从端口拿数据这不涉及keyby所以这个没影响 再看下第一次keyby后的结果因为我们散列后flink根据哈希进行分配所以数据不是百分之百平均但是很明显基本上已经均衡了不会出现这里1一条那里1条这种状况 再看下第二次keyby这里会发现我们ID的2的subtask有820条数据其他的没有数据这里是正常现象因为我们是对第一次聚合后的数据进行keyby统计所以这里的数据大小会非常小比如我们原始数据一条数据有1M大小1000条数据就1个G业务往往还有其他操作我们再第一次keyby 散列时处理其他逻辑比如ETL等等操作最终将统计结果输出给第二次keyby很可能1个G的数据最终只有1kb这比我们将1个G的数据放在一个subtask中处理好很多。
上面我们自定义了MyProcessFunction方法设置每30秒执行一次实际业务场景我们可能会设置一小时执行一次。 至此我们既保证了数据定时统计也保证了数据不倾斜问题。
本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/922395.shtml
如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!