[ML] Pyspark ML tutorial for beginners

Ref: Spark與Python結合:PySpark初學者指南javascript

Ref: Predicting House Prices with Apache Sparkcss

 

儘管Scala擁有SparkMLlib,但它沒有足夠的庫和工具來實現機器學習和NLP目的。 此外,Scala缺少數據可視化。 html

 

1、熱身例子

# get data from file
raw_data = sc.textFile(logFile)
# parse into key-value pairs key_csv_data = raw_data.map(parse_interaction)
# filter normal key interactions normal_key_interactions = key_csv_data.filter(lambda x: x[0] == "normal.")
# collect all t0 = time() all_normal = normal_key_interactions.collect() tt = time() - t0 normal_count = len(all_normal)
print("Data collected in {} seconds".format(round(tt,3))) print("There are {} 'normal' interactions".format(normal_count))

 

 

 

2、常規套路


  

1. Understanding the Data Set and init.
2. Creating the Spark Session, Context
3. Load The Data From a File Into a Dataframehtml5

 

4. Data Exploration
  4.1 Distribution of the median age of the people living in the area
  4.2 Summary Statistics
5. Data Preprocessing
  /* missing value */
  /* outlier */
  5.1 Preprocessing The Target Values [not necessary here]java

 

6. Feature Engineering
  6.1 Feature Extraction
  6.2 Standardization
  /* Feature Selection */node

 

7. Building A Machine Learning Model With Spark MLpython

 

8. Evaluating the Model
  8.1 Inspect the Model Coefficients
  8.2 Generating Predictions
  8.3 Inspect the Metricsjquery

 

 

 

3、房價預測

 

 

Predicting House Prices with Apache Spark

LINEAR REGRESSION

In this we'll make use of the California Housing data set. Note, of course, that this is actually 'small' data and that using Spark in this context might be overkill; This notebook is for educational purposes only and is meant to give us an idea of how we can use PySpark to build a machine learning model.linux

Kaggle: https://www.kaggle.com/camnugent/california-housing-pricesandroid

 

1. Understanding the Data Set and init.

The California Housing data set appeared in a 1997 paper titled Sparse Spatial Autoregressions, written by Pace, R. Kelley and Ronald Barry and published in the Statistics and Probability Letters journal. The researchers built this data set by using the 1990 California census data.

The data contains one row per census block group. A block group is the smallest geographical unit for which the U.S. Census Bureau publishes sample data (a block group typically has a population of 600 to 3,000 people). In this sample a block group on average includes 1425.5 individuals living in a geographically compact area.

These spatial data contain 20,640 observations on housing prices with 9 economic variables:

 

Longitude:refers to the angular distance of a geographic place north or south of the earth’s equator for each block group
Latitude :refers to the angular distance of a geographic place east or west of the earth’s equator for each block group
Housing Median Age:is the median age of the people that belong to a block group. Note that the median is the value that lies at the midpoint of a frequency distribution of observed values
Total Rooms:is the total number of rooms in the houses per block group
Total Bedrooms:is the total number of bedrooms in the houses per block group
Population:is the number of inhabitants of a block group
Households:refers to units of houses and their occupants per block group
Median Income:is used to register the median income of people that belong to a block group
Median House Value:is the dependent variable and refers to the median house value per block group

What's more, we also learn that all the block groups have zero entries for the independent and dependent variables have been excluded from the data.

The Median house value is the dependent variable and will be assigned the role of the target variable in our ML model.

In [1]:
# !pip install pyspark
In [2]:
import os
import pandas as pd
import numpy as np

from pyspark import SparkConf, SparkContext
from pyspark.sql import SparkSession, SQLContext

from pyspark.sql.types import *
import pyspark.sql.functions as F
from pyspark.sql.functions import udf, col

from pyspark.ml.regression import LinearRegression
from pyspark.mllib.evaluation import RegressionMetrics

