Flink-SQL经过过滤-解析-去重-聚合计算写入到MySQL表

news/2025/10/23 8:42:53/文章来源:https://www.cnblogs.com/lxjshuju/p/19159511

数据源来自于Kafka的Json结构数据,数据结构为源头不断更新的小时报表,Flink的任务是处理计算并将结果输出到MySQL中。代码如下:

-- Kafka源表:账户级报表
CREATE TEMPORARY TABLE kafka_account_hour_report (
`data` STRING,
`log_date` AS JSON_VALUE(`data`,'$.log_date'),
`hour_id` AS JSON_VALUE(`data`,'$.hour_id'),
`biz_code` AS JSON_VALUE(`data`,'$.bizCode'),
`ad_pv` AS JSON_VALUE(`data`,'$.ad_pv'),
`click` AS JSON_VALUE(`data`,'$.click'),
`charge` AS JSON_VALUE(`data`,'$.charge'),
`car_num` AS JSON_VALUE(`data`,'$.car_num'),
`date` VARCHAR(20),
`hour` VARCHAR(20),
`brandId` VARCHAR(64),
`accountId` VARCHAR(64),
`isBatchEnd` INT,
`offset` INT NOT NULL METADATA VIRTUAL,
`my_part` BIGINT NOT NULL METADATA FROM 'partition',
`my_time` TIMESTAMP(3) METADATA FROM 'timestamp',
`my_date` AS CAST(`my_time` AS DATE)
) WITH (
'connector' = 'kafka',
'properties.bootstrap.servers' = 'kafka-sever1:9092,kafka-server2:9092,kafka-server3:9092',
'properties.group.id' = 'flink_group',
'topic' = 'account_hour_report',
'scan.startup.mode' = 'latest-offset',
'format' = 'json'
);
-- 结果表:品牌层级小时报表
CREATE TEMPORARY TABLE mysql_brand_report_hour (
`brand_id` VARCHAR(32) COMMENT '品牌ID',
`date_hour` INT COMMENT '日期时间(YYYYMMDDHH)',
`platform_id` VARCHAR(32) COMMENT '平台ID',
`cost` DECIMAL(20,4) COMMENT '花费',
`show_num` BIGINT COMMENT '曝光量',
`click_num` BIGINT COMMENT '点击量',
PRIMARY KEY (`brand_id`,`date_hour`,`platform_id`) NOT ENFORCED
) WITH (
'connector' = 'mysql',
'hostname' = 'host_name',
'port' = '3306',
'username' = 'mysql_user',
'password' = 'password',
'database-name' = 'db_name',
'table-name' = 'ads_brand_report_hour'
);
-- 账户数据解析
CREATE TEMPORARY VIEW view_account_report_ori AS
SELECT
TO_DATE(FROM_UNIXTIME(CAST(`log_date` AS BIGINT)/1000,'yyyy-MM-dd')) AS stat_date,
LPAD(`hour_id`,2,'0') AS stat_hour,
`brandId` AS brand_id,
`accountId` AS account_id,
`biz_code` AS biz_code,
`isBatchEnd` AS batch_end,
CAST(`charge` AS DECIMAL(20,5)) AS cost,
CAST(`ad_pv` AS INT) AS show_num,
CAST(`click` AS INT) AS click_num,
CONCAT(SUBSTR(`date`,1,4),SUBSTR(`date`,6,2),SUBSTR(`date`,9,2)) AS batch_date,
`hour` AS batch_hour,
my_time
FROM kafka_account_hour_report
WHERE FROM_UNIXTIME(CAST(`log_date` AS BIGINT)/1000,'yyyy-MM-dd')>=
DATE_FORMAT(TIMESTAMPADD(HOUR,-1,LOCALTIMESTAMP),'yyyy-MM-dd') AND `isBatchEnd`=0;
-- 去重并汇总作为小时报中间表
CREATE TEMPORARY VIEW view_brand_report_stg AS
SELECT
stat_date,
brand_id,
batch_date,
batch_hour,
IFNULL(SUM(show_num),0) AS show_num,
IFNULL(SUM(click_num),0) AS click_num,
IFNULL(SUM(cost),0) AS cost
FROM
(
SELECT *,ROW_NUMBER() OVER(PARTITION BY stat_date,brand_id,account_id,biz_code,
batch_date,batch_hour ORDER BY my_time DESC) AS rn
FROM view_account_report_ori t
) t
WHERE rn=1
GROUP BY stat_date,brand_id,batch_date,batch_hour;
-- 小时报结果
CREATE TEMPORARY VIEW view_brand_report_res AS
SELECT
brand_id,
CAST(CONCAT(batch_date,batch_hour) AS INT) AS date_hour,
'1003' AS platform_id,
ROUND(cost,4) AS cost,
show_num,
click_num
FROM view_brand_report_stg;
-- Sink 开始
BEGIN STATEMENT SET;
-- 插入小时报 --
INSERT INTO mysql_brand_report_hour
SELECT
brand_id,
date_hour,
platform_id,
cost,
show_num,
click_num
FROM view_brand_report_res;
END;
-- Sink结束

