房地产公司网站建设ppt东莞招聘网哪个平台比较好

pingmian/2025/10/12 8:33:12/文章来源:
房地产公司网站建设ppt,东莞招聘网哪个平台比较好,做网站跟桌面程序差别大吗,建设人才服务中心系列文章目录 物流实时数仓#xff1a;采集通道搭建 物流实时数仓#xff1a;数仓搭建 文章目录 系列文章目录前言一、IDEA环境准备1.pom.xml2.目录创建 二、代码编写1.log4j.properties2.CreateEnvUtil.java3.KafkaUtil.java4.OdsApp.java 三、代码测试总结 前言 现在我们…系列文章目录 物流实时数仓采集通道搭建 物流实时数仓数仓搭建 文章目录 系列文章目录前言一、IDEA环境准备1.pom.xml2.目录创建 二、代码编写1.log4j.properties2.CreateEnvUtil.java3.KafkaUtil.java4.OdsApp.java 三、代码测试总结 前言 现在我们开始进行数仓的搭建我们用Kafka来代替数仓的ods层。 基本流程为使用Flink从MySQL读取数据然后写入Kafka中 一、IDEA环境准备 1.pom.xml 写入项目需要的配置 propertiesmaven.compiler.source8/maven.compiler.sourcemaven.compiler.target8/maven.compiler.targetproject.build.sourceEncodingUTF-8/project.build.sourceEncodingjava.version1.8/java.versionflink.version1.17.0/flink.versionhadoop.version3.2.3/hadoop.versionflink-cdc.version2.3.0/flink-cdc.version/propertiesdependenciesdependencygroupIdorg.apache.flink/groupIdartifactIdflink-java/artifactIdversion${flink.version}/version/dependencydependencygroupIdorg.apache.flink/groupIdartifactIdflink-streaming-java/artifactIdversion${flink.version}/version/dependencydependencygroupIdorg.apache.flink/groupIdartifactIdflink-connector-kafka/artifactIdversion${flink.version}/version/dependencydependencygroupIdcom.alibaba/groupIdartifactIdfastjson/artifactIdversion1.2.68/version/dependencydependencygroupIdorg.apache.hadoop/groupIdartifactIdhadoop-client/artifactIdversion${hadoop.version}/versionexclusionsexclusiongroupIdorg.slf4j/groupIdartifactIdslf4j-reload4j/artifactId/exclusion/exclusions/dependencydependencygroupIdorg.apache.flink/groupIdartifactIdflink-clients/artifactIdversion${flink.version}/version/dependencydependencygroupIdorg.slf4j/groupIdartifactIdslf4j-api/artifactIdversion1.7.25/version/dependencydependencygroupIdorg.slf4j/groupIdartifactIdslf4j-log4j12/artifactIdversion1.7.25/version/dependencydependencygroupIdorg.apache.logging.log4j/groupIdartifactIdlog4j-to-slf4j/artifactIdversion2.14.0/version/dependencydependencygroupIdcom.ververica/groupIdartifactIdflink-connector-mysql-cdc/artifactIdversion${flink-cdc.version}/version/dependencydependencygroupIdorg.apache.flink/groupIdartifactIdflink-table-runtime/artifactIdversion${flink.version}/version/dependencydependencygroupIdorg.apache.flink/groupIdartifactIdflink-table-planner-loader/artifactIdversion${flink.version}/version/dependency/dependencies基本上项目需要的所有jar包都有了不够以后在加。 2.目录创建 按照以上目录结构进行目录创建 二、代码编写 1.log4j.properties log4j.rootLoggererror,stdout log4j.appender.stdoutorg.apache.log4j.ConsoleAppender log4j.appender.stdout.targetSystem.out log4j.appender.stdout.layoutorg.apache.log4j.PatternLayout log4j.appender.stdout.layout.ConversionPattern%d %p [%c] - %m%n2.CreateEnvUtil.java 这个文件中有两个方法 创建初始化Flink的env Flink连接mysql的MySqlSource package com.atguigu.tms.realtime.utils;import com.esotericsoftware.minlog.Log; import com.ververica.cdc.connectors.mysql.source.MySqlSource; import com.ververica.cdc.connectors.mysql.source.MySqlSourceBuilder; import com.ververica.cdc.connectors.mysql.table.StartupOptions; import com.ververica.cdc.debezium.JsonDebeziumDeserializationSchema; import org.apache.flink.api.common.restartstrategy.RestartStrategies; import org.apache.flink.api.common.time.Time; import org.apache.flink.api.java.utils.ParameterTool; import org.apache.flink.runtime.state.hashmap.HashMapStateBackend; import org.apache.flink.streaming.api.CheckpointingMode; import org.apache.flink.streaming.api.environment.CheckpointConfig; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.kafka.connect.json.DecimalFormat; import org.apache.kafka.connect.json.JsonConverterConfig;import java.util.HashMap;public class CreateEnvUtil {public static StreamExecutionEnvironment getStreamEnv(String[] args) {// 1.1 指定流处理环境StreamExecutionEnvironment env StreamExecutionEnvironment.getExecutionEnvironment();// 2.检查点相关设置// 2.1 开启检查点env.enableCheckpointing(6000L, CheckpointingMode.EXACTLY_ONCE);// 2.2 设置检查点的超时时间env.getCheckpointConfig().setCheckpointTimeout(120000L);// 2.3 设置job取消之后 检查点是否保留env.getCheckpointConfig().setExternalizedCheckpointCleanup(CheckpointConfig.ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION);// 2.4 设置两个检查点之间的最小时间间隔env.getCheckpointConfig().setMinPauseBetweenCheckpoints(30000L);// 2.5 设置重启策略env.setRestartStrategy(RestartStrategies.failureRateRestart(3, Time.days(1), Time.seconds(3)));// 2.6 设置状态后端env.setStateBackend(new HashMapStateBackend());env.getCheckpointConfig().setCheckpointStorage(hdfs://hadoop102:8020/tms/ck);// 2.7 设置操作hdfs用户// 获取命令行参数ParameterTool parameterTool ParameterTool.fromArgs(args);String hdfsUserName parameterTool.get(hadoop-user-name, atguigu);System.setProperty(HADOOP_USER_NAME, hdfsUserName);return env;}public static MySqlSourceString getMysqlSource(String option, String serverId, String[] args) {ParameterTool parameterTool ParameterTool.fromArgs(args);String mysqlHostname parameterTool.get(hadoop-user-name, hadoop102);int mysqlPort Integer.parseInt(parameterTool.get(mysql-port, 3306));String mysqlUsername parameterTool.get(mysql-username, root);String mysqlPasswd parameterTool.get(mysql-passwd, 000000);option parameterTool.get(start-up-option, option);serverId parameterTool.get(server-id, serverId);// 创建配置信息 Map 集合将 Decimal 数据类型的解析格式配置 k-v 置于其中HashMap config new HashMap();config.put(JsonConverterConfig.DECIMAL_FORMAT_CONFIG, DecimalFormat.NUMERIC.name());// 将前述 Map 集合中的配置信息传递给 JSON 解析 Schema该 Schema 将用于 MysqlSource 的初始化JsonDebeziumDeserializationSchema jsonDebeziumDeserializationSchema new JsonDebeziumDeserializationSchema(false, config);MySqlSourceBuilderString builder MySqlSource.Stringbuilder().hostname(mysqlHostname).port(mysqlPort).username(mysqlUsername).password(mysqlPasswd).deserializer(jsonDebeziumDeserializationSchema);switch (option) {// 读取实时数据case dwd:String[] dwdTables new String[]{tms.order_info,tms.order_cargo,tms.transport_task,tms.order_org_bound};return builder.databaseList(tms).tableList(dwdTables).startupOptions(StartupOptions.latest()).serverId(serverId).build();// 读取维度数据case realtime_dim:String[] realtimeDimTables new String[]{tms.user_info,tms.user_address,tms.base_complex,tms.base_dic,tms.base_region_info,tms.base_organ,tms.express_courier,tms.express_courier_complex,tms.employee_info,tms.line_base_shift,tms.line_base_info,tms.truck_driver,tms.truck_info,tms.truck_model,tms.truck_team};return builder.databaseList(tms).tableList(realtimeDimTables).startupOptions(StartupOptions.initial()).serverId(serverId).build();}Log.error(不支持操作类型);return null;} } 3.KafkaUtil.java 该文件中有一个方法创建Flink连接Kafka需要的Sink package com.atguigu.tms.realtime.utils;import org.apache.flink.api.common.serialization.SimpleStringSchema; import org.apache.flink.api.java.utils.ParameterTool; import org.apache.flink.connector.base.DeliveryGuarantee; import org.apache.flink.connector.kafka.sink.KafkaRecordSerializationSchema; import org.apache.flink.connector.kafka.sink.KafkaSink; import org.apache.kafka.clients.producer.ProducerConfig;public class KafkaUtil {private static final String KAFKA_SERVER hadoop102:9092,hadoop103:9092,hadoop104:9092;public static KafkaSinkString getKafkaSink(String topic, String transIdPrefix, String[] args) {// 将命令行参数对象封装为 ParameterTool 类对象ParameterTool parameterTool ParameterTool.fromArgs(args);// 提取命令行传入的 key 为 topic 的配置信息并将默认值指定为方法参数 topic// 当命令行没有指定 topic 时会采用默认值topic parameterTool.get(topic, topic);// 如果命令行没有指定主题名称且默认值为 null 则抛出异常if (topic null) {throw new IllegalArgumentException(主题名不可为空命令行传参为空且没有默认值!);}// 获取命令行传入的 key 为 bootstrap-servers 的配置信息并指定默认值String bootstrapServers parameterTool.get(bootstrap-severs, KAFKA_SERVER);// 获取命令行传入的 key 为 transaction-timeout 的配置信息并指定默认值String transactionTimeout parameterTool.get(transaction-timeout, 15 * 60 * 1000 );return KafkaSink.Stringbuilder().setBootstrapServers(bootstrapServers).setRecordSerializer(KafkaRecordSerializationSchema.builder().setTopic(topic).setValueSerializationSchema(new SimpleStringSchema()).build()).setDeliveryGuarantee(DeliveryGuarantee.AT_LEAST_ONCE).setTransactionalIdPrefix(transIdPrefix).setProperty(ProducerConfig.TRANSACTION_TIMEOUT_CONFIG, transactionTimeout).build();}public static KafkaSinkString getKafkaSink(String topic, String[] args) {return getKafkaSink(topic, topic _trans, args);} } 4.OdsApp.java Ods层的app创建负责读取和写入数据 package com.atguigu.tms.realtime.app.ods;import com.alibaba.fastjson.JSON; import com.alibaba.fastjson.JSONObject; import com.atguigu.tms.realtime.utils.CreateEnvUtil; import com.atguigu.tms.realtime.utils.KafkaUtil; import com.esotericsoftware.minlog.Log; import com.ververica.cdc.connectors.mysql.source.MySqlSource; import org.apache.flink.api.common.eventtime.WatermarkStrategy; import org.apache.flink.api.java.functions.KeySelector; import org.apache.flink.streaming.api.datastream.KeyedStream; import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.flink.streaming.api.functions.ProcessFunction; import org.apache.flink.util.Collector;public class OdsApp {public static void main(String[] args) throws Exception {// 1.获取流处理环境并指定检查点StreamExecutionEnvironment env CreateEnvUtil.getStreamEnv(args);env.setParallelism(4);// 2 使用FlinkCDC从MySQL中读取数据-事实数据String dwdOption dwd;String dwdServerId 6030;String dwdsourceName ods_app_dwd_source;mysqlToKafka(dwdOption, dwdServerId, dwdsourceName, env, args);// 3 使用FlinkCDC从MySQL中读取数据-维度数据String realtimeDimOption realtime_dim;String realtimeDimServerId 6040;String realtimeDimsourceName ods_app_realtimeDim_source;mysqlToKafka(realtimeDimOption, realtimeDimServerId, realtimeDimsourceName, env, args);env.execute();}public static void mysqlToKafka(String option, String serverId, String sourceName, StreamExecutionEnvironment env, String[] args) {MySqlSourceString MySqlSource CreateEnvUtil.getMysqlSource(option, serverId, args);SingleOutputStreamOperatorString dwdStrDS env.fromSource(MySqlSource, WatermarkStrategy.noWatermarks(), sourceName).setParallelism(1).uid(option sourceName);// 3 简单ETLSingleOutputStreamOperatorString processDS dwdStrDS.process(new ProcessFunctionString, String() {Overridepublic void processElement(String jsonStr, ProcessFunctionString, String.Context ctx, CollectorString out) {try {JSONObject jsonObj JSONObject.parseObject(jsonStr);if (jsonObj.getJSONObject(after) ! null !d.equals(jsonObj.getString(op))) { // System.out.println(jsonObj);Long tsMs jsonObj.getLong(ts_ms);jsonObj.put(ts, tsMs);jsonObj.remove(ts_ms);String jsonString jsonObj.toJSONString();out.collect(jsonString);}} catch (Exception e) {Log.error(从Flink-CDC得到的数据不是一个标准的json格式,e);}}}).setParallelism(1);// 4 按照主键进行分组避免出现乱序KeyedStreamString, String keyedDS processDS.keyBy((KeySelectorString, String) jsonStr - {JSONObject jsonObj JSON.parseObject(jsonStr);return jsonObj.getJSONObject(after).getString(id);});//将数据写入KafkakeyedDS.sinkTo(KafkaUtil.getKafkaSink(tms_ods, sourceName _transPre, args)).uid(option _ods_app_sink);} }三、代码测试 在虚拟机启动我们需要的组件目前需要hadoop、zk、kafka和MySQL。 先开一个消费者进行消费。 bin/kafka-console-consumer.sh --bootstrap-server hadoop102:9092 --topic tms_ods然后运行OdsApp.java 他会先读取维度数据因为维度数据需要全量更新之前的数据。 当他消费结束后我们运行jar包获取事实数据。 java -jar tms-mock-2023-01-06.jar 如果能消费到新数据代表通道没问题ODS层创建完成。 总结 至此ODS搭建完成。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/pingmian/88433.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

