[ML] Load and preview large scale data

Ref: [Feature] Preprocessing tutorialhtml

主要是 「無量綱化」 以前的部分。python

 

 

  

加載數據

1、大數據源

http://archive.ics.uci.edu/ml/
http://aws.amazon.com/publicdatasets/
http://www.kaggle.com/
http://www.kdnuggets.com/datasets/index.htmlmysql

 

 

2、初步查看

瞭解需求

Swipejobs is all about matching Jobs to Workers. Your challenge is to analyse the data provided and answer the questions below.

You can access the data by opening the following S3 bucket:

/* somewhere */

Please note that Worker (worker parquet files) has one or more job tickets (jobticket parquet files) associated with it.

Using these parquet files:

求相關性
1. Is there a co-relation between jobticket.jobTicketState, jobticket.clickedCalloff and jobticket.assignedBySwipeJobs values across workers.

預測
2. Looking at Worker.profileLastUpdatedDate values, calculate an estimation for workers who will update their profile in the next two weeks.

requirement
Requirement

 

粗看數據

head -5 <file>
less <file>

  

 

3、數據讀取

python讀取txt文件

沒有格式,就要split出格式,仍是建議以後轉到df格式,操做方便些。git

PATH = "/home/ubuntu/work/rajdeepd-spark-ml/spark-ml/data"
user_data = sc.textFile("%s/ml-100k/u.user" % PATH)

user_fields = user_data.map(lambda line: line.split("|"))
print(user_fields)
user_fields.take(5)

PythonRDD[
29] at RDD at PythonRDD.scala:53 Out[19]: [['1', '24', 'M', 'technician', '85711'], ['2', '53', 'F', 'other', '94043'], ['3', '23', 'M', 'writer', '32067'], ['4', '24', 'M', 'technician', '43537'], ['5', '33', 'F', 'other', '15213']]

 

python讀取parquet文件

Spark SQL仍是做爲首選工具,參見:[Spark] 03 - Spark SQLgithub

Ref: 讀寫parquet格式文件的幾種方式sql

本文將介紹經常使用parquet文件讀寫的幾種方式數據庫

1. 用spark的hadoopFile api 讀取 hive中的  parquet格式。
2. 用  sparkSql 讀寫hive中的 parquet
3. 用新舊MapReduce讀寫 parquet格式文件。
4. 用  SparkSql 讀寫s3中的 parquet

Ref: How to read parquet data from S3 to spark dataframe Python?ubuntu

spark = SparkSession.builder
                        .master("local")             
                        .appName("app name")             
                        .config("spark.some.config.option", true).getOrCreate()

df = spark.read.parquet("s3://path/to/parquet/file.parquet")

 

python讀取csv文件

# define the schema, corresponding to a line in the csv data file.
schema = StructType([
    StructField("long", FloatType(), nullable=True),
    StructField("lat", FloatType(), nullable=True),
    StructField("medage", FloatType(), nullable=True),
    StructField("totrooms", FloatType(), nullable=True),
    StructField("totbdrms", FloatType(), nullable=True),
    StructField("pop", FloatType(), nullable=True),
    StructField("houshlds", FloatType(), nullable=True),
    StructField("medinc", FloatType(), nullable=True),
    StructField("medhv", FloatType(), nullable=True)]
)
schema
# 參數中包含了column的定義
housing_df = spark.read.csv(path=HOUSING_DATA, schema=schema).cache()
# User-friendly的表格顯示 housing_df.show(
5)
# 包括了列的性質 housing_df.printSchema()

 

 

4、數據庫到HBase

MySQL (binlog) --> Maxwell --> Kafka --> HBase --> Parquet.

拋出問題

咱們的業務數據存放在 mysql中,可是在大數據系統中咱們 須要拿hbase的數據進行業務處理,好比推薦系統。
那麼怎麼能夠將mysql中的業務數據實時同步到hbase中,從而能夠在大數據系統中進行實時流式計算。
 

對應方案

(1) MySQL到HBaseapp

本次推薦工具 maxwell,方式是:咱們打開mysql的binlog模式,數據以row爲單位輸出,而後經過maxwell發送給kafka,再由另外一個程序將kafka數據錄入到hbase中。
 

(2) HBase到Parquet

