本套系列博客從真實商業環境抽取案例進行總結和分享,並給出Spark商業應用實戰指導,請持續關注本套博客。版權聲明:本套Spark商業應用實戰歸做者(秦凱新)全部,禁止轉載,歡迎學習。java
以yarn模式啓動(必須拷貝spark jar包)在yarn模式下,如何肯定executor個數,直接指定 –num-executors 這個參數便可。ios
咱們知道,使用yarn做爲cluster manager時,spark(以client模式爲例)用spark-submit提交應用程序(或者是spark-shell交互操做)不加任何資源參數時,會使用以下幾個默認配置來向yarn的resourcemanager申請container資源:web
按照參數的默認值,yarn將會生成3個containers,第一個是container0,用來執行applicationmaster功能,另外兩個container,就是分配給spark程序的CoarseGrainedExecutorBackend. 結合上面這些默認值,咱們認爲將會佔用集羣的3個vcores,3.5G memory。 第一個問題來了,爲何memory使用的數量是5個, 爲何memory使用的數量不是想象中的3.5g呢?sql
原來,yarn對於應用程序所要申請的內存資源,有兩個參數來影響實際申請到內存容量: 第一個是yarn.scheduler.minimum-allocation-mb:最小可申請內存量,默認是1024。 第二個是規整化因子(FIFO和Capacity Scheduler時,規整化因子等於最小可申請資源量,不可單獨配置;Fair Scheduler時,規整化因子經過參數yarn.scheduler.increment-allocation-mb設置,默認是1024),其做用是應用程序申請的資源若是不是該因子的整數倍,則將被修改成最小的整數倍對應的值。 因爲每一個容器都會佔用一些額外的內存,因此致使CoarseGrainedExecutorBackend的每一個容器實際使用的內存數 > 申請的1G,在規整化因子的做用下,這些容器實際申請的內存,就會是2G;而applicationmaster所在的容器,由於申請內存不到1G,則在最小可申請內存的做用下,實際申請的內存就會是1G。shell
$SPARK_HOME/bin/spark-submit --master yarn --deploy-mode cluster --class you.class.Name --executor-memory 1g --executor-cores 1 --num-executors 8 --driver-memory 2g /you/jar.jar
複製代碼
yarn-site.xml
<property>
<name>yarn.resourcemanager.scheduler.class</name>
<value>org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacityScheduler</value>
</property>
capacity-scheduler.xml
<property>
<name>yarn.scheduler.capacity.resource-calculator</name>
<value>org.apache.hadoop.yarn.util.resource.DominantResourceCalculator</value>
</property>
複製代碼
spark-shell --master yarn --executor-memory 512m --num-executors 4 --executor-cores 2
./bin/spark-submit --class org.apache.spark.examples.SparkPi --master yarn --executor-memory 512m --num-executors 3 ./examples/jars/spark-examples_2.11-2.3.0.jar 1000
複製代碼
standlone模式下,如何肯定內存和cpu數量:公式:execuoterNum = spark.cores.max/spark.executor.cores spark.cores.max:表示整個集羣所具備的cpu內核數量,相關參數在啓動具體應用時指定apache
它們共同決定了當前應用啓動executor的個數,因此經過設置total-executor-cores,能夠決定executor的個數。app
./bin/spark-submit --class org.apache.spark.examples.SparkPi --master spark://bd-master:7077 --executor-memory 512m --num-executors 3 ./examples/jars/spark-examples_2.11-2.3.0.jar 1000
spark-shell --master spark://bd-master:7077 --total-executor-cores 40 --executor-memory 4096m --executor-cores 4
複製代碼
sc.textFile("hdfs://bd-master:9000/user/root/input").flatMap(_.split(" ")).map(x=>(x,1)).reduceByKey(_+_).map(x=>(x._2,x._1)).sortByKey(false).map(x=>(x._2,x._1)).collect.foreach(println)
sc.textFile("hdfs://bd-master:9000/waflog").flatMap(_.split("|")).collect.take(10).foreach(println)
複製代碼
$remote_addr | $time_local | $request | $status | $body_bytes_sent | $bytes_sent | $gzip_ratio
| $http_referer | $http_user_agent | $http_x_forwarded_for | $upstream_addr
| $upstream_response_time | $upstream_status | $request_time | $host;
複製代碼
import java.text.SimpleDateFormat
import org.apache.log4j.{Level, Logger}
import org.apache.spark.{SparkConf, SparkContext}
import java.util.{Calendar, Date}
val DATE_FORMAT = new SimpleDateFormat("yyyyMMdd")
val DATE_FORMAT_ = new SimpleDateFormat("yyyy-MM-dd")
val lines = sc.textFile("/opendir/opendir/access.log-20180620")
val formatedLog = lines.map(log =>{
val logSplited = log .split("\\|")
val eventTime = logSplited(1)
val todayDate = DATE_FORMAT_.format(new Date().getTime)
val cutTime = eventTime.substring(13, eventTime.length - 7)
val dataTime = todayDate + " " + cutTime
logSplited(1)=dataTime
for(i <- 0 to (logSplited.length-1)){
logSplited(i)=logSplited(i).trim
}
logSplited.mkString("@@")
})
val outpath = "hdfs://bd-master:9000/waflog/access.log-20180620"
formatedLog.saveAsTextFile(outpath)
複製代碼
import java.sql.Timestamp
import java.text.SimpleDateFormat
import java.util.{Calendar, Date}
val DATE_FORMAT = new SimpleDateFormat("yyyyMMdd")
val DATE_FORMAT_ = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss")
private val cal: Calendar = Calendar.getInstance
//日誌模型--加入小時分鐘標誌
case class wafLogModel(remote_addr:String, time_local:Timestamp, request:String,
status:String, body_bytes_sent:String, bytes_sent:Long,
gzip_ratio:String, http_referer:String, http_user_agent:String,
http_x_forwarded_for:String, upstream_addr:String, upstream_response_time:String,
upstream_status:String, request_time:String, host:String, hour_flag:String, minute_flag:String)
val fileRDD = sc.textFile("hdfs://bd-master:9000/waflog/access.log-20180620")
import spark.implicits._
val wafLogRDD = fileRDD.filter(x=>{
if(x.contains("\\xFAW\\xF7")) {
false
}else{
true
}
}).map(line => line.split("@@")).map(x => {
val ip = x(0).trim
val urlDetails = x(2).split("/")
var url = ""
if (urlDetails.length == 1) url = urlDetails(0).trim
else if (urlDetails.length == 2) url = urlDetails(0)+" "+urlDetails(1).trim
else if (urlDetails.length == 3) url = urlDetails(0)+" "+urlDetails(1) + "/" + urlDetails(2).trim
else if (urlDetails.length >= 4) url = urlDetails(0)+" "+urlDetails(1) + "/" + urlDetails(2) + "/" + urlDetails(3).trim
val eventTime = Timestamp.valueOf(x(1))
val format_date = DATE_FORMAT_.format(eventTime)
val hourflag = format_date.substring(11,13)
val minuteflag = hourflag+":"+format_date.substring(14,16)
var bytesSent = ""
var host=""
if(x(5).trim.equals("/error= HTTP/1.1")){
bytesSent=x(8).trim
host = x(17).trim
url ="GET = ReportProductSn&LoginCode=LoginID&ProductSn=ZJSN/error= HTTP/1.1 (Exception Log)"
}else{
bytesSent = x(5).trim
host = x(14).trim
}
val bytesSentMb:Long = bytesSent.toLong/1024/1024L;
wafLogModel(x(0),eventTime, url, x(3), x(4),bytesSentMb, x(6), x(7), x(8), x(9), x(10), x(11), x(12), x(13),host,hourflag, minuteflag)
})
複製代碼
val wafLogDs = wafLogRDD.toDS()
wafLogDs.createOrReplaceTempView("wafLog")
val urlStat = spark.sql("SELECT host, remote_addr, count(*) as total_count FROM wafLog group by host, remote_addr order by total_count desc limit 10")
urlStat.show
+--------------------+--------------+-----------+
| host| remote_addr|total_count|
+--------------------+--------------+-----------+
| hcdt.dataserver.cn|192.168.100.61| 73642|
|resource.dataserv...| 58.60.228.148| 61113|
| hcdt.dataserver.cn| 58.60.228.148| 45858|
|testuweb.dataserv...| 58.60.228.148| 44042|
|hcautotestkyj.dat...| 58.60.228.148| 42827|
|gdlmdt.dataserver.cn| 14.157.120.63| 36587|
|resource.dataserv...| 14.157.120.63| 26947|
| cbs.dataserver.cn|192.168.100.62| 26726|
| cbs.dataserver.cn|192.168.100.61| 26503|
|message.dataserve...| 58.60.228.148| 25739|
+--------------------+--------------+-----------+
val urlStatStore = urlStat.map(row => row(0)+"|"+row(1)+"|"+row(2)).rdd
urlStatStore.saveAsTextFile("/wafResult/20180620");
hcdt.dataserver.cn|192.168.100.61|73642
resource.dataserver.cn|58.60.228.148|61113
hcdt.dataserver.cn|58.60.228.148|45858
testuweb.dataserver.cn|58.60.228.148|44042
hcautotestkyj.dataserver.cn|58.60.228.148|42827
gdlmdt.dataserver.cn|14.157.120.63|36587
resource.dataserver.cn|14.157.120.63|26947
cbs.dataserver.cn|192.168.100.62|26726
cbs.dataserver.cn|192.168.100.61|26503
message.dataserver.cn|58.60.228.148|25739
case class urlStatModel(host:String,remote_addr:String,total_count:String)
urlStat.as[urlStatModel].map(urlStat => urlStat.host+"|"+urlStat.remote_addr+"|"+urlStat.total_count).rdd.saveAsTextFile("/wafResult2/20180620");
hcdt.dataserver.cn|192.168.100.61|73642
resource.dataserver.cn|58.60.228.148|61113
hcdt.dataserver.cn|58.60.228.148|45858
testuweb.dataserver.cn|58.60.228.148|44042
hcautotestkyj.dataserver.cn|58.60.228.148|42827
gdlmdt.dataserver.cn|14.157.120.63|36587
resource.dataserver.cn|14.157.120.63|26947
cbs.dataserver.cn|192.168.100.62|26726
cbs.dataserver.cn|192.168.100.61|26503
message.dataserver.cn|58.60.228.148|25739
複製代碼
create table accesslog(
host string,
remote_addr string,
total_count bigint
)row format delimited fields terminated by '|';
從hdfs導入到hive
load data inpath '/wafResult/20180620' overwrite into table accesslog;
複製代碼
val bytesStat = spark.sql("SELECT host, remote_addr, request, max(bytes_sent) as max_byte FROM wafLog group by host,remote_addr,request order by max_byte desc limit 10")
bytesStat.show
+--------------------+--------------+--------------------+--------+
| host| remote_addr| request|max_byte|
+--------------------+--------------+--------------------+--------+
|resource.dataserv...|27.123.214.103|GET download/bro...| 42|
| hcdt.dataserver.cn|61.178.233.112|GET 1.1/componen...| 40|
|qdakfhdt.dataserv...| 58.56.156.190|GET 1.1/componen...| 40|
|westdt.dataserver.cn|222.179.116.10|GET 1.1/componen...| 40|
|security.dataserv...| 119.23.123.17|GET iosDeploy/el...| 28|
|bestlink.dataserv...|180.97.106.135|GET /uploadfile/APP| 22|
|security.dataserv...| 112.17.244.69|GET iosDeploy/uw...| 17|
|greatdt.dataserve...| 58.210.39.230|GET monitor/webs...| 16|
| rdts.dataserver.cn| 61.130.49.162|GET rdts?ip=192....| 15|
|security.dataserv...| 119.23.123.25|GET iosDeploy/ca...| 13|
+--------------------+--------------+--------------------+--------+
複製代碼
val urlStat = spark.sql("SELECT hour_flag, host, remote_addr, count(*) as total_count FROM wafLog group by hour_flag,host,remote_addr order by total_count desc limit 50")
urlStat.show
+---------+--------------------+--------------+-----------+
|hour_flag| host| remote_addr|total_count|
+---------+--------------------+--------------+-----------+
| 13| hcdt.dataserver.cn| 58.60.228.148| 8650|
| 08| hcdt.dataserver.cn| 58.60.228.148| 8606|
| 21|sdryer2.dataserve...|171.213.124.37| 8324|
| 04| hcdt.dataserver.cn|192.168.100.61| 7162|
| 05| hcdt.dataserver.cn|192.168.100.61| 7144|
| 12| hcdt.dataserver.cn|192.168.100.61| 7131|
| 13| hcdt.dataserver.cn|192.168.100.61| 7108|
| 20| hcdt.dataserver.cn|192.168.100.61| 7106|
| 21| hcdt.dataserver.cn|192.168.100.61| 7083|
| 11| hcdt.dataserver.cn|192.168.100.61| 6068|
| 03| hcdt.dataserver.cn|192.168.100.61| 6064|
| 19| hcdt.dataserver.cn|192.168.100.61| 6029|
| 09|gdlmdt.dataserver.cn| 14.157.120.63| 5557|
| 10|gdlmdt.dataserver.cn| 14.157.120.63| 5297|
| 14|gdlmdt.dataserver.cn| 14.157.120.63| 4148|
| 13|gdlmdt.dataserver.cn| 14.157.120.63| 4140|
| 14| hcdt.dataserver.cn|192.168.100.61| 3867|
| 12|gdlmdt.dataserver.cn| 14.157.120.63| 3789|
| 11|gdlmdt.dataserver.cn| 14.157.120.63| 3771|
| 15|gdlmdt.dataserver.cn| 14.157.120.63| 3756|
+---------+--------------------+--------------+-----------+
複製代碼
val bytesStat = spark.sql("SELECT hour_flag,host,remote_addr,request,max(bytes_sent) as max_byte FROM wafLog group by hour_flag, host, remote_addr, request order by max_byte desc limit 50")
bytesStat.show
+---------+--------------------+---------------+--------------------+--------+
|hour_flag| host| remote_addr| request|max_byte|
+---------+--------------------+---------------+--------------------+--------+
| 15|resource.dataserv...| 27.123.214.103|GET download/bro...| 42|
| 09|westdt.dataserver.cn| 222.179.116.10|GET 1.1/componen...| 40|
| 15|qdakfhdt.dataserv...| 58.56.156.190|GET 1.1/componen...| 40|
| 09| hcdt.dataserver.cn| 61.178.233.112|GET 1.1/componen...| 40|
| 11|security.dataserv...| 119.23.123.17|GET iosDeploy/el...| 28|
| 09|security.dataserv...| 119.23.123.17|GET iosDeploy/el...| 28|
| 11|westdt.dataserver.cn| 222.179.116.10|GET 1.1/componen...| 27|
| 23|bestlink.dataserv...| 180.97.106.135|GET /uploadfile/APP| 22|
| 11|security.dataserv...| 112.17.244.69|GET iosDeploy/uw...| 17|
| 07|greatdt.dataserve...| 58.210.39.230|GET monitor/webs...| 16|
| 16| rdts.dataserver.cn| 61.130.49.162|GET rdts?ip=192....| 15|
| 16|security.dataserv...| 119.23.123.25|GET iosDeploy/ca...| 13|
| 16| rdts.dataserver.cn| 61.130.49.162|GET rdts?ip=192....| 11|
| 23| rdts.dataserver.cn| 183.33.59.157|GET rdts?ip=192....| 11|
| 14| rdts.dataserver.cn| 61.130.49.162|GET rdts?ip=192....| 11|
| 21|bestlink.dataserv...| 123.125.71.74|GET uploadfile/A...| 9|
| 13|hcuweb.dataserver.cn| 27.123.214.107|GET uploadfile/A...| 9|
| 18| hnks.dataserver.cn| 122.192.15.137|GET uploadfile/s...| 9|
| 16|hcuweb.dataserver.cn| 122.192.13.2|GET /uploadfile/...| 9|
| 07|bestlink.dataserv...|211.138.116.246|GET /uploadfile/...| 8|
+---------+--------------------+---------------+--------------------+--------+
複製代碼
val urlStat = spark.sql("SELECT minute_flag,host,remote_addr,request, count(*) as total_count FROM wafLog group by minute_flag, host, remote_addr, request order by total_count desc limit 50")
urlStat.show
+-----------+--------------------+--------------+--------------------+-----------+
|minute_flag| host| remote_addr| request|total_count|
+-----------+--------------------+--------------+--------------------+-----------+
| 21:33|sdryer2.dataserve...|171.213.124.37|GET itas-app/web...| 304|
| 21:37|sdryer2.dataserve...|171.213.124.37|GET itas-app/web...| 302|
| 21:35|sdryer2.dataserve...|171.213.124.37|GET itas-app/web...| 302|
| 21:34|sdryer2.dataserve...|171.213.124.37|GET itas-app/web...| 302|
| 22:00|sdryer2.dataserve...|171.213.124.37|GET itas-app/web...| 299|
| 22:01|sdryer2.dataserve...|171.213.124.37|GET itas-app/web...| 298|
| 21:36|sdryer2.dataserve...|171.213.124.37|GET itas-app/web...| 296|
| 22:02|sdryer2.dataserve...|171.213.124.37|GET itas-app/web...| 293|
| 21:39|sdryer2.dataserve...|171.213.124.37|GET itas-app/web...| 293|
| 21:40|sdryer2.dataserve...|171.213.124.37|GET itas-app/web...| 292|
| 21:55|sdryer2.dataserve...|171.213.124.37|GET itas-app/web...| 292|
| 21:53|sdryer2.dataserve...|171.213.124.37|GET itas-app/web...| 292|
| 21:52|sdryer2.dataserve...|171.213.124.37|GET itas-app/web...| 289|
| 21:31|sdryer2.dataserve...|171.213.124.37|GET itas-app/web...| 288|
| 21:58|sdryer2.dataserve...|171.213.124.37|GET itas-app/web...| 288|
| 21:38|sdryer2.dataserve...|171.213.124.37|GET itas-app/web...| 286|
| 21:42|sdryer2.dataserve...|171.213.124.37|GET itas-app/web...| 286|
| 21:48|sdryer2.dataserve...|171.213.124.37|GET itas-app/web...| 284|
| 21:59|sdryer2.dataserve...|171.213.124.37|GET itas-app/web...| 282|
| 21:54|sdryer2.dataserve...|171.213.124.37|GET itas-app/web...| 280|
+-----------+--------------------+--------------+--------------------+-----------+
複製代碼
val bytesStat = spark.sql("SELECT minute_flag,host,remote_addr,request,max(bytes_sent) as max_byte FROM wafLog group by minute_flag, host, remote_addr, request order by max_byte desc limit 50")
bytesStat.show
+-----------+--------------------+---------------+--------------------+--------+
|minute_flag| host| remote_addr| request|max_byte|
+-----------+--------------------+---------------+--------------------+--------+
| 15:21|resource.dataserv...| 27.123.214.103|GET download/bro...| 42|
| 09:29| hcdt.dataserver.cn| 61.178.233.112|GET 1.1/componen...| 40|
| 09:42|westdt.dataserver.cn| 222.179.116.10|GET 1.1/componen...| 40|
| 15:58|qdakfhdt.dataserv...| 58.56.156.190|GET 1.1/componen...| 40|
| 11:49|security.dataserv...| 119.23.123.17|GET iosDeploy/el...| 28|
| 09:21|security.dataserv...| 119.23.123.17|GET iosDeploy/el...| 28|
| 11:03|westdt.dataserver.cn| 222.179.116.10|GET 1.1/componen...| 27|
| 23:31|bestlink.dataserv...| 180.97.106.135|GET /uploadfile/APP| 22|
| 11:06|security.dataserv...| 112.17.244.69|GET iosDeploy/uw...| 17|
| 07:51|greatdt.dataserve...| 58.210.39.230|GET monitor/webs...| 16|
| 16:35| rdts.dataserver.cn| 61.130.49.162|GET rdts?ip=192....| 15|
| 16:41|security.dataserv...| 119.23.123.25|GET iosDeploy/ca...| 13|
| 14:01| rdts.dataserver.cn| 61.130.49.162|GET rdts?ip=192....| 11|
| 23:00| rdts.dataserver.cn| 183.33.59.157|GET rdts?ip=192....| 11|
| 16:35| rdts.dataserver.cn| 61.130.49.162|GET rdts?ip=192....| 11|
| 18:37| hnks.dataserver.cn| 122.192.15.137|GET uploadfile/s...| 9|
| 21:40|bestlink.dataserv...| 123.125.71.74|GET uploadfile/A...| 9|
| 16:12|hcuweb.dataserver.cn| 122.192.13.2|GET /uploadfile/...| 9|
| 13:02|hcuweb.dataserver.cn| 27.123.214.107|GET uploadfile/A...| 9|
| 07:56|bestlink.dataserv...|211.138.116.246|GET /uploadfile/...| 8|
+-----------+--------------------+---------------+--------------------+--------+
複製代碼
秦凱新 於深圳oop