成品网站 高端建设网站0基础需要学什么

bicheng/2026/1/19 18:04:24/文章来源:
成品网站 高端,建设网站0基础需要学什么,手机优化系统,杨邦胜酒店设计公司官网​ 本文将详细介绍Flink-CDC如何全量及增量采集Sqlserver数据源#xff0c;准备适配Sqlserver数据源的小伙伴们可以参考本文#xff0c;希望本文能给你带来一定的帮助。 一、Sqlserver的安装及开启事务日志 如果没有Sqlserver环境#xff0c;但你又想学习这块的内容#x…​ 本文将详细介绍Flink-CDC如何全量及增量采集Sqlserver数据源准备适配Sqlserver数据源的小伙伴们可以参考本文希望本文能给你带来一定的帮助。 一、Sqlserver的安装及开启事务日志 如果没有Sqlserver环境但你又想学习这块的内容那你只能自己动手通过docker安装一个 myself sqlserver来用作学习当然如果你有现成环境那就检查一下Sqlserver是否开启了代理(sqlagent.enabled)服务和CDC功能。 1.1 docker拉取镜像 看Github上写Flink-CDC 目前支持的Sqlserver版本为2012, 2014, 2016, 2017, 2019但我想全部拉到最新事实证明2022-latest 和latest是一样的因为imagId都是一致的且在后续测试也是没有问题的所以我在docker上拉取镜像时直接采用如下命令 docker pull mcr.microsoft.com/mssql/server:latest1.2 运行Sqlserver并设置代理 标准启动模式没什么好说的主要设置一下密码(密码要求比较严格建议直接在网上搜个随机密码生成器来搞一下)。 docker run -e ACCEPT_EULAY -e SA_PASSWORD${your_password} \-p 1433:1433 --name sqlserver \-d mcr.microsoft.com/mssql/server:latest设置代理sqlagent.enabled,代理设置完成后需要重启Sqlserver因为我们是docker安装的直接用docker restart sqlserver就行了。 [roothdp-01 ~]# docker exec -it --user root sqlserver bash root0274812d0c10:/# /opt/mssql/bin/mssql-conf set sqlagent.enabled true SQL Server needs to be restarted in order to apply this setting. Please run systemctl restart mssql-server.service. root0274812d0c10:/# exit exit [roothdp-01 ~]# docker restart sqlserver sqlserver1.3 启用CDC功能 按照如下步骤执行命令如果看到is_cdc_enabled 1则说明当前数据库 root0274812d0c10:/# /opt/mssql-tools/bin/sqlcmd -S localhost -U SA -P ${your_password} 1 create databases test; 2 go 1 use test; 2 go Changed database context to test. 1 EXEC sys.sp_cdc_enable_db; 2 go 1 SELECT is_cdc_enabled FROM sys.databases WHERE name test; 2 go is_cdc_enabled --------------1(1 rows affected) 1 CREATE TABLE t_info (id int,order_date date,purchaser int,quantity int,product_id int,PRIMARY KEY ([id])) 2 go 1 2 3 EXEC sys.sp_cdc_enable_table 4 source_schema dbo, 5 source_name t_info, 6 role_name cdc_role; 7 go Update mask evaluation will be disabled in net_changes_function because the CLR configuration option is disabled. Job cdc.zeus_capture started successfully. Job cdc.zeus_cleanup started successfully. 1 select * from t_info; 2 go id order_date purchaser quantity product_id ----------- ---------------- ----------- ----------- -----------(0 rows affected)1.4 检查CDC是否正常开启 用客户端连接Sqlserver查看test库下的INFORMATION_SCHEMA.TABLES中是否出现TABLE_SCHEMA cdc的表如果出现说明已经成功安装Sqlserver并启用了CDC。 1 use test; 2 go Changed database context to test. 1 select * from INFORMATION_SCHEMA.TABLES; 2 goTABLE_CATALOG TABLE_SCHEMA TABLE_NAME TABLE_TYPE test dbo user_info BASE TABLE test dbo systranschemas BASE TABLE test cdc change_tables BASE TABLE test cdc ddl_history BASE TABLE test cdc lsn_time_mapping BASE TABLE test cdc captured_columns BASE TABLE test cdc index_columns BASE TABLE test dbo orders BASE TABLE test cdc dbo_orders_CT BASE TABLE二、具体实现 2.1 Flik-CDC采集SqlServer主程序 添加依赖包 dependencygroupIdcom.ververica/groupIdartifactIdflink-connector-sqlserver-cdc/artifactIdversion3.0.0/version/dependency编写主函数 public static void main(String[] args) throws Exception {StreamExecutionEnvironment env StreamExecutionEnvironment.getExecutionEnvironment();// 设置全局并行度env.setParallelism(1);// 设置时间语义为ProcessingTimeenv.getConfig().setAutoWatermarkInterval(0);// 每隔60s启动一个检查点env.enableCheckpointing(60000, CheckpointingMode.EXACTLY_ONCE);// checkpoint最小间隔env.getCheckpointConfig().setMinPauseBetweenCheckpoints(1000);// checkpoint超时时间env.getCheckpointConfig().setCheckpointTimeout(60000);// 同一时间只允许一个checkpoint// env.getCheckpointConfig().setMaxConcurrentCheckpoints(1);// Flink处理程序被cancel后会保留Checkpoint数据// env.getCheckpointConfig().setExternalizedCheckpointCleanup(CheckpointConfig.ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION);SourceFunctionString sqlServerSource SqlServerSource.Stringbuilder().hostname(localhost).port(1433).username(SA).password().database(test).tableList(dbo.t_info).startupOptions(StartupOptions.initial()).debeziumProperties(getDebeziumProperties()).deserializer(new CustomerDeserializationSchemaSqlserver()).build();DataStreamSourceString dataStreamSource env.addSource(sqlServerSource, _transaction_log_source);dataStreamSource.print().setParallelism(1);env.execute(sqlserver-cdc-test);}public static Properties getDebeziumProperties() {Properties properties new Properties();properties.put(converters, sqlserverDebeziumConverter);properties.put(sqlserverDebeziumConverter.type, SqlserverDebeziumConverter);properties.put(sqlserverDebeziumConverter.database.type, sqlserver);// 自定义格式可选properties.put(sqlserverDebeziumConverter.format.datetime, yyyy-MM-dd HH:mm:ss);properties.put(sqlserverDebeziumConverter.format.date, yyyy-MM-dd);properties.put(sqlserverDebeziumConverter.format.time, HH:mm:ss);return properties;}2.2 自定义Sqlserver反序列化格式 Flink-CDC底层技术为debezium它捕获到Sqlserver数据变更(CRUD)的数据格式如下 #初始化 Struct{afterStruct{id1,order_date2024-01-30,purchaser1,quantity100,product_id1},sourceStruct{version1.9.7.Final,connectorsqlserver,namesqlserver_transaction_log_source,ts_ms1706574924473,snapshottrue,dbzeus,schemadbo,tableorders,commit_lsn0000002b:00002280:0003},opr,ts_ms1706603724432}#新增 Struct{afterStruct{id12,order_date2024-01-11,purchaser6,quantity233,product_id63},sourceStruct{version1.9.7.Final,connectorsqlserver,namesqlserver_transaction_log_source,ts_ms1706603786187,dbzeus,schemadbo,tableorders,change_lsn0000002b:00002480:0002,commit_lsn0000002b:00002480:0003,event_serial_no1},opc,ts_ms1706603788461}#更新 Struct{beforeStruct{id12,order_date2024-01-11,purchaser6,quantity233,product_id63},afterStruct{id12,order_date2024-01-11,purchaser8,quantity233,product_id63},sourceStruct{version1.9.7.Final,connectorsqlserver,namesqlserver_transaction_log_source,ts_ms1706603845603,dbzeus,schemadbo,tableorders,change_lsn0000002b:00002500:0002,commit_lsn0000002b:00002500:0003,event_serial_no2},opu,ts_ms1706603850134}#删除 Struct{beforeStruct{id11,order_date2024-01-11,purchaser6,quantity233,product_id63},sourceStruct{version1.9.7.Final,connectorsqlserver,namesqlserver_transaction_log_source,ts_ms1706603973023,dbzeus,schemadbo,tableorders,change_lsn0000002b:000025e8:0002,commit_lsn0000002b:000025e8:0005,event_serial_no1},opd,ts_ms1706603973859}因此可以根据自己需要自定义反序列化格式将数据按照标准统一数据输出下面是我自定义的格式供大家参考 import com.alibaba.fastjson2.JSON; import com.alibaba.fastjson2.JSONObject; import com.alibaba.fastjson2.JSONWriter; import com.ververica.cdc.debezium.DebeziumDeserializationSchema; import io.debezium.data.Envelope; import org.apache.flink.api.common.typeinfo.BasicTypeInfo; import org.apache.flink.api.common.typeinfo.TypeInformation; import org.apache.flink.util.Collector; import org.apache.kafka.connect.data.Field; import org.apache.kafka.connect.data.Schema; import org.apache.kafka.connect.data.Struct; import org.apache.kafka.connect.source.SourceRecord;import java.util.HashMap; import java.util.Map;public class CustomerDeserializationSchemaSqlserver implements DebeziumDeserializationSchemaString {private static final long serialVersionUID -1L;Overridepublic void deserialize(SourceRecord sourceRecord, Collector collector) {MapString, Object resultMap new HashMap();String topic sourceRecord.topic();String[] split topic.split([.]);String database split[1];String table split[2];resultMap.put(db, database);resultMap.put(tableName, table);//获取操作类型Envelope.Operation operation Envelope.operationFor(sourceRecord);//获取数据本身Struct struct (Struct) sourceRecord.value();Struct after struct.getStruct(after);Struct before struct.getStruct(before);String op operation.name();resultMap.put(op, op);//新增,更新或者初始化if (op.equals(Envelope.Operation.CREATE.name()) || op.equals(Envelope.Operation.READ.name()) || op.equals(Envelope.Operation.UPDATE.name())) {JSONObject afterJson new JSONObject();if (after ! null) {Schema schema after.schema();for (Field field : schema.fields()) {afterJson.put(field.name(), after.get(field.name()));}resultMap.put(after, afterJson);}}if (op.equals(Envelope.Operation.DELETE.name())) {JSONObject beforeJson new JSONObject();if (before ! null) {Schema schema before.schema();for (Field field : schema.fields()) {beforeJson.put(field.name(), before.get(field.name()));}resultMap.put(before, beforeJson);}}collector.collect(JSON.toJSONString(resultMap, JSONWriter.Feature.FieldBased, JSONWriter.Feature.LargeObject));}Overridepublic TypeInformationString getProducedType() {return BasicTypeInfo.STRING_TYPE_INFO;}}2.3 自定义日期格式转换器 debezium会将日期转为5位数字日期时间转为13位的数字因此我们需要根据Sqlserver的日期类型转换成标准的时期或者时间格式。Sqlserver的日期类型主要包含以下几种 字段类型快照类型(jdbc type)cdc类型(jdbc type)DATEjava.sql.Date(91)java.sql.Date(91)TIMEjava.sql.Timestamp(92)java.sql.Time(92)DATETIMEjava.sql.Timestamp(93)java.sql.Timestamp(93)DATETIME2java.sql.Timestamp(93)java.sql.Timestamp(93)DATETIMEOFFSETmicrosoft.sql.DateTimeOffset(-155)microsoft.sql.DateTimeOffset(-155)SMALLDATETIMEjava.sql.Timestamp(93)java.sql.Timestamp(93) import io.debezium.spi.converter.CustomConverter; import io.debezium.spi.converter.RelationalColumn; import org.apache.kafka.connect.data.SchemaBuilder; import java.time.ZoneOffset; import java.time.format.DateTimeFormatter; import java.util.Properties;Sl4j public class SqlserverDebeziumConverter implements CustomConverterSchemaBuilder, RelationalColumn {private static final String DATE_FORMAT yyyy-MM-dd;private static final String TIME_FORMAT HH:mm:ss;private static final String DATETIME_FORMAT yyyy-MM-dd HH:mm:ss;private DateTimeFormatter dateFormatter;private DateTimeFormatter timeFormatter;private DateTimeFormatter datetimeFormatter;private SchemaBuilder schemaBuilder;private String databaseType;private String schemaNamePrefix;Overridepublic void configure(Properties properties) {// 必填参数database.type只支持sqlserverthis.databaseType properties.getProperty(database.type);// 如果未设置或者设置的不是mysql、sqlserver则抛出异常。if (this.databaseType null || !this.databaseType.equals(sqlserver))) {throw new IllegalArgumentException(database.type 必须设置为sqlserver);}// 选填参数format.date、format.time、format.datetime。获取时间格式化的格式String dateFormat properties.getProperty(format.date, DATE_FORMAT);String timeFormat properties.getProperty(format.time, TIME_FORMAT);String datetimeFormat properties.getProperty(format.datetime, DATETIME_FORMAT);// 获取自身类的包名数据库类型为默认schema.nameString className this.getClass().getName();// 查看是否设置schema.name.prefixthis.schemaNamePrefix properties.getProperty(schema.name.prefix, className . this.databaseType);// 初始化时间格式化器dateFormatter DateTimeFormatter.ofPattern(dateFormat);timeFormatter DateTimeFormatter.ofPattern(timeFormat);datetimeFormatter DateTimeFormatter.ofPattern(datetimeFormat);}// sqlserver的转换器public void registerSqlserverConverter(String columnType, ConverterRegistrationSchemaBuilder converterRegistration) {String schemaName this.schemaNamePrefix . columnType.toLowerCase();schemaBuilder SchemaBuilder.string().name(schemaName);switch (columnType) {case DATE:converterRegistration.register(schemaBuilder, value - {if (value null) {return null;} else if (value instanceof java.sql.Date) {return dateFormatter.format(((java.sql.Date) value).toLocalDate());} else {return this.failConvert(value, schemaName);}});break;case TIME:converterRegistration.register(schemaBuilder, value - {if (value null) {return null;} else if (value instanceof java.sql.Time) {return timeFormatter.format(((java.sql.Time) value).toLocalTime());} else if (value instanceof java.sql.Timestamp) {return timeFormatter.format(((java.sql.Timestamp) value).toLocalDateTime().toLocalTime());} else {return this.failConvert(value, schemaName);}});break;case DATETIME:case DATETIME2:case SMALLDATETIME:case DATETIMEOFFSET:converterRegistration.register(schemaBuilder, value - {if (value null) {return null;} else if (value instanceof java.sql.Timestamp) {return datetimeFormatter.format(((java.sql.Timestamp) value).toLocalDateTime());} else if (value instanceof microsoft.sql.DateTimeOffset) {microsoft.sql.DateTimeOffset dateTimeOffset (microsoft.sql.DateTimeOffset) value;return datetimeFormatter.format(dateTimeOffset.getOffsetDateTime().withOffsetSameInstant(ZoneOffset.UTC).toLocalDateTime());} else {return this.failConvert(value, schemaName);}});break;default:schemaBuilder null;break;}}Overridepublic void converterFor(RelationalColumn relationalColumn, ConverterRegistrationSchemaBuilder converterRegistration) {// 获取字段类型String columnType relationalColumn.typeName().toUpperCase();// 根据数据库类型调用不同的转换器if (this.databaseType.equals(sqlserver)) {this.registerSqlserverConverter(columnType, converterRegistration);} else {log.warn(不支持的数据库类型: {}, this.databaseType);schemaBuilder null;}}private String getClassName(Object value) {if (value null) {return null;}return value.getClass().getName();}// 类型转换失败时的日志打印private String failConvert(Object value, String type) {String valueClass this.getClassName(value);String valueString valueClass null ? null : value.toString();return valueString;} } 三、总计 目前Fink-CDC对这种增量采集传统数据库的技术已经封装的很好了并且官方也给了详细的操作教程但如果想要深入的学习一项技能个人觉得还是要从头到尾操作一遍一方面能够快速的提升自己另一方面发现问题时也能从不同的角度来思考解决方案希望本篇文章能够给大家带来一点帮助。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/bicheng/88797.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

