大数据-玩转数据-Flink SQL编程实战 (热门商品TOP N)

一、需求描述

每隔30min 统计最近 1hour的热门商品 top3, 并把统计的结果写入到mysql中。

二、需求分析

  • 1.统计每个商品的点击量, 开窗
  • 2.分组窗口分组
  • 3.over窗口

三、需求实现

3.1、创建数据源示例

input/UserBehavior.csv

543462,1715,1464116,pv,1511658000
662867,2244074,1575622,pv,1511658000
561558,3611281,965809,pv,1511658000
894923,3076029,1879194,pv,1511658000
834377,4541270,3738615,pv,1511658000
315321,942195,4339722,pv,1511658000
625915,1162383,570735,pv,1511658000
578814,176722,982926,pv,1511658000
873335,1256540,1451783,pv,1511658000
429984,4625350,2355072,pv,1511658000
866796,534083,4203730,pv,1511658000
937166,321683,2355072,pv,1511658000
156905,2901727,3001296,pv,1511658000
758810,5109495,1575622,pv,1511658000
107304,111477,4173315,pv,1511658000
452437,3255022,5099474,pv,1511658000
813974,1332724,2520771,buy,1511658000
524395,3887779,2366905,pv,1511658000

3.2、创建目标表

CREATE DATABASE flink_sql; //创建flink_sql库
USE flink_sql;
DROP TABLE IF EXISTS `hot_item`;
CREATE TABLE `hot_item` (`w_end` timestamp NOT NULL,`item_id` bigint(20) NOT NULL,`item_count` bigint(20) NOT NULL,`rk` bigint(20) NOT NULL,PRIMARY KEY (`w_end`,`rk`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8;

3.3、导入JDBC Connector依赖

<!-- 导入JDBC Connector依赖 --><dependency><groupId>org.apache.flink</groupId><artifactId>flink-connector-jdbc_${scala.binary.version}</artifactId><version>${flink.version}</version></dependency>

3.4、代码实现

package com.atguigu.flink.java.chapter_12;import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.api.Table;
import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;/*** @Author lizhenchao@atguigu.cn* @Date 2021/1/31 9:11*/
public class Flink01_HotItem_TopN {public static void main(String[] args) {StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();env.setParallelism(2);StreamTableEnvironment tenv = StreamTableEnvironment.create(env);// 使用sql从文件读取数据tenv.executeSql("create table user_behavior(" +"   user_id bigint, " +"   item_id bigint, " +"   category_id int, " +"   behavior string, " +"   ts bigint, " +"   event_time as to_timestamp(from_unixtime(ts, 'yyyy-MM-dd HH:mm:ss')), " +"   watermark for event_time as  event_time - interval '5' second " +")with(" +"   'connector'='filesystem', " +"   'path'='input/UserBehavior.csv', " +"   'format'='csv')");// 每隔 10m 统计一次最近 1h 的热门商品 top// 1. 计算每每个窗口内每个商品的点击量Table t1 = tenv.sqlQuery("select " +"   item_id, " +"   hop_end(event_time, interval '10' minute, interval '1' hour) w_end," +"   count(*) item_count " +"from user_behavior " +"where behavior='pv' " +"group by hop(event_time, interval '10' minute, interval '1' hour), item_id");tenv.createTemporaryView("t1", t1);// 2. 按照窗口开窗, 对商品点击量进行排名Table t2 = tenv.sqlQuery("select " +"   *," +"   row_number() over(partition by w_end order by item_count desc) rk " +"from t1");tenv.createTemporaryView("t2", t2);// 3. 取 top3Table t3 = tenv.sqlQuery("select " +"   item_id, w_end, item_count, rk " +"from t2 " +"where rk<=3");// 4. 数据写入到mysql// 4.1 创建输出表tenv.executeSql("create table hot_item(" +"   item_id bigint, " +"   w_end timestamp(3), " +"   item_count bigint, " +"   rk bigint, " +"   PRIMARY KEY (w_end, rk) NOT ENFORCED)" +"with(" +"   'connector' = 'jdbc', " +"   'url' = 'jdbc:mysql://hadoop162:3306/flink_sql?useSSL=false', " +"   'table-name' = 'hot_item', " +"   'username' = 'root', " +"   'password' = 'aaaaaa' " +")");// 4.2 写入到输出表t3.executeInsert("hot_item");}
}

四、总结

Flink 使用 OVER 窗口条件和过滤条件相结合以进行 Top-N 查询。利用 OVER 窗口的 PARTITION BY 子句的功能,Flink 还支持逐组 Top-N 。 例如,每个类别中实时销量最高的前五种产品。批处理表和流处理表都支持基于SQL的 Top-N 查询。
流处理模式需注意: TopN 查询的结果会带有更新。 Flink SQL 会根据排序键对输入的流进行排序;若 top N 的记录发生了变化,变化的部分会以撤销、更新记录的形式发送到下游。 推荐使用一个支持更新的存储作为 Top-N 查询的 sink 。另外,若 top N 记录需要存储到外部存储,则结果表需要拥有与 Top-N 查询相同的唯一键。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/92759.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

MySQL进阶_3.性能分析工具的使用

文章目录 第一节、数据库服务器的优化步骤第二节、查看系统性能参数第三节、 慢查询日志第四节、 查看 SQL 执行成本第五节、 分析查询语句&#xff1a;EXPLAIN 第一节、数据库服务器的优化步骤 当我们遇到数据库调优问题的时候&#xff0c;可以按照以下流程进行分析。整个流程…

Stm32_标准库_5_呼吸灯_按键控制

Stm32按键和输出差不多 PA1为LED供给正电&#xff0c;PB5放置按键&#xff0c;按键一端接PB5,另一端接负极 void Key_Init(void){RCC_APB2PeriphClockCmd(RCC_APB2Periph_GPIOB, ENABLE); //APB2总线连接着GPIOBGPIO_InitStructur.GPIO_Mode GPIO_Mode_IPU;GPIO_InitStructur.…

[SpringBoot] 8. aop 获取 request response

最近开发有一个需求需要在 aop 中获取request response &#xff0c;搜索许久没有答案&#xff0c;故此记录&#x1f4dd;&#xff5e; aop 获取 package com.example.easy_im.aop;import com.example.easy_im.Context; import jakarta.servlet.http.HttpServletRequest; impo…

STL学习笔记之容器

首先我们要学习的是容器 第一个是容器的初始化&#xff08;构造方式&#xff09;有三种方式 分别是 第一种 int arr[]{1,2,3} vector<int> v1(arr,arr3) 即容器存放的种类和从另外一个数组去拷贝一段数据。 第二种 vector<int> v2(3,10); 第一个3是指存放…

【Linux进行时】进程地址空间

进程地址空间 例子引入&#xff1a; 我们在讲C语言的时候&#xff0c;老师给大家画过这样的空间布局图&#xff0c;但是我们对它不了解 我们写一个代码来验证Linux进程地址空间 #include<stdio.h> #include<assert.h> #include<unistd.h> int g_value100; …

MATLAB在逐渐被Python淘汰吗?

MATLAB和Python都是非常强大的编程工具&#xff0c;各有优势。以下是两段代码的对比&#xff1a; MATLAB代码&#xff1a; import numpy as np from scipy.linalg import eig# 定义一个矩阵A A np.array([[1, 2], [3, 4]])# 计算矩阵A的特征值和特征向量 D, V eig(A)# 输出…

Java基础---第十二篇

系列文章目录 文章目录 系列文章目录一、获取一个类Class对象的方式有哪些?二、ArrayList 和 LinkedList 的区别有哪些?三、用过 ArrayList 吗?说一下它有什么特点?一、获取一个类Class对象的方式有哪些? 搞清楚类对象和实例对象,但都是对象。 第一种:通过类对象的 get…

[机缘参悟-107] :一个IT人关于顺势而为的思考:顺势而为思想在股-市中的体现与应用

目录 前言&#xff1a; 一、顺势而为的核心思想 1.1 道家“顺势而为”的思想 1.2 佛家“顺势而为”的思想 1.3 儒家“顺势而为”的思想 1.4 无处不在的“势” 1.5 逆势而为的代价 二、顺势而为在股市中的应用 2.1 顺势而为的策略 2.2 在别人疯狂的时候悲观&#xff0…

【C++】多线程的学习笔记——白话文版(bushi

目录 为什么要使用多线程 例子 代码 结果 首先要先学的库——thread库 thread的简介 thread的具体使用方法 基本变量的定义 注意&#xff08;小重点&#xff09; join函数的解读&#xff08;重点&#xff09; detach函数的解读 注意 关于vector和thread是联合使用 …

Mac程序坞美化工具 uBar

uBar是一款为Mac用户设计的任务栏增强软件&#xff0c;它可以为您提供更高效和更个性化的任务管理体验。 以下是uBar的一些主要特点和功能&#xff1a; 更直观的任务管理&#xff1a;uBar改变了Mac上传统的任务栏设计&#xff0c;将所有打开的应用程序以类似于Windows任务栏的方…

学习开发一个RISC-V上的操作系统(汪辰老师) — 环境配置

前言 &#xff08;1&#xff09;此系列文章是跟着汪辰老师的RISC-V课程所记录的学习笔记。 &#xff08;2&#xff09;该课程相关代码gitee链接&#xff1b; &#xff08;3&#xff09;PLCT实验室实习生长期招聘&#xff1a;招聘信息链接 &#xff08;4&#xff09;在学习汪辰老…

计算机竞赛 深度学习疲劳检测 驾驶行为检测 - python opencv cnn

文章目录 0 前言1 课题背景2 相关技术2.1 Dlib人脸识别库2.2 疲劳检测算法2.3 YOLOV5算法 3 效果展示3.1 眨眼3.2 打哈欠3.3 使用手机检测3.4 抽烟检测3.5 喝水检测 4 最后 0 前言 &#x1f525; 优质竞赛项目系列&#xff0c;今天要分享的是 &#x1f6a9; **基于深度学习加…

Guava限流器原理浅析

文章目录 基本知识限流器的类图使用示例 原理解析限流整体流程问题驱动1、限流器创建的时候会初始化令牌吗&#xff1f;2、令牌是如何放到桶里的&#xff1f;3、如果要获取的令牌数大于桶里的令牌数会怎么样4、令牌数量的更新会有并发问题吗 总结 实际工作中难免有限流的场景。…

c++模板小例子

需要注意的是&#xff0c;模板中函数或方法&#xff0c;要在类或头文件中实现。关键字typename 和class基本等同。构造类模板时&#xff0c;要指明模板参数类型&#xff0c;而函数模板则不用指明参数类型。 #pragma once#include <string.h>#include <windows.h>us…

2023/9/27 -- ARM

【汇编语言相关语法】 1.汇编语言的组成部分 1.伪操作&#xff1a;不参与程序的执行&#xff0c;但是用于告诉编译器程序该怎么编译 .text .global .end .if .else .endif .data2.汇编指令 编译器将一条汇编指令编译成一条机器码&#xff0c;在内存里一条指令占4字节内…

JavaWeb 学习笔记 10:Element

JavaWeb 学习笔记 10&#xff1a;Element Element 是一个基于 Vue 的前端组件框架&#xff0c;使用它可以快速构建美观的前端页面。 1.快速开始 创建一个简单的 JavaWeb 应用。 添加一个 Html 页面&#xff0c;并在<head>标签中加入 Element 和 Vue 的相关 js 引用&a…

C++学习笔记一: 变量和基本类型

本章讲解C内置的数据类型&#xff08;如&#xff1a;字符、整型、浮点数等&#xff09;和自定义数据类型的机制。下一章讲解C标准库里面定义的更加复杂的数据类型&#xff0c;比如可变长字符串和向量等。 1.基本内置类型 C内置的基本类型包括&#xff1a;算术类型和空类型。算…

进阶指针(四)—— 加强对指针,数组名,sizeof,strlen的理解

✨博客主页&#xff1a;小钱编程成长记 &#x1f388;博客专栏&#xff1a;进阶C语言 &#x1f388;推荐相关博文&#xff1a;进阶C语言&#xff08;一&#xff09;、进阶C语言&#xff08;二&#xff09;、进阶C语言&#xff08;三&#xff09; 进阶指针&#xff08;四&#x…

QT:绘图

widget.h #ifndef WIDGET_H #define WIDGET_H#include <QWidget> #include <QPaintEvent> //绘图事件class Widget : public QWidget {Q_OBJECTpublic:Widget(QWidget *parent 0);~Widget();void paintEvent(QPaintEvent *event); //重写绘图事件void timerEve…

GD32F10X ----RTC

1. RTC的简介 STM32 的实时时钟&#xff08;RTC&#xff09;是一个独立的定时器。STM32 的 RTC 模块拥有一组连续计数的计数器&#xff0c;在相应软件配置下&#xff0c;可提供时钟日历的功能。修改计数器的值可以重新设置系统当前的时间和日期。 RTC 模块和时钟配置…