kafka + flink +mysql 案例

假设你有两个Kafka主题:user_activities_topicproduct_views_topic,并且你希望将user_activities_topic中的数据写入到user_activities表,而将product_views_topic中的数据写入到product_views表。

maven

<dependencies><!-- Apache Flink dependencies --><dependency><groupId>org.apache.flink</groupId><artifactId>flink-streaming-java_2.12</artifactId><version>1.14.0</version></dependency><dependency><groupId>org.apache.flink</groupId><artifactId>flink-connector-kafka_2.12</artifactId><version>1.14.0</version></dependency><dependency><groupId>org.apache.flink</groupId><artifactId>flink-connector-jdbc_2.12</artifactId><version>1.14.0</version></dependency><!-- MySQL JDBC Driver --><dependency><groupId>mysql</groupId><artifactId>mysql-connector-java</artifactId><version>8.0.26</version></dependency>
</dependencies>

Flink Job 示例代码

import org.apache.flink.api.common.serialization.SimpleStringSchema;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer;
import org.apache.flink.connector.jdbc.JdbcConnectionOptions;
import org.apache.flink.connector.jdbc.JdbcSink;import java.util.Properties;public class MultipleKafkaToFlinkToMysql {public static void main(String[] args) throws Exception {final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();// 配置Kafka消费者属性Properties kafkaProperties = new Properties();kafkaProperties.setProperty("bootstrap.servers", "localhost:9092");kafkaProperties.setProperty("group.id", "test");// 创建第一个Kafka消费者 (User Activities)FlinkKafkaConsumer<String> userActivitiesConsumer = new FlinkKafkaConsumer<>("user_activities_topic",new SimpleStringSchema(),kafkaProperties);// 创建第二个Kafka消费者 (Product Views)FlinkKafkaConsumer<String> productViewsConsumer = new FlinkKafkaConsumer<>("product_views_topic",new SimpleStringSchema(),kafkaProperties);// 从Kafka获取用户活动数据流env.addSource(userActivitiesConsumer).map(value -> {String[] parts = value.split(",");return new UserActivity(parts[0], parts[1]);}).addSink(JdbcSink.sink("INSERT INTO user_activities (user_id, activity) VALUES (?, ?)",(statement, userActivity) -> {statement.setString(1, userActivity.userId);statement.setString(2, userActivity.activity);},new JdbcConnectionOptions.JdbcConnectionOptionsBuilder().withUrl("jdbc:mysql://localhost:3306/your_database").withDriverName("com.mysql.cj.jdbc.Driver").withUsername("your_username").withPassword("your_password").build()));// 从Kafka获取产品浏览数据流env.addSource(productViewsConsumer).map(value -> {String[] parts = value.split(",");return new ProductView(parts[0], Integer.parseInt(parts[1]));}).addSink(JdbcSink.sink("INSERT INTO product_views (user_id, product_id) VALUES (?, ?)",(statement, productView) -> {statement.setString(1, productView.userId);statement.setInt(2, productView.productId);},new JdbcConnectionOptions.JdbcConnectionOptionsBuilder().withUrl("jdbc:mysql://localhost:3306/your_database").withDriverName("com.mysql.cj.jdbc.Driver").withUsername("your_username").withPassword("your_password").build()));env.execute("Multiple Kafka to Multiple MySQL Tables with Flink");}// 用户活动类public static class UserActivity {public String userId;public String activity;public UserActivity(String userId, String activity) {this.userId = userId;this.activity = activity;}}// 产品浏览类public static class ProductView {public String userId;public int productId;public ProductView(String userId, int productId) {this.userId = userId;this.productId = productId;}}
}

当处理多个消费者和表时,直接为每个消费者编写独立的代码会导致代码冗长且难以维护。为了提高代码的可维护性和扩展性,可以采用一些设计模式和抽象方法来简化代码结构。以下是一些改进策略:

### 1. 使用工厂模式和配置文件

通过使用工厂模式和配置文件,可以将不同Kafka主题和MySQL表的映射关系抽象出来,从而减少重复代码。

### 2. 示例代码重构

下面是一个示例,展示了如何通过配置文件和工厂模式来管理多个Kafka消费者和相应的MySQL输出。

#### 2.1 配置文件 (`application.yaml`)

首先,定义一个配置文件来描述每个消费者的配置信息,包括Kafka主题、目标MySQL表名以及字段映射等。

consumers:- name: user_activities_consumerkafka_topic: user_activities_topicmysql_table: user_activitiesfields:- { index: 0, column: user_id }- { index: 1, column: activity }- name: product_views_consumerkafka_topic: product_views_topicmysql_table: product_viewsfields:- { index: 0, column: user_id }- { index: 1, column: product_id }

#### 2.2 工厂类 (`ConsumerFactory.java`)

创建一个工厂类,根据配置文件中的信息动态生成消费者并设置其数据处理逻辑。

import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer;
import org.apache.flink.connector.jdbc.JdbcSink;
import org.apache.flink.api.common.serialization.SimpleStringSchema;import java.util.Properties;
import java.util.List;
import java.util.Map;public class ConsumerFactory {public static void createAndRegisterConsumers(StreamExecutionEnvironment env, List<Map<String, Object>> consumers) {Properties kafkaProperties = new Properties();kafkaProperties.setProperty("bootstrap.servers", "localhost:9092");kafkaProperties.setProperty("group.id", "test");for (Map<String, Object> consumerConfig : consumers) {String kafkaTopic = (String) consumerConfig.get("kafka_topic");String mysqlTable = (String) consumerConfig.get("mysql_table");List<Map<String, Object>> fields = (List<Map<String, Object>>) consumerConfig.get("fields");FlinkKafkaConsumer<String> kafkaConsumer = new FlinkKafkaConsumer<>(kafkaTopic,new SimpleStringSchema(),kafkaProperties);env.addSource(kafkaConsumer).map(value -> parseMessage(value, fields)).addSink(JdbcSink.sink(generateInsertSQL(mysqlTable, fields),(statement, record) -> populateStatement(statement, record, fields),new JdbcConnectionOptions.JdbcConnectionOptionsBuilder().withUrl("jdbc:mysql://localhost:3306/your_database").withDriverName("com.mysql.cj.jdbc.Driver").withUsername("your_username").withPassword("your_password").build()));}}private static Map<String, Object> parseMessage(String value, List<Map<String, Object>> fields) {String[] parts = value.split(",");return fields.stream().collect(Collectors.toMap(field -> (String) field.get("column"),field -> parts[(Integer) field.get("index")]));}private static String generateInsertSQL(String table, List<Map<String, Object>> fields) {StringBuilder columns = new StringBuilder();StringBuilder placeholders = new StringBuilder();for (int i = 0; i < fields.size(); i++) {if (i > 0) {columns.append(", ");placeholders.append(", ");}columns.append(fields.get(i).get("column"));placeholders.append("?");}return "INSERT INTO " + table + " (" + columns.toString() + ") VALUES (" + placeholders.toString() + ")";}private static void populateStatement(java.sql.PreparedStatement statement, Map<String, Object> record, List<Map<String, Object>> fields) throws Exception {for (int i = 0; i < fields.size(); i++) {String column = (String) fields.get(i).get("column");Object value = record.get(column);if (value instanceof Integer) {statement.setInt(i + 1, (Integer) value);} else if (value instanceof String) {statement.setString(i + 1, (String) value);}// 其他类型可以根据需要添加}}
}

#### 2.3 主程序 (`Main.java`)

在主程序中加载配置文件,并调用工厂类来注册所有消费者。

import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.yaml.snakeyaml.Yaml;import java.io.InputStream;
import java.util.List;
import java.util.Map;public class Main {public static void main(String[] args) throws Exception {StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();Yaml yaml = new Yaml();InputStream inputStream = Main.class.getClassLoader().getResourceAsStream("application.yaml");Map<String, Object> config = yaml.load(inputStream);List<Map<String, Object>> consumers = (List<Map<String, Object>>) config.get("consumers");ConsumerFactory.createAndRegisterConsumers(env, consumers);env.execute("Multiple Kafka to Multiple MySQL Tables with Flink");}
}

### 关键点解释

1. **配置文件**:通过配置文件定义每个消费者的信息,使得添加新的消费者变得简单,只需修改配置文件即可。
   
2. **工厂模式**:使用工厂类 `ConsumerFactory` 根据配置动态创建消费者,并为其设置数据处理逻辑和输出目标。

3. **通用的数据处理逻辑**:`parseMessage` 方法根据配置文件中的字段映射解析消息,`generateInsertSQL` 和 `populateStatement` 方法则用于生成插入SQL语句和填充PreparedStatement。

4. **扩展性**:这种设计方式非常灵活,易于扩展。如果需要增加新的消费者或修改现有消费者的配置,只需更新配置文件而无需更改代码逻辑。

这种方法不仅减少了代码量,还提高了代码的可维护性和扩展性,使得系统更容易管理和维护。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/bicheng/72893.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

远程登录客户端软件 CTerm 发布了 v4.0.0

有时候我们需要远程登录到 Linux/Unix 服务器&#xff0c;这方面使用最广泛的客户端软件是 PuTTY&#xff0c;不过它是全英文的&#xff0c;而且是单窗口的&#xff0c;有时候显得不那么方便。 CTerm (Clever Terminal) 是一个 Windows 平台下支持 Telnet 和 SSH 协议进行远程…

从李佳琦团队看新型用工:灵活就业如何重构组织架构?

2022年“双11”期间&#xff0c;李佳琦直播间累计销售额突破115亿元&#xff08;来源&#xff1a;新腕数据《2022双11直播电商战报》&#xff09;&#xff0c;其背后团队规模约400人&#xff0c;但全职员工仅占35%&#xff0c;其余65%为外包选品团队、兼职客服、第三方MCN机构人…

微软程序的打包格式MSIX

MSIX 微软推出的MSIX格式是其为统一Windows应用程序打包和部署而设计的新一代安装包格式&#xff0c;具有以下核心特点和进展&#xff1a; 1. 推出背景与时间线 MSIX最初于2018年在微软Build大会上宣布&#xff0c;并在同年7月发布预览版打包工具&#xff0c;10月正式版上线…

AFL++安装

学习fuzzing也几天了&#xff0c;今天记录AFL的安装及使用 一、实验环境 虚拟机&#xff1a;ubuntu20.04 当然也可以uname -a去看自己的版本号 二、AFL安装 1.先更新一下工具 sudo apt update2.安装AFL必要的一些依赖&#xff0c;例如编译工具&#xff08;如 build-essen…

【STM32】ADC功能-单通道多通道(学习笔记)

本章结合上一节内容复习更好理解【江协科技STM32】ADC数模转换器-学习笔记-CSDN博客 一、ADC单通道 接线图 ADC初始化 ①RCC开启时钟&#xff0c;包括ADC和GPIO的时钟&#xff0c;另外ADCCLK的分频器也要配置 ②配置GPIO,&#xff0c;把需要用的GPIO配置成模拟输入模式&am…

基于YOLO11深度学习的运动品牌LOGO检测与识别系统【python源码+Pyqt5界面+数据集+训练代码】

《------往期经典推荐------》 一、AI应用软件开发实战专栏【链接】 项目名称项目名称1.【人脸识别与管理系统开发】2.【车牌识别与自动收费管理系统开发】3.【手势识别系统开发】4.【人脸面部活体检测系统开发】5.【图片风格快速迁移软件开发】6.【人脸表表情识别系统】7.【…

当前主流的大模型训练与推理框架的全面汇总

以下是当前主流的大模型训练与推理框架的全面汇总 以下是更新后包含 SGLang 的大模型训练与推理框架列表&#xff0c;并对分类和示例进行了优化&#xff1a; 一、通用深度学习推理框架 TensorRT-LLM 特点&#xff1a;NVIDIA推出的针对Transformer类模型的优化框架&#xff0c;支…

Linux学习(八)(服务管理(检查服务状态,开始/停止服务,检查服务日志,创建新服务))

服务管理 Linux 中的服务管理是指控制 Linux 在启动和关闭计算机的过程中启动和停止的服务&#xff08;或“守护程序”&#xff09;的系统。这些服务执行各种功能&#xff0c;并提供未附加到用户界面的进程。 Linux 系统&#xff0c;尤其是系统管理员&#xff0c;通常需要管理…

ElasticSearch 分词器介绍及测试:Standard(标准分词器)、English(英文分词器)、Chinese(中文分词器)、IK(IK 分词器)

ElasticSearch 分词器介绍及测试&#xff1a;Standard&#xff08;标准分词器&#xff09;、English&#xff08;英文分词器&#xff09;、Chinese&#xff08;中文分词器&#xff09;、IK&#xff08;IK 分词器&#xff09; ElasticSearch 分词器介绍及测试1. Standard Analyz…

【计算机网络】确认家庭网络是千兆/百兆带宽并排查问题

要确认你的带宽是千兆&#xff08;1000Mbps&#xff09;还是百兆&#xff08;100Mbps&#xff09;&#xff0c;可以通过以下方法逐步排查&#xff1a; 一、检查物理设备 1. 查看路由器和光猫的网口 千兆网口&#xff1a;路由器或光猫的网口旁通常会标注 “10/100/1000M” 或 …

[数据分享第七弹]全球洪水相关数据集

洪水是一种常见的自然灾害&#xff0c;在全球范围内造成了极为严重的威胁。近年来&#xff0c;针对洪水事件的检测分析&#xff0c;以及对于洪水灾害和灾后恢复能力的研究日渐增多&#xff0c;也产生了众多洪水数据集。今天&#xff0c;我们一起来收集整理一下相关数据集。&…

深入探讨AI-Ops架构 第一讲 - 运维的进化历程以及未来发展趋势

首先&#xff0c;让我们一起回顾运维的进化之路&#xff0c;然后再深入探讨AI-Ops架构的细节。 运维的进化历程 1. AI 大范围普及前的运维状态 (传统运维) 在AI技术尚未广泛渗透到运维领域之前&#xff0c;我们称之为传统运维&#xff0c;其主要特点是&#xff1a; 人工驱动…

Hive-数据倾斜优化

数据倾斜的原因 1&#xff09;key分布不均匀&#xff0c;本质上就是业务数据有可能会存在倾斜 2&#xff09;某些SQL语句本身就有数据倾斜 关键词 情形 后果 Join A、其中一个表较小&#xff0c;但是key集中; B、两张表都是大表&#xff0c;key不均 分发到…

番外篇 - Docker的使用

一、Docker的介绍 Docker 是一个开源的应用容器引擎&#xff0c;基于 Go 语言 并遵从Apache2.0协议开源。 Docker 可以让开发者打包他们的应用以及依赖包到一个轻量级、可移植的容器中&#xff0c;然后发布到任何流行的 Linux 机器上&#xff0c;也可以实现虚拟化。 容器是完…

深度学习与普通神经网络有何区别?

深度学习与普通神经网络的主要区别体现在以下几个方面&#xff1a; 一、结构复杂度 普通神经网络&#xff1a;通常指浅层结构&#xff0c;层数较少&#xff0c;一般为2-3层&#xff0c;包括输入层、一个或多个隐藏层、输出层。深度学习&#xff1a;强调通过5层以上的深度架构…

RuleOS:区块链开发的“新引擎”,点燃Web3创新之火

RuleOS&#xff1a;区块链开发的“新引擎”&#xff0c;点燃Web3创新之火 在区块链技术的浪潮中&#xff0c;RuleOS宛如一台强劲的“新引擎”&#xff0c;为个人和企业开发去中心化应用&#xff08;DApp&#xff09;注入了前所未有的动力。它以独特的设计理念和强大的功能特性&…

c# MimeEntity修改邮件附件名称

在C#中&#xff0c;当你使用如MimeKit库来处理电子邮件时&#xff0c;你可以通过修改MimeEntity的ContentDisposition属性来更改邮件附件的名称。以下是如何做到这一点的步骤&#xff1a; 1. 添加MimeKit引用 首先&#xff0c;确保你的项目中已经添加了MimeKit库。如果你使用…

Windows编译环境搭建(MSYS2\MinGW\cmake)

我的音视频/流媒体开源项目(github) 一、基础环境搭建 1.1 MSYS2\MinGW 参考&#xff1a;1. 基于MSYS2的Mingw-w64 GCC搭建Windows下C开发环境_msys2使用mingw64编译 在Widndows系统上&#xff0c;使用gcc工具链&#xff08;g&#xff09;进行C程序开发&#xff1f;可以的&a…

TikTok美国战略升级:聚焦美食旅行,本地化服务如何重塑市场格局

平台深耕本土内容生态&#xff0c;餐饮旅游创作者迎流量红利&#xff0c;算法推荐机制激发地域经济新活力 过去一年&#xff0c;TikTok在美国市场的动作频频引发行业关注。从早期以娱乐、舞蹈为主的全球化内容&#xff0c;到如今将资源向美食、旅行两大垂类倾斜&#xff0c;这…

Unity Dots环境配置

文章目录 前言环境配置1.新建Unity 工程2.安装Entities包2.安装EntitiesGraphics包3.安装URP渲染管线 Dots窗口 前言 DOTS&#xff08;Data-Oriented Technology Stack&#xff09;是Unity推出的一种用于开发高性能游戏和应用的数据导向技术栈&#xff0c;包含三大核心组件&am…