美色商城 网站建设乐陵森洁新能源有限公司电话

系列文章目录 星际争霸之小霸王之小蜜蜂(十一)--杀杀杀 星际争霸之小霸王之小蜜蜂(十)--鼠道 星际争霸之小霸王之小蜜蜂(九)--狂鼠之灾 星际争霸之小霸王之小蜜蜂(八)--蓝皮鼠和大…

网站改了关键词如何删除网站黑链

一、hiredis源码安装说明 本文创作基于 hiredisv1.2.0版本 1.简介 hiredis是一个用于与Redis交互的C语言客户端库。它提供了一组简单易用的API,使开发人员可以轻松地连接到Redis服务器,并执行各种操作,如设置和获取键值对、执行命令、订阅和…

福州网站建设求职简历莱芜做网站

目录 一,计算机网络背景 二,网络协议初识 三,网络传输基本流程 四,网络中的地址管理 一,计算机网络背景 网络发展 独立模式,计算机之间相互独立;网络互联,多台计算机连接在一起…

上海建设工程施工许可证查询网站智慧团建网站登录密码

一、创建项目 1、创建一个spring-boot的项目 2、创建三个模块file、system、gateway模块 3、file和system分别配置启动信息,并且创建一个简单的控制器 server.port9000 spring.application.namefile server.servlet.context-path/file4、在根目录下引入依赖 <properties&g…

