1、Broadcast State 案例
规则流:1,a,b [规则名1 规则为 a 或 b]
图形流:green,a [绿色 a]
问题:如果规则流先于数据流则匹配不上=>此时缓冲数据流中的数据【如果规则流为null】
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.common.state.*;
import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
import org.apache.flink.api.common.typeinfo.TypeHint;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.datastream.BroadcastStream;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.co.KeyedBroadcastProcessFunction;
import org.apache.flink.util.Collector;import java.util.*;public class _06_BroadcastState {public static void main(String[] args) throws Exception {StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();// 接收广播的规则数据SingleOutputStreamOperator<_06_Rule> ruleStream = env.socketTextStream("localhost", 8888).map(new MapFunction<String, _06_Rule>() {@Overridepublic _06_Rule map(String value) throws Exception {String[] fields = value.split(",");return new _06_Rule(fields[0], new Tuple2<>(fields[1], fields[2]));}});// 一个 map descriptor,它描述了用于存储规则名称与规则本身的 map 存储结构MapStateDescriptor<String, _06_Rule> ruleStateDescriptor = new MapStateDescriptor<>("RulesBroadcastState",BasicTypeInfo.STRING_TYPE_INFO,TypeInformation.of(new TypeHint<_06_Rule>() {}));// 广播流,广播规则并且创建 broadcast stateBroadcastStream<_06_Rule> ruleBroadcastStream = ruleStream.broadcast(ruleStateDescriptor);// 接收图形数据SingleOutputStreamOperator<_06_Shape> shapeStream = env.socketTextStream("localhost", 9999).map(new MapFunction<String, _06_Shape>() {@Overridepublic _06_Shape map(String value) throws Exception {String[] fields = value.split(",");return new _06_Shape(fields[0], fields[1]);}});shapeStream.keyBy(_06_Shape::getColour).connect(ruleBroadcastStream).process(new KeyedBroadcastProcessFunction<String, _06_Shape, _06_Rule, String>() {
// private transient ValueState<List<_06_Shape>> dataBuffer;private transient ListState<_06_Shape> dataBuffer;@Overridepublic void open(Configuration parameters) throws Exception {
// ValueStateDescriptor<List<_06_Shape>> dataListStateDescriptor = new ValueStateDescriptor<>("dataBuffer", TypeInformation.of(new TypeHint<List<_06_Shape>>() {
// }));
//
// dataBuffer = getRuntimeContext().getState(dataListStateDescriptor);ListStateDescriptor<_06_Shape> listStateDescriptor = new ListStateDescriptor<>("dataBuffer", _06_Shape.class);dataBuffer = getRuntimeContext().getListState(listStateDescriptor);}@Overridepublic void processElement(_06_Shape value, KeyedBroadcastProcessFunction<String, _06_Shape, _06_Rule, String>.ReadOnlyContext ctx, Collector<String> out) throws Exception {// 获取广播的规则数据System.out.println("输入的数据颜色为=>" + value.getColour() + ",类型为=>" + value.getType());ReadOnlyBroadcastState<String, _06_Rule> broadcastState = ctx.getBroadcastState(ruleStateDescriptor);Iterator<Map.Entry<String, _06_Rule>> iterator = broadcastState.immutableEntries().iterator();if (iterator.hasNext()) {// 使用 ValueState// 先从缓存中读取数据进行匹配
// List<_06_Shape> shapeList = dataBuffer.value();// 多并行度时,防止某个并行度无数据导致报错
// if (shapeList != null) {
// if (!shapeList.isEmpty()) {
// for (_06_Shape shape : shapeList) {
// System.out.println("被缓冲的数据开始进行处理=>" + shape);
// // 从事件数据中继续匹配
// while (iterator.hasNext()) {
// Map.Entry<String, _06_Rule> rule = iterator.next();
// if (Objects.equals(rule.getValue().getRule().f0, shape.getType()) || Objects.equals(rule.getValue().getRule().f1, shape.getType())) {
// out.collect("匹配上的数据为=>" + value + "匹配上的规则名称为=>" + rule.getValue().getRuleName());
// }
// }
// }
//
// shapeList.clear();
// }
// }// 使用 ListStateIterator<_06_Shape> dataIterator = dataBuffer.get().iterator();while (dataIterator.hasNext()){_06_Shape shape = dataIterator.next();System.out.println("被缓冲的数据开始进行处理=>" + shape);while (iterator.hasNext()) {Map.Entry<String, _06_Rule> rule = iterator.next();if (Objects.equals(rule.getValue().getRule().f0, value.getType()) || Objects.equals(rule.getValue().getRule().f1, value.getType())) {out.collect("匹配上的数据为=>" + value + "匹配上的规则名称为=>" + rule.getValue().getRuleName());}}dataIterator.remove();}// 从事件数据中继续匹配while (iterator.hasNext()) {Map.Entry<String, _06_Rule> rule = iterator.next();if (Objects.equals(rule.getValue().getRule().f0, value.getType()) || Objects.equals(rule.getValue().getRule().f1, value.getType())) {out.collect("匹配上的数据为=>" + value + "匹配上的规则名称为=>" + rule.getValue().getRuleName());}}} else {System.out.println("此时规则流中无规则,先缓冲数据流");// 使用 listStatedataBuffer.add(value);// 使用 valueState
// List<_06_Shape> shapeList = dataBuffer.value();
// if (shapeList == null) {
// shapeList = new ArrayList<>();
// }
// shapeList.add(value);
// dataBuffer.update(shapeList);}}@Overridepublic void processBroadcastElement(_06_Rule value, KeyedBroadcastProcessFunction<String, _06_Shape, _06_Rule, String>.Context ctx, Collector<String> out) throws Exception {// 获取广播流输入的数据,存入广播状态System.out.println("输入的规则名称为=>" + value.getRuleName() + ",规则为=>" + value.getRule());BroadcastState<String, _06_Rule> broadcastState = ctx.getBroadcastState(ruleStateDescriptor);broadcastState.put(value.getRuleName(), value);}}).print();env.execute();}
}
2、Pojo 类
import lombok.AllArgsConstructor;
import lombok.Data;
import lombok.NoArgsConstructor;
import org.apache.flink.api.java.tuple.Tuple2;import java.io.Serializable;@NoArgsConstructor
@AllArgsConstructor
@Data
public class _06_Rule implements Serializable {private String ruleName;private Tuple2<String,String> rule;
}
import lombok.AllArgsConstructor;
import lombok.Data;
import lombok.NoArgsConstructor;import java.io.Serializable;@AllArgsConstructor
@NoArgsConstructor
@Data
public class _06_Shape implements Serializable {private String colour;private String type;
}