目录
一、命名规范
1.业务系统简称规范
2.数据源简称规范
3.数仓分层规范
4.业务板块简称规范
4.数据域简称规范
5.数据域简称规范
6.业务过程简称规范
7.表名规范
通用规范:
二、字段规范
1.Hive表字段类型规范
2.CK表字段类型规范
3.字段默认值规范
4.审计列规范
三、指标规范
四、建模规范
通用规范:
1.STG层建模规范
2.ODS层建模规范
3.DIM层建模规范
4.DWD层建模规范
5.DWS层建模规范
6.ADS层建模规范
五、建表规范
通用规范:
1.STG层建表规范
2.ODS层建表规范
3.DIM层建表规范
4.DWD层建表规范
5.DWS层建表规范
6.ADS层建表规范
7.DAS层建表规范
六、代码规范
通用规范:
1.STG层代码规范
2.ODS层代码规范
3.DIM层代码规范
4.DWD层代码规范
5.DWS层代码规范
6.ADS层代码规范
7.DAS表代码规范
一、命名规范
1.业务系统简称规范
命名规则:3-6个英文字母,数字组成
举例:
No | 英文简称 | 英文名称 | 说明 |
1 | WMS | Warehouse Management System | 仓库管理系统 |
2 | MES | Manufacturing Execution System | 生产管理系统 |
3 | SCM | Supply Chain Management | 供应链管理 |
4 | CRM | Customer Relationship Management | 客户管理系统 |
2.数据源简称规范
命名规则:3-6个英文字母,数字组成
举例:
No | 简称 | 说明 |
1 | WMSDB | 仓库管理数据库 |
3.数仓分层规范
No | 简称 | 说明 |
1 | STG | 贴源层 |
2 | ODS | 操作层 |
3 | DIM | 公共维度层 |
4 | DWD | 明细层 |
5 | DWS | 指标层 |
6 | ADS | 应用层 |
4.业务板块简称规范
命名规则:3-6个英文字母,数字组成
举例:
No | 简称 | 说明 |
1 | SAL | 销售板块 |
2 | FMS | 财务板块 |
4.数据域简称规范
命名规则:3-6个英文字母,数字组成
举例:
No | 简称 | 说明 |
1 | CUST | 客户域 |
2 | TRAD | 交易域 |
5.数据域简称规范
命名规则:3-6个英文字母,数字组成
举例:
No | 简称 | 说明 |
1 | CUST | 客户域 |
2 | TRAD | 交易域 |
6.业务过程简称规范
命名规则:3-6个英文字母,数字组成
举例:
No | 简称 | 说明 |
1 | ORD | 下单 |
3 | PAY | 支付 |
7.表名规范
通用规范:
- 增量全量标识:i-增量,f-全量
- 刷新周期标识:ss-实时,mm-分钟,hh-小时,d-日,w-周,m-月,q-季,y-年
- 刷新周期范围缩写:nd-多少天,td-历史至今,sd-年初至今,自然周-cw
No | 分层名称 | 命名规范 | 说明 |
1 | STG | STG_{源数据库简称}_{源表名}_{刷新周期标识}{增量全量标识} | stg_ord_t_brand_di |
2 | ODS | ODS_{源数据库简称}_{源表名}_{刷新周期标识}{增量全量标识} | ods_ord_t_brand_df |
3 | DIM | DIM_{业务板块/pub}_{自定义表名}_{刷新周期标识}{增量全量标识} | dim_mes_factory_df |
4 | DWD | DWD_{业务板块}_{数据域}_{业务过程}_{自定义表名}_{刷新周期标识}{增量全量标识} | dwd_sal_trad_ord_pos_order_df |
5 | DWS | DWS_{业务板块}_{数据域}_{统计日期}_{自定义表名}_{统计周期范围}{刷新周期标识} | dws_sal_trad_confirm_date_shop_retn_order_1d |
6 | ADS | ADS_{业务板块}_{数据域}_{自定义表名}_{统计周期范围}{刷新周期标识} | ads_sal_trad_c_order_analysis_1d |
7 | TMP | TMP_{表名} | tmp_dim_mes_factory_df |
二、字段规范
1.Hive表字段类型规范
No | 类型名称 | 字段类型 | 长度 |
1 | 字符串 | string | |
2 | 长整型 | bigint | |
3 | 短整型 | tinyint | |
4 | 日期 | string | |
5 | 浮点数 | decimal | (18,4),(18,8) |
2.CK表字段类型规范
No | 类型名称 | 字段类型 | 长度 |
1 | 字符串 | String | |
2 | 长整型 | Int64 | |
3 | 短整型 | Int8 | |
4 | 日期(年月日) | Date | |
5 | 日期(年月日时分秒) | DateTime | |
6 | 浮点数 | decimal | (18,4),(18,8) |
3.字段默认值规范
No | 默认值类型 | 默认值 |
1 | 指标数值 | 0 |
2 | 维度数值 | -99 |
3 | 时间 | 1970-01-01 00:00:00 |
日期 | 1970-01-01 | |
5 | 字符串 | None |
4.审计列规范
分层名称 | 字段名称 | 注释 | 类型 |
STG | source_db | 源数据库名称 | string |
imported_time | 程序导入时间 | string | |
inserted_time | 目标初次插入时间 | string | |
updated_time | 目标最后更新时间 | string | |
ODS | source_db | 源数据库名称 | string |
is_deleted | 是否(物理)删除 | tinyint | |
inserted_time | 目标初次插入时间 | string | |
updated_time | 目标最后更新时间 | string | |
DIM | is_deleted | 是否(物理)删除 | tinyint |
Is_current(拉链表) | 是否当前版本 | tinyint | |
start_time(拉链表) | 开始使用日期 | string | |
end_time(拉链表) | 结束日期 | string | |
inserted_time | 目标初次插入时间 | string | |
updated_time | 目标最后更新时间 | string | |
inserted_time | 目标初次插入时间 | string | |
updated_time | 目标最后更新时间 | string | |
DWD | is_deleted | 是否(物理)删除 | tinyint |
inserted_time | 目标初次插入时间 | string | |
updated_time | 目标最后更新时间 | string | |
DWS | inserted_time | 目标初次插入时间 | string |
updated_time | 目标最后更新时间 | string | |
ADS | inserted_time | 目标初次插入时间 | string |
updated_time | 目标最后更新时间 | string |
三、指标规范
指标分为:
原子指标:事实表中的度量,原子指标=业务过程+度量
派生指标:根据业务需求加工的度量,派生指标=修饰词+原子指标+统计周期范围+刷新周期标识
衍生指标:一个或多各派生指标通过数学公式加工的度量。衍生指标=派生指标+公式
命名规则:{修饰词}_{指标名称}_{统计周期范围}{刷新周期标识}_{统计时间序号}{可变标识}
统计时间序号:代表不同的统计时间,例如预约时间、下单时间。
取值范围{a-z}
可变标识:根据统计时间判断是否已经完成或正在发生的事实
i-不可变;v-可变
事例:pay_sale_amt_1d_ai
说明:按照订单支付时间统计最近1天已经支付的订单金额
四、建模规范
通用规范:
- 表和每个字段必须有注释
- 字段类型按照规范的数据类型
- 键类型:
- 主键(PK):表的主键
- 外键(FK):表的外键
- 业务主键(BUSK):从业务的角度来确定数据的粒度
- 变化时间戳(CDCK):增量更新字段
- 审计键(AUK):表的审计字段
- 分区键(PTK):表的分区字段
- 4.除了审计和分区字段必须有源表字段
1.STG层建模规范
- 表存储格式textfile
- 字段顺序和源库保持一致
- 表添加分区字段dt
2.ODS层建模规范
- 表存储格式orc
- 数据压缩是snappy
- 写入方式:truncate,merge
- 字段顺序和源库保持一致
- 表添加分区字段dt
3.DIM层建模规范
- 表存储格式orc
- 数据压缩是snappy
- 写入方式:truncate,linked
- 字段顺序按照主键组、外键组、字典组、属性组、指标组、审计组、分区组进行排序
- 外键组和字典组把相关的编码和名称冗余存储
4.DWD层建模规范
- 表存储格式orc
- 数据压缩是snappy
- 写入方式:merge
- 字段顺序按照主键组、外键组、字典组、属性组、指标组、审计组、分区组进行排序
- 外键组只存储ID,保证数据粒度一致,添加关联维度表信息
- 字典组把相关的编码和名称冗余存储
5.DWS层建模规范
- 表存储格式orc
- 数据压缩是snappy
- 字段顺序按照主键组、外键组、字典组、属性组、指标组、审计组、分区组进行排序
- 外键组只存储ID,保证数据粒度一致,添加关联维度表信息
- 字典组把相关的编码和名称冗余存储
- 表添加分区字段dt
6.ADS层建模规范
- 表存储格式orc
- 数据压缩是snappy
- 字段顺序按照业务需求进行排序
- 字典组把相关的编码和名称冗余存储
- 表添加分区字段dt
五、建表规范
通用规范:
- 表名前加上数据库名
- 数据库名、表名、字段名必须带上反引号
- 数据库关键字大写
1.STG层建表规范
举例:
DROP TABLE IF EXISTS `stg`.`stg_ord_t_brand_df`;
CREATE TABLE IF NOT EXISTS `stg`.`stg_ord_t_brand_df`(`id` bigint COMMENT '主键' ,`brand_china_name` string COMMENT '品牌名称(中文)' ,`brand_english_name` string COMMENT '品牌名称(英文)' ,`brand_desc` string COMMENT '品牌描述' ,`band_logo` string COMMENT '品牌LOGO' ,`status` tinyint COMMENT '有效=1,无效=0' ,`create_time` string COMMENT '创建时间' ,`update_time` string COMMENT '修改时间' ,`create_id` bigint COMMENT '创建人ID' ,`source_db` string COMMENT '源数据库名' ,`imported_time` string COMMENT '目标导入时间' ,`inserted_time` string COMMENT '目标初次插入时间' ,`updated_time` string COMMENT '目标最后更新时间'
) COMMENT '品牌表'
PARTITIONED BY (`dt` string COMMENT '分区')-- 日分区:yyyy-mm-dd
STORED AS textfile;
2.ODS层建表规范
举例:
DROP TABLE IF EXISTS `ods`.`ods_ord_t_brand_df`;
CREATE TABLE IF NOT EXISTS `ods`.`ods_ord_t_brand_df`(`id` bigint COMMENT '主键' ,`brand_china_name` string COMMENT '品牌名称(中文)' ,`brand_english_name` string COMMENT '品牌名称(英文)' ,`brand_desc` string COMMENT '品牌描述' ,`band_logo` string COMMENT '品牌LOGO' ,`status` tinyint COMMENT '有效=1,无效=0' ,`create_time` string COMMENT '创建时间' ,`update_time` string COMMENT '修改时间' ,`create_id` bigint COMMENT '创建人ID' ,`source_db` string COMMENT '源数据库名' ,`is_deleted` tinyint COMMENT '是否(物理)删除' ,`inserted_time` string COMMENT '目标初次插入时间' ,`updated_time` string COMMENT '目标最后更新时间'
) COMMENT '品牌表'
PARTITIONED BY (`dt` string COMMENT '分区')-- 日分区:yyyy-mm-dd
STORED AS orc
TBLPROPERTIES ("orc.compression"="snappy");
3.DIM层建表规范
- 如果是拉链表插入一条默认记录
举例:
DROP TABLE IF EXISTS `dim`.`dim_sal_commodity_df`;
CREATE TABLE IF NOT EXISTS `dim`.`dim_sal_commodity_df`(`commodity_id` bigint COMMENT '主键' ,`brand_id` bigint COMMENT '品牌ID' ,`brand_cn_name` string COMMENT '品牌中文名称' ,`commodity_no` string COMMENT '产品代码' ,`commodity_name` string COMMENT '产品名称' ,`commodity_desc` string COMMENT '产品描述' ,`commodity_color` string COMMENT '颜色' ,`commodity_length` decimal(18,4) COMMENT '长' ,`commodity_width` decimal(18,4) COMMENT '宽' ,`commodity_height` decimal(18,4) COMMENT '高' ,`commodity_weight` decimal(18,4) COMMENT '重量' ,`retail_price` decimal(18,4) COMMENT '零售价' ,`cost_price` decimal(18,4) COMMENT '成本价' ,`launch_date` string COMMENT '上市日期' ,`delisting_date` string COMMENT '下士日期' ,`origin` string COMMENT '产地' ,`create_id` bigint COMMENT '操作人ID' ,`update_id` bigint COMMENT '更新人ID' ,`create_time` string COMMENT '创建时间' ,`update_time` string COMMENT '更新时间' ,`is_deleted` tinyint COMMENT '是否(物理)删除' ,`is_current` tinyint COMMENT '是否当前版本' ,`start_date` string COMMENT '开始使用日期' ,`end_date` string COMMENT '结束日期' ,`inserted_time` string COMMENT '目标初次插入时间' ,`updated_time` string COMMENT '目标最后更新时间'
) COMMENT '商品维表'
STORED AS orc
TBLPROPERTIES ("orc.compression"="snappy");INSERT INTO `dim`.`dim_sal_commodity_df` (`commodity_id`,`brand_id`,`brand_cn_name`,`commodity_no`,`commodity_name`,`commodity_desc`,`commodity_color`,`commodity_length`,`commodity_width`,`commodity_height`,`commodity_weight`,`retail_price`,`cost_price`,`launch_date`,`delisting_date`,`origin`,`create_id`,`update_id`,`create_time`,`update_time` ,`is_deleted` ,`is_current` ,`start_date` ,`end_date` ,`inserted_time` ,`updated_time`
) VALUES
(-99,-99,'None','None','None','None','None',0,0,0,0,0,0,'1970-01-01','1970-01-01','None',-99,-99,'1970-01-01 00:00:00','1970-01-01 00:00:00',0,1,'1970-01-01','9999-12-31',DATE_FORMAT(CURRENT_TIMESTAMP, 'yyyy-MM-dd HH:mm:ss'),DATE_FORMAT(CURRENT_TIMESTAMP, 'yyyy-MM-dd HH:mm:ss')
);
4.DWD层建表规范
举例:
DROP TABLE IF EXISTS `dwd`.`dwd_sal_trd_ord_order_df`;
CREATE TABLE IF NOT EXISTS `dwd`.`dwd_sal_trd_ord_order_df`(`order_id` bigint COMMENT '主键id' ,`commodity_id` bigint COMMENT '商品id' ,`user_id` bigint COMMENT 'C端用户ID' ,`order_status_code` bigint COMMENT '订单状态(0=已取消,1=待付款,2=待配送,3=配送中,4=配送成功,5=配送失败)' ,`order_status_name` string COMMENT '订单状态(0=已取消,1=待付款,2=待配送,3=配送中,4=配送成功,5=配送失败)' ,`pay_type_code` bigint COMMENT '支付方式:1=支付宝,2=微信,3=刷卡(线上),4=线上支付/积分支付,5=云闪付支付' ,`pay_type_name` string COMMENT '支付方式:1=支付宝,2=微信,3=刷卡(线上),4=线上支付/积分支付,5=云闪付支付' ,`pay_status_code` bigint COMMENT '支付状态(1=待付款,2=已付款)' ,`pay_status_name` string COMMENT '支付状态(1=待付款,2=已付款)' ,`total_amt` decimal(18,4) COMMENT '订单总金额(明细分摊之后的汇总)' ,`total_qty` decimal(18,4) COMMENT '商品总件数' ,`create_id` bigint COMMENT '创建者' ,`create_time` string COMMENT '创建日期' ,`update_id` bigint COMMENT '更新者' ,`update_time` string COMMENT '更新时间' ,`is_deleted` tinyint COMMENT '是否(物理)删除' ,`inserted_time` string COMMENT '目标初次插入时间' ,`updated_time` string COMMENT '目标最后更新时间'
) COMMENT '订单表'
STORED AS orc
TBLPROPERTIES ("orc.compression"="snappy");
5.DWS层建表规范
举例:
DROP TABLE IF EXISTS `px_dws`.`dws_sal_trd_complete_date_xj_retn_order_1d`;
CREATE TABLE IF NOT EXISTS `px_dws`.`dws_sal_trd_complete_date_xj_retn_order_1d`(`shop_id` bigint COMMENT '门店ID' ,`member_id` bigint COMMENT '会员ID' ,`commodity_id` bigint COMMENT '商品ID' ,`source_type_code` tinyint COMMENT '订单来源(1线上,2线下)' ,`source_type_name` string COMMENT '订单来源:1-APP,2-POS,3-小程序,4-饿了么,5-鲜食APP,6-团购,7-京东到家' ,`order_id` bigint COMMENT '源订单ID' ,`return_order_id` bigint COMMENT '退货订单ID' ,`return_order_no` string COMMENT '退货订单编号' ,`delivery_retn_amt_1d_ai` decimal(18,4) COMMENT '最近1天配送成功退货总金额' ,`delivery_retn_cmd_cnt_1d_ai` decimal(18,4) COMMENT '最近1天配送成功退货商品数量' ,`origin_sale_amt_1d_ai` decimal(18,4) COMMENT '最近1天原价金额' ,`trade_amt_1d_ai` decimal(18,4) COMMENT '最近1天实收总金额' ,`real_delivery_amt_1d_ai` decimal(18,4) COMMENT '最近1天实发总金额' ,`inserted_time` string COMMENT '目标初次插入时间' ,`updated_time` string COMMENT '目标最后更新时间'
) COMMENT '最近一天按退货完成日期线上退货订单汇总报表'
PARTITIONED BY (`dt` string COMMENT '分区')-- 日分区:yyyy-mm-dd
STORED AS orc
TBLPROPERTIES ("orc.compression"="snappy");
6.ADS层建表规范
举例:
DROP TABLE IF EXISTS `ads`.`ads_sal_trd_pay_date_order_sum_1d`;
CREATE TABLE IF NOT EXISTS `ads`.`ads_sal_trd_pay_date_order_sum_1d`(`report_date` string COMMENT '统计日期' ,`commodity_id` bigint COMMENT '商品id' ,`user_id` bigint COMMENT 'C端用户ID' ,`order_status_code` tinyint COMMENT '订单状态(0=已取消,1=待付款,2=待配送,3=配送中,4=配送成功,5=配送失败)' ,`order_status_name` string COMMENT '订单状态(0=已取消,1=待付款,2=待配送,3=配送中,4=配送成功,5=配送失败)' ,`pay_type_code` tinyint COMMENT '支付方式:1=支付宝,2=微信,3=刷卡(线上),4=线上支付/积分支付,5=云闪付支付' ,`pay_type_name` string COMMENT '支付方式:1=支付宝,2=微信,3=刷卡(线上),4=线上支付/积分支付,5=云闪付支付' ,`pay_status_code` tinyint COMMENT '支付状态(1=待付款,2=已付款)' ,`pay_status_name` string COMMENT '支付状态(1=待付款,2=已付款)' ,`total_amt_1d_ai` decimal(18,4) COMMENT '订单总金额(明细分摊之后的汇总)' ,`total_qty_1d_ai` decimal(18,4) COMMENT '商品总件数' ,`inserted_time` string COMMENT '目标初次插入时间' ,`updated_time` string COMMENT '目标最后更新时间'
) COMMENT '最近1天按支付日期订单汇总报表'
PARTITIONED BY (`dt` string COMMENT '分区')-- 日分区:yyyy-mm-dd
STORED AS orc
TBLPROPERTIES ("orc.compression"="snappy");
7.DAS层建表规范
- 表名和ADS层表名保持一致
- 字段顺序和ADS层保持一致
- 排序字段按照业务主键进行设置
- 数据生命周期按照业务需求设置
举例:
DROP TABLE IF EXISTS `das`.`ads_sal_trd_pay_date_order_sum_1d`;
CREATE TABLE IF NOT EXISTS `das`.`ads_sal_trd_pay_date_order_sum_1d`(`report_date` Date COMMENT '统计日期' ,`commodity_id` Int64 COMMENT '商品id' ,`user_id` Int64 COMMENT 'C端用户ID' ,`order_status_code` Int8 COMMENT '订单状态(0=已取消,1=待付款,2=待配送,3=配送中,4=配送成功,5=配送失败)' ,`order_status_name` String COMMENT '订单状态(0=已取消,1=待付款,2=待配送,3=配送中,4=配送成功,5=配送失败)' ,`pay_type_code` Int8 COMMENT '支付方式:1=支付宝,2=微信,3=刷卡(线上),4=线上支付/积分支付,5=云闪付支付' ,`pay_type_name` String COMMENT '支付方式:1=支付宝,2=微信,3=刷卡(线上),4=线上支付/积分支付,5=云闪付支付' ,`pay_status_code` Int8 COMMENT '支付状态(1=待付款,2=已付款)' ,`pay_status_name` String COMMENT '支付状态(1=待付款,2=已付款)' ,`total_amt_1d_ai` Decimal(18,4) COMMENT '订单总金额(明细分摊之后的汇总)' ,`total_qty_1d_ai` Decimal(18,4) COMMENT '商品总件数' ,`inserted_time` DateTime COMMENT '目标初次插入时间' ,`updated_time` DateTime COMMENT '目标最后更新时间'
)
ENGINE = MergeTree()
ORDER BY (`report_date`,`commodity_id`,`user_id`)
PARTITION BY toYYYYMMDD(`report_date`)
TTL report_date + INTERVAL 765 DAY;
六、代码规范
通用规范:
- 保证代码可重复执行,而不影响结果
- 每层的源表只能是上一层的表,不能跨层查询。
- 表名前加上数据库名
- 数据库名、表名、字段名必须带上反引号
- 数据库关键字大写
- 分区名以参数的形式配置
- 指定队列名称
- 添加以下注释:作者、创建日期、功能描述、目标表、源表、修改日志信息(修改日期、修改人、修改内容)
1.STG层代码规范
- 源数据库和目标数据库以参数的形式配置
- STG层采用二种同步方式:
增量同步:如果表的数据量比较大,并且有更新时间戳
全量同步:如果表的数据量比较小或者缺少更新时间戳
举例:
增量脚本:
{"job": {"setting": {"speed": {"channel":4}},"content": [{"reader": {"name": "mysqlreader", "parameter": {"username": "$USER_NAME", "password": "$PASSWORD", "connection": [{"jdbcUrl": ["$JDBCURL"], "querySql": ["select `id` ,`commodity_code` ,`commodity_name` ,`commodity_describe` ,`brand_id` ,`retail_price` ,`cost_price` ,`launch_date` ,`delisting_date` ,`commodity_color` ,`create_time` ,`update_time` ,`origin` ,`length` ,`width` ,`height` ,`weight` ,`create_id` ,`update_id` ,`status` ,'order',now(),now(),now() from t_commodity WHERE update_time BETWEEN str_to_date( '$START_TIME', '%Y-%m-%d %H:%i:%s' ) AND str_to_date( '$END_TIME', '%Y-%m-%d %H:%i:%s' )"]}]}}, "writer": {"name": "hdfswriter", "parameter": {"column": [{"name": "id","type": "bigint"},{"name": "commodity_code","type": "string"},{"name": "commodity_name","type": "string"},{"name": "commodity_describe","type": "string"},{"name": "brand_id","type": "bigint"},{"name": "retail_price","type": "double"},{"name": "cost_price","type": "double"},{"name": "launch_date","type": "string"},{"name": "delisting_date","type": "string"},{"name": "commodity_color","type": "string"},{"name": "create_time","type": "string"},{"name": "update_time","type": "string"},{"name": "origin","type": "string"},{"name": "length","type": "double"},{"name": "width","type": "double"},{"name": "height","type": "double"},{"name": "weight","type": "double"},{"name": "create_id","type": "bigint"},{"name": "update_id","type": "bigint"},{"name": "status","type": "tinyint"},{"name": "source_db","type": "string"},{"name": "imported_time","type": "string"},{"name": "inserted_time","type": "string"},{"name": "updated_time","type": "string"}], "hadoopConfig":{"dfs.client.failover.proxy.provider.mycluster": "org.apache.hadoop.hdfs.server.namenode.ha.ConfiguredFailoverProxyProvider","dfs.ha.namenodes.mycluster": "nn1,nn2","dfs.namenode.rpc-address.mycluster.nn1": "node101:8020","dfs.namenode.rpc-address.mycluster.nn2": "node102:8020","dfs.nameservices": "mycluster"}, "compress": "", "defaultFS": "$HDFS_URL", "fieldDelimiter": "\u0001", "fileName": "$TRGT_TABLE_NAME", "fileType": "text", "path": "${HIVE_DB_DIR}stg.db/$TRGT_TABLE_NAME/dt=$PARTITION", "writeMode": "truncate"}}}], }
}
全量脚本:
{"job": {"setting": {"speed": {"channel":4}},"content": [{"reader": {"name": "mysqlreader", "parameter": {"username": "$USER_NAME", "password": "$PASSWORD", "splitPk": "id","column": ["id" ,"brand_china_name" ,"brand_english_name" ,"brand_desc" ,"band_logo" ,"status" ,"create_time" ,"update_time" ,"create_id" ,"'order'",'now()','now()','now()' ], "connection": [{"jdbcUrl": ["$JDBCURL"], "table": ["t_brand"]}]}}, "writer": {"name": "hdfswriter", "parameter": {"column": [{"name": "id","type": "bigint"},{"name": "brand_china_name","type": "string"},{"name": "brand_english_name","type": "string"},{"name": "brand_desc","type": "string"},{"name": "band_logo","type": "string"},{"name": "status","type": "tinyint"},{"name": "create_time","type": "string"},{"name": "update_time","type": "string"},{"name": "create_id","type": "bigint"},{"name": "source_db","type": "string"},{"name": "imported_time","type": "string"},{"name": "inserted_time","type": "string"},{"name": "updated_time","type": "string"}], "hadoopConfig":{"dfs.client.failover.proxy.provider.mycluster": "org.apache.hadoop.hdfs.server.namenode.ha.ConfiguredFailoverProxyProvider","dfs.ha.namenodes.mycluster": "nn1,nn2","dfs.namenode.rpc-address.mycluster.nn1": "node101:8020","dfs.namenode.rpc-address.mycluster.nn2": "node102:8020","dfs.nameservices": "mycluster"},"defaultFS": "$HDFS_URL", "fieldDelimiter": "\u0001", "fileName": "$TRGT_TABLE_NAME", "fileType": "text", "path": "${HIVE_DB_DIR}stg.db/$TRGT_TABLE_NAME/dt=$PARTITION", "writeMode": "truncate"}}}], }
}
2.ODS层代码规范
ODS层采用二种同步方式:
- 全量覆盖:如果数据删除比较频繁,且只用当前的数据,如果数据合并会造成大量重复无用的数据
- 数据合并:一般情况下用数据合并
举例:
全量覆盖:
-- ================================================
-- 作者: Administrator
-- 创建日期: 2024-06-11
-- 功能描述: 同步ods层品牌表
-- 目标表: ods_ord_t_brand_df
-- 源表: stg_ord_t_brand_df
-- 修改日志:
-- 修改日期: 修改人: 修改内容:
-- ================================================
set mapreduce.job.queuename=hive;
INSERT OVERWRITE TABLE `ods`.`ods_ord_t_brand_df` PARTITION (`dt`='${PARTITITON}')
SELECT t1.`id` AS `id` ,t1.`brand_china_name` AS `brand_china_name` ,t1.`brand_english_name` AS `brand_english_name` ,t1.`brand_desc` AS `brand_desc` ,t1.`band_logo` AS `band_logo` ,t1.`status` AS `status` ,t1.`create_time` AS `create_time` ,t1.`update_time` AS `update_time` ,t1.`create_id` AS `create_id` ,t1.`source_db` AS `is_deleted` ,0 AS `is_deleted`,DATE_FORMAT(CURRENT_TIMESTAMP, 'yyyy-MM-dd HH:mm:ss') AS `inserted_time`,DATE_FORMAT(CURRENT_TIMESTAMP, 'yyyy-MM-dd HH:mm:ss') AS `updated_time`
FROM `stg`.`stg_ord_t_brand_df` t1 WHERE `dt`='${PARTITITON}';
数据合并:
-- ================================================
-- 作者: Administrator
-- 创建日期: 2024-06-11
-- 功能描述: 同步ods层品牌表
-- 目标表: ods_ord_t_commodity_df
-- 源表: stg_ord_t_commodity_di
-- 修改日志:
-- 修改日期: 修改人: 修改内容:
-- ================================================
set mapreduce.job.queuename=hive;
INSERT OVERWRITE TABLE `ods`.`ods_ord_t_commodity_df` PARTITION (`dt`='${PARTITITON}')
SELECT COALESCE(t2.`id`,t1.`id`) AS `id` ,COALESCE(t2.`commodity_code`,t1.`commodity_code`) AS `commodity_code` ,COALESCE(t2.`commodity_name`,t1.`commodity_name`) AS `commodity_name` ,COALESCE(t2.`commodity_describe`,t1.`commodity_describe`) AS `commodity_describe` ,COALESCE(t2.`brand_id`,t1.`brand_id`) AS `brand_id` ,COALESCE(t2.`retail_price`,t1.`retail_price`) AS `retail_price` ,COALESCE(t2.`cost_price`,t1.`cost_price`) AS `cost_price` ,COALESCE(t2.`launch_date`,t1.`launch_date`) AS `launch_date` ,COALESCE(t2.`delisting_date`,t1.`delisting_date`) AS `delisting_date` ,COALESCE(t2.`commodity_color`,t1.`commodity_color`) AS `commodity_color` ,COALESCE(t2.`create_time`,t1.`create_time`) AS `create_time` ,COALESCE(t2.`update_time`,t1.`update_time`) AS `update_time` ,COALESCE(t2.`origin`,t1.`origin`) AS `origin` ,COALESCE(t2.`length`,t1.`length`) AS `length` ,COALESCE(t2.`width`,t1.`width`) AS `width` ,COALESCE(t2.`height`,t1.`height`) AS `height` ,COALESCE(t2.`weight`,t1.`weight`) AS `weight` ,COALESCE(t2.`create_id`,t1.`create_id`) AS `create_id` ,COALESCE(t2.`update_id`,t1.`update_id`) AS `update_id` ,COALESCE(t2.`status`,t1.`status`) AS `status`,COALESCE(t2.`source_db`,t1.`source_db`) AS `source_db` ,COALESCE(t1.`is_deleted`, 0) AS `is_deleted`,COALESCE(t1.`inserted_time`,DATE_FORMAT(CURRENT_TIMESTAMP, 'yyyy-MM-dd HH:mm:ss')) AS `inserted_time`,CASE WHEN t2.`update_time` >t1.`update_time` THEN DATE_FORMAT(CURRENT_TIMESTAMP, 'yyyy-MM-dd HH:mm:ss') ELSE COALESCE(t1.`updated_time`,DATE_FORMAT(CURRENT_TIMESTAMP, 'yyyy-MM-dd HH:mm:ss')) END AS `updated_time`
FROM (SELECT * FROM `ods`.`ods_ord_t_commodity_df` WHERE `dt`='${PRE_PARTITITON}') t1
FULL JOIN (SELECT * FROM `stg`.`stg_ord_t_commodity_di` WHERE `dt`='${PARTITITON}') t2
ON t1.`id`=t2.`id`;
3.DIM层代码规范
- 保证每个字段没有空值,如果有按默认值填充
- DIM层采用二种同步方式:
- 全量覆盖:如果只用当前的数据,历史数据无效
- 拉链表:一般情况下用拉链表
举例:
全量覆盖:
-- ================================================
-- 作者: Administrator
-- 创建日期: 2024-06-11
-- 功能描述: 同步dim层品牌维表
-- 目标表: dim_sal_brand_df
-- 源表: ods_ord_t_brand_df
-- 修改日志:
-- 修改日期: 修改人: 修改内容:
-- ================================================
set mapreduce.job.queuename=hive;
INSERT OVERWRITE TABLE `dim`.`dim_sal_brand_df`
SELECT defaultv(t1.`id`,-99) AS `brand_id` ,defaultv(t1.`brand_china_name`,'None') AS `brand_cn_name` ,defaultv(t1.`brand_english_name`,'None') AS `brand_en_name` ,defaultv(t1.`brand_desc`,'None') AS `brand_desc` ,defaultv(t1.`band_logo`,'None') AS `band_logo` ,defaultv(t1.`status`,-99) AS `band_status` ,defaultv(t1.`create_time`,'1970-01-01 00:00:00') AS `create_time` ,defaultv(t1.`update_time`,'1970-01-01 00:00:00') AS `update_time` ,defaultv(t1.`create_id`,-99) AS `create_id` ,0 AS `is_deleted`,DATE_FORMAT(CURRENT_TIMESTAMP, 'yyyy-MM-dd HH:mm:ss') AS `inserted_time`,DATE_FORMAT(CURRENT_TIMESTAMP, 'yyyy-MM-dd HH:mm:ss') AS `updated_time`
FROM `ods`.`ods_ord_t_brand_df` t1
WHERE t1.`dt` = '${PARTITITON}';
拉链表:
-- ================================================
-- 作者: Administrator
-- 创建日期: 2024-06-11
-- 功能描述: 同步dim层商品维表
-- 目标表: dim_sal_commodity_df
-- 源表: ods_ord_t_commodity_df,ods_ord_t_brand_df
-- 修改日志:
-- 修改日期: 修改人: 修改内容:
-- ================================================
set mapreduce.job.queuename=hive;
WITH `tmp_dim_sal_commodity_df` AS (SELECT COALESCE(t2.`commodity_id`,t1.`commodity_id`) AS `commodity_id` ,COALESCE(t2.`brand_id`,t1.`brand_id`) AS `brand_id` ,COALESCE(t2.`brand_cn_name`,t1.`brand_cn_name`) AS `brand_cn_name` ,COALESCE(t2.`commodity_no`,t1.`commodity_no`) AS `commodity_no` ,COALESCE(t2.`commodity_name`,t1.`commodity_name`) AS `commodity_name` ,COALESCE(t2.`commodity_desc`,t1.`commodity_desc`) AS `commodity_desc` ,COALESCE(t2.`commodity_color`,t1.`commodity_color`) AS `commodity_color` ,COALESCE(t2.`commodity_length`,t1.`commodity_length`) AS `commodity_length` ,COALESCE(t2.`commodity_width`,t1.`commodity_width`) AS `commodity_width` ,COALESCE(t2.`commodity_height`,t1.`commodity_height`) AS `commodity_height` ,COALESCE(t2.`commodity_weight`,t1.`commodity_weight`) AS `commodity_weight` ,COALESCE(t2.`retail_price`,t1.`retail_price`) AS `retail_price` ,COALESCE(t2.`cost_price`,t1.`cost_price`) AS `cost_price` ,COALESCE(t2.`launch_date`,t1.`launch_date`) AS `launch_date` ,COALESCE(t2.`delisting_date`,t1.`delisting_date`) AS `delisting_date` ,COALESCE(t2.`origin`,t1.`origin`) AS `origin` ,COALESCE(t2.`create_id`,t1.`create_id`) AS `create_id` ,COALESCE(t2.`update_id`,t1.`update_id`) AS `update_id` ,COALESCE(t2.`create_time`,t1.`create_time`) AS `create_time` ,COALESCE(t2.`update_time`,t1.`update_time`) AS `update_time` ,COALESCE(t2.`is_deleted`, t1.`is_deleted`) AS `is_deleted` ,DATE_FORMAT(CURRENT_TIMESTAMP, 'yyyy-MM-dd HH:mm:ss') AS `inserted_time`,DATE_FORMAT(CURRENT_TIMESTAMP, 'yyyy-MM-dd HH:mm:ss') AS `updated_time`,CASE WHEN t1.`commodity_id` IS NULL THEN 'insert' ELSE 'update'END AS `flag`FROM (SELECT * FROM `dim`.`dim_sal_commodity_df` WHERE `is_current` =1) t1 FULL JOIN (SELECT defaultv(t1.`id`,-99) AS `commodity_id` ,defaultv(t1.`brand_id`,-99) AS `brand_id` ,defaultv(t2.`brand_china_name`,'None') AS `brand_cn_name` ,defaultv(t1.`commodity_code`,'None') AS `commodity_no` ,defaultv(t1.`commodity_name`,'None') AS `commodity_name` ,defaultv(t1.`commodity_describe`,'None') AS `commodity_desc` ,defaultv(t1.`commodity_color`,'None') AS `commodity_color` ,defaultv(t1.`length`,0) AS `commodity_length` ,defaultv(t1.`width`,0) AS `commodity_width` ,defaultv(t1.`height`,0) AS `commodity_height` ,defaultv(t1.`weight`,0) AS `commodity_weight` ,defaultv(t1.`retail_price`,0) AS `retail_price` ,defaultv(t1.`cost_price`,0) AS `cost_price` ,defaultv(t1.`launch_date`,'1970-01-01') AS `launch_date` ,defaultv(t1.`delisting_date`,'1970-01-01') AS `delisting_date` ,defaultv(t1.`origin`,'None') AS `origin` ,defaultv(t1.`create_id`,-99) AS `create_id` ,defaultv(t1.`update_id`,-99) AS `update_id` ,defaultv(t1.`create_time`,'1970-01-01 00:00:00') AS `create_time` ,defaultv(t1.`update_time`,'1970-01-01 00:00:00') AS `update_time` ,t1.`is_deleted` ,t1.`inserted_time` ,t1.`updated_time` FROM `ods`.`ods_ord_t_commodity_df` t1 LEFT JOIN `ods`.`ods_ord_t_brand_df` t2 ON t1.`brand_id`=t2.`id` AND t2.`dt` ='${PARTITITON}' WHERE t1.`dt`='${PARTITITON}') t2ON t1.`commodity_id`=t2.`commodity_id` WHERE MD5(CONCAT( t2.`brand_id` ,t2.`brand_cn_name` ,t2.`commodity_no` ,t2.`commodity_name` ,t2.`commodity_desc` ,t2.`commodity_color` ,t2.`commodity_length` ,t2.`commodity_width` ,t2.`commodity_height` ,t2.`commodity_weight` ,t2.`retail_price` ,t2.`cost_price` ,t2.`launch_date` ,t2.`delisting_date` ,t2.`origin` ,t2.`create_id` ,t2.`update_id` ,t2.`create_time` ,t2.`update_time` ,t2.`is_deleted`
)) <> MD5(CONCAT( t1.`brand_id` ,t1.`brand_cn_name` ,t1.`commodity_no` ,t1.`commodity_name` ,t1.`commodity_desc` ,t1.`commodity_color` ,t1.`commodity_length` ,t1.`commodity_width` ,t1.`commodity_height` ,t1.`commodity_weight` ,t1.`retail_price` ,t1.`cost_price` ,t1.`launch_date` ,t1.`delisting_date` ,t1.`origin` ,t1.`create_id` ,t1.`update_id` ,t1.`create_time` ,t1.`update_time` ,t1.`is_deleted`
)) OR t1.`commodity_id` IS NULL)INSERT OVERWRITE TABLE `dim`.`dim_sal_commodity_df`
SELECT * FROM
(SELECT t1.`commodity_id` ,t1.`brand_id` ,t1.`brand_cn_name` ,t1.`commodity_no` ,t1.`commodity_name` ,t1.`commodity_desc` ,t1.`commodity_color` ,t1.`commodity_length` ,t1.`commodity_width` ,t1.`commodity_height` ,t1.`commodity_weight` ,t1.`retail_price` ,t1.`cost_price` ,t1.`launch_date` ,t1.`delisting_date` ,t1.`origin` ,t1.`create_id` ,t1.`update_id` ,t1.`create_time` ,t1.`update_time` ,t1.`is_deleted` ,CASEWHEN t1.`end_date` = '9999-12-31' AND t2.`commodity_id` IS NOT NULL THEN 0ELSE t1.`is_current`END AS `is_current`,t1.`start_date` ,CASEWHEN t1.`end_date` = '9999-12-31' AND t2.`commodity_id` IS NOT NULL THEN DATE_FORMAT(DATE_ADD(CURRENT_TIMESTAMP, -1), 'yyyy-MM-dd') ELSE t1.`end_date`END AS `end_date`,t1.`inserted_time` ,t1.`updated_time`FROM `dim`.`dim_sal_commodity_df` t1LEFT JOIN (SELECT `commodity_id` FROM `tmp_dim_sal_commodity_df` WHERE `flag`='update') t2ON t1.`commodity_id` = t2.`commodity_id`UNION ALL SELECT t1.`commodity_id` ,t1.`brand_id` ,t1.`brand_cn_name` ,t1.`commodity_no` ,t1.`commodity_name` ,t1.`commodity_desc` ,t1.`commodity_color` ,t1.`commodity_length` ,t1.`commodity_width` ,t1.`commodity_height` ,t1.`commodity_weight` ,t1.`retail_price` ,t1.`cost_price` ,t1.`launch_date` ,t1.`delisting_date` ,t1.`origin` ,t1.`create_id` ,t1.`update_id` ,t1.`create_time` ,t1.`update_time` ,0 `is_deleted` ,1 `is_current`,CASE WHEN `flag`='insert' THEN '1970-01-01' ELSE DATE_FORMAT(DATE_ADD(CURRENT_TIMESTAMP, -1), 'yyyy-MM-dd') END AS `start_date` ,'9999-12-31' AS `end_date`,`inserted_time` ,`updated_time` FROM `tmp_dim_sal_commodity_df` t1
) a;
4.DWD层代码规范
- 更新时间范围要以参数的形式配置
- 保证每个字段没有空值,如果有按默认值填充
- 关联大表时要加上时间范围条件而不能全表扫描
举例:
-- ================================================
-- 作者: Administrator
-- 创建日期: 2024-06-11
-- 功能描述: 同步dwd层订单表
-- 目标表: dwd_sal_trd_ord_order_df
-- 源表: ods_ord_t_order_df.,dim_pub_dictionary
-- 修改日志:
-- 修改日期: 修改人: 修改内容:
-- ================================================
set mapreduce.job.queuename=hive;
INSERT OVERWRITE TABLE `dwd`.`dwd_sal_trd_ord_order_df`
SELECT COALESCE(t2.`order_id`,t1.`order_id`) AS `order_id` ,COALESCE(t2.`commodity_id`,t1.`commodity_id`) AS `commodity_id` ,COALESCE(t2.`user_id`,t1.`user_id`) AS `user_id` ,COALESCE(t2.`order_status_code`,t1.`order_status_code`) AS `order_status_code` ,COALESCE(t2.`order_status_name`,t1.`order_status_name`) AS `order_status_name` ,COALESCE(t2.`pay_type_code`,t1.`pay_type_code`) AS `pay_type_code` ,COALESCE(t2.`pay_type_name`,t1.`pay_type_name`) AS `pay_type_name` ,COALESCE(t2.`pay_status_code`,t1.`pay_status_code`) AS `pay_status_code` ,COALESCE(t2.`pay_status_name`,t1.`pay_status_name`) AS `pay_status_name` ,COALESCE(t2.`total_amt`,t1.`total_amt`) AS `total_amt` ,COALESCE(t2.`total_qty`,t1.`total_qty`) AS `total_qty` ,COALESCE(t2.`create_id`,t1.`create_id`) AS `create_id` ,COALESCE(t2.`create_time`,t1.`create_time`) AS `create_time` ,COALESCE(t2.`update_id`,t1.`update_id`) AS `update_id` ,COALESCE(t2.`update_time`,t1.`update_time`) AS `update_time`,COALESCE(t2.`is_deleted`, t1.`is_deleted`) AS `is_deleted`,COALESCE(t1.`inserted_time`, t2.`inserted_time`) AS `inserted_time`,COALESCE(t2.`updated_time`, t1.`updated_time`) AS `updated_time`
FROM (SELECT * FROM `dwd`.`dwd_sal_trd_ord_order_df`) t1
FULL JOIN (SELECT defaultv(t1.`id`,-99) AS `order_id` ,defaultv(t1.`commodity_id`,-99) AS `commodity_id` ,defaultv(t1.`user_id`,-99) AS `user_id` ,defaultv(t1.`order_status`,-99) AS `order_status_code` ,defaultv(t2.`dic_name`,'None') AS `order_status_name` ,defaultv(t1.`pay_type`,-99) AS `pay_type_code` ,defaultv(t2.`dic_name`,'None') AS `pay_type_name` ,defaultv(t1.`pay_status`,-99) AS `pay_status_code` ,defaultv(t1.`pay_status`,'None') AS `pay_status_name` ,defaultv(t1.`total_amount`,0) AS `total_amt` ,defaultv(t1.`total_quantity`,0) AS `total_qty` ,defaultv(t1.`create_id`,-99) AS `create_id` ,defaultv(t1.`create_time`,'1970-01-01 00:00:00') AS `create_time` ,defaultv(t1.`update_id`,-99) AS `update_id` ,defaultv(t1.`update_time`,'1970-01-01 00:00:00') AS `update_time` ,t1.`is_deleted` ,DATE_FORMAT(CURRENT_TIMESTAMP, 'yyyy-MM-dd HH:mm:ss') AS `inserted_time`,DATE_FORMAT(CURRENT_TIMESTAMP, 'yyyy-MM-dd HH:mm:ss') AS `updated_time`FROM `ods`.`ods_ord_t_order_df.` t1 LEFT JOIN `dim`.`dim_pub_dictionary` t2 ON AND t2.`dt` ='${PARTITITON}' WHERE t1.`dt`='${PARTITITON}'AND t1.`update_time` >= DATE_FORMAT(${START_TIME}, 'yyyy-MM-dd') AND t1.`update_time` < DATE_FORMAT(${END_TIME}, 'yyyy-MM-dd')) t2
ON t1.`order_id`=t2.`order_id`;
5.DWS层代码规范
- 更新时间范围要以参数的形式配置
- 保证每个字段没有空值,如果有按默认值填充
- 可变指标和不可变指标不能放在同一个表里
举例:
-- ================================================
-- 作者: Administrator
-- 创建日期: 2024-03-02
-- 功能描述: 最近一天按退货完成日期线上退货订单汇总报表
-- 目标表: dws_sal_trd_complete_date_xj_retn_order_1d
-- 源表: dwd_sal_trd_retn_ord_xj_order_df
-- 修改日志:
-- 修改日期: 修改人: 修改内容:
-- ================================================
set mapreduce.job.queuename=hive;
INSERT OVERWRITE TABLE `px_dws`.`dws_sal_trd_complete_date_xj_retn_order_1d`
SELECT t1.`shop_id` ,t1.`member_id`,t1.`commodity_id` ,t1.`source_type_code` ,t1.`source_type_name` ,t1.`return_time` AS `return_order_date`,t1.`create_time` AS `return_order_time`,t1.`update_time` AS `return_complete_date`,t1.`order_id`,t1.`return_order_id`,t1.`return_order_no`,SUM(t1.`commodity_price` * t1.`commodity_quantity`) AS `origin_retn_sale_amt_1d_ai`,SUM(t1.`commodity_sale_amt`) AS `trade_retn_amt_1d_ai` ,SUM(t1.`commodity_real_amt`) AS `delivery_retn_amt_1d_ai` ,SUM(t1.`commodity_real_quantity` ) AS `delivery_retn_cmd_cnt_1d_ai` ,SUM(CASE WHEN t1.`order_status_code` = 6 THEN t1.`commodity_real_amt` ELSE 0 END) AS `commodity_real_amt_1d_ai`,SUM(CASE WHEN t1.`order_status_code` = 6 THEN t1.`commodity_real_quantity` ELSE 0 END) AS `commodity_real_quantity_1d_ai`,DATE_FORMAT(CURRENT_TIMESTAMP, 'yyyy-MM-dd HH:mm:ss') AS `inserted_time`,DATE_FORMAT(CURRENT_TIMESTAMP, 'yyyy-MM-dd HH:mm:ss') AS `updated_time`,DATE_FORMAT(t1.`update_time` , 'yyyy-MM-dd') AS `dt`
FROM `px_dwd`.`dwd_sal_trd_retn_ord_xj_order_df` t1
WHERE t1.`dt` ='9999-12-31' AND t1.`is_deleted`=0 AND t1.`return_order_status_code`=7AND t1.`update_time` >= DATE_FORMAT(${START_TIME}, 'yyyy-MM-dd') AND t1.`update_time`< DATE_FORMAT(${END_TIME}, 'yyyy-MM-dd')
GROUP BY t1.`shop_id` ,t1.`member_id`,t1.`commodity_id` ,t1.`return_time` ,t1.`create_time` ,t1.`update_time` ,t2.`source_type_code` ,t2.`source_type_name`,t1.`order_id`,t1.`return_order_id`,t1.`return_order_no`,DATE_FORMAT(t1.`update_time` , 'yyyy-MM-dd');
6.ADS层代码规范
- 更新时间范围要以参数的形式配置
- 保证每个字段没有空值,如果有按默认值填充
- 维度表字段根据需求取快照值或当前值
举例:
-- ================================================
-- 作者: Administrator
-- 创建日期: 2024-04-16
-- 功能描述: 最近1天线上门店商品销售分析报表
-- 目标表: ads_sal_trd_pay_retn_order_shop_item_1d
-- 源表: dws_sal_trd_complete_date_xj_retn_order_1d,dim_sal_shop,dim_pub_commodity
-- 修改日志:
-- 修改日期: 修改人: 修改内容:
-- ================================================set mapreduce.job.queuename=hive;
INSERT OVERWRITE TABLE `px_ads`.`ads_sal_trd_pay_retn_order_shop_item_1d`
SELECT t1.`dt` AS `report_date`,t1.`shop_id`,t2.`shop_no`,t2.`shop_name`,t1.`commodity_id`,t3.`commodity_no`,t3.`commodity_name`,t1.`source_type_code`,t1.`source_type_name`,SUM(t2.`trade_amt_1d_ai`) AS `trade_amt_1d_ai`,SUM(t2.`gross_cmd_cnt_1d_ai`) AS `gross_cmd_cnt_1d_ai`,SUM(t2.`pay_sale_amt_1d_ai`) AS `pay_sale_amt_1d_ai`,SUM(t2.`pay_cmd_cnt_1d_ai`) AS `pay_cmd_cnt_1d_ai`,DATE_FORMAT(CURRENT_TIMESTAMP, 'yyyy-MM-dd HH:mm:ss') AS `inserted_time`,DATE_FORMAT(CURRENT_TIMESTAMP, 'yyyy-MM-dd HH:mm:ss') AS `updated_time`,t1.`dt`
FROM `px_dws`.`dws_sal_trd_complete_date_xj_retn_order_1d` t1
LEFT JOIN `px_dim`.`dim_sal_shop` t2 ON t1.`shop_id` = t2.`shop_id` AND t1.`dt`>= DATE_FORMAT(t2.`start_time`, 'yyyy-MM-dd' ) AND t1.`dt`< DATE_FORMAT(t2.`end_time`, 'yyyy-MM-dd' )
LEFT JOIN `px_dim`.`dim_pub_commodity` t3 ON t1.`commodity_id` = t3.`commodity_id` AND t3.`is_current` =1
WHERE t1.`dt` >= DATE_FORMAT(${START_TIME}, 'yyyy-MM-dd') AND t1.`dt` < DATE_FORMAT(${END_TIME}, 'yyyy-MM-dd')
GROUP BY t1.`shop_id`,t2.`shop_no`,t2.`shop_name`,t1.`commodity_id`,t3.`commodity_no`,t3.`commodity_name`,t1.`source_type_code`,t1.`source_type_name`,t1.`dt`
7.DAS表代码规范
- 源数据库和目标数据库以参数的形式配置
举例:
{"job": {"setting": {"speed": {"channel":4}},"content": [{"reader": {"name": "hdfsreader", "parameter": {"path": "${HIVE_DB_DIR}ads.db/$TRGT_TABLE_NAME/dt=$PARTITION","hadoopConfig":{"dfs.client.failover.proxy.provider.mycluster": "org.apache.hadoop.hdfs.server.namenode.ha.ConfiguredFailoverProxyProvider","dfs.ha.namenodes.mycluster": "nn1,nn2","dfs.namenode.rpc-address.mycluster.nn1": "node101:8020","dfs.namenode.rpc-address.mycluster.nn2": "node102:8020","dfs.nameservices": "mycluster"},"defaultFS": "$HDFS_URL","column": [{"index": "0","type": "string"},{"index": "1","type": "long"},{"index": "2","type": "long"},{"index": "3","type": "long"},{"index": "4","type": "string"},{"index": "5","type": "long"},{"index": "6","type": "string"},{"index": "7","type": "long"},{"index": "8","type": "string"},{"index": "9","type": "double"},{"index": "10","type": "double"},{"index": "11","type": "string"},{"index": "12","type": "string"}],"fileType": "orc", "encoding": "UTF-8", "fieldDelimiter": "\u0001" }}, "writer": {"name": "clickhousewriter", "parameter": {"username": "$USER_NAME","password": "$PASSWORD","column": ["report_date" ,"commodity_id" ,"user_id" ,"order_status_code" ,"order_status_name" ,"pay_type_code" ,"pay_type_name" ,"pay_status_code" ,"pay_status_name" ,"total_amt" ,"total_qty" ,"inserted_time" ,"updated_time" ], "preSql": ["ALTER TABLE $TRGT_TABLE_NAME DROP PARTITION '$CK_PARTITION'"],"connection": [{"jdbcUrl":"$JDBCURL", "table": ["$TRGT_TABLE_NAME"]}]}}}], }
}