在pyspark中調用scala代碼

在pyspark中調用scala代碼

情境說明

問題

咱們這邊是要使用Spark去並行一個天然語言處理的算法,其中使用到了LDA主題模型。因爲使用的是天河二號,Spark版本是1.5.1,pyspark一樣,因此獲取主題時還不能使用describeTopics(在spark1.6中才開放對python的接口),只能使用topicsMatrix的方法。java

原本湊合用topicsMatrix也行,但咱們發現,這一個用來獲取主題模型的函數,竟然比Lda的訓練還要慢!不管在咱們本身的集羣仍是在天河二號的分區上,都是這一個狀況。觀察topicsMatrix的源代碼,好像也沒有什麼複雜操做,只是把數據彙總collect而已:python

@Since("1.3.0")
override lazy val topicsMatrix: Matrix = {
  // Collect row-major topics
  val termTopicCounts: Array[(Int, TopicCounts)] =
    graph.vertices.filter(_._1 < 0).map { case (termIndex, cnts) =>
    (index2term(termIndex), cnts)}.collect()
  // Convert to Matrix
  val brzTopics = BDM.zeros[Double](vocabSize, k)
  termTopicCounts.foreach { case (term, cnts) =>
    var j = 0
    while (j < k) {
      brzTopics(term, j) = cnts(j)
      j += 1
    }
  }
  Matrices.fromBreeze(brzTopics)
}

因爲並非算法中有一些複雜運算致使較慢,咱們天然不但願在程序中有這樣的狀況。發現到在Spark1.5.1中,mllib中LdaModel已經實現了describeTopics,只是未在Python中開放,咱們天然但願嘗試使用describeTopics看看效果。算法

describeTopics的源代碼探索

已知LDA.train()返回的是LdaModel的實例,因而乎,參考上篇博客,用如下方式去調用:編程

model = LDA.train(rdd_data, k=num_topics, maxIterations=20)
topics = model.call('describeTopics', _py2java(sc, 10))

執行速度特別快,然而返回的結果卻不盡如人意,僅返回了一個長度k的列表,每一個元素是一個key爲'class',value爲'scala.Tuple2'的單元素字典。從結果來看,scala的代碼應該是被成功執行了,然而返回結果卻出了問題。查看callJavaFunc的內容,能夠判斷出,是describeTopics的返回結果沒有被_java2py函數正常的轉換。bash

比對Spark1.5和Spark1.6的代碼,LdaModel.describeTopics函數的內容是一致的,那麼問題在哪兒呢?再去查看pyspark的LDA.train()調用的PythonMLLibAPI.trainLdaModel,發如今1.6中返回的再也不是LdaModel而是它的子類LdaModelWrapper。查看這個類的方法,發現它重載了describeTopics來方便_java2py進行數據轉換:app

private[python] class LDAModelWrapper(model: LDAModel) {

  def topicsMatrix(): Matrix = model.topicsMatrix

  def vocabSize(): Int = model.vocabSize

  def describeTopics(): Array[Byte] = describeTopics(this.model.vocabSize)

  def describeTopics(maxTermsPerTopic: Int): Array[Byte] = {
    val topics = model.describeTopics(maxTermsPerTopic).map { case (terms, termWeights) =>
      val jTerms = JavaConverters.seqAsJavaListConverter(terms).asJava
      val jTermWeights = JavaConverters.seqAsJavaListConverter(termWeights).asJava
      Array[Any](jTerms, jTermWeights)
    }
    SerDe.dumps(JavaConverters.seqAsJavaListConverter(topics).asJava)
  }

  def save(sc: SparkContext, path: String): Unit = model.save(sc, path)
}

找到這裏,解決方法就油然而生了。只要咱們把這一段scala代碼在python中調用,並將describeTopics的Java對象傳入,不就萬事大吉了嗎?jvm

在pyspark中調用scala代碼

也許還有別的方法,不過這裏使用的方法也足夠簡單。將.scala文件打包成jar後,啓動spark時加入參數--driver-class-path /path/to/xxx.jar,即可以將你的scala代碼放入Spark運行的虛擬機JVM中,從而讓python代碼在運行中經過反射機制在SparkContext._jvm裏動態獲取到你的類與方法:ide

func = sc._jvm.com.example.YourObject.func

打包scala代碼

那麼,如今的問題就是如何把scala代碼打包成jar了。scala雖然也是基於JVM運行的語言,與java很是類似,可是其編譯選項中並無提供將其打包成jar的參數。這裏咱們用sbt打包它,sbt的下載與安裝請自行查閱其餘教程,這裏就不提供了,官方網站函數

首先編寫好你的scala代碼,確認沒有bug,並在文件開頭用package關鍵字將其封裝至包中。接着,請手動創建你的項目目錄,並建立以下結構:
圖片描述網站

在build.sbt中,請至少進行如下設置

//項目名
name := "Project"

//項目版本
version := "0.1"

//scala版本
scalaVersion := "2.10.5"

//jdk版本
javacOptions ++= Seq("-source", "1.7", "-target", "1.7")

//主函數
mainClass in Compile := Some("YourClass.func")

在plugins.sbt中,請加上這一句話,告訴sbt須要這個第三方插件,這是用來打包的

addSbtPlugin("com.eed3si9n" % "sbt-assembly" % "0.14.5")

這些都準備完成後,在terminal裏進入你的項目根目錄下,輸入

sbt package

等待打包完成,會有相應提示。
更多的打包選項,以及sbt的更多用法,感興趣能夠自行查閱。

解決咱們的問題

回到咱們這裏的問題,咱們但願能在python中對describeTopics的返回值進行轉換,那麼我麼只須要打包那一個重載的describeTopics就行了,這樣能夠避免打包Spark的第三方包。更改一下函數的返回值,並註釋掉調用Spark的SerDe進行序列化的語句,最終的代碼以下:

package com.sysu.sparkhelper

import java.util.List
import scala.collection.JavaConverters

object LdaHelper {
    def convert(topics: Array[(Array[Int], Array[Double])]): List[Array[Any]] = {
        val result = topics.map { case (terms, termWeights) =>
          val jTerms = JavaConverters.seqAsJavaListConverter(terms).asJava
          val jTermWeights = JavaConverters.seqAsJavaListConverter(termWeights).asJava
          Array[Any](jTerms, jTermWeights)
        }
        return JavaConverters.seqAsJavaListConverter(result).asJava
        // SerDe.dumps(JavaConverters.seqAsJavaListConverter(result).asJava)
    }
}

用sbt打包完成後,使用--driver-class-path添加jar包,在python中相應代碼爲:

lda_java_model = model._java_model
func = getattr(model._java_model, 'describeTopics')
result = func(_py2java(sc, 10))
topics = _java2py(sc, sc._jvm.com.sysu.sparkhelper.LdaHelper.convert(result))

總結

這算是閱讀源碼的一次應用,能夠說仍是解決了遇到的問題,同時也加深了對Spark的瞭解。原本作並行化就是但願效率更高,pyspark卻在調用scala代碼,同時進行了不少數據轉換。想要更好的使用Spark的話,使用scala去編程應該纔是最好的。

相關文章
相關標籤/搜索