Ref: How to move HBase tables to HDFS in Parquet format?

Ref: spark 讀 hbase parquet 哪一個快

Spark讀hbase,生成task受所查詢table的region個數限制,任務數有限,例如查詢的40G數據,10G一個region,極可能就4~6個region,初始的task數就只有4~6個左右,RDD後續能夠partition設置task數;spark讀parquet按默認的bolck個數生成task個數,例如128M一個bolck,差很少就是300多個task,初始載入狀況就比hbase快,並且直接載入parquet文件到spark的內存,而hbase還須要同regionserver交互把數據傳到spark的內存也是須要消耗時間的。

整體來講,讀parquet更快。

 

 

 

瞭解數據

—— RDD方式,以及正統的高階方法:[Spark] 03 - Spark SQL

 

1、初步清理數據

前期發現缺失數據、不合格的數據。

 

# 可用於檢查「空數據」、「不合格的數據」
def
convert_year(x): try: return int(x[-4:]) except: return 1900 # there is a 'bad' data point with a blank year, which we set to 1900 and will filter out later movie_fields = movie_data.map(lambda lines: lines.split("|")) years = movie_fields.map(lambda fields: fields[2]).map(lambda x: convert_year(x)) 

 

 

2、特徵內部類別數

num_genders     = user_fields.map(lambda fields: fields[2]).distinct().count()
num_occupations = user_fields.map(lambda fields: fields[3]).distinct().count()
num_zipcodes    = user_fields.map(lambda fields: fields[4]).distinct().count()

也就是下圖中慘素hist中的bins的原始值。

 

 

3、某個特徵可視化

是否符合正態分佈,可視化後甄別「異常值」。

數據若是有偏,能夠經過log轉換。

 

plt.hist 方法 

簡單地,使用hist直接獲得柱狀圖;若是數據量太大,能夠先抽樣,再顯示。

import matplotlib.pyplot as plt

ages = user_fields.map(lambda x: int(x[1])).collect()
plt.hist(ages, bins=30, color='gray', normed=True)
fig = matplotlib.pyplot.gcf()
fig.set_size_inches(8, 5)

 

* Pandas.plot 方法

顯示特徵列 「medage" 的直方圖。

result_df.toPandas().plot.bar(x='medage',figsize=(14, 6))

 

reduceByKey 方法

import numpy as np

count_by_occupation = user_fields.map(lambda fields: (fields[3], 1)).reduceByKey(lambda x, y: x + y).collect()
# count_by_occupation2 = user_fields.map(lambda fields: fields[3]).countByValue()


#######################################################
# 如下怎麼用了 np 這個處理小數據的東東。
#
######################################################
x_axis1 = np.array([c[0] for c in count_by_occupation]) 
y_axis1
= np.array([c[1] for c in count_by_occupation])

#
sort by y_axis1
x_axis = x_axis1[np.argsort(y_axis1)] y_axis = y_axis1[np.argsort(y_axis1)]

pos = np.arange(len(x_axis)) width = 1.0 ax = plt.axes() ax.set_xticks(pos + (width / 2)) ax.set_xticklabels(x_axis) plt.bar(pos, y_axis, width, color='lightblue') plt.xticks(rotation=30) fig = matplotlib.pyplot.gcf() fig.set_size_inches(16, 5)

 

 

4、特徵統計量

RDD 獲取一列

rating_data   = rating_data_raw.map(lambda line: line.split("\t"))
ratings       = rating_data.map(lambda fields: int(fields[2]))

max_rating    = ratings.reduce(lambda x, y: max(x, y))
min_rating    = ratings.reduce(lambda x, y: min(x, y))

mean_rating   = ratings.reduce(lambda x, y: x + y) / float(num_ratings)
median_rating = np.median(ratings.collect())

We can also use the stats function to get some similar information to the above.

ratings.stats()

Out[11]:
(count: 100000, mean: 3.52986, stdev: 1.12566797076, max: 5.0, min: 1.0) 

 

* Summary Statistics

(housing_df.describe().select(
                    "summary",
                    F.round("medage", 4).alias("medage"),
                    F.round("totrooms", 4).alias("totrooms"),
                    F.round("totbdrms", 4).alias("totbdrms"),
                    F.round("pop", 4).alias("pop"),
                    F.round("houshlds", 4).alias("houshlds"),
                    F.round("medinc", 4).alias("medinc"),
                    F.round("medhv", 4).alias("medhv"))
                    .show())

