方法1:使用flink DataSet APIide
points.map(new SelectNearestCenter).withBroadcastSet(currentCentroids, "centroids")//申明對map操做進行廣播
import scala.collection.JavaConverters._ final class SelectNearestCenter extends RichMapFunction[DenseVector, (Int, DenseVector)] with Serializable{ private var centroids: Traversable[DenseVector] = null override def open(parameters: Configuration) { centroids = getRuntimeContext.getBroadcastVariable[DenseVector]("centroids").asScala } def map(p: DenseVector): (Int, DenseVector) = { //use centroids ... } }
方法2:使用Flink ml mapWithBcVariable方法scala
points.mapWithBcVariable(currentCentroids) { (point, center) => { //直接使用廣播變量center } }