没备案网站如何通过百度联盟审核游戏网站开发视频

[安全]修复jquery低版本的xss安全漏洞,升级用最新版jquery; [新增]后台的登录页、欢迎页支持自定义模板文件; [新增]后台多语言列表管理支持手工同步文档数据; [新增]后台商品发布时,单规格商品支持会员折扣价的设置…

服务器网站模板贵阳网站建设培训

5.4 树和森林 5.4.1 树的存储结构 树的存储1:双亲表示法 用数组顺序存储各结点,每个结点中保存数据元素、指向双亲结点(父结点)的“指针” #define MAX_TREE_SIZE 100// 树的结点 typedef struct{ElemType data;int parent; }PTNode;// 树的类型 type…

河北衡水建设网站公司怎样做原创短视频网站

目标 为了让不同语言表达相同意思,所需的编码长度差不多,有了UTF-64编码。 现实 可是,今天刚发现:虽然不同语言用UTF-8表达相同意思,编码长度差很多,但是,压缩之后就差不多了。代码如下&…

贵阳网站建设平台网业云原神

先认识一下Object Object 类的 toString 方法 返回一个字符串,该字符串由类名(对象是该类的一个实例)、at 标记符“”和此对象哈希码的无符号十六进制表示组成。换句话说,该方法返回一个字符串,它的值等于:…

