Spark資源申請肯定內存和CPU數量案例實戰-Spark商業應用實戰

本套系列博客從真實商業環境抽取案例進行總結和分享,並給出Spark商業應用實戰指導,請持續關注本套博客。版權聲明:本套Spark商業應用實戰歸做者(秦凱新)全部,禁止轉載,歡迎學習。java

1 Spark基於Yarn模式進行資源申請

1.1 executor個數肯定

以yarn模式啓動(必須拷貝spark jar包)在yarn模式下,如何肯定executor個數,直接指定 –num-executors 這個參數便可。ios

咱們知道,使用yarn做爲cluster manager時,spark(以client模式爲例)用spark-submit提交應用程序(或者是spark-shell交互操做)不加任何資源參數時,會使用以下幾個默認配置來向yarn的resourcemanager申請container資源:web

  • spark.executor.memory     1g
  • spark.executor.cores      1
  • spark.executor.instances  2
  • spark.yarn.am.memory      512m
  • spark.yarn.am.cores       1

按照參數的默認值,yarn將會生成3個containers,第一個是container0,用來執行applicationmaster功能,另外兩個container,就是分配給spark程序的CoarseGrainedExecutorBackend. 結合上面這些默認值,咱們認爲將會佔用集羣的3個vcores,3.5G memory。 第一個問題來了,爲何memory使用的數量是5個, 爲何memory使用的數量不是想象中的3.5g呢?sql

原來,yarn對於應用程序所要申請的內存資源,有兩個參數來影響實際申請到內存容量: 第一個是yarn.scheduler.minimum-allocation-mb:最小可申請內存量,默認是1024。 第二個是規整化因子(FIFO和Capacity Scheduler時,規整化因子等於最小可申請資源量,不可單獨配置;Fair Scheduler時,規整化因子經過參數yarn.scheduler.increment-allocation-mb設置,默認是1024),其做用是應用程序申請的資源若是不是該因子的整數倍,則將被修改成最小的整數倍對應的值。 因爲每一個容器都會佔用一些額外的內存,因此致使CoarseGrainedExecutorBackend的每一個容器實際使用的內存數 > 申請的1G,在規整化因子的做用下,這些容器實際申請的內存,就會是2G;而applicationmaster所在的容器,由於申請內存不到1G,則在最小可申請內存的做用下,實際申請的內存就會是1G。shell

$SPARK_HOME/bin/spark-submit --master yarn  --deploy-mode cluster --class you.class.Name --executor-memory 1g --executor-cores 1 --num-executors 8 --driver-memory 2g  /you/jar.jar
複製代碼

1.2 同時調度cpu core和memory

yarn-site.xml
<property>
    <name>yarn.resourcemanager.scheduler.class</name>   
    <value>org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacityScheduler</value>
</property>

capacity-scheduler.xml
<property>
    <name>yarn.scheduler.capacity.resource-calculator</name>
    <value>org.apache.hadoop.yarn.util.resource.DominantResourceCalculator</value>
</property>
複製代碼

1.3 Yarn模式啓動提交腳本

spark-shell --master yarn --executor-memory 512m --num-executors 4 --executor-cores 2

./bin/spark-submit --class org.apache.spark.examples.SparkPi --master yarn --executor-memory 512m --num-executors 3 ./examples/jars/spark-examples_2.11-2.3.0.jar 1000
複製代碼

2 Spark以standalone模式進行資源申請

2.1 executor個數肯定

standlone模式下,如何肯定內存和cpu數量:公式:execuoterNum = spark.cores.max/spark.executor.cores spark.cores.max:表示整個集羣所具備的cpu內核數量,相關參數在啓動具體應用時指定apache

  • --total-executor-cores
  • --executor-cores

它們共同決定了當前應用啓動executor的個數,因此經過設置total-executor-cores,能夠決定executor的個數。app

