Spark&Hive:如何使用scala開發spark訪問hive做業,如何使用yarn resourcemanager。

  • 背景:

接到任務,須要在一個一天數據量在460億條記錄的hive表中,篩選出某些host爲特定的值時才解析該條記錄的http_content中的經緯度:html

解析規則譬如:java

須要解析host: api.map.baidu.com
須要解析的規則:"result":{"location":{"lng":120.25088311933617,"lat":30.310684375444877},
"confidence":25
須要解析http_conent:renderReverse&&renderReverse({"status":0,"result":{"location":{"lng":120.25088311933617,"lat":30.310684375444877},"formatted_address":"???????????????????????????????????????","business":"","addressComponent":{"country":"??????","country_code":0,"province":"?????????","city":"?????????","district":"?????????","adcode":"330104","street":"????????????","street_number":"","direction":"","distance":""},"pois":[{"addr":"????????????5277???","cp":" ","direction":"???","distance":"68","name":"????????????????????????????????????","poiType":"????????????","point":{"x":120.25084961536486,"y":30.3112150
  • Scala代碼實現「訪問hive,並保存結果到hive表」的spark任務:

開發工具爲IDEA16,開發語言爲scala,開發包有了spark對應集羣版本下的不少個jar包,和對應集羣版本下的不少個jar包,引入jar包:git

scala代碼:sql

import java.sql.{Connection, DriverManager, PreparedStatement, Timestamp}

import org.apache.spark.SparkConf
import org.apache.spark.SparkContext
import org.apache.spark.sql.hive.HiveContext
import java.util
import java.util.{UUID, Calendar, Properties}
import org.apache.spark.rdd.JdbcRDD
import org.apache.spark.sql.{Row, SaveMode, SQLContext}
import org.apache.spark.storage.StorageLevel
import org.apache.spark.{sql, SparkContext, SparkConf}
import org.apache.spark.sql.DataFrameHolder

/**
  * temp http_content
  **/
case class Temp_Http_Content_ParserResult(success: String, lnglatType: String, longitude: String, Latitude: String, radius: String)

/**
  * Created by Administrator on 2016/11/15.
  */
object ParserMain {
  def main(args: Array[String]): Unit = {
    val conf = new SparkConf() 
    //.setAppName("XXX_ParserHttp").setMaster("local[1]").setMaster("spark://172.21.7.10:7077").setJars(List("xxx.jar"))
        //.set("spark.executor.memory", "10g")
    val sc = new SparkContext(conf)
    val hiveContext = new HiveContext(sc)

    // use abc_hive_db;
    hiveContext.sql("use abc_hive_db")
    // error date format:2016-11-15,date format must be 20161115
    val rdd = hiveContext.sql("select host,http_content from default.http where hour>='20161115' and hour<'20161116'")

    // toDF() method need this line...
    import hiveContext.implicits._

    // (success, lnglatType, longitude, latitude, radius)
    val rdd2 = rdd.map(s => parse_http_context(s.getAs[String]("host"), s.getAs[String]("http_content"))).filter(s => s._1).map(s => Temp_Http_Content_ParserResult(s._1.toString(), s._2, s._3, s._4, s._5)).toDF()
    rdd2.registerTempTable("Temp_Http_Content_ParserResult_20161115")
    hiveContext.sql("create table Temp_Http_Content_ParserResult20161115 as select * from Temp_Http_Content_ParserResult_20161115")

    sc.stop()
  }

  /**
    * @ summary: 解析http_context字段信息
    * @ param http_context 參數信息
    * @ result 1:是否匹配成功;
    * @ result 2:匹配出的是什麼經緯度的格式:
    * @ result 3:經度;
    * @ result 4:緯度,
    * @ result 5:radius
    **/
  def parse_http_context(host: String, http_context: String): (Boolean, String, String, String, String) = {
    if (host == null || http_context == null) {
      return (false, "", "", "", "")
    }

    //    val result2 = parse_http_context(「api.map.baidu.com」,"renderReverse&&renderReverse({\"status\":0,\"result\":{\"location\":{\"lng\":120.25088311933617,\"lat\":30.310684375444877},\"formatted_address\":\"???????????????????????????????????????\",\"business\":\"\",\"addressComponent\":{\"country\":\"??????\",\"country_code\":0,\"province\":\"?????????\",\"city\":\"?????????\",\"district\":\"?????????\",\"adcode\":\"330104\",\"street\":\"????????????\",\"street_number\":\"\",\"direction\":\"\",\"distance\":\"\"},\"pois\":[{\"addr\":\"????????????5277???\",\"cp\":\" \",\"direction\":\"???\",\"distance\":\"68\",\"name\":\"????????????????????????????????????\",\"poiType\":\"????????????\",\"point\":{\"x\":120.25084961536486,\"y\":30.3112150")
    //    println(result2._1 + ":" + result2._2 + ":" + result2._3 + ":" + result2._4 + ":" + result2._5)
   
    var success = false
    var lnglatType = ""
    var longitude = ""
    var latitude = ""
    var radius = ""
    var lowerCaseHost = host.toLowerCase().trim();
    val lowerCaseHttp_Content = http_context.toLowerCase()
    //    api.map.baidu.com
    //    "result":{"location":{"lng":120.25088311933617,"lat":30.310684375444877},
    //    "confidence":25
    //     --renderReverse&&renderReverse({"status":0,"result":{"location":{"lng":120.25088311933617,"lat":30.310684375444877},"formatted_address":"???????????????????????????????????????","business":"","addressComponent":{"country":"??????","country_code":0,"province":"?????????","city":"?????????","district":"?????????","adcode":"330104","street":"????????????","street_number":"","direction":"","distance":""},"pois":[{"addr":"????????????5277???","cp":" ","direction":"???","distance":"68","name":"????????????????????????????????????","poiType":"????????????","point":{"x":120.25084961536486,"y":30.3112150
    if (lowerCaseHost.equals("api.map.baidu.com")) {
      val indexLng = lowerCaseHttp_Content.indexOf("\"lng\"")
      val indexLat = lowerCaseHttp_Content.indexOf("\"lat\"")
      if (lowerCaseHttp_Content.indexOf("\"location\"") != -1 && indexLng != -1 && indexLat != -1) {
        var splitstr: String = "\\,|\\{|\\}"
        var uriItems: Array[String] = lowerCaseHttp_Content.split(splitstr)
        var tempItem: String = ""
        lnglatType = "BD"
        success = true
        for (uriItem <- uriItems) {
          tempItem = uriItem.trim()
          if (tempItem.startsWith("\"lng\":")) {
            longitude = tempItem.replace("\"lng\":", "").trim()
          } else if (tempItem.startsWith("\"lat\":")) {
            latitude = tempItem.replace("\"lat\":", "").trim()
          } else if (tempItem.startsWith("\"confidence\":")) {
            radius = tempItem.replace("\"confidence\":", "").trim()
          }
        }
      }
    }  
    else if (lowerCaseHost.equals("loc.map.baidu.com")) {
      。。。
    }

    longitude = longitude.replace("\"", "")
    latitude = latitude.replace("\"", "")
    radius = radius.replace("\"", "")

    (success, lnglatType, longitude, latitude, radius)
  }
}

打包,注意應爲咱們使用的hadoop&hive&spark on yarn的集羣,咱們這裏並不須要想spark&hadoop同樣還須要在執行spark-submit時將spark-hadoop-xx.jar打包進來,也不須要在submit-spark腳本.sh中制定jars參數,yarn會自動診斷咱們須要哪些集羣系統包;可是,若是你應用的是第三方的包,好比ab.jar,那打包時能夠打包進來,也能夠在spark-submit 參數jars後邊指定特定的包。apache

  • 寫spark-submit提交腳本.sh:

  • 當執行spark-submit腳本出現錯誤時,怎麼應對呢?

注意,咱們這裏不是spark而是spark on yarn,當咱們使用yarn-cluster方式提交時,界面是看不到任何日誌新的。咱們須要藉助yarn管理系統來查看日誌:api

一、根據返回的任務id查看歷史日誌:
yarn logs -applicationId  application_1475071482566_3329402

二、yarn頁面查看日誌app

https://xx.xx.xx.xx:xxxxx/Yarn/ResourceManager/xxxx/cluster
用戶名/密碼:user/password
 
 
三、yarn關閉application:
從yarn resourcemanger界面中,能夠查看到具體的applicationId,使用命令來殺掉該任務:
更多命令能夠參考:http://hadoop.apache.org/docs/stable/hadoop-yarn/hadoop-yarn-site/YarnCommands.html
yarn application -kill application_1475071482566_3807023

或者從界面進入spark做業進度管理界面,進行查看做業具體執行進度,也能夠kill applicationide

參考資料:
http://blog.csdn.net/sparkexpert/article/details/50964732

Spark On YARN內存分配:http://blog.javachen.com/2015/06/09/memory-in-spark-on-yarn.html?utm_source=tuicool

相關文章
相關標籤/搜索