在前面的全部例子中,咱們只是運行了livy
官方給的兩個例子。這篇咱們要嘗試運行一些有意義的代碼。node
如沒有特殊說明,之後全部的實驗都是在
yarn-cluster
模式下運行的。
咱們打算嘗試運行下面代碼:python
sparkSession.read.format("org.elasticsearch.spark.sql") .options(Map( "es.nodes" -> "192.168.21.41:9200", "es.resource" -> "xxxxxxxxxxxxx") ) .load() .show()
這段代碼用spark sql加載了elasticsearch
的某個index,並使用show()
打印幾行數據。web
爲了完成這個實驗,有兩個問題必須解決:sql
elasticsearch-spark-20_2.11-x.x.x.jar
裏面。因此要運行上面的代碼,必須保證這個jar包被正確加載到。sc
表示當前的SparkContext
對象,而這裏咱們須要的是SparkSession
對象。如今咱們還不知道應該如何引用「當前SparkSession」對象。這兩個問題,livy的文檔沒有涉及。可是不要緊,從源碼裏面找答案。apache
首先,種種跡象代表livy會自動將LIVY_HOME/rsc-jars
目錄下的jar包上傳。因而咱們先把elasticsearch-spark-20_2.11-x.x.x.jar
傳到LIVY_HOME/rsc-jars
目錄下。json
而後,從源碼org/apache/livy/repl/AbstractSparkInterpreter.scala
中能夠找到SparkSession
對象的bind
:elasticsearch
... bind("spark", sparkEntries.sparkSession().getClass.getCanonicalName, sparkEntries.sparkSession(), List("""@transient""")) bind("sc", "org.apache.spark.SparkContext", sparkEntries.sc().sc, List("""@transient""")) execute("import org.apache.spark.SparkContext._") execute("import spark.implicits._") execute("import spark.sql") execute("import org.apache.spark.sql.functions._") ...
能夠看到,這裏將SparkSession對象bind到spark
變量上,而把SparkContext對象bind到sc
變量上。post
因而咱們的代碼應該寫成:ui
spark.read.format("org.elasticsearch.spark.sql") .options(Map( "es.nodes" -> "192.168.21.41:9200", "es.resource" -> "xxxxxxxxxxxxx") ) .load() .show()
接下來,仍是用python來提交代碼運行:url
data = {'code': 'sc.read.format("org.elasticsearch.spark.sql").options(Map("es.nodes" -> "192.168.21.41:9200", "es.resource" -> "777_zabbix_item2020_09_23_09_50_41")).load().show()'} r = requests.post(statements_url, data=json.dumps(data), headers=headers)
從webui上查看運行結果:
能夠看到show()
成果打印告終果
從spark-web-ui上找到環境頁面,查看spark.yarn.dist.jars
,能夠看到,elasticsearch-spark-20_2.11-x.x.x.jar
被加了進來:
從這個實驗,咱們掌握了自定義的jar包應該如何利用livy上傳到集羣上;還知道了SparkSession對象bind的變量是spark
。