./bin/spark-submit --class org.apache.spark.examples.SparkPi --master spark://bd-master:7077 --executor-memory 512m --num-executors 3 ./examples/jars/spark-examples_2.11-2.3.0.jar 1000

spark-shell --master spark://bd-master:7077 --total-executor-cores 40 --executor-memory 4096m --executor-cores 4
複製代碼

2.1 Spark Shell測試

sc.textFile("hdfs://bd-master:9000/user/root/input").flatMap(_.split(" ")).map(x=>(x,1)).reduceByKey(_+_).map(x=>(x._2,x._1)).sortByKey(false).map(x=>(x._2,x._1)).collect.foreach(println)

sc.textFile("hdfs://bd-master:9000/waflog").flatMap(_.split("|")).collect.take(10).foreach(println)
複製代碼

2.2 日誌模型案例測試

$remote_addr | $time_local | $request | $status | $body_bytes_sent | $bytes_sent | $gzip_ratio 
| $http_referer | $http_user_agent | $http_x_forwarded_for | $upstream_addr 
| $upstream_response_time | $upstream_status | $request_time | $host;
複製代碼

2.3:離線ETL

import java.text.SimpleDateFormat
    import org.apache.log4j.{Level, Logger}
    import org.apache.spark.{SparkConf, SparkContext}
    import java.util.{Calendar, Date}
    
    val DATE_FORMAT = new SimpleDateFormat("yyyyMMdd")
    val DATE_FORMAT_ = new SimpleDateFormat("yyyy-MM-dd")
    
    val lines = sc.textFile("/opendir/opendir/access.log-20180620")
    
    val formatedLog = lines.map(log =>{
      val logSplited = log .split("\\|")
      val eventTime = logSplited(1)
      val todayDate = DATE_FORMAT_.format(new Date().getTime)
      val cutTime = eventTime.substring(13, eventTime.length - 7)
      val dataTime = todayDate + " " + cutTime
      logSplited(1)=dataTime
    
      for(i <- 0 to (logSplited.length-1)){
        logSplited(i)=logSplited(i).trim
      }
      logSplited.mkString("@@")
    })
    
    val outpath = "hdfs://bd-master:9000/waflog/access.log-20180620"
    formatedLog.saveAsTextFile(outpath) 
複製代碼

2.4:數據修正--細化粒度

import java.sql.Timestamp
    import java.text.SimpleDateFormat
    import java.util.{Calendar, Date}
    
    val DATE_FORMAT =  new SimpleDateFormat("yyyyMMdd")
    val DATE_FORMAT_ = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss")
    private val cal: Calendar = Calendar.getInstance
    
    //日誌模型--加入小時分鐘標誌
    case class wafLogModel(remote_addr:String, time_local:Timestamp, request:String,
                             status:String, body_bytes_sent:String, bytes_sent:Long,
                             gzip_ratio:String, http_referer:String, http_user_agent:String,
                             http_x_forwarded_for:String, upstream_addr:String, upstream_response_time:String,
                             upstream_status:String, request_time:String, host:String, hour_flag:String, minute_flag:String) 
    val fileRDD = sc.textFile("hdfs://bd-master:9000/waflog/access.log-20180620")
    import spark.implicits._
    val wafLogRDD = fileRDD.filter(x=>{
            if(x.contains("\\xFAW\\xF7")) {
              false
            }else{
              true
            }
         }).map(line => line.split("@@")).map(x => {
          val ip = x(0).trim
          val urlDetails = x(2).split("/")
          var url = ""
          if (urlDetails.length == 1)  url = urlDetails(0).trim
          else if (urlDetails.length == 2) url = urlDetails(0)+" "+urlDetails(1).trim
          else if (urlDetails.length == 3) url = urlDetails(0)+" "+urlDetails(1) + "/" + urlDetails(2).trim
          else if (urlDetails.length >= 4) url = urlDetails(0)+" "+urlDetails(1) + "/" + urlDetails(2) + "/" + urlDetails(3).trim
    
          val eventTime = Timestamp.valueOf(x(1))
          val format_date = DATE_FORMAT_.format(eventTime)
          val hourflag = format_date.substring(11,13)
          val minuteflag = hourflag+":"+format_date.substring(14,16)
    
          var bytesSent = ""
          var host=""
          if(x(5).trim.equals("/error= HTTP/1.1")){
            bytesSent=x(8).trim
            host = x(17).trim
            url ="GET = ReportProductSn&LoginCode=LoginID&ProductSn=ZJSN/error= HTTP/1.1 (Exception Log)"
          }else{
            bytesSent = x(5).trim
            host = x(14).trim
          }
          val bytesSentMb:Long = bytesSent.toLong/1024/1024L;
    
          wafLogModel(x(0),eventTime, url, x(3), x(4),bytesSentMb, x(6), x(7), x(8), x(9), x(10), x(11), x(12), x(13),host,hourflag, minuteflag)
        })