from pyspark.ml.tuning import ParamGridBuilder, CrossValidator, CrossValidatorModel
from pyspark.ml.feature import VectorAssembler, StandardScaler
from pyspark.ml.evaluation import RegressionEvaluator
In [3]:
import seaborn as sns
import matplotlib.pyplot as plt
In [4]:
# Visualization
from IPython.core.interactiveshell import InteractiveShell
InteractiveShell.ast_node_interactivity = "all"

pd.set_option('display.max_columns', 200)
pd.set_option('display.max_colwidth', 400)

from matplotlib import rcParams
sns.set(context='notebook', style='whitegrid', rc={'figure.figsize': (18,4)})
rcParams['figure.figsize'] = 18,4

%matplotlib inline
%config InlineBackend.figure_format = 'retina'
In [5]:
# setting random seed for notebook reproducability
rnd_seed=23
np.random.seed=rnd_seed
np.random.set_state=rnd_seed
 

2. Creating the Spark Session, Context

In [6]:
spark = SparkSession.builder.master("spark://node-master:7077").appName("Linear-Regression-California-Housing").getOrCreate()
In [7]:
spark
Out[7]:

SparkSession - in-memory

SparkContext

Spark UI

Version
v2.4.4
Master
spark://node-master:7077
AppName
Linear-Regression-California-Housing
In [8]:
sc = spark.sparkContext
sc
Out[8]:

SparkContext

Spark UI

Version
v2.4.4
Master
spark://node-master:7077
AppName
Linear-Regression-California-Housing
In [9]:
# sqlContext = SQLContext(spark.sparkContext)
# sqlContext
 

3. Load The Data From a File Into a Dataframe

In [10]:
HOUSING_DATA = '/dataset/cal_housing.data'
 

Specifying the schema when loading data into a DataFrame will give better performance than schema inference.

In [11]:
# define the schema, corresponding to a line in the csv data file.
schema = StructType([
    StructField("long", FloatType(), nullable=True),
    StructField("lat", FloatType(), nullable=True),
    StructField("medage", FloatType(), nullable=True),
    StructField("totrooms", FloatType(), nullable=True),
    StructField("totbdrms", FloatType(), nullable=True),
    StructField("pop", FloatType(), nullable=True),
    StructField("houshlds", FloatType(), nullable=True),
    StructField("medinc", FloatType(), nullable=True),
    StructField("medhv", FloatType(), nullable=True)]
)
In [12]:
# Load housing data
housing_df = spark.read.csv(path=HOUSING_DATA, schema=schema).cache()
In [13]:
# Inspect first five rows
housing_df.take(5)
Out[13]:
[Row(long=-122.2300033569336, lat=37.880001068115234, medage=41.0, totrooms=880.0, totbdrms=129.0, pop=322.0, houshlds=126.0, medinc=8.325200080871582, medhv=452600.0),
 Row(long=-122.22000122070312, lat=37.86000061035156, medage=21.0, totrooms=7099.0, totbdrms=1106.0, pop=2401.0, houshlds=1138.0, medinc=8.301400184631348, medhv=358500.0),
 Row(long=-122.23999786376953, lat=37.849998474121094, medage=52.0, totrooms=1467.0, totbdrms=190.0, pop=496.0, houshlds=177.0, medinc=7.257400035858154, medhv=352100.0),
 Row(long=-122.25, lat=37.849998474121094, medage=52.0, totrooms=1274.0, totbdrms=235.0, pop=558.0, houshlds=219.0, medinc=5.643099784851074, medhv=341300.0),
 Row(long=-122.25, lat=37.849998474121094, medage=52.0, totrooms=1627.0, totbdrms=280.0, pop=565.0, houshlds=259.0, medinc=3.8461999893188477, medhv=342200.0)]
In [14]:
# Show first five rows
housing_df.show(5)
 
