目录
函数类(Function Classes)
富函数类(Rich Function Classes)
函数类(Function Classes)
Flink暴露了所有UDF函数的接口,具体实现方式为接口或者抽象类,例如MapFunction、FilterFunction、ReduceFunction等。所以用户可以自定义一个函数类,实现对应的接口。
java:
public class MapFunctionExample {  public static void main(String[] args) throws Exception {  // 创建流执行环境  final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();  // 创建一个数据源,例如从本地生成整数序列  DataStream<Integer> numbers = env.fromElements(1, 2, 3, 4, 5);  // 使用 MapFunction 将整数翻倍  DataStream<Integer> doubledNumbers = numbers.map(new MapFunction<Integer, Integer>() {  @Override  public Integer map(Integer value) throws Exception {  return value * 2;  }  });  // 打印结果到控制台  doubledNumbers.print();  // 执行流处理  env.execute("MapFunction Example");  }  
}scala:
object MapFunctionExample {  def main(args: Array[String]): Unit = {  // 创建流处理环境  val env = StreamExecutionEnvironment.getExecutionEnvironment  // 创建输入数据源  val input = env.fromElements(1, 2, 3, 4, 5)  // 使用 MapFunction 将每个元素乘以 2  val output = input.map(new MapFunction[Int, Int] {  def map(value: Int): Int = {  value * 2  }  })  // 打印结果到控制台  output.print()  // 执行流处理作业  env.execute("MapFunction Example")  }  
}富函数类(Rich Function Classes)
“富函数”是 DataStream API 提供的一个函数类的接口,所有 Flink 函数类都有其 Rich 版本。它与常规函数的不同在于,可以获取运行环境的上下文,并拥有一些生命周期方法,所以可以实现更复杂的功能。
- RichMapFunction
- RichFlatMapFunction
- RichFilterFunction
- .................
Rich Function 有一个生命周期的概念。典型的生命周期方法有:
- open()方法是 rich function 的初始化方法,当一个算子例如 map 或者 filter被调用之前 open()会被调用。
- close()方法是生命周期中的最后一个调用的方法,做一些清理工作。
- getRuntimeContext()方法提供了函数的 RuntimeContext 的一些信息,例如函数执行的并行度,任务的名字,以及 state 状态
// 富函数,可以获取到运行时上下文,还有一些生命周期class MyRichMap extends RichMapFunction[SensorReading, String]{override def open(parameters: Configuration): Unit = {//做一些初始化操作。比如map方法需要交互数据库,数据库连接可以在open里边做//getRuntimeContext()}override def map(value: SensorReading): String = {value.id + " temperature"}override def close(): Unit = {//map调用完之后。一般做收尾工作,比如关闭连接,或者清空状态}}java:
public class RichMapExample {  public static void main(String[] args) throws Exception {  // 设置执行环境  final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();  // 创建数据源  env.fromElements("Hello", "World", "Flink")  .map(new RichMapFunction<String, Tuple2<String, Integer>>() {  private int count = 0;  @Override  public Tuple2<String, Integer> map(String value) throws Exception {  count++;  System.out.println("Mapped value: " + value);  return new Tuple2<>(value, count);  }  @Override  public void open(Configuration parameters) throws Exception {  super.open(parameters);  // 在这里可以添加一些初始化代码,比如日志记录、度量等。  }  })  // 添加简单的打印操作作为例子  .print();  // 执行任务  env.execute("Rich Map Function Example");  }  
}scala:
object RichMapFunctionExample {  def main(args: Array[String]): Unit = {  // 创建流处理环境  val env = StreamExecutionEnvironment.getExecutionEnvironment  // 创建一个简单的数据源  val stream = env.fromElements(1, 2, 3, 4, 5)  // 使用 RichMapFunction 转换数据流  val transformedStream = stream.map(new RichMapFunction[Int, Int]() {  var stateValue: Option[Int] = None  override def open(config: Configuration): Unit = {  // 初始化状态描述器  val descriptor = new ValueStateDescriptor[Int]("state", classOf[Int])  stateValue = getRuntimeContext.getState(descriptor)  }  override def map(value: Int): Int = {  // 使用状态值进行转换  val transformedValue = stateValue match {  case Some(prevValue) => value + prevValue  case None => value  }  // 更新状态值  stateValue = Some(transformedValue)  transformedValue  }  })  // 打印结果到控制台  transformedStream.print()  // 执行作业  env.execute("RichMapFunction Example")  }  
}