Livy探究(四) -- 從es讀取數據

在前面的全部例子中,咱們只是運行了livy官方給的兩個例子。這篇咱們要嘗試運行一些有意義的代碼。node

如沒有特殊說明,之後全部的實驗都是在 yarn-cluster模式下運行的。

咱們打算嘗試運行下面代碼:python

sparkSession.read.format("org.elasticsearch.spark.sql")
.options(Map(
    "es.nodes" -> "192.168.21.41:9200", 
    "es.resource" -> "xxxxxxxxxxxxx")
)
.load()
.show()

這段代碼用spark sql加載了elasticsearch的某個index,並使用show()打印幾行數據。web

爲了完成這個實驗,有兩個問題必須解決:sql

  1. 你們知道spark sql能夠擴展DataSource,elasticsearch官方爲spark開發的DataSource在elasticsearch-spark-20_2.11-x.x.x.jar裏面。因此要運行上面的代碼,必須保證這個jar包被正確加載到。
  2. 在以前的例子中,咱們用sc表示當前的SparkContext對象,而這裏咱們須要的是SparkSession對象。如今咱們還不知道應該如何引用「當前SparkSession」對象。

這兩個問題,livy的文檔沒有涉及。可是不要緊,從源碼裏面找答案。apache

首先,種種跡象代表livy會自動將LIVY_HOME/rsc-jars目錄下的jar包上傳。因而咱們先把elasticsearch-spark-20_2.11-x.x.x.jar傳到LIVY_HOME/rsc-jars目錄下。json

而後,從源碼org/apache/livy/repl/AbstractSparkInterpreter.scala中能夠找到SparkSession對象的bindelasticsearch

...
bind("spark",
 sparkEntries.sparkSession().getClass.getCanonicalName,
 sparkEntries.sparkSession(),
 List("""@transient"""))
bind("sc", "org.apache.spark.SparkContext", sparkEntries.sc().sc, List("""@transient"""))
execute("import org.apache.spark.SparkContext._")
execute("import spark.implicits._")
execute("import spark.sql")
execute("import org.apache.spark.sql.functions._")
...

能夠看到,這裏將SparkSession對象bind到spark變量上,而把SparkContext對象bind到sc變量上。post

因而咱們的代碼應該寫成:ui

spark.read.format("org.elasticsearch.spark.sql")
.options(Map(
    "es.nodes" -> "192.168.21.41:9200", 
    "es.resource" -> "xxxxxxxxxxxxx")
)
.load()
.show()

接下來,仍是用python來提交代碼運行:url

data = {'code': 'sc.read.format("org.elasticsearch.spark.sql").options(Map("es.nodes" -> "192.168.21.41:9200", "es.resource" -> "777_zabbix_item2020_09_23_09_50_41")).load().show()'}

r = requests.post(statements_url, data=json.dumps(data), headers=headers)

從webui上查看運行結果:

image.png

能夠看到show()成果打印告終果

從spark-web-ui上找到環境頁面,查看spark.yarn.dist.jars,能夠看到,elasticsearch-spark-20_2.11-x.x.x.jar被加了進來:

image.png

總結

從這個實驗,咱們掌握了自定義的jar包應該如何利用livy上傳到集羣上;還知道了SparkSession對象bind的變量是spark

相關文章
相關標籤/搜索