+-------+-----+------+--------+--------+------+--------+------+--------+
|   long|  lat|medage|totrooms|totbdrms|   pop|houshlds|medinc|   medhv|
+-------+-----+------+--------+--------+------+--------+------+--------+
|-122.23|37.88|  41.0|   880.0|   129.0| 322.0|   126.0|8.3252|452600.0|
|-122.22|37.86|  21.0|  7099.0|  1106.0|2401.0|  1138.0|8.3014|358500.0|
|-122.24|37.85|  52.0|  1467.0|   190.0| 496.0|   177.0|7.2574|352100.0|
|-122.25|37.85|  52.0|  1274.0|   235.0| 558.0|   219.0|5.6431|341300.0|
|-122.25|37.85|  52.0|  1627.0|   280.0| 565.0|   259.0|3.8462|342200.0|
+-------+-----+------+--------+--------+------+--------+------+--------+
only showing top 5 rows

In [15]:
# show the dataframe columns
housing_df.columns
Out[15]:
['long',
 'lat',
 'medage',
 'totrooms',
 'totbdrms',
 'pop',
 'houshlds',
 'medinc',
 'medhv']
In [16]:
# show the schema of the dataframe
housing_df.printSchema()
 
root
 |-- long: float (nullable = true)
 |-- lat: float (nullable = true)
 |-- medage: float (nullable = true)
 |-- totrooms: float (nullable = true)
 |-- totbdrms: float (nullable = true)
 |-- pop: float (nullable = true)
 |-- houshlds: float (nullable = true)
 |-- medinc: float (nullable = true)
 |-- medhv: float (nullable = true)

 

4. Data Exploration

In [17]:
# run a sample selection
housing_df.select('pop','totbdrms').show(10)
 
+------+--------+
|   pop|totbdrms|
+------+--------+
| 322.0|   129.0|
|2401.0|  1106.0|
| 496.0|   190.0|
| 558.0|   235.0|
| 565.0|   280.0|
| 413.0|   213.0|
|1094.0|   489.0|
|1157.0|   687.0|
|1206.0|   665.0|
|1551.0|   707.0|
+------+--------+
only showing top 10 rows

 

4.1 Distribution of the median age of the people living in the area:

In [18]:
# group by housingmedianage and see the distribution
result_df = housing_df.groupBy("medage").count().sort("medage", ascending=False)
In [19]:
result_df.show(10)
 
+------+-----+
|medage|count|
+------+-----+
|  52.0| 1273|
|  51.0|   48|
|  50.0|  136|
|  49.0|  134|
|  48.0|  177|
|  47.0|  198|
|  46.0|  245|
|  45.0|  294|
|  44.0|  356|
|  43.0|  353|
+------+-----+
only showing top 10 rows

In [20]:
result_df.toPandas().plot.bar(x='medage',figsize=(14, 6))
Out[20]:
<matplotlib.axes._subplots.AxesSubplot at 0x7f294c8ce668>
 
 

Most of the residents are either in their youth or they settle here during their senior years. Some data are showing median age < 10 which seems to be out of place.

 

4.2 Summary Statistics:

Spark DataFrames include some built-in functions for statistical processing. The describe() function performs summary statistics calculations on all numeric columns and returns them as a DataFrame.

In [21]:
(housing_df.describe().select(
                    "summary",
                    F.round("medage", 4).alias("medage"),
                    F.round("totrooms", 4).alias("totrooms"),
                    F.round("totbdrms", 4).alias("totbdrms"),
                    F.round("pop", 4).alias("pop"),
                    F.round("houshlds", 4).alias("houshlds"),
                    F.round("medinc", 4).alias("medinc"),
                    F.round("medhv", 4).alias("medhv"))
                    .show())
 
