【spark】spark應用(分佈式估算圓周率+基於Spark MLlib的貸款風險預測)

注:本章不涉及spark和scala原理的探討,詳情見其餘隨筆html

1、分佈式估算圓周率

計算原理:
假設正方形的面積S等於x²,而正方形的內切圓的面積C等於Pi×(x/2)²,所以圓面積與正方形面積之比C/S就爲Pi/4,因而就有Pi=4×C/S。
能夠利用計算機隨機產生大量位於正方形內部的點,經過點的數量去近似表示面積。假設位於正方形中點的數量爲Ps,落在圓內的點的數量爲Pc,則隨機點的數量趨近於無窮時,4×Pc/Ps將逼近於Pi。

idea實現代碼:java

package com.hadoop

import scala.math.random
import org.apache.spark._ //導入spark包
/**
 * 在一個邊長爲2的正方形內畫個圓,正方形的面積 S1=4,圓的半徑 r=1,面積 S2=πr^2=π如今只須要計算出S2就能夠知道π,
 * 這裏取圓心爲座標軸原點,在正方向中不斷的隨機選點,總共選n個點,
 * 計算在圓內的點的數目爲count,則 S2=S1*count/n,而後就出來了 
 * -- 蒙特-卡羅方法
 */
object sparkPi {
  def main(args: Array[String]) {
    /*初始化SparkContext*/
    val conf = new SparkConf().setAppName("sparkPi") //建立SparkConf
    val spark = new SparkContext(conf) //基於SparkConf建立一個SparkContext對象
    val slices = if (args.length > 0) args(0).toInt else 2
    val n = 10000 * slices //選n個點
    val count = spark.parallelize(1 to n, slices).map { i => //RDD建立:將[1:20000]的集合傳給SparkContext中parallelize方法。RDD操做:map函數應用RDD裏面每一個函數並建立新的RDD
      /*這裏取圓心爲座標軸原點,在正方向中不斷的隨機選點*/
      val x = random * 2 - 1
      val y = random * 2 - 1
      if (x * x + y * y < 1) 1 else 0 //若選的這個點在圓內則計1,不是則計0
    }.reduce(_ + _) //對已有的值進行相加
    println("Pi is roughly " + 4.0 * count / n)  //pi=S2=S1*count/n
    spark.stop()
  }
}

 


分佈式運行測試:
分佈式運行,指在客戶端以命令行方式想spark集羣提交jar包的運行方式,因此須要將上面的程序編譯成jar包(俗稱打jar包)git

打jar包的方式:web

File -- Project Structure -- Artifacts -- + -- jar -- From modules with dependencies

-- 將Main Class設置爲com.hadoop.sparkPi -- OK -- 在Output Layout下只留下一個compile output  --  OK

-- Build-Build Artifacts-Build


複製到spark安裝目錄下:算法

[hadoop@hadoop01 ~]$ cp /home/hadoop/IdeaProjects/sparkapp/out/artifacts/sparkapp_jar/sparkapp.jar /home/hadoop/spark-2.4.4-bin-without-hadoop


跳轉到spark安裝目錄下運行:、sql

[hadoop@hadoop01 ~]$ cd spark-2.4.4-bin-without-hadoop


A.本地模式apache

