[ML] LIBSVM Data: Classification, Regression, and Multi-label

數據庫下載:LIBSVM Data: Classification, Regression, and Multi-labeljavascript

 

1、機器學習模型的參數

模型所需的參數格式,有些爲:LabeledPoint。css

    # $example on$
    def parse(lp):
        label = float(lp[lp.find('(') + 1: lp.find(',')])
        vec = Vectors.dense(lp[lp.find('[') + 1: lp.find(']')].split(','))
        return LabeledPoint(label, vec)

    trainingData = ssc.textFileStream(sys.argv[1]).map(parse).cache()
    testData     = ssc.textFileStream(sys.argv[2]).map(parse)

 

官方示例:https://spark.apache.org/docs/2.4.4/mllib-data-types.html#data-types-rdd-based-apihtml

(a) 手動設置html5

from pyspark.mllib.linalg import SparseVector
from pyspark.mllib.regression import LabeledPoint

# Create a labeled point with a positive label and a dense feature vector.
pos = LabeledPoint(1.0, [1.0, 0.0, 3.0])

# Create a labeled point with a negative label and a sparse feature vector.
neg = LabeledPoint(0.0, SparseVector(3, [0, 2], [1.0, 3.0]))

 

(b) 讀取設置java

from pyspark.mllib.util import MLUtils

examples = MLUtils.loadLibSVMFile(sc, "data/mllib/sample_libsvm_data.txt")

 

 

2、二分類訓練

單機sklearn

採用傳統單機sklearn, 設置iteration=10000的話,須要跑的時間太長了不想等,只能被迫人工中斷了。node

from joblib import Memory
from sklearn.datasets import load_svmlight_file
import time
mem = Memory("./mycache")

@mem.cache
def get_data():
    data = load_svmlight_file("/home/hadoop/covtype.libsvm.binary")
    return data[0], data[1]

X, y = get_data()


# Build the model
time_start=time.time()

from sklearn.linear_model import LogisticRegression
clf = LogisticRegression(random_state=0, solver='lbfgs', max_iter=200).fit(X, y)

time_end=time.time()
print('totally cost {} sec'.format(time_end-time_start))
print(clf.score(X, y))

 

集羣Spark.ml

節省訓練時間:1-192/325 = 1-59% = 41%python

Figure 01, cluster 三節點的訓練耗時jquery

 

 

Init

In [1]:
from pyspark import SparkConf, SparkContext
from pyspark.sql import SparkSession
from pyspark.streaming import StreamingContext

import datetime
import time
In [2]:
def fnGetAppName():

    currentSecond=datetime.datetime.now().second
    currentMinute=datetime.datetime.now().minute
    currentHour=datetime.datetime.now().hour

    currentDay=datetime.datetime.now().day
    currentMonth=datetime.datetime.now().month
    currentYear=datetime.datetime.now().year
    
    return "{}-{}-{}_{}-{}-{}".format(currentYear, currentMonth, currentDay, currentHour, currentMinute, currentSecond)
In [3]:
def fn_timer(a_func):

    def wrapTheFunction():
        time_start=time.time()
        
        a_func()
        
        time_end=time.time()
        print('totally cost {} sec'.format(time_end-time_start))
 
    return wrapTheFunction
In [4]:
appName = fnGetAppName()
print("appName: {}".format(appName))

# conf = SparkConf().setMaster("spark://node-master:7077").setAppName(appName)
conf = SparkConf().setMaster("local").setAppName(appName)
 
appName: 2019-11-13_13-1-31
 

Spark Context

In [5]:
sc = SparkContext(conf = conf)
 

Spark Session

In [6]:
spark = SparkSession.builder.config(conf=SparkConf()).getOrCreate()
 

Spark Stream

In [7]:
ssc = StreamingContext(sc, 1)
 

Let's Go!

 

Load data

In [8]:
from __future__ import print_function
import sys
from pyspark.mllib.linalg import Vectors
from pyspark.mllib.regression import LabeledPoint
from pyspark.mllib.classification import LogisticRegressionWithLBFGS, LogisticRegressionModel
from pyspark.mllib.util import MLUtils