+-------+-------+---------+--------+---------+--------+-------+-----------+
|summary| medage| totrooms|totbdrms|      pop|houshlds| medinc|      medhv|
+-------+-------+---------+--------+---------+--------+-------+-----------+
|  count|20640.0|  20640.0| 20640.0|  20640.0| 20640.0|20640.0|    20640.0|
|   mean|28.6395|2635.7631| 537.898|1425.4767|499.5397| 3.8707|206855.8169|
| stddev|12.5856|2181.6153|421.2479|1132.4621|382.3298| 1.8998|115395.6159|
|    min|    1.0|      2.0|     1.0|      3.0|     1.0| 0.4999|    14999.0|
|    max|   52.0|  39320.0|  6445.0|  35682.0|  6082.0|15.0001|   500001.0|
+-------+-------+---------+--------+---------+--------+-------+-----------+

 

 

 

清洗數據

—— Spark SQL's DataFrame爲主力工具,參考: [Spark] 03 - Spark SQL

 

1、重複數據

Ref: https://github.com/drabastomek/learningPySpark/blob/master/Chapter04/LearningPySpark_Chapter04.ipynb

df能夠經過rdd轉變而來。

1. 找重複的行

print('Count of rows: {0}'.format(df.count()))
print('Count of distinct rows: {0}'.format(df.distinct().count()))  # 全部列的集合 print('Count of distinct ids: {0}'.format(df.select([c for c in df.columns if c != 'id']).distinct().count()))  # 自定義某些列的集合

 

2. 去除 "徹底相同的 row",包括 index

df = df.dropDuplicates()
df.show()

 

3. 去除 "相同的 row",不包括 index

df = df.dropDuplicates(subset=[c for c in df.columns if c != 'id'])
df.show()

 

 

2、缺失值

構造一個典型的 「問題數據表」。

df_miss = spark.createDataFrame([
        (1, 143.5, 5.6, 28,   'M',  100000),
        (2, 167.2, 5.4, 45,   'M',  None),
        (3, None , 5.2, None, None, None),
        (4, 144.5, 5.9, 33,   'M',  None),
        (5, 133.2, 5.7, 54,   'F',  None),
        (6, 124.1, 5.2, None, 'F',  None),
        (7, 129.2, 5.3, 42,   'M',  76000),
    ], ['id', 'weight', 'height', 'age', 'gender', 'income'])

  

(1) 哪些行有缺失值?

df_miss.rdd.map( lambda row: (row['id'], sum([c == None for c in row])) ).collect() 
 
[(1, 0), (2, 1), (3, 4), (4, 1), (5, 1), (6, 2), (7, 0)]
 

(2) 瞧瞧細節

df_miss.where('id == 3').show() 
 
+---+------+------+----+------+------+
| id|weight|height| age|gender|income|
+---+------+------+----+------+------+
|  3|  null|   5.2|null|  null|  null|
+---+------+------+----+------+------+

 

(3) 每列的缺失率如何?

df_miss.agg(*[ (1 - (fn.count(c) / fn.count('*'))).alias(c + '_missing') for c in df_miss.columns ]).show() 
 
+----------+------------------+--------------+------------------+------------------+------------------+
|id_missing|    weight_missing|height_missing|       age_missing|    gender_missing|    income_missing|
+----------+------------------+--------------+------------------+------------------+------------------+
|       0.0|0.1428571428571429|           0.0|0.2857142857142857|0.1428571428571429|0.7142857142857143|
+----------+------------------+--------------+------------------+------------------+------------------+

 

(4) 缺失太多的特徵,則「廢」

df_miss_no_income = df_miss.select([c for c in df_miss.columns if c != 'income']) df_miss_no_income.show() 
 
+---+------+------+----+------+
| id|weight|height| age|gender|
+---+------+------+----+------+
|  1| 143.5|   5.6|  28|     M|
|  2| 167.2|   5.4|  45|     M|
|  3|  null|   5.2|null|  null|
|  4| 144.5|   5.9|  33|     M|
|  5| 133.2|   5.7|  54|     F|
|  6| 124.1|   5.2|null|     F|
|  7| 129.2|   5.3|  42|     M|
+---+------+------+----+------+

 