[hadoop@hadoop01 spark-2.4.4-bin-without-hadoop]$ bin/spark-submit --master local --class com.hadoop.sparkPi sparkapp.jar 
運行結果:
SLF4J: Class path contains multiple SLF4J bindings.
SLF4J: Found binding in [jar:file:/home/hadoop/spark-2.4.4-bin-without-hadoop/jars/slf4j-log4j12-1.7.16.jar!/org/slf4j/impl/StaticLoggerBinder.class]
SLF4J: Found binding in [jar:file:/home/hadoop/hadoop-3.2.0/share/hadoop/common/lib/slf4j-log4j12-1.7.25.jar!/org/slf4j/impl/StaticLoggerBinder.class]
SLF4J: See http://www.slf4j.org/codes.html#multiple_bindings for an explanation.
SLF4J: Actual binding is of type [org.slf4j.impl.Log4jLoggerFactory]
2019-10-04 11:12:09,551 WARN util.NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
2019-10-04 11:12:09,777 INFO spark.SparkContext: Running Spark version 2.4.4
2019-10-04 11:12:09,801 INFO spark.SparkContext: Submitted application: sparkPi
2019-10-04 11:12:09,862 INFO spark.SecurityManager: Changing view acls to: hadoop
2019-10-04 11:12:09,862 INFO spark.SecurityManager: Changing modify acls to: hadoop
2019-10-04 11:12:09,862 INFO spark.SecurityManager: Changing view acls groups to:
2019-10-04 11:12:09,862 INFO spark.SecurityManager: Changing modify acls groups to:
2019-10-04 11:12:09,862 INFO spark.SecurityManager: SecurityManager: authentication disabled; ui acls disabled; users  with view permissions: Set(hadoop); groups with view permissions: Set(); users  with modify permissions: Set(hadoop); groups with modify permissions: Set()
2019-10-04 11:12:10,140 INFO util.Utils: Successfully started service 'sparkDriver' on port 34911.
2019-10-04 11:12:10,168 INFO spark.SparkEnv: Registering MapOutputTracker
2019-10-04 11:12:10,190 INFO spark.SparkEnv: Registering BlockManagerMaster
2019-10-04 11:12:10,191 INFO storage.BlockManagerMasterEndpoint: Using org.apache.spark.storage.DefaultTopologyMapper for getting topology information
2019-10-04 11:12:10,192 INFO storage.BlockManagerMasterEndpoint: BlockManagerMasterEndpoint up
2019-10-04 11:12:10,204 INFO storage.DiskBlockManager: Created local directory at /tmp/blockmgr-f08f8fb2-3c19-4f99-b24e-df08f23cff23
2019-10-04 11:12:10,220 INFO memory.MemoryStore: MemoryStore started with capacity 1048.8 MB
2019-10-04 11:12:10,235 INFO spark.SparkEnv: Registering OutputCommitCoordinator
2019-10-04 11:12:10,300 INFO util.log: Logging initialized @2617ms
2019-10-04 11:12:10,353 INFO server.Server: jetty-9.3.z-SNAPSHOT, build timestamp: 2018-06-06T01:11:56+08:00, git hash: 84205aa28f11a4f31f2a3b86d1bba2cc8ab69827
2019-10-04 11:12:10,371 INFO server.Server: Started @2689ms
2019-10-04 11:12:10,385 INFO server.AbstractConnector: Started ServerConnector@3c2772d1{HTTP/1.1,[http/1.1]}{0.0.0.0:4040}
2019-10-04 11:12:10,385 INFO util.Utils: Successfully started service 'SparkUI' on port 4040.
2019-10-04 11:12:10,411 INFO handler.ContextHandler: Started o.s.j.s.ServletContextHandler@c3c4c1c{/jobs,null,AVAILABLE,@Spark}
2019-10-04 11:12:10,411 INFO handler.ContextHandler: Started o.s.j.s.ServletContextHandler@9f6e406{/jobs/json,null,AVAILABLE,@Spark}
2019-10-04 11:12:10,412 INFO handler.ContextHandler: Started o.s.j.s.ServletContextHandler@7a94b64e{/jobs/job,null,AVAILABLE,@Spark}
2019-10-04 11:12:10,413 INFO handler.ContextHandler: Started o.s.j.s.ServletContextHandler@12477988{/jobs/job/json,null,AVAILABLE,@Spark}
2019-10-04 11:12:10,414 INFO handler.ContextHandler: Started o.s.j.s.ServletContextHandler@2caf6912{/stages,null,AVAILABLE,@Spark}
2019-10-04 11:12:10,415 INFO handler.ContextHandler: Started o.s.j.s.ServletContextHandler@73d69c0f{/stages/json,null,AVAILABLE,@Spark}
2019-10-04 11:12:10,415 INFO handler.ContextHandler: Started o.s.j.s.ServletContextHandler@34237b90{/stages/stage,null,AVAILABLE,@Spark}
2019-10-04 11:12:10,416 INFO handler.ContextHandler: Started o.s.j.s.ServletContextHandler@d400943{/stages/stage/json,null,AVAILABLE,@Spark}
2019-10-04 11:12:10,417 INFO handler.ContextHandler: Started o.s.j.s.ServletContextHandler@22101c80{/stages/pool,null,AVAILABLE,@Spark}
2019-10-04 11:12:10,417 INFO handler.ContextHandler: Started o.s.j.s.ServletContextHandler@31ff1390{/stages/pool/json,null,AVAILABLE,@Spark}
2019-10-04 11:12:10,417 INFO handler.ContextHandler: Started o.s.j.s.ServletContextHandler@759d81f3{/storage,null,AVAILABLE,@Spark}
2019-10-04 11:12:10,418 INFO handler.ContextHandler: Started o.s.j.s.ServletContextHandler@781a9412{/storage/json,null,AVAILABLE,@Spark}
2019-10-04 11:12:10,418 INFO handler.ContextHandler: Started o.s.j.s.ServletContextHandler@5a4c638d{/storage/rdd,null,AVAILABLE,@Spark}
2019-10-04 11:12:10,419 INFO handler.ContextHandler: Started o.s.j.s.ServletContextHandler@13e698c7{/storage/rdd/json,null,AVAILABLE,@Spark}
2019-10-04 11:12:10,419 INFO handler.ContextHandler: Started o.s.j.s.ServletContextHandler@aed0151{/environment,null,AVAILABLE,@Spark}
2019-10-04 11:12:10,419 INFO handler.ContextHandler: Started o.s.j.s.ServletContextHandler@267bbe1a{/environment/json,null,AVAILABLE,@Spark}
2019-10-04 11:12:10,420 INFO handler.ContextHandler: Started o.s.j.s.ServletContextHandler@1f12e153{/executors,null,AVAILABLE,@Spark}
2019-10-04 11:12:10,420 INFO handler.ContextHandler: Started o.s.j.s.ServletContextHandler@389562d6{/executors/json,null,AVAILABLE,@Spark}
2019-10-04 11:12:10,421 INFO handler.ContextHandler: Started o.s.j.s.ServletContextHandler@5a101b1c{/executors/threadDump,null,AVAILABLE,@Spark}
2019-10-04 11:12:10,422 INFO handler.ContextHandler: Started o.s.j.s.ServletContextHandler@2160e52a{/executors/threadDump/json,null,AVAILABLE,@Spark}
2019-10-04 11:12:10,428 INFO handler.ContextHandler: Started o.s.j.s.ServletContextHandler@29f0802c{/static,null,AVAILABLE,@Spark}
2019-10-04 11:12:10,429 INFO handler.ContextHandler: Started o.s.j.s.ServletContextHandler@779de014{/,null,AVAILABLE,@Spark}
2019-10-04 11:12:10,431 INFO handler.ContextHandler: Started o.s.j.s.ServletContextHandler@5c41d037{/api,null,AVAILABLE,@Spark}
2019-10-04 11:12:10,432 INFO handler.ContextHandler: Started o.s.j.s.ServletContextHandler@1450078a{/jobs/job/kill,null,AVAILABLE,@Spark}
2019-10-04 11:12:10,433 INFO handler.ContextHandler: Started o.s.j.s.ServletContextHandler@c68a5f8{/stages/stage/kill,null,AVAILABLE,@Spark}
2019-10-04 11:12:10,436 INFO ui.SparkUI: Bound SparkUI to 0.0.0.0, and started at http://hadoop01:4040
2019-10-04 11:12:10,467 INFO spark.SparkContext: Added JAR file:/home/hadoop/spark-2.4.4-bin-without-hadoop/sparkapp.jar at spark://hadoop01:34911/jars/sparkapp.jar with timestamp 1570158730467
2019-10-04 11:12:10,513 INFO executor.Executor: Starting executor ID driver on host localhost
2019-10-04 11:12:10,583 INFO util.Utils: Successfully started service 'org.apache.spark.network.netty.NettyBlockTransferService' on port 37776.
2019-10-04 11:12:10,583 INFO netty.NettyBlockTransferService: Server created on hadoop01:37776
2019-10-04 11:12:10,584 INFO storage.BlockManager: Using org.apache.spark.storage.RandomBlockReplicationPolicy for block replication policy
2019-10-04 11:12:10,606 INFO storage.BlockManagerMaster: Registering BlockManager BlockManagerId(driver, hadoop01, 37776, None)
2019-10-04 11:12:10,611 INFO storage.BlockManagerMasterEndpoint: Registering block manager hadoop01:37776 with 1048.8 MB RAM, BlockManagerId(driver, hadoop01, 37776, None)
2019-10-04 11:12:10,614 INFO storage.BlockManagerMaster: Registered BlockManager BlockManagerId(driver, hadoop01, 37776, None)
2019-10-04 11:12:10,616 INFO storage.BlockManager: Initialized BlockManager: BlockManagerId(driver, hadoop01, 37776, None)
2019-10-04 11:12:10,745 INFO handler.ContextHandler: Started o.s.j.s.ServletContextHandler@6aef4eb8{/metrics/json,null,AVAILABLE,@Spark}
2019-10-04 11:12:11,013 INFO spark.SparkContext: Starting job: reduce at sparkPi.scala:20
2019-10-04 11:12:11,036 INFO scheduler.DAGScheduler: Got job 0 (reduce at sparkPi.scala:20) with 2 output partitions
2019-10-04 11:12:11,036 INFO scheduler.DAGScheduler: Final stage: ResultStage 0 (reduce at sparkPi.scala:20)
2019-10-04 11:12:11,036 INFO scheduler.DAGScheduler: Parents of final stage: List()
2019-10-04 11:12:11,037 INFO scheduler.DAGScheduler: Missing parents: List()
2019-10-04 11:12:11,041 INFO scheduler.DAGScheduler: Submitting ResultStage 0 (MapPartitionsRDD[1] at map at sparkPi.scala:16), which has no missing parents
2019-10-04 11:12:11,107 INFO memory.MemoryStore: Block broadcast_0 stored as values in memory (estimated size 1904.0 B, free 1048.8 MB)
2019-10-04 11:12:11,139 INFO memory.MemoryStore: Block broadcast_0_piece0 stored as bytes in memory (estimated size 1254.0 B, free 1048.8 MB)
2019-10-04 11:12:11,142 INFO storage.BlockManagerInfo: Added broadcast_0_piece0 in memory on hadoop01:37776 (size: 1254.0 B, free: 1048.8 MB)
2019-10-04 11:12:11,144 INFO spark.SparkContext: Created broadcast 0 from broadcast at DAGScheduler.scala:1161
2019-10-04 11:12:11,210 INFO scheduler.DAGScheduler: Submitting 2 missing tasks from ResultStage 0 (MapPartitionsRDD[1] at map at sparkPi.scala:16) (first 15 tasks are for partitions Vector(0, 1))
2019-10-04 11:12:11,211 INFO scheduler.TaskSchedulerImpl: Adding task set 0.0 with 2 tasks
2019-10-04 11:12:11,255 INFO scheduler.TaskSetManager: Starting task 0.0 in stage 0.0 (TID 0, localhost, executor driver, partition 0, PROCESS_LOCAL, 7866 bytes)
2019-10-04 11:12:11,262 INFO executor.Executor: Running task 0.0 in stage 0.0 (TID 0)
2019-10-04 11:12:11,271 INFO executor.Executor: Fetching spark://hadoop01:34911/jars/sparkapp.jar with timestamp 1570158730467
2019-10-04 11:12:11,346 INFO client.TransportClientFactory: Successfully created connection to hadoop01/192.168.1.100:34911 after 27 ms (0 ms spent in bootstraps)
2019-10-04 11:12:11,352 INFO util.Utils: Fetching spark://hadoop01:34911/jars/sparkapp.jar to /tmp/spark-c35e81e3-5419-4858-b25c-93fbbc73e431/userFiles-c7eb44d6-5f78-4f9e-bfdf-986881e946b4/fetchFileTemp656185270030476350.tmp
2019-10-04 11:12:11,390 INFO executor.Executor: Adding file:/tmp/spark-c35e81e3-5419-4858-b25c-93fbbc73e431/userFiles-c7eb44d6-5f78-4f9e-bfdf-986881e946b4/sparkapp.jar to class loader
2019-10-04 11:12:11,426 INFO executor.Executor: Finished task 0.0 in stage 0.0 (TID 0). 824 bytes result sent to driver
2019-10-04 11:12:11,429 INFO scheduler.TaskSetManager: Starting task 1.0 in stage 0.0 (TID 1, localhost, executor driver, partition 1, PROCESS_LOCAL, 7923 bytes)
2019-10-04 11:12:11,431 INFO executor.Executor: Running task 1.0 in stage 0.0 (TID 1)
2019-10-04 11:12:11,437 INFO scheduler.TaskSetManager: Finished task 0.0 in stage 0.0 (TID 0) in 199 ms on localhost (executor driver) (1/2)
2019-10-04 11:12:11,438 INFO executor.Executor: Finished task 1.0 in stage 0.0 (TID 1). 824 bytes result sent to driver
2019-10-04 11:12:11,445 INFO scheduler.TaskSetManager: Finished task 1.0 in stage 0.0 (TID 1) in 17 ms on localhost (executor driver) (2/2)
2019-10-04 11:12:11,447 INFO scheduler.TaskSchedulerImpl: Removed TaskSet 0.0, whose tasks have all completed, from pool
2019-10-04 11:12:11,449 INFO scheduler.DAGScheduler: ResultStage 0 (reduce at sparkPi.scala:20) finished in 0.392 s
2019-10-04 11:12:11,456 INFO scheduler.DAGScheduler: Job 0 finished: reduce at sparkPi.scala:20, took 0.442551 s
Pi is roughly 3.1378
2019-10-04 11:12:11,466 INFO server.AbstractConnector: Stopped Spark@3c2772d1{HTTP/1.1,[http/1.1]}{0.0.0.0:4040}
2019-10-04 11:12:11,471 INFO ui.SparkUI: Stopped Spark web UI at http://hadoop01:4040
2019-10-04 11:12:11,481 INFO spark.MapOutputTrackerMasterEndpoint: MapOutputTrackerMasterEndpoint stopped!
2019-10-04 11:12:11,501 INFO memory.MemoryStore: MemoryStore cleared
2019-10-04 11:12:11,502 INFO storage.BlockManager: BlockManager stopped
2019-10-04 11:12:11,508 INFO storage.BlockManagerMaster: BlockManagerMaster stopped
2019-10-04 11:12:11,509 INFO scheduler.OutputCommitCoordinator$OutputCommitCoordinatorEndpoint: OutputCommitCoordinator stopped!
2019-10-04 11:12:11,518 INFO spark.SparkContext: Successfully stopped SparkContext
2019-10-04 11:12:11,520 INFO util.ShutdownHookManager: Shutdown hook called
2019-10-04 11:12:11,522 INFO util.ShutdownHookManager: Deleting directory /tmp/spark-2edea92d-9604-43f3-99c1-8e541a518199
2019-10-04 11:12:11,527 INFO util.ShutdownHookManager: Deleting directory /tmp/spark-c35e81e3-5419-4858-b25c-93fbbc73e431