+-------+-------+---------+--------+---------+--------+-------+-----------+
|summary| medage| totrooms|totbdrms|      pop|houshlds| medinc|      medhv|
+-------+-------+---------+--------+---------+--------+-------+-----------+
|  count|20640.0|  20640.0| 20640.0|  20640.0| 20640.0|20640.0|    20640.0|
|   mean|28.6395|2635.7631| 537.898|1425.4767|499.5397| 3.8707|206855.8169|
| stddev|12.5856|2181.6153|421.2479|1132.4621|382.3298| 1.8998|115395.6159|
|    min|    1.0|      2.0|     1.0|      3.0|     1.0| 0.4999|    14999.0|
|    max|   52.0|  39320.0|  6445.0|  35682.0|  6082.0|15.0001|   500001.0|
+-------+-------+---------+--------+---------+--------+-------+-----------+

 

Look at the minimum and maximum values of all the (numerical) attributes. We see that multiple attributes have a wide range of values: we will need to normalize your dataset.

 

5. Data Preprocessing

With all this information that we gathered from our small exploratory data analysis, we know enough to preprocess our data to feed it to the model.

  • we shouldn't care about missing values; all zero values have been excluded from the data set.
  • We should probably standardize our data, as we have seen that the range of minimum and maximum values is quite big.
  • There are possibly some additional attributes that we could add, such as a feature that registers the number of bedrooms per room or the rooms per household.
  • Our dependent variable is also quite big; To make our life easier, we'll have to adjust the values slightly.
 

5.1 Preprocessing The Target Values

First, let's start with the medianHouseValue, our dependent variable. To facilitate our working with the target values, we will express the house values in units of 100,000. That means that a target such as 452600.000000 should become 4.526:

In [22]:
# Adjust the values of `medianHouseValue`
housing_df = housing_df.withColumn("medhv", col("medhv")/100000)
In [23]:
# Show the first 2 lines of `df`
housing_df.show(2)
 
+-------+-----+------+--------+--------+------+--------+------+-----+
|   long|  lat|medage|totrooms|totbdrms|   pop|houshlds|medinc|medhv|
+-------+-----+------+--------+--------+------+--------+------+-----+
|-122.23|37.88|  41.0|   880.0|   129.0| 322.0|   126.0|8.3252|4.526|
|-122.22|37.86|  21.0|  7099.0|  1106.0|2401.0|  1138.0|8.3014|3.585|
+-------+-----+------+--------+--------+------+--------+------+-----+
only showing top 2 rows

 

We can clearly see that the values have been adjusted correctly when we look at the result of the show() method:

 

6. Feature Engineering

Now that we have adjusted the values in medianHouseValue, we will now add the following columns to the data set:

  • Rooms per household which refers to the number of rooms in households per block group;
  • Population per household, which basically gives us an indication of how many people live in households per block group; And
  • Bedrooms per room which will give us an idea about how many rooms are bedrooms per block group;

As we're working with DataFrames, we can best use the select() method to select the columns that we're going to be working with, namely totalRooms, households, and population. Additionally, we have to indicate that we're working with columns by adding the col() function to our code. Otherwise, we won't be able to do element-wise operations like the division that we have in mind for these three variables:

In [24]:
housing_df.columns
Out[24]:
['long',
 'lat',
 'medage',
 'totrooms',
 'totbdrms',
 'pop',
 'houshlds',
 'medinc',
 'medhv']
In [25]:
# Add the new columns to `df`
housing_df = (housing_df.withColumn("rmsperhh", F.round(col("totrooms")/col("houshlds"), 2))
                       .withColumn("popperhh", F.round(col("pop")/col("houshlds"), 2))
                       .withColumn("bdrmsperrm", F.round(col("totbdrms")/col("totrooms"), 2)))
In [26]:
# Inspect the result
housing_df.show(5)
 