垡头网站建设网站及微站建设合同验收

在没有封面图片的时候&#xff0c;会随机显示出几张色彩不同的风格图片。这样整个效果就好很多&#xff0c;也能减少工作量&#xff0c;毕竟我们没太多时间去找封面图处理。我就想着用Eyoucms能执行PHP的功能&#xff0c;用rand随机下这几张图片出来&#xff0c;我用mt_rand&am…

番禺区大石做网站苏州手机网站设计

我们多次被问到使用Enigma Protector保护软件免遭破解和逆向工程的最佳方法是什么&#xff1f;在这里我将解释保护常用应用程序的技巧是什么。 许多开发人员认为&#xff0c;如果他们只需单击“保护”按钮&#xff0c;保护程序就会自动完成所有操作&#xff0c;无需嵌入额外的…

宝安网站设计项目没有网站怎么推广

功能&#xff1a;ArrayList去除集合中字符串的重复值(字符串的内容相同)&#xff0c;思路&#xff1a;创建新集合方式。 第一种编译运行没问题&#xff0c;第二种写法出错&#xff0c;原因是不可以两次使用it.next()。 错误提示&#xff1a;Exception in thread "main&q…

建设一个视频网站己18虚拟现实企业解决方案

什么是 Raneto &#xff1f; Raneto 是一个开源知识库平台&#xff0c;它使用静态 Markdown 文件来支持您的知识库。 官方提供了 doc & demo 网站&#xff0c;即是帮助文档&#xff0c;也是个 demo&#xff0c;地址&#xff1a;https://docs.raneto.com 准备 项目使用con…