複製代碼

2.5: 數據分析--域名訪問統計

val wafLogDs = wafLogRDD.toDS()
wafLogDs.createOrReplaceTempView("wafLog")
val urlStat = spark.sql("SELECT host, remote_addr, count(*) as total_count FROM wafLog group by host, remote_addr order by total_count desc limit 10")
urlStat.show

+--------------------+--------------+-----------+                               
|                host|   remote_addr|total_count|
+--------------------+--------------+-----------+
|  hcdt.dataserver.cn|192.168.100.61|      73642|
|resource.dataserv...| 58.60.228.148|      61113|
|  hcdt.dataserver.cn| 58.60.228.148|      45858|
|testuweb.dataserv...| 58.60.228.148|      44042|
|hcautotestkyj.dat...| 58.60.228.148|      42827|
|gdlmdt.dataserver.cn| 14.157.120.63|      36587|
|resource.dataserv...| 14.157.120.63|      26947|
|   cbs.dataserver.cn|192.168.100.62|      26726|
|   cbs.dataserver.cn|192.168.100.61|      26503|
|message.dataserve...| 58.60.228.148|      25739|
+--------------------+--------------+-----------+

val urlStatStore =  urlStat.map(row => row(0)+"|"+row(1)+"|"+row(2)).rdd
urlStatStore.saveAsTextFile("/wafResult/20180620");

hcdt.dataserver.cn|192.168.100.61|73642
resource.dataserver.cn|58.60.228.148|61113
hcdt.dataserver.cn|58.60.228.148|45858
testuweb.dataserver.cn|58.60.228.148|44042
hcautotestkyj.dataserver.cn|58.60.228.148|42827
gdlmdt.dataserver.cn|14.157.120.63|36587
resource.dataserver.cn|14.157.120.63|26947
cbs.dataserver.cn|192.168.100.62|26726
cbs.dataserver.cn|192.168.100.61|26503
message.dataserver.cn|58.60.228.148|25739

case class urlStatModel(host:String,remote_addr:String,total_count:String)
urlStat.as[urlStatModel].map(urlStat => urlStat.host+"|"+urlStat.remote_addr+"|"+urlStat.total_count).rdd.saveAsTextFile("/wafResult2/20180620");

hcdt.dataserver.cn|192.168.100.61|73642
resource.dataserver.cn|58.60.228.148|61113
hcdt.dataserver.cn|58.60.228.148|45858
testuweb.dataserver.cn|58.60.228.148|44042
hcautotestkyj.dataserver.cn|58.60.228.148|42827
gdlmdt.dataserver.cn|14.157.120.63|36587
resource.dataserver.cn|14.157.120.63|26947
cbs.dataserver.cn|192.168.100.62|26726
cbs.dataserver.cn|192.168.100.61|26503
message.dataserver.cn|58.60.228.148|25739
複製代碼

2.6: Hive數據建模:

