如何创建自己公司的网站公司企业网站开发
如何创建自己公司的网站,公司企业网站开发,百度一下网页入口,大气网络公司网站源码Flink系列之#xff1a;Joins 一、Joins二、Regular Joins三、INNER Equi-JOIN四、OUTER Equi-JOIN五、Interval Joins六、Temporal Joins七、事件时间 Temporal Join八、处理时间 Temporal Join九、时态表函数连接十、Lookup Join十一、数组展开十二、表功能 一、Joins
适用… Flink系列之Joins 一、Joins二、Regular Joins三、INNER Equi-JOIN四、OUTER Equi-JOIN五、Interval Joins六、Temporal Joins七、事件时间 Temporal Join八、处理时间 Temporal Join九、时态表函数连接十、Lookup Join十一、数组展开十二、表功能 一、Joins
适用于流、批一体
Flink SQL支持对动态表进行复杂而灵活的连接操作。 为了处理不同的场景需要多种查询语义因此有几种不同类型的 Join。
默认情况下joins 的顺序是没有优化的。表的 join 顺序是在 FROM 从句指定的。可以通过把更新频率最低的表放在第一个、频率最高的放在最后这种方式来微调 join 查询的性能。需要确保表的顺序不会产生笛卡尔积因为不支持这样的操作并且会导致查询失败。
二、Regular Joins
Regular join 是最通用的 join 类型。在这种 join 下join 两侧表的任何新记录或变更都是可见的并会影响整个 join 的结果。 例如如果左边有一条新纪录在 Product.id 相等的情况下它将和右边表的之前和之后的所有记录进行 join。
SELECT * FROM Orders
INNER JOIN Product
ON Orders.productId Product.id对于流式查询regular join 的语法是最灵活的允许任何类型的更新插入、更新、删除输入表。 然而这种操作具有重要的操作意义Flink 需要将 Join 输入的两边数据永远保持在状态中。 因此计算查询结果所需的状态可能会无限增长这取决于所有输入表的输入数据量。你可以提供一个合适的状态 time-to-live (TTL) 配置来防止状态过大。注意这样做可能会影响查询的正确性。
三、INNER Equi-JOIN
根据 join 限制条件返回一个简单的笛卡尔积。目前只支持 equi-joins即至少有一个等值条件。不支持任意的 cross join 和 theta join。(cross join 指的是类似 SELECT * FROM table_a CROSS JOIN table_btheta join 指的是类似 SELECT * FROM table_a, table_b
SELECT *
FROM Orders
INNER JOIN Product
ON Orders.product_id Product.id四、OUTER Equi-JOIN
返回所有符合条件的笛卡尔积即所有通过 join 条件连接的行加上所有外表没有匹配到的行。Flink 支持 LEFT、RIGHT 和 FULL outer joins。目前只支持 equi-joins即至少有一个等值条件。不支持任意的 cross join 和 theta join。
SELECT *
FROM Orders
LEFT JOIN Product
ON Orders.product_id Product.idSELECT *
FROM Orders
RIGHT JOIN Product
ON Orders.product_id Product.idSELECT *
FROM Orders
FULL OUTER JOIN Product
ON Orders.product_id Product.id这个Flink SQL查询的目标是通过将Orders表与Product表进行FULL OUTER JOIN获取所有订单和产品的信息。
使用SELECT *来选择所有列。使用FULL OUTER JOIN将Orders表和Product表连接起来连接条件是Orders表的product_id列与Product表的id列相等。FULL OUTER JOIN操作将返回所有匹配和不匹配的行因此查询结果将包括Orders表和Product表中的所有数据。这个查询的结果将包含所有订单和产品的信息。
五、Interval Joins
返回一个符合 join 条件和时间限制的简单笛卡尔积。Interval join 需要至少一个 equi-join 条件和一个 join 两边都包含的时间限定 join 条件。范围判断可以定义成就像一个条件, , , 也可以是一个 BETWEEN 条件或者两边表的一个相同类型即处理时间 或 事件时间的时间属性 的等式判断。
例如如果订单是在被接收到4小时后发货这个查询会把所有订单和它们相应的 shipments join 起来。
SELECT *
FROM Orders o, Shipments s
WHERE o.id s.order_id
AND o.order_time BETWEEN s.ship_time - INTERVAL 4 HOUR AND s.ship_time这个SQL查询的目标是从订单数据表Orders和发货数据表Shipments中选择符合条件的所有列。
首先我们使用FROM关键字将Orders表和Shipments表进行连接使用逗号表示进行内连接。然后我们使用WHERE子句来指定连接条件即订单Orders和发货Shipments的订单ID必须相等。除此之外我们还使用AND子句来指定一个条件即订单时间o.order_time必须在发货时间s.ship_time之前的4小时内。最后使用SELECT *语句选择所有的列。这个查询的结果将返回满足连接条件和时间范围条件的所有订单和发货数据。
下面列举了一些有效的 interval join 时间条件
ltime rtimeltime rtime AND ltime rtime INTERVAL ‘10’ MINUTEltime BETWEEN rtime - INTERVAL ‘10’ SECOND AND rtime INTERVAL ‘5’ SECOND
对于流式查询对比 regular joininterval join 只支持有时间属性的非更新表。 由于时间属性是递增的Flink 从状态中移除旧值也不会影响结果的正确性。
六、Temporal Joins
时态表Temporal table是一个随时间变化的表在 Flink 中被称为动态表。时态表中的行与一个或多个时间段相关联所有 Flink 中的表都是时态的Temporal。 时态表包含一个或多个版本的表快照它可以是一个变化的历史表跟踪变化例如数据库变化日志包含所有快照或一个变化的维度表也可以是一个将变更物化的维表例如存放最终快照的数据表。
七、事件时间 Temporal Join
基于事件时间的 Temporal join 允许对版本表进行 join。 这意味着一个表可以使用变化的元数据来丰富并在某个时间点检索其具体值。
Temporal Joins 使用任意表左侧输入/探测端的每一行与版本表中对应的行进行关联右侧输入/构建端。 Flink 使用 SQL:2011标准 中的 FOR SYSTEM_TIME AS OF 语法去执行操作。 Temporal join 的语法如下
SELECT [column_list]
FROM table1 [AS alias1]
[LEFT] JOIN table2 FOR SYSTEM_TIME AS OF table1.{ proctime | rowtime } [AS alias2]
ON table1.column-name1 table2.column-name1有了事件时间属性即rowtime 属性就能检索到过去某个时间点的值。 这允许在一个共同的时间点上连接这两个表。 版本表将存储自最后一个 watermark 以来的所有版本按时间标识。
例如假设我们有一个订单表每个订单都有不同货币的价格。 为了正确地将该表统一为单一货币如美元,每个订单都需要与下单时相应的汇率相关联。
-- 创建订单表。这是一个标准
-- 仅追加动态表。
CREATE TABLE orders (order_id STRING,price DECIMAL(32,2),currency STRING,order_time TIMESTAMP(3),WATERMARK FOR order_time AS order_time - INTERVAL 15 SECOND
) WITH (/* ... */);-- 定义版本化的货币汇率表。
-- 这可能来自变更数据捕获
-- 例如 Debezium、压缩的 Kafka 主题或任何其他
-- 定义版本化表的方法。
CREATE TABLE currency_rates (currency STRING,conversion_rate DECIMAL(32, 2),update_time TIMESTAMP(3) METADATA FROM values.source.timestamp VIRTUAL,WATERMARK FOR update_time AS update_time - INTERVAL 15 SECOND,PRIMARY KEY(currency) NOT ENFORCED
) WITH (connector kafka,value.format debezium-json,/* ... */
);SELECT order_id,price,orders.currency,conversion_rate,order_time
FROM orders
LEFT JOIN currency_rates FOR SYSTEM_TIME AS OF orders.order_time
ON orders.currency currency_rates.currency;order_id price currency conversion_rate order_timeo_001 11.11 EUR 1.14 12:00:00
o_002 12.51 EUR 1.10 12:06:00这个Flink SQL查询的目标是将订单数据和货币汇率数据进行连接并在连接后的结果中选择特定的列。
首先我们使用CREATE TABLE语句创建了一个名为orders的表。该表包含了订单的相关信息如订单ID、价格、货币类型和订单时间。在定义该表时我们还为订单时间添加了一个WATERMARK。WATERMARK用于指定事件时间以便Flink能够按照事件时间进行处理。接下来我们使用CREATE TABLE语句创建了一个名为currency_rates的表。该表用于存储货币汇率的相关信息包括货币类型、兑换率和更新时间。这个表是一个版本化的表可以通过变更数据捕获如Debezium、压缩的Kafka主题或其他方式创建。最后我们使用SELECT语句从orders表中选择了订单ID、价格、订单货币类型、兑换率和订单时间这几个列。在选择这些列时我们还使用了LEFT JOIN关键字将orders表和currency_rates表进行连接。连接条件是订单的货币类型orders.currency和货币汇率表中的货币类型currency_rates.currency必须相等。此外我们还使用了FOR SYSTEM_TIME AS OF orders.order_time指定使用订单时间来选择货币汇率表中的数据。这样查询的结果将返回订单数据和相应的货币汇率信息。查询结果中的order_id、price、currency、conversion_rate和order_time列分别表示订单ID、价格、货币类型、兑换率和订单时间。每一行表示一个订单及其相应的货币汇率。
注意 事件时间 temporal join 是通过左和右两侧的 watermark 触发的 这里的 INTERVAL 时间减法用于等待后续事件以确保 join 满足预期。 请确保 join 两边设置了正确的 watermark 。
注意 事件时间 temporal join 需要包含主键相等的条件即currency_rates 表的主键 currency_rates.currency 包含在条件 orders.currency currency_rates.currency 中。
与 regular joins 相比就算 build side例子中的 currency_rates 表发生变更了之前的 temporal table 的结果也不会被影响。 与 interval joins 对比temporal join没有定义join的时间窗口。 Probe side 例子中的 orders 表的记录总是在 time 属性指定的时间与 build side 的版本行进行连接。因此build side 表的行可能已经过时了。 随着时间的推移不再被需要的记录版本对于给定的主键将从状态中删除。
八、处理时间 Temporal Join
基于处理时间的 temporal join 使用处理时间属性将数据与外部版本表例如 mysql、hbase的最新版本相关联。
通过定义一个处理时间属性这个 join 总是返回最新的值。可以将 build side 中被查找的表想象成一个存储所有记录简单的 HashMapK,V。 这种 join 的强大之处在于当无法在 Flink 中将表具体化为动态表时它允许 Flink 直接针对外部系统工作。
下面这个处理时间 temporal join 示例展示了一个追加表 orders 与 LatestRates 表进行 join。 LatestRates 是一个最新汇率的维表比如 HBase 表在 10:1510:3010:52这些时间LatestRates 表的数据看起来是这样的
10:15 SELECT * FROM LatestRates;currency rateUS Dollar 102
Euro 114
Yen 110:30 SELECT * FROM LatestRates;currency rateUS Dollar 102
Euro 114
Yen 110:52 SELECT * FROM LatestRates;currency rateUS Dollar 102
Euro 116 changed from 114 to 116
Yen 1LastestRates 表的数据在 10:15 和 10:30 是相同的。 欧元Euro的汇率rate在 10:52 从 114 变更为 116。
Orders 表示支付金额的 amount 和currency的追加表。 例如在 10:15 有一个金额为 2 Euro 的 order。
SELECT * FROM Orders;amount currency2 Euro arrived at time 10:151 US Dollar arrived at time 10:302 Euro arrived at time 10:52给出下面这些表我们希望所有 Orders 表的记录转换为一个统一的货币。
amount currency rate amount*rate2 Euro 114 228 arrived at time 10:151 US Dollar 102 102 arrived at time 10:302 Euro 116 232 arrived at time 10:52目前temporal join 还不支持与任意 view/table 的最新版本 join 时使用 FOR SYSTEM_TIME AS OF 语法。可以像下面这样使用 temporal table function 语法来实现时态表函数:
SELECTo_amount, r_rate
FROMOrders,LATERAL TABLE (Rates(o_proctime))
WHEREr_currency o_currency这个Flink SQL查询的目标是将订单数据和货币汇率数据进行连接并在连接后的结果中选择特定的列。
首先通过FROM子句指定了两个输入表即Orders和LATERAL TABLE (Rates(o_proctime))。Orders表示订单数据表而LATERAL TABLE (Rates(o_proctime))表示根据订单处理时间获取的货币汇率表。接下来在WHERE子句中指定了连接条件即货币汇率表中的货币类型(r_currency)必须与订单表中的货币类型(o_currency)相等。最后在SELECT子句中选择了o_amount和r_rate这两个列分别表示订单金额和货币汇率。查询结果将返回订单金额和对应的货币汇率。
注意 Temporal join 不支持与 table/view 的最新版本进行 join 时使用 FOR SYSTEM_TIME AS OF 语法是出于语义考虑因为左流的连接处理不会等待 temporal table 的完整快照这可能会误导生产环境中的用户。处理时间 temporal join 使用 temporal table function 也存在相同的语义问题但它已经存在了很长时间因此我们从兼容性的角度支持它。
processing-time 的结果是不确定的。 processing-time temporal join 常常用在使用外部系统来丰富流的数据。例如维表
与 regular joins 的差异就算 build side例子中的 currency_rates 表发生变更了之前的 temporal table 结果也不会被影响。 与 interval joins 的差异temporal join 没有定义数据连接的时间窗口。即旧数据没存储在状态中。
九、时态表函数连接
使用 temporal table function 去 join 表的语法和 Table Function 相同。
注意目前只支持 inner join 和 left outer join。
假设Rates是一个 temporal table function这个 join 在 SQL 中可以被表达为
SELECTo_amount, r_rate
FROMOrders,LATERAL TABLE (Rates(o_proctime))
WHEREr_currency o_currency上述 temporal table DDL 和 temporal table function 的主要区别在于
SQL 中可以定义 temporal table DDL但不能定义 temporal table 函数;temporal table DDL 和 temporal table function 都支持 temporal join 版本表但只有 temporal table function 可以 temporal join 任何表/视图的最新版本即处理时间 Temporal Join。
十、Lookup Join
lookup join 通常用于使用从外部系统查询的数据来丰富表。join 要求一个表具有处理时间属性另一个表由查找源连接器lookup source connnector支持。
lookup join 和上面的 处理时间 Temporal Join 语法相同右表使用查找源连接器支持。
下面的例子展示了 lookup join 的语法。
-- Customers 由 JDBC 连接器支持可用于查找连接
CREATE TEMPORARY TABLE Customers (id INT,name STRING,country STRING,zip STRING
) WITH (connector jdbc,url jdbc:mysql://mysqlhost:3306/customerdb,table-name customers
);-- 用客户信息丰富每个订单
SELECT o.order_id, o.total, c.country, c.zip
FROM Orders AS oJOIN Customers FOR SYSTEM_TIME AS OF o.proc_time AS cON o.customer_id c.id;在上面的示例中Orders 表由保存在 MySQL 数据库中的 Customers 表数据来丰富。带有后续 process time 属性的 FOR SYSTEM_TIME AS OF 子句确保在联接运算符处理 Orders 行时Orders 的每一行都与 join 条件匹配的 Customer 行连接。它还防止连接的 Customer 表在未来发生更新时变更连接结果。lookup join 还需要一个强制的相等连接条件在上面的示例中是 o.customer_id c.id。
十一、数组展开
返回给定数组中每个元素的新行。尚不支持 WITH ORDINALITY会额外生成一个标识顺序的整数列展开。
SELECT order_id, tag
FROM Orders CROSS JOIN UNNEST(tags) AS t (tag)十二、表功能
将表与表函数的结果联接。左侧外部表的每一行都与表函数的相应调用产生的所有行相连接。用户自定义表函数 必须在使用前注册。
INNER JOIN
如果表函数调用返回一个空结果那么左表的这行数据将不会输出。
SELECT order_id, res
FROM Orders,
LATERAL TABLE(table_func(order_id)) t(res)这个Flink SQL查询的目标是从订单数据表中获取订单ID和通过自定义表函数根据订单ID计算得出的结果。
首先通过FROM子句指定了输入表Orders。然后通过LATERAL TABLE子句调用了自定义的表函数table_func并将订单ID作为参数传递给函数。表函数的作用是根据订单ID计算得出一个结果。接下来在SELECT子句中选择了order_id和res这两个列分别表示订单ID和通过表函数计算得到的结果。查询结果将返回订单ID和对应的表函数计算结果。
LEFT OUTER JOIN
如果表函数调用返回了一个空结果则保留相应的行并用空值填充未关联到的结果。当前针对 lateral table 的 left outer join 需要 ON 子句中有一个固定的 TRUE 连接条件。
SELECT order_id, res
FROM Orders
LEFT OUTER JOIN LATERAL TABLE(table_func(order_id)) t(res)ON TRUE这个Flink SQL查询的目标是从订单数据表中获取订单ID和通过自定义表函数根据订单ID计算得出的结果并使用左外连接将计算结果与订单数据进行关联。
首先通过FROM子句指定了输入表Orders。然后通过LATERAL TABLE子句调用了自定义的表函数table_func并将订单ID作为参数传递给函数。表函数的作用是根据订单ID计算得出一个结果。接下来使用LEFT OUTER JOIN关键字将计算结果与订单数据进行连接。使用ON TRUE来指定连接条件这意味着对所有的订单都执行连接操作。最后在SELECT子句中选择了order_id和res这两个列分别表示订单ID和通过表函数计算得到的结果。查询结果将返回订单ID和对应的表函数计算结果如果没有计算结果的话会返回NULL值。
本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/diannao/91977.shtml
如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!