若是隻想顯示結果,則執行:json

[hadoop@hadoop01 spark-2.4.4-bin-without-hadoop]$ bin/spark-submit --master local --class com.hadoop.sparkPi sparkapp.jar 2>&1 | grep "Pi is roughly"
結果:Pi is roughly 3.1384



B.Yarn-cluster模式(需先啓動hadoop與spark)bootstrap

[hadoop@hadoop01 spark-2.4.4-bin-without-hadoop]$ bin/spark-submit --master yarn-cluster --class com.hadoop.sparkPi sparkapp.jar
輸出內容:
2019-10-04 13:08:55,136 INFO yarn.Client:
     client token: N/A
     diagnostics: AM container is launched, waiting for AM container to Register with RM
     ApplicationMaster host: N/A
     ApplicationMaster RPC port: -1
     queue: default
     start time: 1570165734049
     final status: UNDEFINED
     tracking URL: http://hadoop01:8088/proxy/application_1570165372810_0002/
     user: hadoop

結果在Tracking URL裏的logs中的stdout中查看:
b_1:進入http://hadoop01:50070網頁
b_2:點擊logs裏面的user_logs目錄,如:/logs/userlogs/
b_3:點擊對應的文件,如:application_1570165372810_0002(對應前面輸出內容裏面的文件)
b_4:點開裏面的stdout,就能夠看見輸出結果了
輸出結果:
Pi is roughly 3.1294api