(5) 缺失太多的行,則「廢」

df_miss_no_income.dropna(thresh=3).show() 
 
+---+------+------+----+------+
| id|weight|height| age|gender|
+---+------+------+----+------+
|  1| 143.5|   5.6|  28|     M|
|  2| 167.2|   5.4|  45|     M|
|  4| 144.5|   5.9|  33|     M|
|  5| 133.2|   5.7|  54|     F|
|  6| 124.1|   5.2|null|     F|
|  7| 129.2|   5.3|  42|     M|
+---+------+------+----+------+

 

(6) 填補缺失值

means = df_miss_no_income.agg( *[fn.mean(c).alias(c) for c in df_miss_no_income.columns if c != 'gender'] ).toPandas().to_dict('records')[0] means['gender'] = 'missing' df_miss_no_income.fillna(means).show() 
 
+---+------------------+------+---+-------+
| id|            weight|height|age| gender|
+---+------------------+------+---+-------+
|  1|             143.5|   5.6| 28|      M|
|  2|             167.2|   5.4| 45|      M|
|  3|140.28333333333333|   5.2| 40|missing|
|  4|             144.5|   5.9| 33|      M|
|  5|             133.2|   5.7| 54|      F|
|  6|             124.1|   5.2| 40|      F|
|  7|             129.2|   5.3| 42|      M|
+---+------------------+------+---+-------+

 

或者,經過 Imputer 填補缺失值,以下。

from pyspark.ml.feature import Imputer df = spark.createDataFrame([ (1.0, float("nan")), (2.0, float("nan")), (float("nan"), 3.0), (4.0, 4.0), (5.0, 5.0) ], ["a", "b"]) imputer = Imputer(inputCols=["a", "b"], outputCols=["out_a", "out_b"]) model = imputer.fit(df) model.transform(df).show()

  

3、異常值

1. 基本策略

  1. 斷定爲「outlier」,首先要經過統計描述可視化數據。
  2. 常識之外的數據點也能夠直接祛除,好比:age = 300
df_outliers = spark.createDataFrame([
        (1, 143.5, 5.3, 28),
        (2, 154.2, 5.5, 45),
        (3, 342.3, 5.1, 99),
        (4, 144.5, 5.5, 33),
        (5, 133.2, 5.4, 54),
        (6, 124.1, 5.1, 21),
        (7, 129.2, 5.3, 42),
    ], ['id', 'weight', 'height', 'age'])

 

2. 定義有效區間

cols = ['weight', 'height', 'age']
bounds = {}

for col in cols:
    quantiles = df_outliers.approxQuantile(col, [0.25, 0.75], 0.05)
    IQR = quantiles[1] - quantiles[0]
    bounds[col] = [quantiles[0] - 1.5 * IQR, quantiles[1] + 1.5 * IQR]

bounds
{'age': [-11.0, 93.0], 'height': [4.499999999999999, 6.1000000000000005], 'weight': [91.69999999999999, 191.7]}
 

3. filter有效區間

outliers = df_outliers.select(*['id'] + [
    (
        (df_outliers[c] < bounds[c][0]) | 
        (df_outliers[c] > bounds[c][1])
    ).alias(c + '_o') for c in cols
])
outliers.show()
+---+--------+--------+-----+
| id|weight_o|height_o|age_o|
+---+--------+--------+-----+
|  1|   false|   false|false|
|  2|   false|   false|false|
|  3|    true|   false| true|
|  4|   false|   false|false|
|  5|   false|   false|false|
|  6|   false|   false|false|
|  7|   false|   false|false|
+---+--------+--------+-----+

 

 並查看細節,以下。

df_outliers = df_outliers.join(outliers, on='id')
df_outliers.filter(
'weight_o').select('id', 'weight').show() df_outliers.filter('age_o').select('id', 'age').show()
+---+------+
| id|weight|
+---+------+
|  3| 342.3|
+---+------+

+---+---+
| id|age|
+---+---+
|  3| 99|
+---+---+
 
End.
相關文章
相關標籤/搜索