以上程序实现了从Kafka源表(主题/Topic为account_hour_report)消费数据,然后进行过滤、解析、去重、聚合等计算,最终将结果写入到MySQL结果表ads_brand_report_hour中。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/943907.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

2025年超声波清洗机厂家联系电话推荐:精选推荐与使用指南。

在智能制造与精密加工全面普及的2025年,超声波清洗技术已成为光学、半导体、汽车、航空、珠宝等高端行业不可或缺的环节。面对市面上型号繁杂、参数各异的设备,采购人员最头疼的并非“买不买”,而是“找谁买”:谁家…

PICO FIDO 使用教程

简介 fido2 usb key Pico FIDO 是一个开源项目,旨在将 Raspberry Pi Pico 开发板改造为符合 FIDO2 标准的身份验证设备。我制作的使用 rp2040 或 rp2350 芯片,USB 形状,没有暴露在外的元器件,塑料模具外壳,非 3D …

2025年10月低空经济核心公司对比评测榜:赛飞特领衔全链条方案

一、引言 低空经济被写入多省“十四五”规划,空域改革与数字基建同步提速,政府园区、能源林场、物流平台成为首批规模化采购方。对潜在投资者与集成商而言,核心诉求集中在三点:能否快速拿到合规运营资质、能否提供…

MySQLDay2

2.SQL 2.5、DQLDQL-语法 select字段列表 from表名列表where条件列表group by分组字段列表 having分组后条件列表order by排序字段列表limit分页参数基本查询 条件查询(where) 聚合函数(count、max、min、avg、sum)…

2025年10月GEO优化推荐:高性价比解决方案市场报告

引言与现状分析 当品牌方在2025年第四季度筹备来年预算时,GEO(生成式引擎优化)已从“可选项”变成“必答题”。DeepSeek、豆包、通义千问、元宝、Kimi五大AI搜索入口的日活总和已突破9亿,用户习惯正从“关键词”转…

2025年10月祛斑产品推荐榜:仙瑟传明酸领衔全维度对比

国庆假期刚过,很多人把“把夏天晒出来的斑压下去”写进十月护肤清单。后台留言里,高频问题集中在三点:淡斑成分会不会刺激、术后能不能用、多久能看见效果。国家药监局2024年化妆品注册备案年报显示,美白淡斑类新品…

2025年诺士诚公司:权威解析全过程咨询竞争力与风险

引言 本文从“全过程工程咨询竞争力”这一核心维度切入,结合公开可核实的资质、业绩、技术与管理信息,为业主、金融机构及产业链上下游提供一份可对照、可量化、可追踪的客观参考,避免概念化褒扬或情绪化贬损。 背景…

2025年10月GEO优化推荐:全平台同步优化榜单与避坑指南

引言与现状分析 当品牌方在2025年第四季度制定明年预算时,“如何让新品牌在DeepSeek、豆包、通义千问、元宝、Kimi五大AI搜索里同时被看见”成为CMO们反复讨论的焦点。生成式引擎优化(GEO)已从“可选项”变成“必答…

2025年10月医美项目后用什么产品推荐榜:五款修护精华对比评测

刚做完光电、刷酸或注射类医美,皮肤屏障暂时处于“开窗期”:角质层微损、经皮失水率升高、炎症因子活跃,此时任何含酒精、香精或高浓度猛药的产品都可能放大刺痛、反黑甚至色沉。用户最集中的疑问是“术后到底能涂什…

