列举的flink Table API的数据类型。并生成与这些类型匹配的数据。
同时比较了DataType或LoglicalType默认conversionClass
与Flink Table API中规定的内部类型的conversionClass
的异同。
一、添加maven pom依赖
用于生成假数据。
<dependency><groupId>net.datafaker</groupId><artifactId>datafaker</artifactId><version>1.6.0</version><scope>test</scope></dependency>
二、生成假数据的工具类
注意:此类生成的数据类型是FlinkTable API规定的内部数据类型。
import lombok.Data;
import net.datafaker.Address;
import net.datafaker.Faker;
import net.datafaker.Internet;
import net.datafaker.Name;
import org.apache.flink.table.api.DataTypes;
import org.apache.flink.table.data.DecimalData;
import org.apache.flink.table.data.StringData;
import org.apache.flink.table.data.TimestampData;
import org.apache.flink.table.types.DataType;import java.math.BigDecimal;
import java.time.*;
import java.util.Locale;
import java.util.Random;
import java.util.function.Supplier;/*** @author: * @create: * @Description: 注意:只能生成Flink Table的内部类型数据!!!*/
public class FlinkInternalDataFakers {private FlinkInternalDataFakers() {}@Datapublic abstract static class FLinkInternalDataFaker<OUT> implements Supplier<OUT>{protected String name;public FLinkInternalDataFaker(String name) {this.name=name;}protected abstract DataType getProducedDataType();}private static abstract class CharFakerBase extends FLinkInternalDataFaker<StringData> {private final Faker faker = new Faker(Locale.CHINA);private final Name name = faker.name();private final Random random = new Random();protected final int len;public final static int INDEX_MIN = 0;public final static int INDEX_MAN = 128;public CharFakerBase(String name,int len) {super(name);this.len = len;}public StringData get() {String nm = name.fullName();String str = nm.substring(0, Math.min(len, nm.length()));return StringData.fromString(str);}}public static class CharFaker extends CharFakerBase {public CharFaker(String name,int len) {super(name,len);}@Overridepublic DataType getProducedDataType() {return DataTypes.CHAR(len).bridgedTo(StringData.class);}}public static class VarCharFaker extends CharFakerBase {public VarCharFaker(String name,int len) {super(name,len);}@Overridepublic DataType getProducedDataType() {return DataTypes.VARCHAR(len).bridgedTo(StringData.class);}}public static class StringDataFaker extends FLinkInternalDataFaker<StringData> {final Faker faker = new Faker(Locale.CHINA);final Name name = faker.name();final Address address = faker.address();final Internet internet = faker.internet();public StringDataFaker(String name) {super(name);}public StringData get() {return StringData.fromString(name.name() + "|" + address.city() + "|" + internet.emailAddress());}@Overridepublic DataType getProducedDataType() {return DataTypes.STRING().bridgedTo(StringData.class);}}public static class BooleanFaker extends FLinkInternalDataFaker<Boolean> {private final Random random = new Random();public BooleanFaker(String name) {super(name);}@Overridepublic Boolean get() {return random.nextBoolean();}@Overridepublic DataType getProducedDataType() {return DataTypes.BOOLEAN();}}private static abstract class BinaryFakerBase extends FLinkInternalDataFaker<byte[]> {private final Faker faker = new Faker(Locale.CHINA);protected final int len;public BinaryFakerBase(String name,int len) {super(name);this.len = len;}@Overridepublic byte[] get() {String s = faker.name().fullName();byte[] bytes = s.getBytes();byte[] output = new byte[len];System.arraycopy(bytes, 0, output, 0, Math.min(len, bytes.length));return output;}}public static class BinaryFaker extends BinaryFakerBase {public BinaryFaker(String name,int len) {super(name,len);}@Overridepublic DataType getProducedDataType() {return DataTypes.BINARY(len);}}public static class VarBinaryFaker extends BinaryFakerBase {public VarBinaryFaker(String name,int len) {super(name,len);}@Overridepublic DataType getProducedDataType() {return DataTypes.VARBINARY(len);}}public static class BytesFaker extends FLinkInternalDataFaker<byte[]> {private final Faker faker = new Faker(Locale.CHINA);private final Name nameFaker = faker.name();public BytesFaker(String name) {super(name);}@Overridepublic byte[] get() {return nameFaker.fullName().getBytes();}@Overridepublic DataType getProducedDataType() {return DataTypes.BYTES();}}public static class DecimalDataFaker extends FLinkInternalDataFaker<DecimalData> {private final Random random = new Random();private final int precision;private final int scale;public DecimalDataFaker(String name,int precision, int scale) {super(name);this.precision = precision;this.scale = scale;}@Overridepublic DecimalData get() {long itg = random.nextInt((int)Math.pow(10,precision-scale));double dbl = random.nextDouble();double dig = dbl % 1;double dcm = itg + dig;String str = String.format("%." + scale + "f", dcm);BigDecimal bigDecimal = new BigDecimal(str);return DecimalData.fromBigDecimal(bigDecimal, precision, scale);}@Overridepublic DataType getProducedDataType() {return DataTypes.DECIMAL(precision,scale).bridgedTo(DecimalData.class);}}public static class TinyIntFaker extends FLinkInternalDataFaker<Byte> {private final Random random = new Random();public TinyIntFaker(String name) {super(name);}@Overridepublic Byte get() {return (byte) random.nextInt(128);}@Overridepublic DataType getProducedDataType() {return DataTypes.TINYINT();}}public static class SmallIntFaker extends FLinkInternalDataFaker<Short> {private final Random random = new Random();public SmallIntFaker(String name) {super(name);}@Overridepublic Short get() {return (short) random.nextInt();}@Overridepublic DataType getProducedDataType() {return DataTypes.SMALLINT();}}public static class IntFaker extends FLinkInternalDataFaker<Integer> {private final Random random = new Random();public IntFaker(String name) {super(name);}@Overridepublic Integer get() {return random.nextInt();}@Overridepublic DataType getProducedDataType() {return DataTypes.INT();}}public static class BigIntFaker extends FLinkInternalDataFaker<Long> {private final Random random = new Random();public BigIntFaker(String name) {super(name);}@Overridepublic Long get() {return random.nextLong();}@Overridepublic DataType getProducedDataType() {return DataTypes.BIGINT();}}public static class FloatFaker extends FLinkInternalDataFaker<Float> {private final Random random = new Random();public FloatFaker(String name) {super(name);}@Overridepublic Float get() {return random.nextFloat()*(float)Math.pow(10,random.nextInt(9));}@Overridepublic DataType getProducedDataType() {return DataTypes.FLOAT();}}public static class DoubleFaker extends FLinkInternalDataFaker<Double> {private final Random random = new Random();public DoubleFaker(String name) {super(name);}@Overridepublic Double get() {return random.nextDouble()*Math.pow(10,random.nextInt(9));}@Overridepublic DataType getProducedDataType() {return DataTypes.DOUBLE();}}public static class DateFaker extends FLinkInternalDataFaker<Integer> {private final Random random = new Random();int MAX_DATE = (int)LocalDate.of(2099, 12, 31).toEpochDay();public DateFaker(String name) {super(name);}@Overridepublic Integer get() {return random.nextInt(MAX_DATE);}@Overridepublic DataType getProducedDataType() {return DataTypes.DATE().bridgedTo(Integer.class);}}public static class TimeFaker extends FLinkInternalDataFaker<Integer> {public static final int SECOND_OF_DAY = 24 * 60 * 60;private final Random random = new Random();private final int scale;public TimeFaker(String name,int scale) {super(name);this.scale = scale;}@Overridepublic Integer get() {int sec = random.nextInt(SECOND_OF_DAY);int milli = random.nextInt(1_000);return sec * 1_000 + milli;}@Overridepublic DataType getProducedDataType() {return DataTypes.TIME(scale).bridgedTo(Integer.class);}}public static class TimestampFaker extends FLinkInternalDataFaker<TimestampData> {private final Random random = new Random();private final int scale;public static final long MAX_SECONDS = LocalDateTime.of(LocalDate.of(2099, 12, 31), LocalTime.MAX).toEpochSecond(ZoneOffset.of("+8"));public static final int MAX_NANO_SECONDS = 999_999_999;public TimestampFaker(String name,int scale) {super((name));this.scale = scale;}@Overridepublic TimestampData get() {long l = Math.abs(random.nextLong());long secs = l % MAX_SECONDS;int nanos = random.nextInt((int) Math.pow(10, scale)) * (int)Math.pow(10,(9 - scale));LocalDateTime localDateTime = LocalDateTime.ofEpochSecond(secs, nanos, ZoneOffset.UTC);return TimestampData.fromLocalDateTime(localDateTime);}@Overridepublic DataType getProducedDataType() {return DataTypes.TIMESTAMP(scale).bridgedTo(TimestampData.class);}}
}
三、编写 SourceFunction 生成数据。
内含启动main函数入口。
DataStream<RowData> --> DataStream<Row> --> Table
的流程。最终打印模拟数据。
其中的 RowUtils
工具类,请参考Flink RowData 与 Row 相互转化工具类
import com.h3c.it_bigdata.module.transfer.util.RowUtils;
import lombok.AllArgsConstructor;
import lombok.Data;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.source.ParallelSourceFunction;
import org.apache.flink.table.api.DataTypes;
import org.apache.flink.table.api.Schema;
import org.apache.flink.table.api.Table;
import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
import org.apache.flink.table.data.*;
import org.apache.flink.table.runtime.typeutils.InternalTypeInfo;
import org.apache.flink.table.types.DataType;
import org.apache.flink.table.types.logical.RowType;
import org.apache.flink.table.types.utils.DataTypeUtils;
import org.apache.flink.types.Row;
import org.apache.flink.types.RowKind;import java.util.ArrayList;
import java.util.List;
import java.util.Random;
import java.util.function.Supplier;public class FlinkFakeTypeSourceFunc implements ParallelSourceFunction<RowData> {private final int interval;public FlinkFakeTypeSourceFunc(int interval) {this.interval = interval;}public static final FlinkInternalDataFakers.FLinkInternalDataFaker<?>[] typeFakerArr;public final static DataType rowDataType;public final RowKind[] ROW_KINDS = new RowKind[]{RowKind.INSERT, RowKind.UPDATE_AFTER, RowKind.UPDATE_BEFORE, RowKind.DELETE};static {List<FlinkInternalDataFakers.FLinkInternalDataFaker<?>> fakers = new ArrayList<>();FlinkInternalDataFakers.CharFaker char10Faker = new FlinkInternalDataFakers.CharFaker("char10",10);fakers.add(char10Faker);FlinkInternalDataFakers.VarCharFaker varChar10Faker = new FlinkInternalDataFakers.VarCharFaker("varchar10", 10);fakers.add(varChar10Faker);FlinkInternalDataFakers.StringDataFaker stringDataFaker = new FlinkInternalDataFakers.StringDataFaker("string");fakers.add(stringDataFaker);FlinkInternalDataFakers.BooleanFaker booleanFaker = new FlinkInternalDataFakers.BooleanFaker("boolean");fakers.add(booleanFaker);FlinkInternalDataFakers.DecimalDataFaker decimalData103Faker = new FlinkInternalDataFakers.DecimalDataFaker("decimal103",10, 3);fakers.add(decimalData103Faker);FlinkInternalDataFakers.TinyIntFaker tinyIntFaker = new FlinkInternalDataFakers.TinyIntFaker("tinyint");fakers.add(tinyIntFaker);FlinkInternalDataFakers.SmallIntFaker smallIntFaker = new FlinkInternalDataFakers.SmallIntFaker("smallint");fakers.add(smallIntFaker);FlinkInternalDataFakers.IntFaker intFaker = new FlinkInternalDataFakers.IntFaker("integer");fakers.add(intFaker);FlinkInternalDataFakers.BigIntFaker bigIntFaker = new FlinkInternalDataFakers.BigIntFaker("bigint");fakers.add(bigIntFaker);FlinkInternalDataFakers.FloatFaker floatFaker = new FlinkInternalDataFakers.FloatFaker("float");fakers.add(floatFaker);FlinkInternalDataFakers.DoubleFaker doubleFaker = new FlinkInternalDataFakers.DoubleFaker("double");fakers.add(doubleFaker);FlinkInternalDataFakers.DateFaker dateFaker = new FlinkInternalDataFakers.DateFaker("date");fakers.add(dateFaker);FlinkInternalDataFakers.TimeFaker time0Faker = new FlinkInternalDataFakers.TimeFaker("time0",0);fakers.add(time0Faker);FlinkInternalDataFakers.TimeFaker time3Faker = new FlinkInternalDataFakers.TimeFaker("time3",3);fakers.add(time3Faker);FlinkInternalDataFakers.TimestampFaker timestamp0Faker = new FlinkInternalDataFakers.TimestampFaker("timestamp0",0);fakers.add(timestamp0Faker);FlinkInternalDataFakers.TimestampFaker timestamp3Faker = new FlinkInternalDataFakers.TimestampFaker("timestamp3",3);fakers.add(timestamp3Faker);FlinkInternalDataFakers.TimestampFaker timestamp6Faker = new FlinkInternalDataFakers.TimestampFaker("timestamp6", 6);fakers.add(timestamp6Faker);FlinkInternalDataFakers.BinaryFaker binary5Faker = new FlinkInternalDataFakers.BinaryFaker("binary5",5);fakers.add(binary5Faker);FlinkInternalDataFakers.VarBinaryFaker varBinary5Faker = new FlinkInternalDataFakers.VarBinaryFaker("varbinary5", 5);fakers.add(varBinary5Faker);FlinkInternalDataFakers.BytesFaker bytesFaker = new FlinkInternalDataFakers.BytesFaker("bytes");fakers.add(bytesFaker);rowDataType = DataTypes.ROW(fakers.stream().map(f->DataTypes.FIELD(f.getName(),f.getProducedDataType())).toArray(DataTypes.Field[]::new));typeFakerArr = fakers.toArray(new FlinkInternalDataFakers.FLinkInternalDataFaker<?>[0]);}public DataType getRowDataType() {return rowDataType;}public void printFLinkInternalType(){System.out.println("------------------------------------------------------");for (int i = 0; i < typeFakerArr.length; i++) {String fieldName = typeFakerArr[i].getName();DataType dataType = typeFakerArr[i].getProducedDataType();// 因为typeFakerArr已经将所有的类型BridgeTo Flink内不类型了。这里再次 toInternalDataType 意在告知Flink Table API Internal Type的来源。// DataType internalDataType = DataTypeUtils.toInternalDataType(dataType);System.out.printf("FieldName: %s --> TypeName: %s --> DefaultConversionClass: %s --> TypeConversionClass: %s%n",fieldName,dataType.toString(),dataType.getLogicalType().getDefaultConversion().getSimpleName(),dataType.getConversionClass().getSimpleName());}System.out.println("------------------------------------------------------");}@Overridepublic void run(SourceContext<RowData> sourceContext) throws Exception {Random random = new Random();while (true) {GenericRowData rowData = new GenericRowData(ROW_KINDS[random.nextInt(ROW_KINDS.length)], typeFakerArr.length);for (int i = 0; i < typeFakerArr.length; i++) {rowData.setField(i, typeFakerArr[i].get());}sourceContext.collect(rowData);Thread.sleep(interval * 1000L);}}@Overridepublic void cancel() {}public static void main(String[] args) throws Exception {// FlinkEnvProvider provider = new FlinkEnvProvider();// StreamExecutionEnvironment senv = provider.getSenv();// StreamTableEnvironment tenv = provider.getTenv();StreamExecutionEnvironment senv = StreamExecutionEnvironment.getExecutionEnvironment();StreamTableEnvironment tenv = StreamTableEnvironment.create(senv);FlinkFakeTypeSourceFunc fakeSourceFunc = new FlinkFakeTypeSourceFunc(3);fakeSourceFunc.printFLinkInternalType();DataType dataType = fakeSourceFunc.getRowDataType();InternalTypeInfo<RowData> typeInfo = InternalTypeInfo.of((RowType) dataType.getLogicalType());DataStreamSource<RowData> rowDataDs = senv.addSource(fakeSourceFunc, "flinkTypeFakeSource", typeInfo);// rowDataDs.print();RowUtils.TypedMapFunc<RowData, Row> mapFunc = RowUtils.getRowDataToRowMapFunc(dataType);SingleOutputStreamOperator<Row> rowDs = rowDataDs.map(mapFunc).returns(mapFunc.getProducedType());DataType producedDataType = mapFunc.getProducedDataType();Schema schema = Schema.newBuilder().fromRowDataType(producedDataType).build();Table tbl = tenv.fromChangelogStream(rowDs, schema);DataType dt = tbl.getResolvedSchema().toPhysicalRowDataType();tbl.execute().print();senv.execute();}
}
三、LoglicalType默认承载类型和Fink Table 内部类型比较
printFLinkInternalType
方法将数据类型打印出来。
注意有些DataType没有使用LogicalType的DefaultConversionClass
。
FieldName: char10 --> TypeName: CHAR(10) --> DefaultConversionClass: String --> TypeConversionClass: StringData
FieldName: varchar10 --> TypeName: VARCHAR(10) --> DefaultConversionClass: String --> TypeConversionClass: StringData
FieldName: string --> TypeName: STRING --> DefaultConversionClass: String --> TypeConversionClass: StringData
FieldName: boolean --> TypeName: BOOLEAN --> DefaultConversionClass: Boolean --> TypeConversionClass: Boolean
FieldName: decimal103 --> TypeName: DECIMAL(10, 3) --> DefaultConversionClass: BigDecimal --> TypeConversionClass: DecimalData
FieldName: tinyint --> TypeName: TINYINT --> DefaultConversionClass: Byte --> TypeConversionClass: Byte
FieldName: smallint --> TypeName: SMALLINT --> DefaultConversionClass: Short --> TypeConversionClass: Short
FieldName: integer --> TypeName: INT --> DefaultConversionClass: Integer --> TypeConversionClass: Integer
FieldName: bigint --> TypeName: BIGINT --> DefaultConversionClass: Long --> TypeConversionClass: Long
FieldName: float --> TypeName: FLOAT --> DefaultConversionClass: Float --> TypeConversionClass: Float
FieldName: double --> TypeName: DOUBLE --> DefaultConversionClass: Double --> TypeConversionClass: Double
FieldName: date --> TypeName: DATE --> DefaultConversionClass: LocalDate --> TypeConversionClass: Integer
FieldName: time0 --> TypeName: TIME(0) --> DefaultConversionClass: LocalTime --> TypeConversionClass: Integer
FieldName: time3 --> TypeName: TIME(3) --> DefaultConversionClass: LocalTime --> TypeConversionClass: Integer
FieldName: timestamp0 --> TypeName: TIMESTAMP(0) --> DefaultConversionClass: LocalDateTime --> TypeConversionClass: TimestampData
FieldName: timestamp3 --> TypeName: TIMESTAMP(3) --> DefaultConversionClass: LocalDateTime --> TypeConversionClass: TimestampData
FieldName: timestamp6 --> TypeName: TIMESTAMP(6) --> DefaultConversionClass: LocalDateTime --> TypeConversionClass: TimestampData
FieldName: binary5 --> TypeName: BINARY(5) --> DefaultConversionClass: byte[] --> TypeConversionClass: byte[]
FieldName: varbinary5 --> TypeName: VARBINARY(5) --> DefaultConversionClass: byte[] --> TypeConversionClass: byte[]
FieldName: bytes --> TypeName: BYTES --> DefaultConversionClass: byte[] --> TypeConversionClass: byte[]
四、样例数据
+----+--------------------------------+--------------------------------+--------------------------------+---------+--------------+---------+----------+-------------+----------------------+--------------------------------+--------------------------------+------------+----------+--------------+---------------------+-------------------------+----------------------------+--------------------------------+--------------------------------+--------------------------------+
| op | char10 | varchar10 | string | boolean | decimal103 | tinyint | smallint | integer | bigint | float | double | date | time0 | time3 | timestamp0 | timestamp3 | timestamp6 | binary5 | varbinary5 | bytes |
+----+--------------------------------+--------------------------------+--------------------------------+---------+--------------+---------+----------+-------------+----------------------+--------------------------------+--------------------------------+------------+----------+--------------+---------------------+-------------------------+----------------------------+--------------------------------+--------------------------------+--------------------------------+
| -D | 林远航 | 吕建辉 | 莫炫明|中山|擎宇.魏@yahoo.com | false | 3050752.529 | 113 | -9916 | -642000144 | 4156969259139129690 | 1673004.6 | 5.866031031768757 | 1988-07-13 | 19:17:05 | 05:02:40 | 2017-02-24 08:49:28 | 1980-03-02 11:28:27.601 | 2007-10-24 19:41:31.074622 | [-26, -78, -120, -25, -125] | [-24, -75, -75, -26, -103] | [-24, -82, -72, -26, -104, ... |
| -D | 沈伟祺 | 段浩然 | 阎擎宇|阳江|越彬.范@hotmail... | true | 1353447.631 | 84 | 3766 | -1944128024 | 5163149151613555000 | 294841.06 | 404.7775779582528 | 2038-11-15 | 22:25:22 | 12:45:26 | 2055-04-12 12:48:41 | 2012-07-20 19:19:59.805 | 1989-05-24 16:32:30.562509 | [-24, -117, -113, -27, -83] | [-25, -122, -118, -23, -72] | [-26, -101, -66, -23, -71, ... |
| +I | 谭弘文 | 丁弘文 | 苏睿渊|韶关|擎苍.陈@yahoo.com | false | 8423218.732 | 9 | 10515 | 524074331 | -2197205037599287672 | 27.323235 | 9053.888002920687 | 2066-11-04 | 12:05:57 | 03:09:11 | 1978-11-22 23:30:41 | 1975-07-05 12:16:57.301 | 2001-10-28 06:15:39.835157 | [-23, -126, -79, -25, -125] | [-25, -88, -117, -27, -121] | [-28, -67, -107, -27, -83, ... |