C.Yarn-client模式(需先啓動hadoop與spark)

[hadoop@hadoop01 spark-2.4.4-bin-without-hadoop]$ bin/spark-submit --master yarn-client --class com.hadoop.sparkPi sparkapp.jar


2、基於Spark MLlib的貸款風險預測

建立工程,編輯啓動配置:

Edit Configuration -- Application
Name              (Credit)
Main Class        (com.hadoop.Credit)
Program arguments (/home/hadoop/IdeaProjects/Gredit)
VM options        (-Dspark.master=local -Dspark.app.name=Credit -server -XX:PermSize=128M -XX:MaxPermSize=256M)


添加spark依賴包:

File -- Project Structure -- Libraries -- + -- Java -- /home/hadoop/spark-2.4.4-bin-without-hadoop/jars下的全部jar包-OK


拷貝UserGredit.csv文件到 /home/hadoop/IdeaProjects/Gredit/ 目錄下

UserGredit.csv內容:
1,1,18,4,2,1049,1,2,4,2,1,4,2,21,3,1,1,3,1,1,1
1,1,9,4,0,2799,1,3,2,3,1,2,1,36,3,1,2,3,2,1,1
1,2,12,2,9,841,2,4,2,2,1,4,1,23,3,1,1,2,1,1,1
1,1,12,4,0,2122,1,3,3,3,1,2,1,39,3,1,2,2,2,1,2
1,1,12,4,0,2171,1,3,4,3,1,4,2,38,1,2,2,2,1,1,2
1,1,10,4,0,2241,1,2,1,3,1,3,1,48,3,1,2,2,2,1,2

 