哪个网站可以做拼图装饰行业网站模板

本组件一般用于展示头像的地方,如个人中心,或者评论列表页的用户头像展示等场所。 #平台差异说明 App(vue)App(nvue)H5小程序√√√√ #基本使用 通过src指定头像的路径即可简单使用,如果传…

网站建设公司宣传文案网站开发 入门 pdf

一、前言 前面几篇文章介绍了微服务核心的两个组件:注册中心和网关,今天我们来思考一下微服务如何拆分,微服务拆分难度在于粒度和层次,粒度太大拆分的意义不大,粒度太小开发、调试、运维会有很多坑。 二、微服务划分…

芜湖网站设计公司赤峰酒店网站建设哪家便宜

文章目录 前言一、树型结构1.1概念1.2 知识点1.3 树的表示形式1.4 树的应用 二、二叉树2.1 概念2.2 两种特殊的二叉树2.3 二叉树的性质2.4 二叉树的存储2.5 二叉树的基本操作2.5.1 二叉树的遍历2.5.2 二叉树的基本操作 前言 对学习的二叉树的知识进行总结。 一、树型结构 1.1…

网页制作与网站建设wordpress获取点赞数

1.横向对标参数 厂商华为车型极狐阿尔法S全新HI版上市时间2022/9/23方案13V5R3L+1DMS摄像头前视摄像头*4【双目+长焦+广角】侧视摄像头*4后视摄像头*1环视摄像头*4