create table accesslog(
host string, 
remote_addr string,
total_count bigint
)row format delimited fields terminated by '|';

從hdfs導入到hive
load data inpath '/wafResult/20180620' overwrite into table accesslog;
複製代碼

2.7: 數據分析--流量統計

val bytesStat = spark.sql("SELECT host, remote_addr, request, max(bytes_sent) as max_byte FROM wafLog group by host,remote_addr,request order by max_byte desc limit 10")
bytesStat.show
+--------------------+--------------+--------------------+--------+             
|                host|   remote_addr|             request|max_byte|
+--------------------+--------------+--------------------+--------+
|resource.dataserv...|27.123.214.103|GET  download/bro...|      42|
|  hcdt.dataserver.cn|61.178.233.112|GET  1.1/componen...|      40|
|qdakfhdt.dataserv...| 58.56.156.190|GET  1.1/componen...|      40|
|westdt.dataserver.cn|222.179.116.10|GET  1.1/componen...|      40|
|security.dataserv...| 119.23.123.17|GET  iosDeploy/el...|      28|
|bestlink.dataserv...|180.97.106.135|GET  /uploadfile/APP|      22|
|security.dataserv...| 112.17.244.69|GET  iosDeploy/uw...|      17|
|greatdt.dataserve...| 58.210.39.230|GET  monitor/webs...|      16|
|  rdts.dataserver.cn| 61.130.49.162|GET  rdts?ip=192....|      15|
|security.dataserv...| 119.23.123.25|GET  iosDeploy/ca...|      13|
+--------------------+--------------+--------------------+--------+
複製代碼

2.8: 數據分析--按小時進行訪問次數統計

val urlStat = spark.sql("SELECT hour_flag, host, remote_addr, count(*) as total_count FROM wafLog group by hour_flag,host,remote_addr order by total_count desc limit 50")
urlStat.show
+---------+--------------------+--------------+-----------+                     
|hour_flag|                host|   remote_addr|total_count|
+---------+--------------------+--------------+-----------+
|       13|  hcdt.dataserver.cn| 58.60.228.148|       8650|
|       08|  hcdt.dataserver.cn| 58.60.228.148|       8606|
|       21|sdryer2.dataserve...|171.213.124.37|       8324|
|       04|  hcdt.dataserver.cn|192.168.100.61|       7162|
|       05|  hcdt.dataserver.cn|192.168.100.61|       7144|
|       12|  hcdt.dataserver.cn|192.168.100.61|       7131|
|       13|  hcdt.dataserver.cn|192.168.100.61|       7108|
|       20|  hcdt.dataserver.cn|192.168.100.61|       7106|
|       21|  hcdt.dataserver.cn|192.168.100.61|       7083|
|       11|  hcdt.dataserver.cn|192.168.100.61|       6068|
|       03|  hcdt.dataserver.cn|192.168.100.61|       6064|
|       19|  hcdt.dataserver.cn|192.168.100.61|       6029|
|       09|gdlmdt.dataserver.cn| 14.157.120.63|       5557|
|       10|gdlmdt.dataserver.cn| 14.157.120.63|       5297|
|       14|gdlmdt.dataserver.cn| 14.157.120.63|       4148|
|       13|gdlmdt.dataserver.cn| 14.157.120.63|       4140|
|       14|  hcdt.dataserver.cn|192.168.100.61|       3867|
|       12|gdlmdt.dataserver.cn| 14.157.120.63|       3789|
|       11|gdlmdt.dataserver.cn| 14.157.120.63|       3771|
|       15|gdlmdt.dataserver.cn| 14.157.120.63|       3756|
+---------+--------------------+--------------+-----------+
複製代碼

2.9: 數據分析--按小時進行接口流量統計

val bytesStat = spark.sql("SELECT hour_flag,host,remote_addr,request,max(bytes_sent) as max_byte FROM wafLog group by hour_flag, host, remote_addr, request order by max_byte desc limit 50")
 
 bytesStat.show