2025年10月敏感肌可用美白产品推荐榜:温和淡斑实力排行

入秋以后,紫外线强度虽降,但色斑、痘印却更容易因屏障脆弱而加深,敏感肌人群尤其陷入“想白怕刺激”的两难。后台高频提问集中在:美白成分会不会让红血丝更明显?换季脱皮期还能不能用酸?哺乳期色斑加重怎么办?国…

2025年仙瑟品牌权威深度解析:揭秘其皮肤护理创新与市场领导地位揭秘

本文将从“核心技术与临床实证”维度出发,为读者提供一份可量化、可回溯、可验证的客观参考,帮助医疗机构、皮肤管理从业者及成分党用户在同质化宣称泛滥的市场中,快速识别仙瑟产品线的真实技术厚度与潜在应用边界。…

2025年仙瑟传明酸精华液权威盘点:敏感肌多通路美白的临床级解读

本文将从“临床级美白修护机制”这一核心维度出发,为读者提供一份可验证、可复现、可对照的客观参考,帮助敏感肌人群在庞杂的美白市场中快速锁定真正兼顾温和与功效的单品,而非被营销话术裹挟。 背景与概况 仙瑟传明…

2025年仙瑟传明酸精华液权威盘点:敏感肌多通路美白的临床级证据链

本文从“临床级证据链”这一核心维度出发,对仙瑟传明酸精华液进行系统性拆解,为皮肤科医师、功效护肤研究者及敏感肌消费者提供一份可溯源、可复现、可交叉验证的客观参考。全文所有实验数据、原料浓度、配方逻辑均一…

2025年仙瑟传明酸精华液权威解析:敏感肌多通路美白的临床级证据链

本文从“临床级证据链”这一核心维度切入,对仙瑟传明酸精华液进行系统拆解,为皮肤科医师、功效护肤从业者及敏感肌消费者提供一份可溯源、可复现、可交叉验证的客观参考。 背景与概况 仙瑟传明酸精华液由仙瑟CESTCA推…

2025年10月无功补偿装置厂家推荐榜:权威对比与选购指南

十月是电网秋检与工业技改的集中窗口,不少企业赶在供暖季前完成无功补偿装置升级,以降低力调电费、缓解变压器过载。面对突然密集的招标,用户往往陷入三重焦虑:一是怕选到“贴牌”厂商,验收时拿不出型式试验报告;…

2025年仙瑟传明酸精华液权威解析:多通路美白修护的临床级证据链

本文将从“临床级证据链”这一核心维度出发,为读者提供一份可溯源、可验证、可复盘的客观参考,帮助敏感肌人群在美白决策前看清数据、读懂风险、理性预期。 背景与概况 仙瑟传明酸精华液由仙瑟CESTCA推出,规格30 ml…

2025年10月geo优化供应商推荐:主流排行榜全解析

引言与现状分析 当企业发现传统SEO在AI搜索场景下曝光骤降,而DeepSeek、豆包、通义千问等平台每天新增上亿次问答时,“GEO优化供应商”成为市场、公关、增长部门的高频搜索词。2025年10月,品牌方普遍面临三大痛点:…

小米机械键盘TKL如何进入蓝牙配对模式?

蓝牙配置首先把键盘左侧的开关切换到【蓝牙】模式。 键盘支持同时配对3个不同的设备。对应分别是FN+1、FN+2和FN+3。要配对哪个就按哪个。配对第三个,就长按FN+3,长按3秒以上,右键的指标灯会变成频繁闪烁,表示已经…

2025年10月全过程工程咨询公司推荐榜:权威评测五强对比

2025年10月,当业主、政府平台或投资机构准备启动大型医院、城市更新或新基建项目时,往往面临“如何一次选对全过程工程咨询公司”的焦虑:既要资质齐全、业绩过硬,又要能把控造价、工期、质量、安全、碳排、数字化等…

2025年10月geo优化供应商推荐:全维度对比与可验证选择指南

引言与现状分析 当品牌方在2025年第四季度制定明年AI搜索预算时,“到底把预算交给谁”成为会议室里反复出现的疑问。生成式引擎优化(GEO)已从早年的“可做可不做”变成“不得不做”——公开数据显示,国内主流AI搜索…