##################################################
# Load and parse the data
##################################################
def parsePoint(line):
    values = [float(x) for x in line.split(' ')]
    return LabeledPoint(values[0], values[1:])


##################################################
# Change the label to [0,1]
##################################################
def sparsePoint(lp):
    new_label = 0;
    if lp.label == 1.0:
        new_label=1.0
    else:
        new_label=0.0

    return LabeledPoint(new_label, features=lp.features)
 

Load train data.

In [9]:
# (1) small dense data
# data_train = sc.textFile("/test/sample_svm_data.txt")
# parsedData_train = data_train.map(parsePoint)


# (2) large sparse data
# data_train = MLUtils.loadLibSVMFile(sc, "/dataset/a9a.txt")
data_train = MLUtils.loadLibSVMFile(sc, "/dataset/covtype.libsvm.binary")

data_train.take(10)
Out[9]:
[LabeledPoint(1.0, (54,[0,1,2,3,5,6,7,8,9,10,42],[2596.0,51.0,3.0,258.0,510.0,221.0,232.0,148.0,6279.0,1.0,1.0])),
 LabeledPoint(1.0, (54,[0,1,2,3,4,5,6,7,8,9,10,42],[2590.0,56.0,2.0,212.0,-6.0,390.0,220.0,235.0,151.0,6225.0,1.0,1.0])),
 LabeledPoint(2.0, (54,[0,1,2,3,4,5,6,7,8,9,10,25],[2804.0,139.0,9.0,268.0,65.0,3180.0,234.0,238.0,135.0,6121.0,1.0,1.0])),
 LabeledPoint(2.0, (54,[0,1,2,3,4,5,6,7,8,9,10,43],[2785.0,155.0,18.0,242.0,118.0,3090.0,238.0,238.0,122.0,6211.0,1.0,1.0])),
 LabeledPoint(1.0, (54,[0,1,2,3,4,5,6,7,8,9,10,42],[2595.0,45.0,2.0,153.0,-1.0,391.0,220.0,234.0,150.0,6172.0,1.0,1.0])),
 LabeledPoint(2.0, (54,[0,1,2,3,4,5,6,7,8,9,10,42],[2579.0,132.0,6.0,300.0,-15.0,67.0,230.0,237.0,140.0,6031.0,1.0,1.0])),
 LabeledPoint(1.0, (54,[0,1,2,3,4,5,6,7,8,9,10,42],[2606.0,45.0,7.0,270.0,5.0,633.0,222.0,225.0,138.0,6256.0,1.0,1.0])),
 LabeledPoint(1.0, (54,[0,1,2,3,4,5,6,7,8,9,10,42],[2605.0,49.0,4.0,234.0,7.0,573.0,222.0,230.0,144.0,6228.0,1.0,1.0])),
 LabeledPoint(1.0, (54,[0,1,2,3,4,5,6,7,8,9,10,42],[2617.0,45.0,9.0,240.0,56.0,666.0,223.0,221.0,133.0,6244.0,1.0,1.0])),
 LabeledPoint(1.0, (54,[0,1,2,3,4,5,6,7,8,9,10,42],[2612.0,59.0,10.0,247.0,11.0,636.0,228.0,219.0,124.0,6230.0,1.0,1.0]))]
In [10]:
parsedData_train = data_train.map(sparsePoint)

print(parsedData_train.count())
parsedData_train.take(10)
 