+---------+--------------------+---------------+--------------------+--------+  
|hour_flag|                host|    remote_addr|             request|max_byte|
+---------+--------------------+---------------+--------------------+--------+
|       15|resource.dataserv...| 27.123.214.103|GET  download/bro...|      42|
|       09|westdt.dataserver.cn| 222.179.116.10|GET  1.1/componen...|      40|
|       15|qdakfhdt.dataserv...|  58.56.156.190|GET  1.1/componen...|      40|
|       09|  hcdt.dataserver.cn| 61.178.233.112|GET  1.1/componen...|      40|
|       11|security.dataserv...|  119.23.123.17|GET  iosDeploy/el...|      28|
|       09|security.dataserv...|  119.23.123.17|GET  iosDeploy/el...|      28|
|       11|westdt.dataserver.cn| 222.179.116.10|GET  1.1/componen...|      27|
|       23|bestlink.dataserv...| 180.97.106.135|GET  /uploadfile/APP|      22|
|       11|security.dataserv...|  112.17.244.69|GET  iosDeploy/uw...|      17|
|       07|greatdt.dataserve...|  58.210.39.230|GET  monitor/webs...|      16|
|       16|  rdts.dataserver.cn|  61.130.49.162|GET  rdts?ip=192....|      15|
|       16|security.dataserv...|  119.23.123.25|GET  iosDeploy/ca...|      13|
|       16|  rdts.dataserver.cn|  61.130.49.162|GET  rdts?ip=192....|      11|
|       23|  rdts.dataserver.cn|  183.33.59.157|GET  rdts?ip=192....|      11|
|       14|  rdts.dataserver.cn|  61.130.49.162|GET  rdts?ip=192....|      11|
|       21|bestlink.dataserv...|  123.125.71.74|GET  uploadfile/A...|       9|
|       13|hcuweb.dataserver.cn| 27.123.214.107|GET  uploadfile/A...|       9|
|       18|  hnks.dataserver.cn| 122.192.15.137|GET  uploadfile/s...|       9|
|       16|hcuweb.dataserver.cn|   122.192.13.2|GET  /uploadfile/...|       9|
|       07|bestlink.dataserv...|211.138.116.246|GET  /uploadfile/...|       8|
+---------+--------------------+---------------+--------------------+--------+
複製代碼

2.10: 數據分析--按分鐘進行訪問次數統計

val urlStat = spark.sql("SELECT minute_flag,host,remote_addr,request, count(*) as total_count FROM wafLog group by minute_flag, host, remote_addr, request order by total_count desc limit 50")

urlStat.show
+-----------+--------------------+--------------+--------------------+-----------+
|minute_flag|                host|   remote_addr|             request|total_count|
+-----------+--------------------+--------------+--------------------+-----------+
|      21:33|sdryer2.dataserve...|171.213.124.37|GET  itas-app/web...|        304|
|      21:37|sdryer2.dataserve...|171.213.124.37|GET  itas-app/web...|        302|
|      21:35|sdryer2.dataserve...|171.213.124.37|GET  itas-app/web...|        302|
|      21:34|sdryer2.dataserve...|171.213.124.37|GET  itas-app/web...|        302|
|      22:00|sdryer2.dataserve...|171.213.124.37|GET  itas-app/web...|        299|
|      22:01|sdryer2.dataserve...|171.213.124.37|GET  itas-app/web...|        298|
|      21:36|sdryer2.dataserve...|171.213.124.37|GET  itas-app/web...|        296|
|      22:02|sdryer2.dataserve...|171.213.124.37|GET  itas-app/web...|        293|
|      21:39|sdryer2.dataserve...|171.213.124.37|GET  itas-app/web...|        293|
|      21:40|sdryer2.dataserve...|171.213.124.37|GET  itas-app/web...|        292|
|      21:55|sdryer2.dataserve...|171.213.124.37|GET  itas-app/web...|        292|
|      21:53|sdryer2.dataserve...|171.213.124.37|GET  itas-app/web...|        292|
|      21:52|sdryer2.dataserve...|171.213.124.37|GET  itas-app/web...|        289|
|      21:31|sdryer2.dataserve...|171.213.124.37|GET  itas-app/web...|        288|
|      21:58|sdryer2.dataserve...|171.213.124.37|GET  itas-app/web...|        288|
|      21:38|sdryer2.dataserve...|171.213.124.37|GET  itas-app/web...|        286|
|      21:42|sdryer2.dataserve...|171.213.124.37|GET  itas-app/web...|        286|
|      21:48|sdryer2.dataserve...|171.213.124.37|GET  itas-app/web...|        284|
|      21:59|sdryer2.dataserve...|171.213.124.37|GET  itas-app/web...|        282|
|      21:54|sdryer2.dataserve...|171.213.124.37|GET  itas-app/web...|        280|
+-----------+--------------------+--------------+--------------------+-----------+
複製代碼