注:完整的用戶信用數據(UserGredit)

 

 


拷貝測試程序到運行界面(其中須要修改文件名):

package com.hadoop

/**導入機器學習相關算法包*/
import org.apache.spark._
import org.apache.spark.rdd.RDD
import org.apache.spark.sql.SQLContext
import org.apache.spark.sql.functions._
import org.apache.spark.sql.types._
import org.apache.spark.sql._
import org.apache.spark.ml.classification.RandomForestClassifier
import org.apache.spark.ml.evaluation.BinaryClassificationEvaluator
import org.apache.spark.ml.feature.StringIndexer
import org.apache.spark.ml.feature.VectorAssembler
import org.apache.spark.ml.tuning.{ ParamGridBuilder, CrossValidator }
import org.apache.spark.ml.{ Pipeline, PipelineStage }
import org.apache.spark.mllib.evaluation.RegressionMetrics

/*建立一個Gredit對象*/
object Gredit {
//建立一個Credit類,定義Credit類的屬性
  case class Credit(
                     creditability: Double,
                     balance: Double, duration: Double, history: Double, purpose: Double, amount: Double,
                     savings: Double, employment: Double, instPercent: Double, sexMarried: Double, guarantors: Double,
                     residenceDuration: Double, assets: Double, age: Double, concCredit: Double, apartment: Double,
                     credits: Double, occupation: Double, dependents: Double, hasPhone: Double, foreign: Double
                   )
//解析Credit文件每行數據,將值存入Credit中
  def parseCredit(line: Array[Double]): Credit = {
    Credit(
      line(0),
      line(1) - 1, line(2), line(3), line(4), line(5),
      line(6) - 1, line(7) - 1, line(8), line(9) - 1, line(10) - 1,
      line(11) - 1, line(12) - 1, line(13), line(14) - 1, line(15) - 1,
      line(16) - 1, line(17) - 1, line(18) - 1, line(19) - 1, line(20) - 1
    )
  }
//將數據類型從String轉換成Double類型
  def parseRDD(rdd: RDD[String]): RDD[Array[Double]] = {
    rdd.map(_.split(",")).map(_.map(_.toDouble))
  }
/**
  *主函數
  */
  def main(args: Array[String]) {
/**導入UserGredit.csv文件中數據,並存儲爲一個String類型的RDD
  *對RDD作Map操做,將RDD中的每一個字符串通過ParseRDDR函數的映射,轉換爲一個Double類型的數組
  *對RDD作另外一個Map操做,使用ParseCredit函數,將每一個Double類型的RDD轉換爲Credit對象
  *toDF()函數則是將Array[[Credit]]類型的RDD轉換爲一個Credit類的DataFrame
  */
    val conf = new SparkConf().setAppName("SparkDFebay")
    val sc = new SparkContext(conf)
    val sqlContext = new SQLContext(sc)
    import sqlContext._
    import sqlContext.implicits._
    val creditDF = parseRDD(sc.textFile("UserGredit.csv")).map(parseCredit).toDF().cache()
    creditDF.registerTempTable("credit")
//以樹狀形式打印出Credit每一個字段的含義
    creditDF.printSchema
//顯示DataFrame,默認前20行
    creditDF.show
//用sqlContext執行SQL命令,並顯示出來
    sqlContext.sql("SELECT creditability, avg(balance) as avgbalance, avg(amount) as avgamt, avg(duration) as avgdur  FROM credit GROUP BY creditability ").show

    creditDF.describe("balance").show
    creditDF.groupBy("creditability").avg("balance").show
//將數據的每一個特徵列轉換爲特徵矢量
    val featureCols = Array("balance", "duration", "history", "purpose", "amount",
      "savings", "employment", "instPercent", "sexMarried", "guarantors",
      "residenceDuration", "assets", "age", "concCredit", "apartment",
      "credits", "occupation", "dependents", "hasPhone", "foreign")
//設置輸入和輸出的列名,用VectorAssembler()方法將每一個維度的特徵作轉換
    val assembler = new VectorAssembler().setInputCols(featureCols).setOutputCol("features")
//返回一個新的DataFrame,新增一個feature列
    val df2 = assembler.transform(creditDF)
    df2.show
//使用StringIndexer方法返回一個DataFrame,增長信用度這一列做爲標籤
    val labelIndexer = new StringIndexer().setInputCol("creditability").setOutputCol("label")
    val df3 = labelIndexer.fit(df2).transform(df2)
    df3.show
//爲了訓練模型,數據集分爲訓練數據和測試數據兩個部分,70%的數據用來訓練模型,30%的數據用來測試模型
    val splitSeed = 5043
    val Array(trainingData, testData) = df3.randomSplit(Array(0.7, 0.3), splitSeed)
//注:後面的算法這塊,後面有時間再更新哈!!!
    val classifier = new RandomForestClassifier().setImpurity("gini").setMaxDepth(3).setNumTrees(20).setFeatureSubsetStrategy("auto").setSeed(5043)
    val model = classifier.fit(trainingData)

    val evaluator = new BinaryClassificationEvaluator().setLabelCol("label")
    val predictions = model.transform(testData)
    model.toDebugString

    val accuracy = evaluator.evaluate(predictions)
    println("accuracy before pipeline fitting" + accuracy)

    val rm = new RegressionMetrics(
      predictions.select("prediction", "label").rdd.map(x =>
        (x(0).asInstanceOf[Double], x(1).asInstanceOf[Double]))
    )
    println("MSE: " + rm.meanSquaredError)
    println("MAE: " + rm.meanAbsoluteError)
    println("RMSE Squared: " + rm.rootMeanSquaredError)
    println("R Squared: " + rm.r2)
    println("Explained Variance: " + rm.explainedVariance + "\n")

    val paramGrid = new ParamGridBuilder()
      .addGrid(classifier.maxBins, Array(25, 31))
      .addGrid(classifier.maxDepth, Array(5, 10))
      .addGrid(classifier.numTrees, Array(20, 60))
      .addGrid(classifier.impurity, Array("entropy", "gini"))
      .build()

    val steps: Array[PipelineStage] = Array(classifier)
    val pipeline = new Pipeline().setStages(steps)

    val cv = new CrossValidator()
      .setEstimator(pipeline)
      .setEvaluator(evaluator)
      .setEstimatorParamMaps(paramGrid)
      .setNumFolds(10)

    val pipelineFittedModel = cv.fit(trainingData)

    val predictions2 = pipelineFittedModel.transform(testData)
    val accuracy2 = evaluator.evaluate(predictions2)
    println("accuracy after pipeline fitting" + accuracy2)

    println(pipelineFittedModel.bestModel.asInstanceOf[org.apache.spark.ml.PipelineModel].stages(0))

    pipelineFittedModel
      .bestModel.asInstanceOf[org.apache.spark.ml.PipelineModel]
      .stages(0)
      .extractParamMap

    val rm2 = new RegressionMetrics(
      predictions2.select("prediction", "label").rdd.map(x =>
        (x(0).asInstanceOf[Double], x(1).asInstanceOf[Double]))
    )

    println("MSE: " + rm2.meanSquaredError)
    println("MAE: " + rm2.meanAbsoluteError)
    println("RMSE Squared: " + rm2.rootMeanSquaredError)
    println("R Squared: " + rm2.r2)
    println("Explained Variance: " + rm2.explainedVariance + "\n")

  }
}

 

 
第一次,運行報錯:

Exception in thread "main" java.lang.IllegalArgumentException: System memory 425197568 must be at least 471859200.
在main主函數下:val conf = new SparkConf().setAppName("SparkDFebay")後面添加「.set("spark.testing.memory","2147480000")」
添加後:val conf = new SparkConf().setAppName("SparkDFebay").set("spark.testing.memory","2147480000")


第二次,RUN PROJECT 運行程序,查看結果:
輸出結果:日誌INFO太多了,看不到啥。考慮將INFO日誌隱藏
解決方法:就是將spark安裝文件夾下的默認日誌配置文件拷貝到工程的src下並修改在控制檯顯示的日誌的級別。

[hadoop@hadoop01 ~]$ cd spark-2.4.4-bin-without-hadoop/conf
[hadoop@hadoop01 conf]$ cp log4j.properties.template /home/hadoop/IdeaProjects/Gredit/src/
[hadoop@hadoop01 conf]$ cd /home/hadoop/IdeaProjects/Gredit/src/
[hadoop@hadoop01 src]$ mv log4j.properties.template log4j.properties
[hadoop@hadoop01 src]$ gedit log4j.properties
在日誌的配置文件中修改日誌級別,只將ERROR級別的日誌輸出在控制檯
log4j.properties修改內容:
log4j.rootCategory=ERROR, console

 