+-------+-----+------+--------+--------+------+--------+------+-----+--------+--------+----------+
|   long|  lat|medage|totrooms|totbdrms|   pop|houshlds|medinc|medhv|rmsperhh|popperhh|bdrmsperrm|
+-------+-----+------+--------+--------+------+--------+------+-----+--------+--------+----------+
|-122.23|37.88|  41.0|   880.0|   129.0| 322.0|   126.0|8.3252|4.526|    6.98|    2.56|      0.15|
|-122.22|37.86|  21.0|  7099.0|  1106.0|2401.0|  1138.0|8.3014|3.585|    6.24|    2.11|      0.16|
|-122.24|37.85|  52.0|  1467.0|   190.0| 496.0|   177.0|7.2574|3.521|    8.29|     2.8|      0.13|
|-122.25|37.85|  52.0|  1274.0|   235.0| 558.0|   219.0|5.6431|3.413|    5.82|    2.55|      0.18|
|-122.25|37.85|  52.0|  1627.0|   280.0| 565.0|   259.0|3.8462|3.422|    6.28|    2.18|      0.17|
+-------+-----+------+--------+--------+------+--------+------+-----+--------+--------+----------+
only showing top 5 rows

 

We can see that, for the first row, there are about 6.98 rooms per household, the households in the block group consist of about 2.5 people and the amount of bedrooms is quite low with 0.14:

 

Since we don't want to necessarily standardize our target values, we'll want to make sure to isolate those in our data set. Note also that this is the time to leave out variables that we might not want to consider in our analysis. In this case, let's leave out variables such as longitude, latitude, housingMedianAge and totalRooms.

In this case, we will use the select() method and passing the column names in the order that is more appropriate. In this case, the target variable medianHouseValue is put first, so that it won't be affected by the standardization.

In [27]:
# Re-order and select columns
housing_df = housing_df.select("medhv", 
                              "totbdrms", 
                              "pop", 
                              "houshlds", 
                              "medinc", 
                              "rmsperhh", 
                              "popperhh", 
                              "bdrmsperrm")
 

6.1 Feature Extraction

Now that we have re-ordered the data, we're ready to normalize the data. We will choose the features to be normalized.

In [28]:
featureCols = ["totbdrms", "pop", "houshlds", "medinc", "rmsperhh", "popperhh", "bdrmsperrm"]
 

Use a VectorAssembler to put features into a feature vector column:

In [29]:
# put features into a feature vector column
assembler = VectorAssembler(inputCols=featureCols, outputCol="features") 
In [30]:
assembled_df = assembler.transform(housing_df)
In [31]:
assembled_df.show(10, truncate=False)
 
+-----+--------+------+--------+------+--------+--------+----------+-------------------------------------------------------+
|medhv|totbdrms|pop   |houshlds|medinc|rmsperhh|popperhh|bdrmsperrm|features                                               |
+-----+--------+------+--------+------+--------+--------+----------+-------------------------------------------------------+
|4.526|129.0   |322.0 |126.0   |8.3252|6.98    |2.56    |0.15      |[129.0,322.0,126.0,8.325200080871582,6.98,2.56,0.15]   |
|3.585|1106.0  |2401.0|1138.0  |8.3014|6.24    |2.11    |0.16      |[1106.0,2401.0,1138.0,8.301400184631348,6.24,2.11,0.16]|
|3.521|190.0   |496.0 |177.0   |7.2574|8.29    |2.8     |0.13      |[190.0,496.0,177.0,7.257400035858154,8.29,2.8,0.13]    |
|3.413|235.0   |558.0 |219.0   |5.6431|5.82    |2.55    |0.18      |[235.0,558.0,219.0,5.643099784851074,5.82,2.55,0.18]   |
|3.422|280.0   |565.0 |259.0   |3.8462|6.28    |2.18    |0.17      |[280.0,565.0,259.0,3.8461999893188477,6.28,2.18,0.17]  |
|2.697|213.0   |413.0 |193.0   |4.0368|4.76    |2.14    |0.23      |[213.0,413.0,193.0,4.036799907684326,4.76,2.14,0.23]   |
|2.992|489.0   |1094.0|514.0   |3.6591|4.93    |2.13    |0.19      |[489.0,1094.0,514.0,3.65910005569458,4.93,2.13,0.19]   |
|2.414|687.0   |1157.0|647.0   |3.12  |4.8     |1.79    |0.22      |[687.0,1157.0,647.0,3.119999885559082,4.8,1.79,0.22]   |
|2.267|665.0   |1206.0|595.0   |2.0804|4.29    |2.03    |0.26      |[665.0,1206.0,595.0,2.080399990081787,4.29,2.03,0.26]  |
|2.611|707.0   |1551.0|714.0   |3.6912|4.97    |2.17    |0.2       |[707.0,1551.0,714.0,3.691200017929077,4.97,2.17,0.2]   |
+-----+--------+------+--------+------+--------+--------+----------+-------------------------------------------------------+
only showing top 10 rows

 

