房地产公司网站建设ppt东莞招聘网哪个平台比较好
房地产公司网站建设ppt,东莞招聘网哪个平台比较好,做网站跟桌面程序差别大吗,建设人才服务中心系列文章目录
物流实时数仓#xff1a;采集通道搭建 物流实时数仓#xff1a;数仓搭建 文章目录 系列文章目录前言一、IDEA环境准备1.pom.xml2.目录创建 二、代码编写1.log4j.properties2.CreateEnvUtil.java3.KafkaUtil.java4.OdsApp.java 三、代码测试总结 前言
现在我们…系列文章目录
物流实时数仓采集通道搭建 物流实时数仓数仓搭建 文章目录 系列文章目录前言一、IDEA环境准备1.pom.xml2.目录创建 二、代码编写1.log4j.properties2.CreateEnvUtil.java3.KafkaUtil.java4.OdsApp.java 三、代码测试总结 前言
现在我们开始进行数仓的搭建我们用Kafka来代替数仓的ods层。 基本流程为使用Flink从MySQL读取数据然后写入Kafka中 一、IDEA环境准备
1.pom.xml
写入项目需要的配置
propertiesmaven.compiler.source8/maven.compiler.sourcemaven.compiler.target8/maven.compiler.targetproject.build.sourceEncodingUTF-8/project.build.sourceEncodingjava.version1.8/java.versionflink.version1.17.0/flink.versionhadoop.version3.2.3/hadoop.versionflink-cdc.version2.3.0/flink-cdc.version/propertiesdependenciesdependencygroupIdorg.apache.flink/groupIdartifactIdflink-java/artifactIdversion${flink.version}/version/dependencydependencygroupIdorg.apache.flink/groupIdartifactIdflink-streaming-java/artifactIdversion${flink.version}/version/dependencydependencygroupIdorg.apache.flink/groupIdartifactIdflink-connector-kafka/artifactIdversion${flink.version}/version/dependencydependencygroupIdcom.alibaba/groupIdartifactIdfastjson/artifactIdversion1.2.68/version/dependencydependencygroupIdorg.apache.hadoop/groupIdartifactIdhadoop-client/artifactIdversion${hadoop.version}/versionexclusionsexclusiongroupIdorg.slf4j/groupIdartifactIdslf4j-reload4j/artifactId/exclusion/exclusions/dependencydependencygroupIdorg.apache.flink/groupIdartifactIdflink-clients/artifactIdversion${flink.version}/version/dependencydependencygroupIdorg.slf4j/groupIdartifactIdslf4j-api/artifactIdversion1.7.25/version/dependencydependencygroupIdorg.slf4j/groupIdartifactIdslf4j-log4j12/artifactIdversion1.7.25/version/dependencydependencygroupIdorg.apache.logging.log4j/groupIdartifactIdlog4j-to-slf4j/artifactIdversion2.14.0/version/dependencydependencygroupIdcom.ververica/groupIdartifactIdflink-connector-mysql-cdc/artifactIdversion${flink-cdc.version}/version/dependencydependencygroupIdorg.apache.flink/groupIdartifactIdflink-table-runtime/artifactIdversion${flink.version}/version/dependencydependencygroupIdorg.apache.flink/groupIdartifactIdflink-table-planner-loader/artifactIdversion${flink.version}/version/dependency/dependencies基本上项目需要的所有jar包都有了不够以后在加。
2.目录创建
按照以上目录结构进行目录创建
二、代码编写
1.log4j.properties
log4j.rootLoggererror,stdout
log4j.appender.stdoutorg.apache.log4j.ConsoleAppender
log4j.appender.stdout.targetSystem.out
log4j.appender.stdout.layoutorg.apache.log4j.PatternLayout
log4j.appender.stdout.layout.ConversionPattern%d %p [%c] - %m%n2.CreateEnvUtil.java
这个文件中有两个方法 创建初始化Flink的env Flink连接mysql的MySqlSource
package com.atguigu.tms.realtime.utils;import com.esotericsoftware.minlog.Log;
import com.ververica.cdc.connectors.mysql.source.MySqlSource;
import com.ververica.cdc.connectors.mysql.source.MySqlSourceBuilder;
import com.ververica.cdc.connectors.mysql.table.StartupOptions;
import com.ververica.cdc.debezium.JsonDebeziumDeserializationSchema;
import org.apache.flink.api.common.restartstrategy.RestartStrategies;
import org.apache.flink.api.common.time.Time;
import org.apache.flink.api.java.utils.ParameterTool;
import org.apache.flink.runtime.state.hashmap.HashMapStateBackend;
import org.apache.flink.streaming.api.CheckpointingMode;
import org.apache.flink.streaming.api.environment.CheckpointConfig;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.kafka.connect.json.DecimalFormat;
import org.apache.kafka.connect.json.JsonConverterConfig;import java.util.HashMap;public class CreateEnvUtil {public static StreamExecutionEnvironment getStreamEnv(String[] args) {// 1.1 指定流处理环境StreamExecutionEnvironment env StreamExecutionEnvironment.getExecutionEnvironment();// 2.检查点相关设置// 2.1 开启检查点env.enableCheckpointing(6000L, CheckpointingMode.EXACTLY_ONCE);// 2.2 设置检查点的超时时间env.getCheckpointConfig().setCheckpointTimeout(120000L);// 2.3 设置job取消之后 检查点是否保留env.getCheckpointConfig().setExternalizedCheckpointCleanup(CheckpointConfig.ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION);// 2.4 设置两个检查点之间的最小时间间隔env.getCheckpointConfig().setMinPauseBetweenCheckpoints(30000L);// 2.5 设置重启策略env.setRestartStrategy(RestartStrategies.failureRateRestart(3, Time.days(1), Time.seconds(3)));// 2.6 设置状态后端env.setStateBackend(new HashMapStateBackend());env.getCheckpointConfig().setCheckpointStorage(hdfs://hadoop102:8020/tms/ck);// 2.7 设置操作hdfs用户// 获取命令行参数ParameterTool parameterTool ParameterTool.fromArgs(args);String hdfsUserName parameterTool.get(hadoop-user-name, atguigu);System.setProperty(HADOOP_USER_NAME, hdfsUserName);return env;}public static MySqlSourceString getMysqlSource(String option, String serverId, String[] args) {ParameterTool parameterTool ParameterTool.fromArgs(args);String mysqlHostname parameterTool.get(hadoop-user-name, hadoop102);int mysqlPort Integer.parseInt(parameterTool.get(mysql-port, 3306));String mysqlUsername parameterTool.get(mysql-username, root);String mysqlPasswd parameterTool.get(mysql-passwd, 000000);option parameterTool.get(start-up-option, option);serverId parameterTool.get(server-id, serverId);// 创建配置信息 Map 集合将 Decimal 数据类型的解析格式配置 k-v 置于其中HashMap config new HashMap();config.put(JsonConverterConfig.DECIMAL_FORMAT_CONFIG, DecimalFormat.NUMERIC.name());// 将前述 Map 集合中的配置信息传递给 JSON 解析 Schema该 Schema 将用于 MysqlSource 的初始化JsonDebeziumDeserializationSchema jsonDebeziumDeserializationSchema new JsonDebeziumDeserializationSchema(false, config);MySqlSourceBuilderString builder MySqlSource.Stringbuilder().hostname(mysqlHostname).port(mysqlPort).username(mysqlUsername).password(mysqlPasswd).deserializer(jsonDebeziumDeserializationSchema);switch (option) {// 读取实时数据case dwd:String[] dwdTables new String[]{tms.order_info,tms.order_cargo,tms.transport_task,tms.order_org_bound};return builder.databaseList(tms).tableList(dwdTables).startupOptions(StartupOptions.latest()).serverId(serverId).build();// 读取维度数据case realtime_dim:String[] realtimeDimTables new String[]{tms.user_info,tms.user_address,tms.base_complex,tms.base_dic,tms.base_region_info,tms.base_organ,tms.express_courier,tms.express_courier_complex,tms.employee_info,tms.line_base_shift,tms.line_base_info,tms.truck_driver,tms.truck_info,tms.truck_model,tms.truck_team};return builder.databaseList(tms).tableList(realtimeDimTables).startupOptions(StartupOptions.initial()).serverId(serverId).build();}Log.error(不支持操作类型);return null;}
}
3.KafkaUtil.java
该文件中有一个方法创建Flink连接Kafka需要的Sink
package com.atguigu.tms.realtime.utils;import org.apache.flink.api.common.serialization.SimpleStringSchema;
import org.apache.flink.api.java.utils.ParameterTool;
import org.apache.flink.connector.base.DeliveryGuarantee;
import org.apache.flink.connector.kafka.sink.KafkaRecordSerializationSchema;
import org.apache.flink.connector.kafka.sink.KafkaSink;
import org.apache.kafka.clients.producer.ProducerConfig;public class KafkaUtil {private static final String KAFKA_SERVER hadoop102:9092,hadoop103:9092,hadoop104:9092;public static KafkaSinkString getKafkaSink(String topic, String transIdPrefix, String[] args) {// 将命令行参数对象封装为 ParameterTool 类对象ParameterTool parameterTool ParameterTool.fromArgs(args);// 提取命令行传入的 key 为 topic 的配置信息并将默认值指定为方法参数 topic// 当命令行没有指定 topic 时会采用默认值topic parameterTool.get(topic, topic);// 如果命令行没有指定主题名称且默认值为 null 则抛出异常if (topic null) {throw new IllegalArgumentException(主题名不可为空命令行传参为空且没有默认值!);}// 获取命令行传入的 key 为 bootstrap-servers 的配置信息并指定默认值String bootstrapServers parameterTool.get(bootstrap-severs, KAFKA_SERVER);// 获取命令行传入的 key 为 transaction-timeout 的配置信息并指定默认值String transactionTimeout parameterTool.get(transaction-timeout, 15 * 60 * 1000 );return KafkaSink.Stringbuilder().setBootstrapServers(bootstrapServers).setRecordSerializer(KafkaRecordSerializationSchema.builder().setTopic(topic).setValueSerializationSchema(new SimpleStringSchema()).build()).setDeliveryGuarantee(DeliveryGuarantee.AT_LEAST_ONCE).setTransactionalIdPrefix(transIdPrefix).setProperty(ProducerConfig.TRANSACTION_TIMEOUT_CONFIG, transactionTimeout).build();}public static KafkaSinkString getKafkaSink(String topic, String[] args) {return getKafkaSink(topic, topic _trans, args);}
}
4.OdsApp.java
Ods层的app创建负责读取和写入数据
package com.atguigu.tms.realtime.app.ods;import com.alibaba.fastjson.JSON;
import com.alibaba.fastjson.JSONObject;
import com.atguigu.tms.realtime.utils.CreateEnvUtil;
import com.atguigu.tms.realtime.utils.KafkaUtil;
import com.esotericsoftware.minlog.Log;
import com.ververica.cdc.connectors.mysql.source.MySqlSource;
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.api.java.functions.KeySelector;
import org.apache.flink.streaming.api.datastream.KeyedStream;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.ProcessFunction;
import org.apache.flink.util.Collector;public class OdsApp {public static void main(String[] args) throws Exception {// 1.获取流处理环境并指定检查点StreamExecutionEnvironment env CreateEnvUtil.getStreamEnv(args);env.setParallelism(4);// 2 使用FlinkCDC从MySQL中读取数据-事实数据String dwdOption dwd;String dwdServerId 6030;String dwdsourceName ods_app_dwd_source;mysqlToKafka(dwdOption, dwdServerId, dwdsourceName, env, args);// 3 使用FlinkCDC从MySQL中读取数据-维度数据String realtimeDimOption realtime_dim;String realtimeDimServerId 6040;String realtimeDimsourceName ods_app_realtimeDim_source;mysqlToKafka(realtimeDimOption, realtimeDimServerId, realtimeDimsourceName, env, args);env.execute();}public static void mysqlToKafka(String option, String serverId, String sourceName, StreamExecutionEnvironment env, String[] args) {MySqlSourceString MySqlSource CreateEnvUtil.getMysqlSource(option, serverId, args);SingleOutputStreamOperatorString dwdStrDS env.fromSource(MySqlSource, WatermarkStrategy.noWatermarks(), sourceName).setParallelism(1).uid(option sourceName);// 3 简单ETLSingleOutputStreamOperatorString processDS dwdStrDS.process(new ProcessFunctionString, String() {Overridepublic void processElement(String jsonStr, ProcessFunctionString, String.Context ctx, CollectorString out) {try {JSONObject jsonObj JSONObject.parseObject(jsonStr);if (jsonObj.getJSONObject(after) ! null !d.equals(jsonObj.getString(op))) {
// System.out.println(jsonObj);Long tsMs jsonObj.getLong(ts_ms);jsonObj.put(ts, tsMs);jsonObj.remove(ts_ms);String jsonString jsonObj.toJSONString();out.collect(jsonString);}} catch (Exception e) {Log.error(从Flink-CDC得到的数据不是一个标准的json格式,e);}}}).setParallelism(1);// 4 按照主键进行分组避免出现乱序KeyedStreamString, String keyedDS processDS.keyBy((KeySelectorString, String) jsonStr - {JSONObject jsonObj JSON.parseObject(jsonStr);return jsonObj.getJSONObject(after).getString(id);});//将数据写入KafkakeyedDS.sinkTo(KafkaUtil.getKafkaSink(tms_ods, sourceName _transPre, args)).uid(option _ods_app_sink);}
}三、代码测试
在虚拟机启动我们需要的组件目前需要hadoop、zk、kafka和MySQL。 先开一个消费者进行消费。
bin/kafka-console-consumer.sh --bootstrap-server hadoop102:9092 --topic tms_ods然后运行OdsApp.java 他会先读取维度数据因为维度数据需要全量更新之前的数据。 当他消费结束后我们运行jar包获取事实数据。
java -jar tms-mock-2023-01-06.jar 如果能消费到新数据代表通道没问题ODS层创建完成。 总结
至此ODS搭建完成。
本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/pingmian/88433.shtml
如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!