
case class outer(f1:String,f2:Inner)
 case class outerV1(f1:String,f2:Inner,f3:Int)
 case class Inner(f3:String,f4:Int)
测试代码
package com.yy.table.convertimport org.apache.flink.streaming.api.scala.StreamExecutionEnvironment
import org.apache.flink.table.api._
import org.apache.flink.table.api.bridge.scala._
import org.apache.flink.table.types.DataTypeobject streamPOJO2table {case class outer(f1:String,f2:Inner)case class outerV1(f1:String,f2:Inner,f3:Int)case class Inner(f3:String,f4:Int)def main(args: Array[String]): Unit = {// flink1.13 流处理环境初始化val env = StreamExecutionEnvironment.getExecutionEnvironmentval tEnv = StreamTableEnvironment.create(env)import org.apache.flink.streaming.api.scala._val ds1: DataStream[outer] = env.fromElements(outer("a",Inner("b",2)),outer("d",Inner("e",4)))val table1: Table = tEnv.fromDataStream(ds1)
//    table1
//      .execute()
//      .print()/*+----+--------------------------------+--------------------------------+
| op |                             f1 |                             f2 |
+----+--------------------------------+--------------------------------+
| +I |                              a |                   (f3=b, f4=2) |
| +I |                              d |                   (f3=e, f4=4) |
+----+--------------------------------+--------------------------------+*///    table1
//      .print()/*5> +I[d, Inner(e,4)]
4> +I[a, Inner(b,2)]*/tEnv.createTemporaryView("view1", ds1)val tableResult1: TableResult = tEnv.executeSql("select f1,f2,(f2.f4 + 100) as f3 from view1")tableResult1.print()/*+----+--------------------------------+--------------------------------+-------------+
| op |                             f1 |                             f2 |          f3 |
+----+--------------------------------+--------------------------------+-------------+
| +I |                              a |                   (f3=b, f4=2) |         102 |
| +I |                              d |                   (f3=e, f4=4) |         104 |
+----+--------------------------------+--------------------------------+-------------+*///val t1: Table = tEnv.sqlQuery("select f1,f2,(f2.f4 + 100) as f3 from view1")
//    t1.print()//    println(t1.getResolvedSchema)/*
+----+--------------------------------+--------------------------------+-------------+
| op |                             f1 |                             f2 |          f3 |
+----+--------------------------------+--------------------------------+-------------+
| +I |                              a |                   (f3=b, f4=2) |         102 |
| +I |                              d |                   (f3=e, f4=4) |         104 |
+----+--------------------------------+--------------------------------+-------------+
2 rows in set
(`f1` STRING,`f2` *com.yy.table.convert.streamPOJO2table$Inner<`f3` STRING, `f4` INT NOT NULL>* NOT NULL,`f3` INT NOT NULL
)*/println("---- 1 -------")// tableResult转datastreamval o1: DataStream[outerV1] = tEnv.toDataStream[outerV1](t1,classOf[outerV1])
//    o1.print()println("---- 2 -------")tEnv.executeSql("""|select|f1|,f2.f3|,f2.f4|from view1|""".stripMargin)
//      .print()/*+----+--------------------------------+--------------------------------+--------------------------------+
| op |                             f1 |                             f3 |                             f4 |
+----+--------------------------------+--------------------------------+--------------------------------+
| +I |                              a |                              b |                              c |
| +I |                              d |                              e |                              f |
+----+--------------------------------+--------------------------------+--------------------------------+*/tEnv.executeSql("""|select|f1|,(f2.f3,f2.f4)|from view1|""".stripMargin)
//      .print()env.execute("jobName1")}}