All the features have transformed into a Dense Vector.

 

6.2 Standardization

Next, we can finally scale the data using StandardScaler. The input columns are the features, and the output column with the rescaled that will be included in the scaled_df will be named "features_scaled":

In [32]:
# Initialize the `standardScaler`
standardScaler = StandardScaler(inputCol="features", outputCol="features_scaled")
In [33]:
# Fit the DataFrame to the scaler
scaled_df = standardScaler.fit(assembled_df).transform(assembled_df)
In [34]:
# Inspect the result
scaled_df.select("features", "features_scaled").show(10, truncate=False)
 
+-------------------------------------------------------+--------------------------------------------------------------------------------------------------------------------------------------+
|features                                               |features_scaled                                                                                                                       |
+-------------------------------------------------------+--------------------------------------------------------------------------------------------------------------------------------------+
|[129.0,322.0,126.0,8.325200080871582,6.98,2.56,0.15]   |[0.30623297630686513,0.2843362208866199,0.3295584480852433,4.38209543579743,2.8211223886115664,0.24648542140099877,2.5828740130262697]|
|[1106.0,2401.0,1138.0,8.301400184631348,6.24,2.11,0.16]|[2.6255323394991694,2.1201592122632746,2.9764882057222772,4.36956799913841,2.522034914747303,0.20315790592035446,2.755065613894688]   |
|[190.0,496.0,177.0,7.257400035858154,8.29,2.8,0.13]    |[0.451040817816313,0.4379837439744208,0.4629511532626037,3.820042673324032,3.3505880518037077,0.2695934296573424,2.238490811289434]   |
|[235.0,558.0,219.0,5.643099784851074,5.82,2.55,0.18]   |[0.557866274667545,0.4927317119712234,0.5728039692910182,2.970331231769803,2.3522825647162344,0.2455225877236511,3.099448815631524]   |
|[280.0,565.0,259.0,3.8461999893188477,6.28,2.18,0.17]  |[0.664691731518777,0.4989129341644108,0.6774256988418891,2.024505748166202,2.538201805226452,0.20989774166178804,2.9272572147631064]  |
|[213.0,413.0,193.0,4.036799907684326,4.76,2.14,0.23]   |[0.5056404957624983,0.364692109398056,0.5047998450829521,2.124830908428931,1.9238599670187757,0.20604640695239743,3.960406819973614]  |
|[489.0,1094.0,514.0,3.65910005569458,4.93,2.13,0.19]   |[1.1608366311167213,0.9660367256210006,1.344389224728691,1.9260228580003875,1.9925692515551605,0.20508357327504975,3.271640416499942] |
|[687.0,1157.0,647.0,3.119999885559082,4.8,1.79,0.22]   |[1.6308686412621423,1.021667725359687,1.6922564754853369,1.6422593001231023,1.9400268574979251,0.1723472282452296,3.788215219105196]  |
|[665.0,1206.0,595.0,2.080399990081787,4.29,2.03,0.26]  |[1.5786428623570954,1.0649362807119989,1.5562482270692046,1.0950501144251168,1.7338990038887707,0.19545523650157323,4.476981622578868]|
|[707.0,1551.0,714.0,3.691200017929077,4.97,2.17,0.2]   |[1.678346622084912,1.3695822316619488,1.8674978724830456,1.9429191603871925,2.00873614203431,0.20893490798444037,3.44383201736836]    |
+-------------------------------------------------------+--------------------------------------------------------------------------------------------------------------------------------------+
only showing top 10 rows

 