网站开发主要任务网站建设选哪个

前几天老爸在小区垃圾桶旁边捡回来一个旧茶几&#xff0c;又脏又破都掉漆了&#xff0c;捡回来用不了还占地方&#xff0c;他却非说要动手将它改成个小沙发。那么麻烦还不如直接买一个&#xff0c;但最后改出来的效果还不错&#xff0c;关键是便宜&#xff0c;不到50块钱比定制…

网站开发技术与功能 需求说明牡丹江商城网站建设

文章目录环境准备使用的系统软件磁盘目录安装libfastcommon安装FastDFS安装fastdfs-nginx-module安装nginx单机部署tracker配置storage配置client测试配置nginx访问分布式部署tracker配置storage配置client测试配置nginx访问启动防火墙trackerstoragenginx检测集群说明配置文件…

网站建设策划书1万字网络设置网站

文章目录 1、启动、关闭、挂起、恢复&#xff08;电源&#xff09;2、更多虚拟机操作2.1 电源设置2.2 硬件参数设置2.3 状态栏2.4 全屏显示 3、快照与系统恢复4、桌面环境5、文件系统6、用户目录7、创建目录和文件8、命令行&#xff1a;文件列表ls 9、命令行&#xff1a;切换目…

网站建设目的和功能定位广告投放平台

