1、它到底做了什么
- Source 并行运行:有多少个 source 并发子任务,就把
Long的序列切成多少段(sub-sequence) - 你提供一个
GeneratorFunction<Long, OUT>:把输入的 index(Long)映射成任意事件类型 - 每个 subtask 内部有序,但全局顺序取决于并行度(parallelism)
一句话:Flink 负责发 index,你负责把 index 变成事件。
2、最小可跑示例:生成 0~999 的字符串
importorg.apache.flink.api.common.eventtime.WatermarkStrategy;importorg.apache.flink.api.common.functions.GeneratorFunction;importorg.apache.flink.api.common.typeinfo.Types;importorg.apache.flink.connector.datagen.source.DataGeneratorSource;importorg.apache.flink.streaming.api.datastream.DataStreamSource;importorg.apache.flink.streaming.api.environment.StreamExecutionEnvironment;publicclassDataGenDemo{publicstaticvoidmain(String[]args)throwsException{StreamExecutionEnvironmentenv=StreamExecutionEnvironment.getExecutionEnvironment();GeneratorFunction<Long,String>generator=index->"Number: "+index;longnumberOfRecords=1000;DataGeneratorSource<String>source=newDataGeneratorSource<>(generator,numberOfRecords,Types.STRING);DataStreamSource<String>stream=env.fromSource(source,WatermarkStrategy.noWatermarks(),"Generator Source");stream.print();env.execute("datagen-demo");}}要点:
- 并行度为 1 时输出是严格
"Number: 0"到"Number: 999"顺序 - 并行度 > 1 时:每个 subtask 内部仍然按序,但不同 subtask 的结果交织输出
3、限速:控制总吞吐(全局每秒不超过 N 条)
importorg.apache.flink.api.common.functions.GeneratorFunction;importorg.apache.flink.api.common.typeinfo.Types;importorg.apache.flink.connector.datagen.source.DataGeneratorSource;importorg.apache.flink.connector.datagen.source.RateLimiterStrategy;GeneratorFunction<Long,String>generator=index->"Number: "+index;DataGeneratorSource<String>source=newDataGeneratorSource<>(generator,Long.MAX_VALUE,RateLimiterStrategy.perSecond(100),// 全部 source subtasks 加起来 <= 100 条/sTypes.STRING);适用场景:
- 你想模拟“上游流量”但又不想把本机打爆
- 做算子性能对比、Backpressure 观察、checkpoint 行为观察
4、有界/无界:它“永远是 bounded”,但可以“看起来无界”
- 语义上永远是 bounded(理论上会结束)
- 但
numberOfRecords = Long.MAX_VALUE基本等同“不会结束”(实践上像 unbounded)
建议:
- 要跑有限数据:考虑 BATCH mode,更贴近离线回放
- 要模拟持续输入:用
Long.MAX_VALUE+ rate limit
5、容错语义:at-least-once / end-to-end exactly-once 能不能保证?
可以,但有个硬条件:
GeneratorFunction必须对输入 index 完全确定性
也就是:同一个 index 永远生成同样的输出。
反例(会破坏确定性):
random()、System.currentTimeMillis()、读外部可变配置、读网络请求结果
正确做法:
- 用 index 推导数据(例如 hash(index) 生成用户、金额、状态)
- 或者用固定 seed 的伪随机:
new Random(index)(每个 index 固定)
6、Watermark:也可以在 Source 侧发“确定性水位线”
默认例子用noWatermarks(),但你完全可以:
- 在生成事件里带 eventTime
- 配合自定义
WatermarkStrategy生成 deterministic watermarks
适合做 event-time 窗口、乱序、迟到数据的测试演示。