网站建设网站排名优化徐州专业建站公司

一、概述 方法区是一个供各线程共享的运行时内存区域。它存储了每一个类的结构信息,例如运行时常量池(Runtime Constant Pool)、字段和方法数据、构造函数和普通方法的字节码内容。上面讲的是规范,在不同的虚拟机里面实现是不一样…

学院后勤处网站建设方案书本地企业网站建设服务

翻译自 Separator 本章介绍如何使用分隔符组织JavaFX应用程序的UI组件。 SeparatorJavaFX API中可用的类表示水平或垂直分隔线。它用于划分应用程序用户界面的元素,不会产生任何操作。但是,您可以设置样式,对其应用视觉效果,甚…

网站建设公司小程序开发wordpress男同

📚博客主页:爱敲代码的小杨. ✨专栏:《Java SE语法》 | 《数据结构与算法》 | 《C生万物》 ❤️感谢大家点赞👍🏻收藏⭐评论✍🏻,您的三连就是我持续更新的动力❤️ 🙏小杨水平有…

最新购物网站建设框架淘客联盟推广平台

一、设计要求 用QT做一个聊天室, 制作一个服务器和客户端。可以进行注册、登录, 登陆成功后可以使用昵称进行发送、接收消息。 能根据昵称、聊天内容查询历史记录,也可以查询全部聊天记录。 。 二、客户端三级ui界面 三、项目代码 //在…

