Flink SQL支持对动态表进行复杂而灵活的连接操作,本文为您介绍如何使用双流JOIN语句。
背景信息
实时计算的JOIN和传统批处理JOIN的语义一致,都用于将两张表关联起来。区别为实时计算关联的是两张动态表,关联的结果也会动态更新,以保证最终结果和批处理结果一致。
双流JOIN语法
tableReference [, tableReference ]*
| tableExpression [ NATURAL | INNER ] [ { LEFT | RIGHT | FULL } [ OUTER ] ] JOIN tableExpression [ joinCondition ]
| tableExpression CROSS JOIN tableExpression
| tableExpression [ CROSS | OUTER ] APPLY tableExpressionjoinCondition:
ON booleanExpression
| USING '(' column [, column ]* ')'-  tableReference:表名称。 
-  tableExpression:表达式。 
-  joinCondition:JOIN条件。 
双流JOIN hints
从实时计算引擎VVR 8.0.1 开始,您可以通过提示(Hints)单独为双流JOIN的左右流状态设置不同生命周期 (TTL)来减少维护的状态大小。
-  语法 -- VVR 8.0.1 开始 SELECT /*+ JOIN_STATE_TTL('tableReference1' = 'ttl1' [, 'tableReference2' = 'ttl2']*) */ ...-- VVR 8.0.7 开始,您也可以使用社区的Join State TTL Hint语法 SELECT /*+ STATE_TTL('tableReference1' = 'ttl1' [, 'tableReference2' = 'ttl2']*) */ ...
-  注意事项 -  JOIN STATE TTL HINT仅支持在双流JOIN场景使用,不支持维表JOIN、Interval Join或Window Join。 
-  若双流JOIN时JOIN STATE TTL HINT仅指定某一条流的在JOIN节点的状态生命周期,则另外一条流的状态生命周期使用Flink SQL作业级别的状态生命周期,由table.exec.state.ttl控制(参见基本配置),默认值为1.5天。 
-  tableReference支持表名,视图名和别名,一旦为表名指定别名时,则需使用别名。 
-  这是一个实验性质的特性,HINT语法未来可能会发生变化。 
 
-  
-  示例 -- HINT使用别名 SELECT /*+ JOIN_STATE_TTL('o' = '3d', 'p' = '1d') */o.rowtime, o.productid, o.orderid, o.units, p.name, p.unitpriceFROM Orders AS oJOIN Products AS p ON o.productid = p.productid; -- VVR 8.0.7及以上版本也可以使用新语法 SELECT /*+ STATE_TTL('o' = '3d', 'p' = '1d') */o.rowtime, o.productid, o.orderid, o.units, p.name, p.unitpriceFROM Orders AS oJOIN Products AS p ON o.productid = p.productid;-- HINT使用表名 SELECT /*+ JOIN_STATE_TTL('Orders' = '3d', 'Products' = '1d') */ *FROM OrdersJOIN Products ON Orders.productid = Products.productid; -- VVR 8.0.7及以上版本也可以使用新语法 SELECT /*+ STATE_TTL('Orders' = '3d', 'Products' = '1d') */ *FROM OrdersJOIN Products ON Orders.productid = Products.productid;-- HINT使用视图名 CREATE TEMPORARY VIEW v AS SELECT id, ...FROM (SELECT id, ROW_NUMBER() OVER (PARTITION BY ... ORDER BY ..) AS rnFROM src1WHERE ...) tmp WHERE rn = 1;SELECT /*+ JOIN_STATE_TTL('v' = '1d', 'b' = '3d') */ v.* , b.* FROM v LEFT JOIN src2 AS b ON v.id = b.id; -- VVR 8.0.7及以上版本也可以使用新语法 SELECT /*+ STATE_TTL('v' = '1d', 'b' = '3d') */ v.* , b.* FROM v LEFT JOIN src2 AS b ON v.id = b.id;
Orders JOIN Products表的数据示例
-  测试数据 表 1. Orders rowtime productid orderid units 10:17:0030 5 4 10:17:0510 6 1 10:18:0520 7 2 10:18:0730 8 20 11:02:0010 9 6 11:04:0010 10 1 11:09:3040 11 12 11:24:1110 12 4 表 2. Products productid name unitprice 30 Cheese 1710 Beer 0.2520 Wine 630 Cheese 1710 Beer 0.2510 Beer 0.2540 Bread 10010 Beer 0.25
-  测试语句 SELECT o.rowtime, o.productid, o.orderid, o.units, p.name, p.unitpriceFROM Orders AS oJOIN Products AS p ON o.productid = p.productid;
-  测试结果 o.rowtime o.productid o.orderid o.units p.name p.unitprice 10:17:00 30 5 4 Cheese 17.00 10:17:00 30 5 4 Cheese 17.00 10:17:05 10 6 1 Beer 0.25 10:17:05 10 6 1 Beer 0.25 10:17:05 10 6 1 Beer 0.25 10:17:05 10 6 1 Beer 0.25 10:18:05 20 7 2 Wine 6.00 10:18:07 30 8 20 Cheese 17.00 10:18:07 30 8 20 Cheese 17.00 11:02:00 10 9 6 Beer 0.25 11:02:00 10 9 6 Beer 0.25 11:02:00 10 9 6 Beer 0.25 11:02:00 10 9 6 Beer 0.25 11:04:00 10 10 1 Beer 0.25 11:04:00 10 10 1 Beer 0.25 11:04:00 10 10 1 Beer 0.25 11:04:00 10 10 1 Beer 0.25 11:09:30 40 11 12 Bread 100.00 11:24:11 10 12 4 Beer 0.25 11:24:11 10 12 4 Beer 0.25 11:24:11 10 12 4 Beer 0.25 11:24:11 10 12 4 Beer 0.25 
datahub_stream1 JOIN datahub_stream2表的数据示例
-  测试数据 表 3. datahub_stream1 a(BIGINT) b(BIGINT) c(VARCHAR) 0 10 test11 1 10 test21 表 4. datahub_stream2 a(BIGINT) b(BIGINT) c(VARCHAR) 0 10 test11 1 10 test21 0 10 test31 1 10 test41 
-  测试语句 SELECT s1.c,s2.c FROM datahub_stream1 AS s1 JOIN datahub_stream2 AS s2 ON s1.a = s2.a WHERE s1.a = 0;
-  测试结果 s1.c(VARCHAR) s2.c(VARCHAR) test11 test11 test11 test31