第三次,運行查看結果

Java HotSpot(TM) 64-Bit Server VM warning: ignoring option PermSize=128M; support was removed in 8.0
Java HotSpot(TM) 64-Bit Server VM warning: ignoring option MaxPermSize=256M; support was removed in 8.0
root
 |-- creditability: double (nullable = false)
 |-- balance: double (nullable = false)
 |-- duration: double (nullable = false)
 |-- history: double (nullable = false)
 |-- purpose: double (nullable = false)
 |-- amount: double (nullable = false)
 |-- savings: double (nullable = false)
 |-- employment: double (nullable = false)
 |-- instPercent: double (nullable = false)
 |-- sexMarried: double (nullable = false)
 |-- guarantors: double (nullable = false)
 |-- residenceDuration: double (nullable = false)
 |-- assets: double (nullable = false)
 |-- age: double (nullable = false)
 |-- concCredit: double (nullable = false)
 |-- apartment: double (nullable = false)
 |-- credits: double (nullable = false)
 |-- occupation: double (nullable = false)
 |-- dependents: double (nullable = false)
 |-- hasPhone: double (nullable = false)
 |-- foreign: double (nullable = false)

+-------------+-------+--------+-------+-------+------+-------+----------+-----------+----------+----------+-----------------+------+----+----------+---------+-------+----------+----------+--------+-------+
|creditability|balance|duration|history|purpose|amount|savings|employment|instPercent|sexMarried|guarantors|residenceDuration|assets| age|concCredit|apartment|credits|occupation|dependents|hasPhone|foreign|
+-------------+-------+--------+-------+-------+------+-------+----------+-----------+----------+----------+-----------------+------+----+----------+---------+-------+----------+----------+--------+-------+
|          1.0|    0.0|    18.0|    4.0|    2.0|1049.0|    0.0|       1.0|        4.0|       1.0|       0.0|              3.0|   1.0|21.0|       2.0|      0.0|    0.0|       2.0|       0.0|     0.0|    0.0|
|          1.0|    0.0|     9.0|    4.0|    0.0|2799.0|    0.0|       2.0|        2.0|       2.0|       0.0|              1.0|   0.0|36.0|       2.0|      0.0|    1.0|       2.0|       1.0|     0.0|    0.0|
|          1.0|    1.0|    12.0|    2.0|    9.0| 841.0|    1.0|       3.0|        2.0|       1.0|       0.0|              3.0|   0.0|23.0|       2.0|      0.0|    0.0|       1.0|       0.0|     0.0|    0.0|
|          1.0|    0.0|    12.0|    4.0|    0.0|2122.0|    0.0|       2.0|        3.0|       2.0|       0.0|              1.0|   0.0|39.0|       2.0|      0.0|    1.0|       1.0|       1.0|     0.0|    1.0|
|          1.0|    0.0|    12.0|    4.0|    0.0|2171.0|    0.0|       2.0|        4.0|       2.0|       0.0|              3.0|   1.0|38.0|       0.0|      1.0|    1.0|       1.0|       0.0|     0.0|    1.0|
|          1.0|    0.0|    10.0|    4.0|    0.0|2241.0|    0.0|       1.0|        1.0|       2.0|       0.0|              2.0|   0.0|48.0|       2.0|      0.0|    1.0|       1.0|       1.0|     0.0|    1.0|
+-------------+-------+--------+-------+-------+------+-------+----------+-----------+----------+----------+-----------------+------+----+----------+---------+-------+----------+----------+--------+-------+

+-------------+-------------------+------+------------------+
|creditability|         avgbalance|avgamt|            avgdur|
+-------------+-------------------+------+------------------+
|          1.0|0.16666666666666666|1870.5|12.166666666666666|
+-------------+-------------------+------+------------------+

+-------+-------------------+
|summary|            balance|
+-------+-------------------+
|  count|                  6|
|   mean|0.16666666666666666|
| stddev|  0.408248290463863|
|    min|                0.0|
|    max|                1.0|
+-------+-------------------+

+-------------+-------------------+
|creditability|       avg(balance)|
+-------------+-------------------+
|          1.0|0.16666666666666666|
+-------------+-------------------+

