【README】
本文记录了flink sink操作,输出目的存储器(中间件)包括
- kafka;
- es;
- db;
- 等等有很多;
- 本文只给出了 sink2kafka的代码;
本文使用的flink为 1.14.4 版本;
本文部分内容参考了 flink 官方文档,如下:
Kafka | Apache Flinkhttps://nightlies.apache.org/flink/flink-docs-master/zh/docs/connectors/datastream/kafka/
【1】 flink sink2kafka
1)场景:
- 消费上游topic hello0415的数据,并把数据流输出到下游kafka topic hell0416;
- 如,我们在java框架中把数据库日志发送到 topic1 ,然后我想统计执行时间大于3秒的sql,则需要把筛选后的sql 发送到 下游 topic2, 就可以使用sink 来完成;
2)代码
/*** @Description flink流输出到kafka(下沉)* @author xiao tang* @version 1.0.0* @createTime 2022年04月16日*/
public class SinkTest1_Kafka {public static void main(String[] args) throws Exception {// 创建执行环境StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();env.setParallelism(4); // 设置全局并行度// 创建flink连接kafkaKafkaSource kafkaSource = KafkaSource.<String>builder().setValueOnlyDeserializer(new SimpleStringSchema()).setProperties(KafkaConsumerProps._INS.getProps()).setTopics("hello0415").setGroupId("flink").build();DataStream<String> kafkaStream = env.fromSource(kafkaSource, WatermarkStrategy.noWatermarks(), "kafkaSource");// kafka生产者属性Properties kafkaProducerProps = new Properties();kafkaProducerProps.put(ProducerConfig.ACKS_CONFIG, "all");kafkaProducerProps.put(ProducerConfig.RETRIES_CONFIG, 3);kafkaProducerProps.put(ProducerConfig.BATCH_SIZE_CONFIG, 1 * KfkNumConst._1K);kafkaProducerProps.put(ProducerConfig.LINGER_MS_CONFIG, 1);kafkaProducerProps.put(ProducerConfig.BUFFER_MEMORY_CONFIG, 32 * KfkNumConst._1M);// 把kafka数据流输出到(sink) topic hello0416KafkaSink<String> sink = KafkaSink.<String>builder().setKafkaProducerConfig(kafkaProducerProps).setBootstrapServers("192.168.163.201:9092,192.168.163.202:9092,192.168.163.203:9092").setRecordSerializer(KafkaRecordSerializationSchema.builder().setTopic("hello0416").setValueSerializationSchema(new SimpleStringSchema()).build()).setDeliverGuarantee(DeliveryGuarantee.AT_LEAST_ONCE).build();// 添加到sinkkafkaStream.sinkTo(sink);// 打印streamkafkaStream.print("kafkaStream");// 执行env.execute("kafkaSinkJob");}
}
效果:
【补充】
kafka 消费者属性
public enum KafkaConsumerProps {_INS;/* 1.创建kafka生产者的配置信息 */Properties props = new Properties();private KafkaConsumerProps() {/*2.指定连接的kafka集群, broker-list */props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "192.168.163.201:9092,192.168.163.202:9092,192.168.163.203:9092");props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());props.put(ConsumerConfig.GROUP_ID_CONFIG, "G1");props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "latest");}public Properties getProps() {return props;}
}