2.20: 數據分析--按分鐘進行接口流量統計

val bytesStat = spark.sql("SELECT minute_flag,host,remote_addr,request,max(bytes_sent) as max_byte FROM wafLog group by minute_flag, host, remote_addr, request order by max_byte desc limit 50")

bytesStat.show
+-----------+--------------------+---------------+--------------------+--------+
|minute_flag|                host|    remote_addr|             request|max_byte|
+-----------+--------------------+---------------+--------------------+--------+
|      15:21|resource.dataserv...| 27.123.214.103|GET  download/bro...|      42|
|      09:29|  hcdt.dataserver.cn| 61.178.233.112|GET  1.1/componen...|      40|
|      09:42|westdt.dataserver.cn| 222.179.116.10|GET  1.1/componen...|      40|
|      15:58|qdakfhdt.dataserv...|  58.56.156.190|GET  1.1/componen...|      40|
|      11:49|security.dataserv...|  119.23.123.17|GET  iosDeploy/el...|      28|
|      09:21|security.dataserv...|  119.23.123.17|GET  iosDeploy/el...|      28|
|      11:03|westdt.dataserver.cn| 222.179.116.10|GET  1.1/componen...|      27|
|      23:31|bestlink.dataserv...| 180.97.106.135|GET  /uploadfile/APP|      22|
|      11:06|security.dataserv...|  112.17.244.69|GET  iosDeploy/uw...|      17|
|      07:51|greatdt.dataserve...|  58.210.39.230|GET  monitor/webs...|      16|
|      16:35|  rdts.dataserver.cn|  61.130.49.162|GET  rdts?ip=192....|      15|
|      16:41|security.dataserv...|  119.23.123.25|GET  iosDeploy/ca...|      13|
|      14:01|  rdts.dataserver.cn|  61.130.49.162|GET  rdts?ip=192....|      11|
|      23:00|  rdts.dataserver.cn|  183.33.59.157|GET  rdts?ip=192....|      11|
|      16:35|  rdts.dataserver.cn|  61.130.49.162|GET  rdts?ip=192....|      11|
|      18:37|  hnks.dataserver.cn| 122.192.15.137|GET  uploadfile/s...|       9|
|      21:40|bestlink.dataserv...|  123.125.71.74|GET  uploadfile/A...|       9|
|      16:12|hcuweb.dataserver.cn|   122.192.13.2|GET  /uploadfile/...|       9|
|      13:02|hcuweb.dataserver.cn| 27.123.214.107|GET  uploadfile/A...|       9|
|      07:56|bestlink.dataserv...|211.138.116.246|GET  /uploadfile/...|       8|
+-----------+--------------------+---------------+--------------------+--------+
複製代碼

3: 總結

秦凱新 於深圳oop

相關文章
相關標籤/搜索