7. Building A Machine Learning Model With Spark ML

With all the preprocessing done, it's finally time to start building our Linear Regression model! Just like always, we first need to split the data into training and test sets. Luckily, this is no issue with the randomSplit() method:

In [35]:
# Split the data into train and test sets
train_data, test_data = scaled_df.randomSplit([.8,.2], seed=rnd_seed)
 

We pass in a list with two numbers that represent the size that we want your training and test sets to have and a seed, which is needed for reproducibility reasons.

Note that the argument elasticNetParam corresponds to $\alpha$ or the vertical intercept and that the regParam or the regularization paramater corresponds to $\lambda$.

In [36]:
train_data.columns
Out[36]:
['medhv',
 'totbdrms',
 'pop',
 'houshlds',
 'medinc',
 'rmsperhh',
 'popperhh',
 'bdrmsperrm',
 'features',
 'features_scaled']
 

Create an ElasticNet model:

ElasticNet is a linear regression model trained with L1 and L2 prior as regularizer. This combination allows for learning a sparse model where few of the weights are non-zero like Lasso, while still maintaining the regularization properties of Ridge. We control the convex combination of L1 and L2 using the l1_ratio parameter.

Elastic-net is useful when there are multiple features which are correlated with one another. Lasso is likely to pick one of these at random, while elastic-net is likely to pick both.

A practical advantage of trading-off between Lasso and Ridge is it allows Elastic-Net to inherit some of Ridge’s stability under rotation.

The objective function to minimize is in this case: \begin{align} min_w\frac{1}{2n_{samples}}{\parallel{X_w - y}\parallel}^2_2 + \alpha\lambda{\parallel{X_w - y}\parallel}_1 + \frac{\alpha(1-\lambda)}{2}{\parallel{w}\parallel}^2_2 \end{align}

http://scikit-learn.org/stable/modules/linear_model.html#elastic-net

In [37]:
# Initialize `lr`
lr = (LinearRegression(featuresCol='features_scaled', labelCol="medhv", predictionCol='predmedhv', 
                               maxIter=10, regParam=0.3, elasticNetParam=0.8, standardization=False))
In [38]:
# Fit the data to the model
linearModel = lr.fit(train_data)
 

8. Evaluating the Model

With our model in place, we can generate predictions for our test data: use the transform() method to predict the labels for our test_data. Then, we can use RDD operations to extract the predictions as well as the true labels from the DataFrame.

 

8.1 Inspect the Model Co-efficients

In [39]:
# Coefficients for the model
linearModel.coefficients
Out[39]:
DenseVector([0.0, 0.0, 0.0, 0.526, 0.0, 0.0, 0.0])
In [40]:
featureCols
Out[40]:
['totbdrms', 'pop', 'houshlds', 'medinc', 'rmsperhh', 'popperhh', 'bdrmsperrm']
In [41]:
# Intercept for the model
linearModel.intercept
Out[41]:
0.989875772139301
In [42]:
coeff_df = pd.DataFrame({"Feature": ["Intercept"] + featureCols, "Co-efficients": np.insert(linearModel.coefficients.toArray(), 0, linearModel.intercept)})
coeff_df = coeff_df[["Feature", "Co-efficients"]]
In [43]:
coeff_df
Out[43]:
 
  Feature Co-efficients
0 Intercept 0.989876
1 totbdrms 0.000000
2 pop 0.000000
3 houshlds 0.000000
4 medinc 0.526024
5 rmsperhh 0.000000
6 popperhh 0.000000
7 bdrmsperrm 0.000000
 

8.2 Generating Predictions

