RichFunction&RichMapFunction

RichFunction&RichMapFunction

Rich接口可以获取运行环境的上下文,拥有一些生命周期方法;

1、默认生命周期方法, 初始化方法, 在每个并行度上只会被调用一次, 而且先被调用

2、默认生命周期方法, 最后一个方法, 做一些清理工作, 在每个并行度上只调用一次, 而且是最后被调用

3、getRuntimeContext()方法提供了函数的RuntimeContext的一些信息,例如函数执行的并行度,任务的名字,以及state状态

需要注意的是,这里的生命周期方法,对于一个并行子任务来说只会调用一次;而对应的, 实际工作方法,例如RichMapFunction 中的 map(),在每条数据到来后都会触发一次调用。

public class CounterApp {

public static void main(String[] args) throws Exception {

ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();

DataSource data = env.fromElements("hadoop", "java", "flink", "hive");

DataSet info = data.map(new RichMapFunction() {

LongCounter counter = new LongCounter();

/**

* 默认生命周期方法:初始化方法,在每个并行度上,只会调用一次

* @param parameters

* @throws Exception

*/

@Override

public void open(Configuration parameters) throws Exception {

System.out.println("open方法...");

super.open(parameters);

getRuntimeContext().addAccumulator("element-java", counter);

}

/**

*

* @throws Exception

*/

@Override

public void close() throws Exception {

System.out.println("close方法执行一次...");

super.close();

}

@Override

public String map(String s) throws Exception {

counter.add(1);

return s;

}

}).setParallelism(2);

info.writeAsText("d://a.txt", FileSystem.WriteMode.OVERWRITE);

JobExecutionResult jobResult = env.execute("counterApp");

long result = jobResult.getAccumulatorResult("element-java");

System.out.println("result:" + result);

}

}

并行度计算数量

import org.apache.flink.api.common.accumulators.LongCounter

import org.apache.flink.api.common.functions.RichMapFunction

import org.apache.flink.api.scala._

import org.apache.flink.configuration.Configuration

import org.apache.flink.core.fs.FileSystem.WriteMode

object CounterApp {

def main(args: Array[String]): Unit = {

val env = ExecutionEnvironment.getExecutionEnvironment

val data = env.fromElements("hadoop", "spark", "java", "flink")

val info = data.map(new RichMapFunction[String, String] {

val counter = new LongCounter()

override def open(parameters: Configuration): Unit = {

getRuntimeContext.addAccumulator("element-scala-counter", counter)

}

override def map(in: String): String = {

counter.add(1)

in

}

})

info.setParallelism(3).writeAsText("d://a.text", WriteMode.OVERWRITE)

val result = env.execute("CounterApp")

val num = result.getAccumulatorResult[Long]("element-scala-counter")

println("num:" + num)

}

}

-----------------------------------

num:4

相关内容