581012
Out[10]:
[LabeledPoint(1.0, (54,[0,1,2,3,5,6,7,8,9,10,42],[2596.0,51.0,3.0,258.0,510.0,221.0,232.0,148.0,6279.0,1.0,1.0])),
 LabeledPoint(1.0, (54,[0,1,2,3,4,5,6,7,8,9,10,42],[2590.0,56.0,2.0,212.0,-6.0,390.0,220.0,235.0,151.0,6225.0,1.0,1.0])),
 LabeledPoint(0.0, (54,[0,1,2,3,4,5,6,7,8,9,10,25],[2804.0,139.0,9.0,268.0,65.0,3180.0,234.0,238.0,135.0,6121.0,1.0,1.0])),
 LabeledPoint(0.0, (54,[0,1,2,3,4,5,6,7,8,9,10,43],[2785.0,155.0,18.0,242.0,118.0,3090.0,238.0,238.0,122.0,6211.0,1.0,1.0])),
 LabeledPoint(1.0, (54,[0,1,2,3,4,5,6,7,8,9,10,42],[2595.0,45.0,2.0,153.0,-1.0,391.0,220.0,234.0,150.0,6172.0,1.0,1.0])),
 LabeledPoint(0.0, (54,[0,1,2,3,4,5,6,7,8,9,10,42],[2579.0,132.0,6.0,300.0,-15.0,67.0,230.0,237.0,140.0,6031.0,1.0,1.0])),
 LabeledPoint(1.0, (54,[0,1,2,3,4,5,6,7,8,9,10,42],[2606.0,45.0,7.0,270.0,5.0,633.0,222.0,225.0,138.0,6256.0,1.0,1.0])),
 LabeledPoint(1.0, (54,[0,1,2,3,4,5,6,7,8,9,10,42],[2605.0,49.0,4.0,234.0,7.0,573.0,222.0,230.0,144.0,6228.0,1.0,1.0])),
 LabeledPoint(1.0, (54,[0,1,2,3,4,5,6,7,8,9,10,42],[2617.0,45.0,9.0,240.0,56.0,666.0,223.0,221.0,133.0,6244.0,1.0,1.0])),
 LabeledPoint(1.0, (54,[0,1,2,3,4,5,6,7,8,9,10,42],[2612.0,59.0,10.0,247.0,11.0,636.0,228.0,219.0,124.0,6230.0,1.0,1.0]))]
 

Load test data.

In [11]:
# (1) small dense data
# data = sc.textFile("/test/sample_svm_data.txt")
# parsedData_test = data.map(parsePoint)


# (2) large sparse data
# data_test = MLUtils.loadLibSVMFile(sc, "/dataset/a9a.t")
data_test = MLUtils.loadLibSVMFile(sc, "/dataset/covtype.libsvm.binary")

data_test.take(10)
Out[11]:
[LabeledPoint(1.0, (54,[0,1,2,3,5,6,7,8,9,10,42],[2596.0,51.0,3.0,258.0,510.0,221.0,232.0,148.0,6279.0,1.0,1.0])),
 LabeledPoint(1.0, (54,[0,1,2,3,4,5,6,7,8,9,10,42],[2590.0,56.0,2.0,212.0,-6.0,390.0,220.0,235.0,151.0,6225.0,1.0,1.0])),
 LabeledPoint(2.0, (54,[0,1,2,3,4,5,6,7,8,9,10,25],[2804.0,139.0,9.0,268.0,65.0,3180.0,234.0,238.0,135.0,6121.0,1.0,1.0])),
 LabeledPoint(2.0, (54,[0,1,2,3,4,5,6,7,8,9,10,43],[2785.0,155.0,18.0,242.0,118.0,3090.0,238.0,238.0,122.0,6211.0,1.0,1.0])),
 LabeledPoint(1.0, (54,[0,1,2,3,4,5,6,7,8,9,10,42],[2595.0,45.0,2.0,153.0,-1.0,391.0,220.0,234.0,150.0,6172.0,1.0,1.0])),
 LabeledPoint(2.0, (54,[0,1,2,3,4,5,6,7,8,9,10,42],[2579.0,132.0,6.0,300.0,-15.0,67.0,230.0,237.0,140.0,6031.0,1.0,1.0])),
 LabeledPoint(1.0, (54,[0,1,2,3,4,5,6,7,8,9,10,42],[2606.0,45.0,7.0,270.0,5.0,633.0,222.0,225.0,138.0,6256.0,1.0,1.0])),
 LabeledPoint(1.0, (54,[0,1,2,3,4,5,6,7,8,9,10,42],[2605.0,49.0,4.0,234.0,7.0,573.0,222.0,230.0,144.0,6228.0,1.0,1.0])),
 LabeledPoint(1.0, (54,[0,1,2,3,4,5,6,7,8,9,10,42],[2617.0,45.0,9.0,240.0,56.0,666.0,223.0,221.0,133.0,6244.0,1.0,1.0])),
 LabeledPoint(1.0, (54,[0,1,2,3,4,5,6,7,8,9,10,42],[2612.0,59.0,10.0,247.0,11.0,636.0,228.0,219.0,124.0,6230.0,1.0,1.0]))]