+-------------+-------+--------+-------+-------+------+-------+----------+-----------+----------+----------+-----------------+------+----+----------+---------+-------+----------+----------+--------+-------+--------------------+
|creditability|balance|duration|history|purpose|amount|savings|employment|instPercent|sexMarried|guarantors|residenceDuration|assets| age|concCredit|apartment|credits|occupation|dependents|hasPhone|foreign|            features|
+-------------+-------+--------+-------+-------+------+-------+----------+-----------+----------+----------+-----------------+------+----+----------+---------+-------+----------+----------+--------+-------+--------------------+
|          1.0|    0.0|    18.0|    4.0|    2.0|1049.0|    0.0|       1.0|        4.0|       1.0|       0.0|              3.0|   1.0|21.0|       2.0|      0.0|    0.0|       2.0|       0.0|     0.0|    0.0|(20,[1,2,3,4,6,7,...|
|          1.0|    0.0|     9.0|    4.0|    0.0|2799.0|    0.0|       2.0|        2.0|       2.0|       0.0|              1.0|   0.0|36.0|       2.0|      0.0|    1.0|       2.0|       1.0|     0.0|    0.0|(20,[1,2,4,6,7,8,...|
|          1.0|    1.0|    12.0|    2.0|    9.0| 841.0|    1.0|       3.0|        2.0|       1.0|       0.0|              3.0|   0.0|23.0|       2.0|      0.0|    0.0|       1.0|       0.0|     0.0|    0.0|[1.0,12.0,2.0,9.0...|
|          1.0|    0.0|    12.0|    4.0|    0.0|2122.0|    0.0|       2.0|        3.0|       2.0|       0.0|              1.0|   0.0|39.0|       2.0|      0.0|    1.0|       1.0|       1.0|     0.0|    1.0|[0.0,12.0,4.0,0.0...|
|          1.0|    0.0|    12.0|    4.0|    0.0|2171.0|    0.0|       2.0|        4.0|       2.0|       0.0|              3.0|   1.0|38.0|       0.0|      1.0|    1.0|       1.0|       0.0|     0.0|    1.0|[0.0,12.0,4.0,0.0...|
|          1.0|    0.0|    10.0|    4.0|    0.0|2241.0|    0.0|       1.0|        1.0|       2.0|       0.0|              2.0|   0.0|48.0|       2.0|      0.0|    1.0|       1.0|       1.0|     0.0|    1.0|[0.0,10.0,4.0,0.0...|
+-------------+-------+--------+-------+-------+------+-------+----------+-----------+----------+----------+-----------------+------+----+----------+---------+-------+----------+----------+--------+-------+--------------------+

+-------------+-------+--------+-------+-------+------+-------+----------+-----------+----------+----------+-----------------+------+----+----------+---------+-------+----------+----------+--------+-------+--------------------+-----+
|creditability|balance|duration|history|purpose|amount|savings|employment|instPercent|sexMarried|guarantors|residenceDuration|assets| age|concCredit|apartment|credits|occupation|dependents|hasPhone|foreign|            features|label|
+-------------+-------+--------+-------+-------+------+-------+----------+-----------+----------+----------+-----------------+------+----+----------+---------+-------+----------+----------+--------+-------+--------------------+-----+
|          1.0|    0.0|    18.0|    4.0|    2.0|1049.0|    0.0|       1.0|        4.0|       1.0|       0.0|              3.0|   1.0|21.0|       2.0|      0.0|    0.0|       2.0|       0.0|     0.0|    0.0|(20,[1,2,3,4,6,7,...|  0.0|
|          1.0|    0.0|     9.0|    4.0|    0.0|2799.0|    0.0|       2.0|        2.0|       2.0|       0.0|              1.0|   0.0|36.0|       2.0|      0.0|    1.0|       2.0|       1.0|     0.0|    0.0|(20,[1,2,4,6,7,8,...|  0.0|
|          1.0|    1.0|    12.0|    2.0|    9.0| 841.0|    1.0|       3.0|        2.0|       1.0|       0.0|              3.0|   0.0|23.0|       2.0|      0.0|    0.0|       1.0|       0.0|     0.0|    0.0|[1.0,12.0,2.0,9.0...|  0.0|
|          1.0|    0.0|    12.0|    4.0|    0.0|2122.0|    0.0|       2.0|        3.0|       2.0|       0.0|              1.0|   0.0|39.0|       2.0|      0.0|    1.0|       1.0|       1.0|     0.0|    1.0|[0.0,12.0,4.0,0.0...|  0.0|
|          1.0|    0.0|    12.0|    4.0|    0.0|2171.0|    0.0|       2.0|        4.0|       2.0|       0.0|              3.0|   1.0|38.0|       0.0|      1.0|    1.0|       1.0|       0.0|     0.0|    1.0|[0.0,12.0,4.0,0.0...|  0.0|
|          1.0|    0.0|    10.0|    4.0|    0.0|2241.0|    0.0|       1.0|        1.0|       2.0|       0.0|              2.0|   0.0|48.0|       2.0|      0.0|    1.0|       1.0|       1.0|     0.0|    1.0|[0.0,10.0,4.0,0.0...|  0.0|
+-------------+-------+--------+-------+-------+------+-------+----------+-----------+----------+----------+-----------------+------+----+----------+---------+-------+----------+----------+--------+-------+--------------------+-----+
相關文章
相關標籤/搜索