Rich接口可以获取运行环境的上下文,拥有一些生命周期方法;
1、默认生命周期方法, 初始化方法, 在每个并行度上只会被调用一次, 而且先被调用
2、默认生命周期方法, 最后一个方法, 做一些清理工作, 在每个并行度上只调用一次, 而且是最后被调用
3、getRuntimeContext()方法提供了函数的RuntimeContext的一些信息,例如函数执行的并行度,任务的名字,以及state状态
需要注意的是,这里的生命周期方法,对于一个并行子任务来说只会调用一次;而对应的, 实际工作方法,例如RichMapFunction 中的 map(),在每条数据到来后都会触发一次调用。
public class CounterApp {
public static void main(String[] args) throws Exception {
ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
DataSource
DataSet
LongCounter counter = new LongCounter();
/**
* 默认生命周期方法:初始化方法,在每个并行度上,只会调用一次
* @param parameters
* @throws Exception
*/
@Override
public void open(Configuration parameters) throws Exception {
System.out.println("open方法...");
super.open(parameters);
getRuntimeContext().addAccumulator("element-java", counter);
}
/**
*
* @throws Exception
*/
@Override
public void close() throws Exception {
System.out.println("close方法执行一次...");
super.close();
}
@Override
public String map(String s) throws Exception {
counter.add(1);
return s;
}
}).setParallelism(2);
info.writeAsText("d://a.txt", FileSystem.WriteMode.OVERWRITE);
JobExecutionResult jobResult = env.execute("counterApp");
long result = jobResult.getAccumulatorResult("element-java");
System.out.println("result:" + result);
}
}
并行度计算数量
import org.apache.flink.api.common.accumulators.LongCounter
import org.apache.flink.api.common.functions.RichMapFunction
import org.apache.flink.api.scala._
import org.apache.flink.configuration.Configuration
import org.apache.flink.core.fs.FileSystem.WriteMode
object CounterApp {
def main(args: Array[String]): Unit = {
val env = ExecutionEnvironment.getExecutionEnvironment
val data = env.fromElements("hadoop", "spark", "java", "flink")
val info = data.map(new RichMapFunction[String, String] {
val counter = new LongCounter()
override def open(parameters: Configuration): Unit = {
getRuntimeContext.addAccumulator("element-scala-counter", counter)
}
override def map(in: String): String = {
counter.add(1)
in
}
})
info.setParallelism(3).writeAsText("d://a.text", WriteMode.OVERWRITE)
val result = env.execute("CounterApp")
val num = result.getAccumulatorResult[Long]("element-scala-counter")
println("num:" + num)
}
}
-----------------------------------
num:4