In [12]:
parsedData_test = data_test.map(sparsePoint)

print(parsedData_test.count())
parsedData_test.take(10)
 
581012
Out[12]:
[LabeledPoint(1.0, (54,[0,1,2,3,5,6,7,8,9,10,42],[2596.0,51.0,3.0,258.0,510.0,221.0,232.0,148.0,6279.0,1.0,1.0])),
 LabeledPoint(1.0, (54,[0,1,2,3,4,5,6,7,8,9,10,42],[2590.0,56.0,2.0,212.0,-6.0,390.0,220.0,235.0,151.0,6225.0,1.0,1.0])),
 LabeledPoint(0.0, (54,[0,1,2,3,4,5,6,7,8,9,10,25],[2804.0,139.0,9.0,268.0,65.0,3180.0,234.0,238.0,135.0,6121.0,1.0,1.0])),
 LabeledPoint(0.0, (54,[0,1,2,3,4,5,6,7,8,9,10,43],[2785.0,155.0,18.0,242.0,118.0,3090.0,238.0,238.0,122.0,6211.0,1.0,1.0])),
 LabeledPoint(1.0, (54,[0,1,2,3,4,5,6,7,8,9,10,42],[2595.0,45.0,2.0,153.0,-1.0,391.0,220.0,234.0,150.0,6172.0,1.0,1.0])),
 LabeledPoint(0.0, (54,[0,1,2,3,4,5,6,7,8,9,10,42],[2579.0,132.0,6.0,300.0,-15.0,67.0,230.0,237.0,140.0,6031.0,1.0,1.0])),
 LabeledPoint(1.0, (54,[0,1,2,3,4,5,6,7,8,9,10,42],[2606.0,45.0,7.0,270.0,5.0,633.0,222.0,225.0,138.0,6256.0,1.0,1.0])),
 LabeledPoint(1.0, (54,[0,1,2,3,4,5,6,7,8,9,10,42],[2605.0,49.0,4.0,234.0,7.0,573.0,222.0,230.0,144.0,6228.0,1.0,1.0])),
 LabeledPoint(1.0, (54,[0,1,2,3,4,5,6,7,8,9,10,42],[2617.0,45.0,9.0,240.0,56.0,666.0,223.0,221.0,133.0,6244.0,1.0,1.0])),
 LabeledPoint(1.0, (54,[0,1,2,3,4,5,6,7,8,9,10,42],[2612.0,59.0,10.0,247.0,11.0,636.0,228.0,219.0,124.0,6230.0,1.0,1.0]))]
 

Train data

In [16]:
# Build the model
time_start=time.time()

model = LogisticRegressionWithLBFGS.train(parsedData_train)

time_end=time.time()
print('totally cost {} sec'.format(time_end-time_start))
 
totally cost 325.9337613582611 sec
 

Predict data

In [17]:
# Evaluating the model on training data
labelsAndPreds = parsedData_train.map(lambda p: (p.label, model.predict(p.features)))
trainErr = labelsAndPreds.filter(lambda lp: lp[0] != lp[1]).count() / float(parsedData_train.count())
print("Training Error = " + str(trainErr))
 
Training Error = 0.24482798978334355
In [18]:
# Evaluating the model on training data
labelsAndPreds = parsedData_test.map(lambda p: (p.label, model.predict(p.features)))
trainErr = labelsAndPreds.filter(lambda lp: lp[0] != lp[1]).count() / float(parsedData_test.count())
print("Prediction Error = " + str(trainErr))
 
Prediction Error = 0.24482798978334355

End.linux

相關文章
相關標籤/搜索