<dependencies> <dependency> <groupId>org.elasticsearch</groupId> <artifactId>elasticsearch-hadoop</artifactId> <version>2.4.0</version> <scope>provided</scope> <exclusions> <exclusion> <groupId>org.apache.spark</groupId> <artifactId>spark-core_2.10</artifactId> </exclusion> <exclusion> <groupId>org.apache.spark</groupId> <artifactId>spark-sql_2.10</artifactId> </exclusion> <exclusion> <groupId>org.apache.storm</groupId> <artifactId>storm-core</artifactId> </exclusion> <exclusion> <groupId>cascading</groupId> <artifactId>cascading-hadoop</artifactId> </exclusion> </exclusions> </dependency> </dependencies> <repositories> <repository> <id>cloudera-repos</id> <name>Cloudera Repos</name> <url>https://repository.cloudera.com/artifactory/cloudera-repos/</url> </repository> <repository> <id>Akka repository</id> <url>http://repo.akka.io/releases</url> </repository> <repository> <id>jboss</id> <url>http://repository.jboss.org/nexus/content/groups/public-jboss</url> </repository> <repository> <id>Sonatype snapshots</id> <url>http://oss.sonatype.org/content/repositories/snapshots/</url> </repository> <repository> <id>sonatype-oss</id> <url>http://oss.sonatype.org/content/repositories/snapshots</url> <snapshots><enabled>true</enabled></snapshots> </repository> </repositories>
import org.elasticsearch.spark.sql._ def main(args: Array[String]): Unit ={ val conf = new SparkConf() conf.setAppName("Spark Action ElasticSearch") conf.set("es.index.auto.create", "true") conf.set("es.nodes","192.168.1.11") conf.set("es.port","9200") val sc: SparkContext = new SparkContext(conf) val sqlContext = new HiveContext(sc) val df: DataFrame = sqlContext.sql("select * from info limit 50") //保存數據到ES df.saveToEs("myindex/info") 從ES中讀取數據 val esdf = sqlContext.read.format("org.elasticsearch.spark.sql").load("myindex/info") esdf.count sc.stop() }
/opt/cloudera/parcels/CDH-5.5.0-1.cdh5.5.0.p0.8/jars/elasticsearch-hadoop-2.4.0.jar
spark-shell --master yarn --conf spark.es.nodes=192.168.1.11 spark.es.port=9200 spark.es.index.auto.create=true
<dependency> <groupId>org.elasticsearch</groupId> <artifactId>elasticsearch</artifactId> <version>2.4.0</version> </dependency> <dependency> <groupId>org.nlpcn</groupId> <artifactId>elasticsearch-sql</artifactId> <version>2.4.0</version> </dependency> <dependency> <groupId>com.alibaba</groupId> <artifactId>druid</artifactId> <version>1.0.15</version> </dependency> <dependency> <groupId>mysql</groupId> <artifactId>mysql-connector-java</artifactId> <version>5.1.35</version> </dependency>
public static void query(){ try { Connection connection = getConnection(); String sql = "select * from bigdata/student where usertype > 5 limit 5"; PreparedStatement ps = connection.prepareStatement(sql); ResultSet rs = ps.executeQuery(); while(rs.next()){ System.out.println(rs.getString("_id") +" "+rs.getString("recordtime") +" "+rs.getInt("area")+" "+rs.getInt("usertype")+" "+rs.getInt("count")); } ps.close(); connection.close(); } catch (Exception e) { e.printStackTrace(); } } /** * 獲取 ES jdbc鏈接 */ public static Connection getConnection() throws Exception{ String url = "jdbc:elasticsearch://192.168.1.11:9300"; Properties properties = new Properties(); properties.put("url", url); DruidDataSource dds = (DruidDataSource) ElasticSearchDruidDataSourceFactory.createDataSource(properties); Connection connection = dds.getConnection(); return connection; }
官網參考資料:Elasticsearch for Apache Hadoophtml