目录 前言 一.简介 二.优缺点 三.Element完成登录注册 1. 环境配置及前端演示 1.1 安装Element-UI模块 1.2 安装axios和qs(发送get请求和post请求) 1.3 导入依赖 2 页面布局 2.1组件与界面 3.方法实现功能数据交互 3.1 通过方法进行页面跳转 3.2 axios发送get请求 …

合肥建设官方网站网页设计师培训水公司

文 | Ryan都已经2021年了&#xff0c;互联网已经今非昔比&#xff0c;20年前纯文本的日子已经一去不复返&#xff0c;文字已经满足不了网页、文章的需求&#xff0c;绝大部分都会有着精心设计的表格、图片&#xff0c;甚至视频。PDF文档这种富文本格式拥有更加复杂的结构信息&a…

北京市建设工程造价管理处 网站wordpress管理导航栏目

问题描述 最近安装了MYSQL8&#xff0c;遇到了各种问题&#xff0c;总体汇总一下&#xff0c;凡是无法启动就是my.cnf和初始化的参数不匹配。 第一种 启动前设置了mysqld --initialize --usermysql --lower-case-table-names1&#xff0c;my.cnf文件却没有修改就去启动。 第…

在哪可以建一个网站沂水网站建设

引言 Redis是一款基于内存的键值对数据库&#xff0c;提供了多种数据结构存储数据&#xff0c;存取数据的速度还非常快&#xff0c;除了这些优点它还提供了其他特色功能&#xff0c;比如&#xff1a;管道、lua脚本、发布订阅模型 本篇文章主要描述发布订阅模型&#xff0c;将…

免费微信网站开发前端技术包括哪些

大家都知道电脑使用起来非常方便&#xff0c;但遇见ps如何画直线的时候就非常头疼了&#xff0c;如果你是第一次遇到ps如何画直线&#xff0c;怎么样才能快速解决ps如何画直线带来的烦恼呢&#xff1f;小编为大家收集了很多关于ps如何画直线问题的解决方法&#xff0c;下面请看…

益阳市网站建设交换友情链接的意义是什么

随着科技的快速发展&#xff0c;现在的人们越来越注重自己的卫生问题&#xff0c;不仅在吃上面会注重卫生问题&#xff0c;在用的上面也会更加严格要求&#xff0c;而衣服做为我们最贴身的东西&#xff0c;我们对它的要求也会更加高&#xff0c;所以最近这几年较火爆的无疑是内…

广州手机网站制作天津网站建设揭秘

命名规则&#xff1a;表名_字段名1、需要加索引的字段&#xff0c;要在where条件中2、数据量少的字段不需要加索引3、如果where条件中是OR关系&#xff0c;加索引不起作用4、符合最左原则尽量不要用or&#xff0c;如果可以用union代替&#xff0c;则一定要代替https://segmentf…