由於作資料搜索用到了ElasticSearch,最近又瞭解一下 Spark ML,先來演示一個Spark 讀取/寫入 ElasticSearch 簡單示例。(spark 讀取ElasticSearch中數據)html
環境:IDEA2016,JDK8,windows10,安裝的 ElasticSearch6.3.2 和 spark-2.3.1-bin-hadoop2.7,使用mvn package 將程序打成jar包,採用spark-submit提交給spark執行。java
先在ElasticSearch中建立一個索引用來演示。由於是文本數據,所以採用ik分詞。可參考:elasticsearch-iknode
建立索引:PUT /index_ik_testgit
設置mapping 及相應的分詞器,這裏指定 content 字段爲 ElasticSearch 的text 類型,並使用ik_max_word 分詞模式github
POST index_ik_test/fulltext/_mapping
{
"properties": {
"content":{
"type": "text",
"analyzer": "ik_max_word",
"search_analyzer": "ik_max_word"
}
}
}sql
存幾篇文檔到ElasticSearch中apache
POST index_ik_test/fulltext/1
{"content":"其中有兩我的受傷了"}windows
ik 分詞器有兩種分詞模式:ik_max_word
和ik_smart
。可經過以下方式查看一下這二者的區別:api
GET index_ik_test/_analyze
{
"text": ["其中國家投資了500萬"],
"tokenizer": "ik_smart"
}服務器分詞結果:其中、國家、投資、了、500萬
GET index_ik_test/_analyze
{
"text": ["其中國家投資了500萬"],
"tokenizer": "ik_max_word"
}分詞結果:其中、中國、國家、投資、了、500、萬
使用GET index_ik_test/_mapping
可查看索引的配置信息
{
"index_ik_test": {
"mappings": {
"fulltext": {
"properties": {
"content": {
"type": "text",
"analyzer": "ik_max_word"
}
}
}
}
}
}
好,如今ElasticSearch中有數據了,如今看怎麼基於Spark讀取ElasticSearch中的數據。
IDEA2016中新建一個Maven工程,固然也能夠用SpringBoot工程,可是這裏的是單純的Maven Project。
ElasticSearch官方提供了elasticsearch-hadoop
來供Spark訪問ElasticSearch。具體可參考:官方文檔es for spark。
官方提供了elasticsearch-hadoop
maven 依賴,這個依賴包括了:ElasticSearch for Hadoop MR、ElasticSearch for Hadoop Hive、ElasticSearch for Hadoop Spark。若是隻用到了Spark,也能夠只添加ElasticSearch for spark依賴。具體可參考:(這個連接)[https://www.elastic.co/guide/en/elasticsearch/hadoop/current/install.html]
<dependency> <groupId>org.elasticsearch</groupId> <artifactId>elasticsearch-spark-20_2.10</artifactId> <version>6.3.2</version> </dependency>
建立spark運行上下文時須要spark-sql_2.11
依賴,可參考:spark 官方文檔quick start。
To build the program, we also write a Maven
pom.xml
file that lists Spark as a dependency. Note that Spark artifacts are tagged with a Scala version.
在本文的示例中,添加了下面3個maven依賴:
<dependency> <groupId>org.elasticsearch</groupId> <artifactId>elasticsearch-hadoop</artifactId> <version>6.3.2</version> </dependency> <!-- Spark dependency --> <dependency> <groupId>org.apache.spark</groupId> <artifactId>spark-sql_2.11</artifactId> <version>2.3.1</version> </dependency> <dependency> <groupId>com.google.guava</groupId> <artifactId>guava</artifactId> <version>22.0</version> </dependency>
下面來直接看示例代碼:
spark配置鏈接ElasticSearch。可參考:elasticsearch-hadoop-master,咱們採用的是:Configure the connector to run in WAN mode
SparkConf sparkConf = new SparkConf().setAppName("writeEs").setMaster("local[*]").set("es.index.auto.create", "true") .set("es.nodes", "ELASTIC_SEARCH_IP").set("es.port", "9200").set("es.nodes.wan.only", "true");
將數據寫入到ElasticSearch
JavaRDD<Map<String, ?>> javaRDD = jsc.parallelize(ImmutableList.of(numbers, airports)); JavaEsSpark.saveToEs(javaRDD, elasticIndex);
JavaRDD<Map<String, Object>> searchRdd = esRDD(jsc, "index_ik_test/fulltext", "?q=中國").values(); for (Map<String, Object> item : searchRdd.collect()) { item.forEach((key, value)->{ System.out.println("search key:" + key + ", search value:" + value); }); }
使用?q=中國
做爲查詢條件。整個完整示例代碼以下:
import com.google.common.collect.ImmutableList; import com.google.common.collect.ImmutableMap; import org.apache.spark.SparkConf; import org.apache.spark.api.java.JavaRDD; import org.apache.spark.api.java.JavaSparkContext; import org.apache.spark.sql.SparkSession; import org.elasticsearch.spark.rdd.api.java.JavaEsSpark; import java.util.Map; import static org.elasticsearch.spark.rdd.api.java.JavaEsSpark.esRDD; /** * Created by Administrator on 2018/8/28. */ public class EsSparkTest { public void writeEs() { String elasticIndex = "spark/docs"; //https://www.elastic.co/guide/en/elasticsearch/hadoop/current/spark.html#spark-native SparkConf sparkConf = new SparkConf().setAppName("writeEs").setMaster("local[*]").set("es.index.auto.create", "true") .set("es.nodes", "ELASTIC_SEARCH_IP").set("es.port", "9200").set("es.nodes.wan.only", "true"); SparkSession sparkSession = SparkSession.builder().config(sparkConf).getOrCreate(); JavaSparkContext jsc = new JavaSparkContext(sparkSession.sparkContext());//adapter Map<String, ?> numbers = ImmutableMap.of("one", 1, "two", 2); Map<String, ?> airports = ImmutableMap.of("OTP", "Otopeni", "SFO", "San Fran"); JavaRDD<Map<String, ?>> javaRDD = jsc.parallelize(ImmutableList.of(numbers, airports)); JavaEsSpark.saveToEs(javaRDD, elasticIndex); } public void readEs() { SparkConf sparkConf = new SparkConf().setAppName("writeEs").setMaster("local[*]").set("es.index.auto.create", "true") .set("es.nodes", "ELASTIC_SEARCH_IP").set("es.port", "9200").set("es.nodes.wan.only", "true"); SparkSession sparkSession = SparkSession.builder().config(sparkConf).getOrCreate(); JavaSparkContext jsc = new JavaSparkContext(sparkSession.sparkContext());//adapter JavaRDD<Map<String, Object>> searchRdd = esRDD(jsc, "index_ik_test/fulltext", "?q=中國").values(); for (Map<String, Object> item : searchRdd.collect()) { item.forEach((key, value)->{ System.out.println("search key:" + key + ", search value:" + value); }); } sparkSession.stop(); } }
DemoApplication.java 入口main類
public class DemoApplication { public static void main(String[] args) { new EsSparkTest().readEs(); } }
IDEA菜單欄:view ---> window tools --->maven projects 打開maven 側邊欄。直接雙擊package打包。
$rz -bey esdemo-1.0-SNAPSHOT.jar 將打成的jar包上傳到部署spark服務器上,使用以下命令提交運行:
~/spark-2.3.1-bin-hadoop2.7/bin/spark-submit --class DemoApplication esdemo-1.0-SNAPSHOT.jar
--class 是類的全路徑名。若是執行過程當中拋出ClassNotFoundException異常,要看一下pom.xml中指定的依賴是否在Spark安裝目錄下的 jars/ 目錄下(好比事先把Guava jar 和 elasticsearch-hadoop-6.3.2.jar 上傳到 jars/目錄下)。最終執行readEs()方法查詢獲得的文檔以下:
由於 content 字段採用的是ik_max_word
分詞模式,所以文本其中國家投資了500萬
分詞結果中包含了 中國
,從而使得這篇document被查詢到了。
後期補充:
在使用Spark 查詢ElasticSearch中數據時,因爲ElasticSearch索引user
中定義了一個日期字段,以下:
"created": { "type": "date", "format": "yyyy-MM-dd HH:mm:ss" }
致使Spark執行下面語句查詢
JavaRDD<Map<String, Object>> searchRdd = JavaEsSpark.esRDD(jsc, "user/profile", "?q=test").values(); for (Map<String, Object> item : searchRdd.collect()) { item.forEach((key, value)->{ System.out.println("search key:" + key + ", search value:" + value); }); }
反序列化構建日期對象時,報錯:
Caused by: org.elasticsearch.hadoop.EsHadoopIllegalArgumentException: Cannot invoke method public org.joda.time.DateTime org.joda.time.format.DateTimeFormatter.parseDateTime(java.lang.String)
at org.elasticsearch.hadoop.util.ReflectionUtils.invoke(ReflectionUtils.java:93)
at org.elasticsearch.hadoop.util.DateUtils$JodaTime.parseDate(DateUtils.java:105)
at org.elasticsearch.hadoop.util.DateUtils.parseDate(DateUtils.java:122)
at org.elasticsearch.hadoop.serialization.builder.JdkValueReader.parseDate(JdkValueReader.java:424)
at org.elasticsearch.hadoop.serialization.builder.JdkValueReader.date(JdkValueReader.java:412)
at org.elasticsearch.hadoop.serialization.builder.JdkValueReader.readValue(JdkValueReader.java:88)
at org.elasticsearch.hadoop.serialization.ScrollReader.parseValue(ScrollReader.java:789)
at org.elasticsearch.hadoop.serialization.ScrollReader.read(ScrollReader.java:739)
... 31 more
Caused by: java.lang.reflect.InvocationTargetException
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)
at org.elasticsearch.hadoop.util.ReflectionUtils.invoke(ReflectionUtils.java:91)
... 38 more
Caused by: java.lang.IllegalArgumentException: Invalid format: "2018-10-08 19:00:41" is malformed at " 19:00:41"
at org.joda.time.format.DateTimeFormatter.parseDateTime(DateTimeFormatter.java:945)
... 43 more
這應該是我索引中定義的日期格式是yyyy-MM-dd HH:mm:ss
,而org.joda.time.format.DateTimeFormatter
默認使用的日期格式不一樣致使的,可是又不知道在哪裏指定日期格式進行Format,因此真的是又遇到了個坑……
以下測試,joda 是支持以下格式的日期格式的:
String pattern = "yyyy-MM-dd HH:mm:ss"; String aTime = "2018-10-08 19:00:41"; DateTimeFormatter format = DateTimeFormat.forPattern(pattern); DateTime dateTime = format.parseDateTime(aTime);//no error
spark2.3中依賴的:joda的版本以下:
~/spark-2.3.1-bin-hadoop2.7/jars$ ls | grep joda joda-time-2.9.3.jar