看longAccumulator()方法源码里是val acc = new LongAccumulator
然后用register(acc)
在Spark中注册了累加器,进入ctrl+鼠标左键进入
LongAccumulator,可以看到继承了AccumulatorV2[jl.Long, jl.Long],根据LongAccumulator来实现自定义累加器
实现类
class WordAccumulator extends AccumulatorV2 [ String, util. ArrayList[ String] ] { val list = new util. ArrayList [ String] ( ) override def isZero: Boolean = { list. isEmpty} override def copy ( ) : AccumulatorV2[ String, util. ArrayList[ String] ] = { new WordAccumulator } override def reset ( ) : Unit = { list. clear ( ) } override def add ( v: String) : Unit = { if ( v. contains ( "h" ) ) { list. add ( v) } } override def merge ( other: AccumulatorV2[ String, util. ArrayList[ String] ] ) : Unit = { list. addAll ( other. value) } override def value: util. ArrayList[ String] = list
}
然后是main函数
def main ( args: Array[ String] ) : Unit = { val conf = new SparkConf ( ) . setAppName ( "CheckPoint" ) . setMaster ( "local" ) val sc = new SparkContext ( conf) val dataRDD: RDD[ String] = sc. makeRDD ( List ( "chun1" , "chun2" , "chun3" , "chun4" ) , 2 ) val wordAccumulator = new WordAccumulator ( ) sc. register ( wordAccumulator) dataRDD. foreach{ case word= > { wordAccumulator. add ( word) } } println ( wordAccumulator. value) }
结果:[chun1, chun2, chun3, chun4]
完整代码
package date_9_23import java. utilimport org. apache. spark. rdd. RDD
import org. apache. spark. util. AccumulatorV2
import org. apache. spark. { SparkConf, SparkContext}
object Spark4_LongAccumulator { def main ( args: Array[ String] ) : Unit = { val conf = new SparkConf ( ) . setAppName ( "CheckPoint" ) . setMaster ( "local" ) val sc = new SparkContext ( conf) val dataRDD: RDD[ String] = sc. makeRDD ( List ( "chun1" , "chun2" , "chun3" , "chun4" ) , 2 ) val wordAccumulator = new WordAccumulator ( ) sc. register ( wordAccumulator) dataRDD. foreach{ case word= > { wordAccumulator. add ( word) } } println ( wordAccumulator. value) }
}
class WordAccumulator extends AccumulatorV2 [ String, util. ArrayList[ String] ] { val list = new util. ArrayList [ String] ( ) override def isZero: Boolean = { list. isEmpty} override def copy ( ) : AccumulatorV2[ String, util. ArrayList[ String] ] = { new WordAccumulator } override def reset ( ) : Unit = { list. clear ( ) } override def add ( v: String) : Unit = { if ( v. contains ( "h" ) ) { list. add ( v) } } override def merge ( other: AccumulatorV2[ String, util. ArrayList[ String] ] ) : Unit = { list. addAll ( other. value) } override def value: util. ArrayList[ String] = list
}