In [44]:
# Generate predictions
predictions = linearModel.transform(test_data)
In [45]:
# Extract the predictions and the "known" correct labels
predandlabels = predictions.select("predmedhv", "medhv")
In [46]:
predandlabels.show()
 
+------------------+-----+
|         predmedhv|medhv|
+------------------+-----+
|1.5977678077735522|0.269|
|1.3402962575651638|0.275|
|1.7478926681617617|0.283|
|1.5026315463850333|0.325|
|1.5840068859455108|0.344|
|1.4744173855604754|0.379|
|1.5274954532293994|0.388|
|1.3578228236744827|0.394|
|1.6929041021688493|  0.4|
| 2.010874171848204|  0.4|
|1.3656308740705367| 0.41|
|1.4496919091430263|0.421|
| 1.380970081002033|0.425|
|1.3394379493101451| 0.43|
| 1.722973408950696|0.435|
|1.5529131147882111|0.439|
| 1.323489602290725| 0.44|
|1.4030651812673915|0.444|
|1.5111871672959283|0.446|
|1.5996783060975408| 0.45|
+------------------+-----+
only showing top 20 rows

 

8.3 Inspect the Metrics

Looking at predicted values is one thing, but another and better thing is looking at some metrics to get a better idea of how good your model actually is.

Using the LinearRegressionModel.summary attribute:

Next, we can also use the summary attribute to pull up the rootMeanSquaredError and the r2.

In [47]:
# Get the RMSE
print("RMSE: {0}".format(linearModel.summary.rootMeanSquaredError))
 
RMSE: 0.8729980899366503
In [48]:
print("MAE: {0}".format(linearModel.summary.meanAbsoluteError))
 
MAE: 0.6714989215155925
In [49]:
# Get the R2
print("R2: {0}".format(linearModel.summary.r2))
 
R2: 0.42213332730120356
 
  • The RMSE measures how much error there is between two datasets comparing a predicted value and an observed or known value. The smaller an RMSE value, the closer predicted and observed values are.

  • The R2 ("R squared") or the coefficient of determination is a measure that shows how close the data are to the fitted regression line. This score will always be between 0 and a 100% (or 0 to 1 in this case), where 0% indicates that the model explains none of the variability of the response data around its mean, and 100% indicates the opposite: it explains all the variability. That means that, in general, the higher the R-squared, the better the model fits our data.

 

Using the RegressionEvaluator from pyspark.ml package:

In [50]:
evaluator = RegressionEvaluator(predictionCol="predmedhv", labelCol='medhv', metricName='rmse')
print("RMSE: {0}".format(evaluator.evaluate(predandlabels)))
 
RMSE: 0.9033627063798556
In [51]:
evaluator = RegressionEvaluator(predictionCol="predmedhv", labelCol='medhv', metricName='mae')
print("MAE: {0}".format(evaluator.evaluate(predandlabels)))
 
MAE: 0.6888437385796472
In [52]:
evaluator = RegressionEvaluator(predictionCol="predmedhv", labelCol='medhv', metricName='r2')
print("R2: {0}".format(evaluator.evaluate(predandlabels)))
 
R2: 0.40877519027090536
 

Using the RegressionMetrics from pyspark.mllib package:

In [53]:
# mllib is old so the methods are available in rdd
metrics = RegressionMetrics(predandlabels.rdd)
In [54]:
print("RMSE: {0}".format(metrics.rootMeanSquaredError))
 
RMSE: 0.9033627063798556
In [55]:
print("MAE: {0}".format(metrics.meanAbsoluteError))
 
MAE: 0.6888437385796472
In [56]:
print("R2: {0}".format(metrics.r2))
 
R2: 0.40877519027090536
 

There's definitely some improvements needed to our model! If we want to continue with this model, we can play around with the parameters that we passed to your model, the variables that we included in your original DataFrame.

In [57]:
spark.stop()
相關文章
相關標籤/搜索