Table API 和 SQL 进行基于时间的操作(比如时间窗口)时需要定义相关的时间语义和时间数据来源的信息。因此会给表单独提供一个逻辑上的时间字段,专门用来在表处理程序中指示时间 时间属性(time attributes),其实就是每个表模式结构(schema)的一部分。它可以在创建表的 DDL 里直接定义为一个字段,也可以在 DataStream 转换成表时定义。一旦定义了时间属性,就可以作为一个普通字段引用,并且可以在基于时间的操作中使用 时间属性的数据类型为 TIMESTAMP,类似于常规时间戳,可以直接访问并且进行计算。 按照时间语义的不同,可以把时间属性的定义分成事件时间(event time)和处理时间(processing time) 
public  class  TestTableProcessingTime  { public  static  void  main ( String [ ]  args)  throws  Exception  { StreamExecutionEnvironment  env =  StreamExecutionEnvironment . getExecutionEnvironment ( ) ; env. setParallelism ( 1 ) ; StreamTableEnvironment  tableEnv =  StreamTableEnvironment . create ( env) ; DataStream < String > =  env. readTextFile ( "./sensor.txt" ) ; DataStream < SensorReading > =  inputStream. map ( line ->  { String [ ]  fields =  line. split ( "," ) ; return  new  SensorReading ( fields[ 0 ] ,  new  Long ( fields[ 1 ] ) ,  new  Double ( fields[ 2 ] ) ) ; } ) ; Table  sensorTable =  tableEnv. fromDataStream ( dataStream,  "id, timestamp as ts, temperature, pt.proctime" ) ; tableEnv. connect ( new  FileSystem ( ) . path ( "./sensor.txt" ) )  . withFormat ( new  Csv ( ) )  . withSchema ( new  Schema ( )  . field ( "id" ,  DataTypes . STRING ( ) )  . field ( "timestamp" ,  DataTypes . BIGINT ( ) ) . field ( "temperature" ,  DataTypes . DOUBLE ( ) ) . field ( "pt" ,  DataTypes . TIMESTAMP ( 3 ) ) . proctime ( ) ) . createTemporaryTable ( "sensorTable" ) ; String  sinkDDL=  "create table sensorTable ("  + " id varchar(20) not null, "  + " ts bigint, "  + " temperature double, "  + " pt AS PROCTIME() "  + ") with ("  + " 'connector.type' = 'filesystem', "  + " 'connector.path' = '/sensor.txt', "  + " 'format.type' = 'csv')" ; tableEnv. sqlUpdate ( sinkDDL) ; env. execute ( ) ; } 
} 
public  class  TestTableEventTime  { public  static  void  main ( String [ ]  args)  throws  Exception  { StreamExecutionEnvironment  env =  StreamExecutionEnvironment . getExecutionEnvironment ( ) ; env. setParallelism ( 1 ) ; env. setStreamTimeCharacteristic ( TimeCharacteristic. EventTime ) ; StreamTableEnvironment  tableEnv =  StreamTableEnvironment . create ( env) ; DataStream < String > =  env. readTextFile ( "./sensor.txt" ) ; DataStream < SensorReading > =  inputStream. map ( line ->  { String [ ]  fields =  line. split ( "," ) ; return  new  SensorReading ( fields[ 0 ] ,  new  Long ( fields[ 1 ] ) ,  new  Double ( fields[ 2 ] ) ) ; } ) . assignTimestampsAndWatermarks ( new  BoundedOutOfOrdernessTimestampExtractor < SensorReading > ( Time . seconds ( 2 ) )  { @Override public  long  extractTimestamp ( SensorReading  element)  { return  element. getTimestamp ( )  *  1000L ; } } ) ; Table  sensorTable =  tableEnv. fromDataStream ( dataStream,  "id, timestamp as ts, temperature, rt.rowtime" ) ;  tableEnv. connect ( new  FileSystem ( ) . path ( "./sensor.txt" ) )  . withFormat ( new  Csv ( ) )  . withSchema ( new  Schema ( )  . field ( "id" ,  DataTypes . STRING ( ) )  . field ( "timestamp" ,  DataTypes . BIGINT ( ) ) . rowtime (  new  RowTime ( ) . timestampsFromField ( "timestamp" )   . watermarksPeriodicBounded ( 1000 )    ) . field ( "temperature" ,  DataTypes . DOUBLE ( ) ) ) . createTemporaryTable ( "sensorTable" ) ; String  sinkDDL =  "create table dataTable ("  + " id varchar(20) not null, "  +  " ts bigint, "  + " temperature double, "  + " rt AS TO_TIMESTAMP( FROM_UNIXTIME(ts) ), "  + " watermark for rt as rt - interval '1' second"  + ") with ("  + " 'connector.type' = 'filesystem', "  + " 'connector.path' = '/sensor.txt', "  + " 'format.type' = 'csv')" ; tableEnv. sqlUpdate ( sinkDDL) ; env. execute ( ) ; } 
}