做网站流量是什么wordpress去掉尾巴

在Windows 10上,你可以通过多种方式管理飞行模式,在本指南中,我将概述完成此配置的步骤。飞行模式是一种允许你通过单一选项快速禁用无线设备(如Wi-Fi、蓝牙、近场通信(NFC)、全球定位系统(GPS)和蜂窝)的功能。通常,当你在飞机上,或者你在禁止使用无线电信号的地方时…

网站制作与设计知识点建站平台备案

搬以前写的博客【2014-03-01 08:09】 图像连通域标记算法研究 ConnectedComponent Labeling 最近在研究一篇复杂下背景文字检测的论文。 “Detecting Text in Natural Scenes with Stroke Width Transform ” CPVR 2010的文章,它主要探讨利用文字内…

网站做进一步优化wordpress文字摘要

目录 1.DML(数据操作语言)1.添加数据2.修改数据3.删除数据 2.DQL(数据查询语言)1.DQL-语法2.基本查询3.条件查询(WHERE)1.语法:2.条件:3.案例: 4.聚合函数1.介绍2.常见聚合函数3.语法4.案例 5.分组查询(GROUP BY&#…

郑州专业网站建设公司网站制作教程切片

飞机乘坐流程/怎么坐飞机 编写原因对象人员经历背景飞机乘坐流程流程梗概订票去往机场办理登记牌/托运行李安检登机转机 飞行中下机 后记 编写原因 从上家单位裸辞,大概率下次不会找频繁出差的工作了,而日常出行应该也不会考虑飞机这种交通工具&#xf…

中国中小企业网站建设情况网站分类表

资源 Lua - Joe DFs Builds 或者在文章附加资源下载。 使用方法 在当前文件夹打开文件夹,使用cmd。srglue.exe srlua.exe in.lua out.exe 或 srglue srlua.exe in.lua out.exe in.lua:指用进行打包的lua文件。out.exe:指输出的exe文件的…

南京网站建设培训wordpress 热搜词

目录 一.前后端传输数据的编码格式(contentType) 1.form表单 2.编码格式 3.Ajax 4.代码演示 后端 前端HTML 二.Ajax发送JSON格式数据 1.引入 后端 前端 2.后端 接收到的数据为空 解决办法 3.request方法判断Ajax 4.总结 前端在通过ajax…

在线考试网站开发模板建站合同

本文分享主要描述了几种书写SQL时常见的一些隐藏错误,主要包括:在运算符中使用null值、在聚合数据时使用null值、求平均值时使用判断条件、滤条件中使用and和or、查询的列字段之间缺少逗号分隔、inner join与left join。都是一些比较细节的点&#xff0c…

vue可以做pc端网站吗seo实战密码读后感

3D裸眼技术大多处于研发阶段,它的研发分两个方向,一是硬件设备的研发,二为显示内容的处理研发。第二种已经开始小范围的商业运用。大众消费者接触的不多。从技术上来看,3D裸眼可分为光屏障式(Barrier)、柱状透镜(Lenticular Lens)…