🚀 作者 :“大数据小禅”
🚀 文章简介 :玩转Flink里面核心的Sink Operator实战
🚀 欢迎小伙伴们 点赞👍、收藏⭐、留言💬
目录导航
- Flink Sink Operator简介
- Flink 核心知识 Sink Operator速览
- Flink 自定义的Sink 连接Mysql存储商品订单案例实战
 
 
Flink Sink Operator简介
-  在Flink中,Sink Operator(也称为Sink Function或Sink)是指负责将DataStream或DataSet的数据发送到外部存储或外部系统的操作符。Sink Operator是Flink的数据输出端,它的作用是将处理过的数据写入目标位置,如数据库、文件系统、消息队列等。 
-  Sink Operator通过将数据传输到外部系统来完成最终的数据存储、展示或其他类型的处理。它可以将数据单个地或批量地发送到目标系统,具体取决于Sink操作符的实现。例如,可以将数据写入关系型数据库、NoSQL数据库、消息队列、文件系统等。 
-  在Flink中,可以使用预定义的Sink操作符,如addSink()方法,或自定义Sink函数来实现数据的输出。预定义的Sink操作符可以满足一般的输出需求,而自定义Sink函数可以根据具体的业务逻辑实现特定的输出操作。 
-  自定义Sink函数需要实现SinkFunction接口或RichSinkFunction抽象类,并重写其中的方法。这些方法包括open()、invoke()和close()等,用于初始化和管理连接,以及处理数据发送等操作。 
使用Sink Operator时,需要考虑以下几个方面:
- 目标系统的可用性和容错性:保证目标系统的可用性,并确保在故障发生时能够进行重试或恢复。
- 写入的一致性:根据需求选择适当的写入一致性级别,如精确一次(exactly-once)或最少一次(at-least-once)语义。
- 并行度和性能:根据目标系统的特性和可用资源,设置合适的并行度以提高任务并行处理和整体性能。
Flink 核心知识 Sink Operator速览
- Flink编程模型
  
- Sink 输出源 - 预定义 - writeAsText (过期)
 
- 自定义 - SinkFunction
- RichSinkFunction - Rich相关的api更丰富,多了Open、Close方法,用于初始化连接等
 
 
- flink官方提供 Bundle Connector - kafka、ES 等
 
- Apache Bahir - kafka、ES、Redis等
 
 
- 预定义 
Flink 自定义的Sink 连接Mysql存储商品订单案例实战
-  自定义 - SinkFunction
- RichSinkFunction - Rich相关的api更丰富,多了Open、Close方法,用于初始化连接等
 
 
-  Flink连接mysql的几种方式(都需要加jdbc驱动) - 方式一:自带flink-connector-jdbc 需要加依赖包
 <dependency><groupId>org.apache.flink</groupId><artifactId>flink-connector-jdbc_2.12</artifactId><version>1.12.0</version> </dependency>- 方式二:自定义sink
 
-  保存视频订单到Mysql CREATE TABLE `video_order` (`id` int(11) unsigned NOT NULL AUTO_INCREMENT,`user_id` int(11) DEFAULT NULL,`money` int(11) DEFAULT NULL,`title` varchar(32) DEFAULT NULL,`trade_no` varchar(64) DEFAULT NULL,`create_time` date DEFAULT NULL,PRIMARY KEY (`id`) ) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4;- 添加jdbc依赖
 <dependency><groupId>mysql</groupId><artifactId>mysql-connector-java</artifactId><version>8.0.25</version> </dependency>- 编码
 public class MysqlSink extends RichSinkFunction<VideoOrder> {private Connection conn = null;private PreparedStatement ps = null;@Overridepublic void open(Configuration parameters) throws Exception {conn = DriverManager.getConnection("jdbc:mysql://localhost:3306/xd_order?useUnicode=true&characterEncoding=utf8&allowMultiQueries=true&serverTimezone=Asia/Shanghai", "root", "xdclass.net"); //url user passwdString sql = "INSERT INTO `video_order` (`user_id`, `money`, `title`, `trade_no`, `create_time`) VALUES(?,?,?,?,?);";ps = conn.prepareStatement(sql);}@Overridepublic void close() throws Exception {if (conn != null) {conn.close();}if (ps != null) {ps.close();}}@Overridepublic void invoke(VideoOrder videoOrder, Context context) throws Exception {//给ps中的?设置具体值ps.setInt(1,videoOrder.getUserId());ps.setInt(2,videoOrder.getMoney());ps.setString(3,videoOrder.getTitle());ps.setString(4,videoOrder.getTradeNo());ps.setDate(5,new Date(videoOrder.getCreateTime().getTime()));ps.executeUpdate();} } 
 