什么是维表join?
对于每条流式数据,可以关联一个外部维表数据源,为FlinkSql实时计算提供数据关联查询。
说明: 维表是一张不断变化的表,在维表JOIN时,需指明该条记录关联维表快照的时刻。维表JOIN仅支持对当前时刻维表快照的关联,未来会支持关联左表proctime或rowtime所对应的维表快照。
维表join语法:
SELECT [column_list]
FROM table1 [AS <alias1>]
[LEFT] JOIN table2 FOR SYSTEM_TIME AS OF table1.{ proctime | rowtime } [AS <alias2>]
ON table1.column-name1 = table2.column-name1测试语句:
kafka事实表:
        // 2. 定义输入表,从Kafka消费数据tableEnv.executeSql("CREATE TABLE sourceTable (\n" +"  `user_id` STRING,\n" +"  `item_id` INTEGER,\n" +"  `behavior` STRING,\n" +"  `ts`    STRING,\n" +"  `body` ROW<id STRING,name STRING,code STRING> ,\n" +"`proctime` as PROCTIME()"+") WITH (\n" +"  'connector' = 'kafka',\n" +"  'topic' = 'test-data',\n" +"  'properties.bootstrap.servers' = '127.0.0.1:9092',\n" +"  'properties.group.id' = 'test-data',\n" +"  'scan.startup.mode' = 'latest-offset',\n" +"  'format' = 'json', \n" +//字段丢失任务不失败"   'json.fail-on-missing-field' = 'true',\n"+//-- 解析失败跳过"   'json.ignore-parse-errors' = 'false' \n" + ")");mysql维表:
        tableEnv.executeSql("CREATE TABLE dim_province (\n" +"                        province_id BIGINT,\n" +"                        province_name  VARCHAR,\n" +"                        region_name VARCHAR \n" +"                ) WITH (\n" +"                        'connector.type' = 'jdbc',\n" +"                        'connector.url' = 'jdbc:mysql://localhost:3306/sms?useUnicode=true&characterEncoding=UTF-8&serverTimezone=Asia/Shanghai',\n" +"                        'connector.table' = 'dim_province',\n" +"                        'connector.username' = 'root',\n" +"                        'connector.password' = 'root'\n" +
//                        "                        'connector.lookup.cache.max-rows' = '1',\n" +
//                        "                        'connector.lookup.cache.ttl' = '5s'\n" +"                )");执行查询:
        tableEnv.executeSql("select sourceTable.item_id,sourceTable.ts,dim_province.province_name,sourceTable.proctime" +"" +" from sourceTable  join dim_province " +"   FOR SYSTEM_TIME AS OF sourceTable.proctime   \n"+"ON sourceTable.item_id = dim_province.province_id").print();结果如示:
+----+-------------+--------------------------------+--------------------------------+-------------------------+
 | op |     item_id |                             ts |                  province_name |                proctime |
 +----+-------------+--------------------------------+--------------------------------+-------------------------+
 | +I |           2 |                  1690786451861 |                            222 | 2023-07-31 15:07:49.673 |
 | +I |           2 |                  1690786451861 |                            222 | 2023-07-31 15:08:33.763 |
 | +I |           3 |                  1690786451861 |                            333 | 2023-07-31 15:09:04.121 |
 | +I |           2 |                  1690786451861 |                         222222 | 2023-07-31 15:09:45.225 |