本次測試爲本地單機版的elasticsearch和sparkhtml
配置:spark2.2.0,elasticsearch1.7.2(集羣)或者elasticsearch6.6.1(單機版),sdk2.11.1node
pom依賴:mysql
<dependency> <groupId>org.scala-lang</groupId> <artifactId>scala-library</artifactId> <version>2.11.1</version> </dependency> <dependency> <groupId>org.apache.spark</groupId> <artifactId>spark-core_2.11</artifactId> <version>2.2.0</version> </dependency> <dependency> <groupId>org.apache.spark</groupId> <artifactId>spark-sql_2.11</artifactId> <version>2.2.0</version> </dependency> <!--elasticsearch-dadoop--> <dependency> <groupId>org.elasticsearch</groupId> <artifactId>elasticsearch-hadoop</artifactId> <version>6.1.1</version> </dependency>
備註:編譯環境必定要統一,scala的版本都是不向下兼容的。sql
代碼一:RDD讀取數據庫
val conf = new SparkConf().setMaster("local").setAppName("ScalaSparkElasticSearch") /** * 根據es官網的描述,集成須要設置: * es.index.auto.create--->true * 咱們要去鏈接es集羣,要設置es集羣的位置(host, port) */ conf.set("es.index.auto.create", "true") conf.set("es.nodes", "master") //---->若是是鏈接的遠程es節點,該項必需要設置 conf.set("es.port", "9200") val sc = new SparkContext(conf)
val esrdd:RDD[(String, collection.Map[String, AnyRef])]=sc.esRDD("kc22k2_detail/cz")//獲取整個索引的數據 esrdd.collect().foreach(s=>print(s._2))
代碼二:dataFrame讀取apache
val spark=SparkSession.builder().master("local").appName("spark_es").getOrCreate() val options = Map("es.index.auto.create" -> "true", "pushdown" -> "true", "es.nodes" -> "10.111.121.115","es.port" -> "9200") val sparkDF = spark.sqlContext.read.format("org.elasticsearch.spark.sql").options(options).load("kc22k2_detail/cz") sparkDF.show(10) sparkDF.createOrReplaceTempView("t_sn_gongshi") spark.sql("select count(distinct name_depa) from t_sn_gongshi").show()
使用sparksql來讀取es,能夠實現sql操做數據庫很方便,但會花費大量的交互時間,日常最好直接使用es的api,與hadoop結合的時候能夠使用spark進行交互處理。api
代碼三:經過篩選條件讀取--減小數據量和網絡傳輸時間。網絡
DSL查詢參考地址:https://www.yiibai.com/elasticsearch/elasticsearch_query_dsl.htmlsession
val conf = new SparkConf() conf.set("es.index.auto.create", "true") conf.set("pushdown", "true") conf.set("es.nodes", "192.168.x.xxx") conf.set("es.port", "9200") val spark=SparkSession.builder().master("local[*]").appName("spark_es").config(conf).getOrCreate() val query:String = s"""{ "query" : { "terms":{ "name":["北京","成都"] } } }""" val df = EsSparkSQL.esDF(spark.sqlContext,"area_map_0302_3/cz",query)
df.show()
代碼三:使用spark處理數據後寫入elasticsearchapp
def sparkWriteEs(): Unit ={ val conf = new SparkConf() conf.set("es.index.auto.create", "true") conf.set("pushdown", "true") conf.set("es.nodes", "10.111.121.115") conf.set("es.port", "9200") //sparksession中能夠放多個配置文件 val spark = SparkSession.builder() .appName("Spark-ES") .master("local[*]") .config(conf) .enableHiveSupport() .getOrCreate() /** * 寫入rdd數據 */ val data:RDD[String]=sc.textFile("file:///C:\\Users\\91BGJK2\\Desktop\\es測試數據.txt") val rdds:RDD[Trip]=data.map { line => val s:Array[String]= line.split("\t") Trip(s(0),s(1),s(2)) } EsSpark.saveToEs(rdds, "kc22k2_test/cz") //val sqlContext=new SQLContext(sc) /** * 從數據源中取---寫入datframe數據 */ val gongshi:DataFrame=spark.read.format("jdbc") .option("driver","com.mysql.jdbc.Driver") .option("url","jdbc:mysql://10.111.121.111:3306/test?useUnicode=true&characterEncoding=UTF-8") .option("dbtable","t_test_es") .option("user", "root") .option("password", "root") .load() //dataframe格式數據使用essparksql來傳遞,rdd使用esspark EsSparkSQL.saveToEs(gongshi, "kc22k2_detail/cz") }