文章目录
- 一、技术
- 二、构建SpringBoot工程
- 2.1 创建maven工程并配置 pom.xml文件
- 2.2 编写配置文件 application.yml
- 2.3 编写配置文件 application.propertites
- 2.4 开发主启动类
- 2.5 开发配置类
- 三、测试抽取Hive、HDFS元数据
- 四、将抽取的元数据存储到MySQL
- 4.1 引入依赖
- 4.2 配置application.yml
- 4.3 创建元数据信息Bean
- 4.4 定义Service
- 4.5 创建Mapper
- 4.6 测试
一、技术
SpringBoot + Mybatis Plus
二、构建SpringBoot工程
2.1 创建maven工程并配置 pom.xml文件
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"><modelVersion>4.0.0</modelVersion><parent><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-parent</artifactId><version>2.7.17</version><relativePath/> <!-- lookup parent from repository --></parent><groupId>com.songshuang</groupId><artifactId>dwmeta</artifactId><version>1.0-SNAPSHOT</version><properties><maven.compiler.source>8</maven.compiler.source><maven.compiler.target>8</maven.compiler.target><project.build.sourceEncoding>UTF-8</project.build.sourceEncoding></properties><dependencies><!-- 必须 ,用于开发一个web项目--><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-web</artifactId></dependency><dependency><groupId>org.projectlombok</groupId><artifactId>lombok</artifactId><optional>true</optional></dependency><!-- 测试必须加 --><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-test</artifactId><scope>test</scope></dependency><!-- 连接hive的元数据服务 --><dependency><groupId>org.apache.hive</groupId><artifactId>hive-metastore</artifactId><version>3.1.2</version></dependency><!-- json处理 --><dependency><groupId>com.alibaba</groupId><artifactId>fastjson</artifactId><version>1.2.72</version></dependency></dependencies></project>
2.2 编写配置文件 application.yml
null
2.3 编写配置文件 application.propertites
hive.client.uri:hive元数据服务metastore地址
hdfs.admin.user:hdfs用户
hdfs.uri:hdfs NameNode RPC端口
hive.client.uri=thrift://hadoop102:9083
hdfs.admin.user=hadoop
hdfs.uri=hdfs://hadoop102:9820
2.4 开发主启动类
package com.songshuang.dga;import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;//当前这个类是App的主启动类
@SpringBootApplication
public class MainApp {public static void main(String[] args) {//启动appSpringApplication.run(MainApp.class, args);}
}
2.5 开发配置类
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.hive.metastore.HiveMetaStoreClient;
import org.apache.hadoop.hive.metastore.api.MetaException;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.context.annotation.Scope;import java.net.URI;/*所有的客户端,都应该随用随建,用完就关。*/
@Configuration
public class DgaConfig {@Value("${hive.client.uri}")private String hiveUri;@Bean@Scope("prototype")public HiveMetaStoreClient createHiveMetastoreClient(){org.apache.hadoop.conf.Configuration conf = new org.apache.hadoop.conf.Configuration();//客户端连接服务端,配置地址和端口conf.set("hive.metastore.uris",hiveUri);try {HiveMetaStoreClient client = new HiveMetaStoreClient(conf);return client;} catch (MetaException e) {throw new RuntimeException(e);}}@Value("${hdfs.admin.user}")private String hdfsAdmin;@Value("${hdfs.uri}")private String hdfsUri;@Bean@Scope("prototype")public FileSystem createHDFSClient(){try {FileSystem hdfsClient = FileSystem.get(new URI(hdfsUri), new org.apache.hadoop.conf.Configuration(), hdfsAdmin);return hdfsClient;} catch (Exception e) {throw new RuntimeException(e);}}
}
三、测试抽取Hive、HDFS元数据
连接Metastore服务抽取Hive元数据;连接NameNode抽取HDFS元数据;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.FsStatus;
import org.apache.hadoop.hive.metastore.HiveMetaStoreClient;
import org.apache.thrift.TException;
import org.junit.jupiter.api.Test;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.context.SpringBootTest;
import org.springframework.context.ApplicationContext;import java.io.IOException;/*** @date 2024/1/29 16:27*/@SpringBootTest
public class MetaTest {@Autowiredprivate ApplicationContext context;@Testpublic void testHiveClient() throws TException {HiveMetaStoreClient client = context.getBean(HiveMetaStoreClient.class);//获取库下所有的表System.out.println(client.getAllTables("dw_ods"));//获取某张表的元数据信息System.out.println(client.getTable("dw_ods", "ods_activity_info_full"));client.close();}@Testpublic void testHDFSClient() throws IOException {//1.获取hdfs客户端FileSystem hdfsClient = context.getBean(FileSystem.class);//2.遍历tableMetaInfos,为每一个TableMetaInfo补充hdfs的元数据信息FsStatus status = hdfsClient.getStatus();long capacity = status.getCapacity();long remaining = status.getRemaining();long used = status.getUsed();System.out.println("capacity:" + capacity + "remaining:" + remaining + "used:" + used );}
}
四、将抽取的元数据存储到MySQL
4.1 引入依赖
<!-- 使用springboot插件,不会和springboot的其他插件冲突了 --><dependency><groupId>com.alibaba</groupId><artifactId>druid-spring-boot-starter</artifactId><version>1.2.15</version></dependency><!-- 驱动 --><dependency><groupId>mysql</groupId><artifactId>mysql-connector-java</artifactId><version>8.0.27</version></dependency><!-- 动态数据源切换,允许使用一个注解,可以切换Dao查询的数据源内置了数据库连接池,会和之前配置的Druid冲突--><dependency><groupId>com.baomidou</groupId><artifactId>dynamic-datasource-spring-boot-starter</artifactId><version>2.5.8</version></dependency><!-- 注释掉mybatis,否则会冲突 --><dependency><groupId>com.baomidou</groupId><artifactId>mybatis-plus-boot-starter</artifactId><version>3.4.1</version></dependency><dependency><groupId>com.baomidou</groupId><artifactId>mybatis-plus-generator</artifactId><version>3.5.3.1</version></dependency><dependency><groupId>org.apache.velocity</groupId><artifactId>velocity-engine-core</artifactId><version>2.3</version></dependency><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-freemarker</artifactId></dependency>
4.2 配置application.yml
spring:datasource:dynamic:primary: dga #设置默认的数据源或者数据源组strict: false #严格匹配数据源,默认false. true未匹配到指定数据源时抛异常,false使用默认数据源datasource:dga:url: jdbc:mysql://mall:3306/dga?useSSL=false&useUnicode=true&characterEncoding=UTF-8username: rootpassword: "123456"driver-class-name: com.mysql.cj.jdbc.Driverdruid:initial-size: 5max-active: 20max-wait: 60000min-idle: 5test-on-borrow: truetest-on-return: falsetest-while-idle: trueautoconfigure:exclude: com.alibaba.druid.spring.boot.autoconfigure.DruidDataSourceAutoConfiguremybatis-plus:mapper-locations: classpath*:/sqls/*Mapper.xmlconfiguration:mapUnderscoreToCamelCase: truelogging:level:com:songshuang:dga:meta:mapper: debugserver:port: 80
4.3 创建元数据信息Bean
import com.baomidou.mybatisplus.annotation.IdType;
import com.baomidou.mybatisplus.annotation.TableId;
import com.baomidou.mybatisplus.annotation.TableName;
import java.io.Serializable;
import java.sql.Timestamp;
import lombok.AllArgsConstructor;
import lombok.Data;
import lombok.NoArgsConstructor;/*** <p>* 元数据表附加信息* </p>** @since 2024-01-29*/
@Data
@NoArgsConstructor
@AllArgsConstructor
@TableName("table_meta_info_extra")
public class TableMetaInfoExtra implements Serializable {private static final long serialVersionUID = 1L;/*** id*/@TableId(value = "id", type = IdType.AUTO)private Long id;/*** 表名*/private String tableName;/*** 库名*/private String schemaName;/*** 技术负责人*/private String tecOwnerUserName;/*** 业务负责人*/private String busiOwnerUserName;/*** 存储周期类型*/private String lifecycleType;/*** 生命周期(天)*/private Long lifecycleDays;/*** 安全级别*/private String securityLevel;/*** 数仓所在层级*/private String dwLevel;/*** 创建时间 (自动生成)*/private Timestamp createTime;/*** 更新时间 (自动生成)*/private Timestamp updateTime;
}
4.4 定义Service
import com.baomidou.mybatisplus.extension.service.IService;
import com.songshuang.dga.meta.bean.TableMetaInfoExtra;
import org.apache.hadoop.hive.metastore.api.MetaException;public interface TableMetaInfoExtraService extends IService<TableMetaInfoExtra> {//生成所有表的辅助信息。void initMetaInfoExtra(String db) throws MetaException;}
import com.songshuang.dga.config.MetaConstant;
import com.songshuang.dga.meta.bean.TableMetaInfoExtra;
import com.songshuang.dga.meta.mapper.TableMetaInfoExtraMapper;
import com.songshuang.dga.meta.service.TableMetaInfoExtraService;
import com.baomidou.mybatisplus.core.conditions.query.QueryWrapper;
import com.baomidou.mybatisplus.extension.service.impl.ServiceImpl;
import org.apache.commons.lang3.RandomUtils;
import org.apache.hadoop.hive.metastore.HiveMetaStoreClient;
import org.apache.hadoop.hive.metastore.api.MetaException;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.ApplicationContext;
import org.springframework.stereotype.Service;import java.sql.Timestamp;
import java.util.List;
import java.util.Set;
import java.util.stream.Collectors;/*** <p>* 元数据表附加信息 服务实现类* </p>** @since 2024-01-29*/
@Service
public class TableMetaInfoExtraServiceImpl extends ServiceImpl<TableMetaInfoExtraMapper, TableMetaInfoExtra> implements TableMetaInfoExtraService {@Autowiredprivate ApplicationContext context;/*辅助信息不经常变动。只有当你去创建新表的时候,才需要想数据库中写入新表的辅助信息。如果一张表已经有了辅助信息,无需写入调用initMetaInfoExtra(),只需要写入新表(今天刚创建表的元数据信息)*/@Overridepublic void initMetaInfoExtra(String db) throws MetaException {//查询当前db中的新表//第一步: 先查询table_meta_info_extra中当前db已经有信息的表。 老表Set<String> existsTableNames = list(new QueryWrapper<TableMetaInfoExtra>().eq("schema_name", db)).stream().map(info -> info.getTableName()).collect(Collectors.toSet());//第二步: 查询db下所有的表,根据老表,过滤得到新表HiveMetaStoreClient client = context.getBean(HiveMetaStoreClient.class);List<String> allTables = client.getAllTables(db);List<String> newTables = allTables.stream().filter(name -> !existsTableNames.contains(name)).collect(Collectors.toList());//为新表生成辅助信息,存入到数据库中List<TableMetaInfoExtra> infos = newTables.stream().map(name -> {TableMetaInfoExtra extra = new TableMetaInfoExtra();extra.setSchemaName(db);extra.setTableName(name);//其他的信息应该由员工手动录入,这里为了后续方便,初始化一些默认值,假设员工已经录入了initExtraInfo(extra);extra.setCreateTime(new Timestamp(System.currentTimeMillis()));return extra;}).collect(Collectors.toList());saveBatch(infos);}private void initExtraInfo(TableMetaInfoExtra extra) {String [] bon = {"张三","李四","王五","赵六"};String [] ton = {"张小三","李中四","王大五","赵老六"};extra.setBusiOwnerUserName(bon[RandomUtils.nextInt(0,bon.length)]);extra.setTecOwnerUserName(ton[RandomUtils.nextInt(0,ton.length)]);extra.setLifecycleType(MetaConstant.LIFECYCLE_TYPE_UNSET);extra.setLifecycleDays(-1l);extra.setSecurityLevel(MetaConstant.SECURITY_LEVEL_UNSET);extra.setDwLevel(extra.getTableName().substring(0,3).toUpperCase());}
}
4.5 创建Mapper
import com.baomidou.mybatisplus.core.mapper.BaseMapper;
import com.songshuang.dga.meta.bean.TableMetaInfoExtra;
import org.apache.ibatis.annotations.Mapper;/*** @date 2024/1/29 19:44*/
@Mapper
public interface TableMetaInfoExtraMapper extends BaseMapper<TableMetaInfoExtra> {
}
4.6 测试
@Autowiredprivate TableMetaInfoExtraService extraService;@Testpublic void testExtraInfo() throws Exception {extraService.initMetaInfoExtra("dw_ods");}