Ref: [Feature] Preprocessing tutorialhtml
主要是 「無量綱化」 以前的部分。python
http://archive.ics.uci.edu/ml/
http://aws.amazon.com/publicdatasets/
http://www.kaggle.com/
http://www.kdnuggets.com/datasets/index.htmlmysql
Swipejobs is all about matching Jobs to Workers. Your challenge is to analyse the data provided and answer the questions below. You can access the data by opening the following S3 bucket: /* somewhere */ Please note that Worker (worker parquet files) has one or more job tickets (jobticket parquet files) associated with it. Using these parquet files: 求相關性 1. Is there a co-relation between jobticket.jobTicketState, jobticket.clickedCalloff and jobticket.assignedBySwipeJobs values across workers. 預測 2. Looking at Worker.profileLastUpdatedDate values, calculate an estimation for workers who will update their profile in the next two weeks. requirement
head -5 <file>
less <file>
沒有格式,就要split出格式,仍是建議以後轉到df格式,操做方便些。git
PATH = "/home/ubuntu/work/rajdeepd-spark-ml/spark-ml/data" user_data = sc.textFile("%s/ml-100k/u.user" % PATH) user_fields = user_data.map(lambda line: line.split("|")) print(user_fields) user_fields.take(5)
PythonRDD[29] at RDD at PythonRDD.scala:53 Out[19]: [['1', '24', 'M', 'technician', '85711'], ['2', '53', 'F', 'other', '94043'], ['3', '23', 'M', 'writer', '32067'], ['4', '24', 'M', 'technician', '43537'], ['5', '33', 'F', 'other', '15213']]
Spark SQL仍是做爲首選工具,參見:[Spark] 03 - Spark SQLgithub
Ref: 讀寫parquet格式文件的幾種方式sql
本文將介紹經常使用parquet文件讀寫的幾種方式數據庫
Ref: How to read parquet data from S3 to spark dataframe Python?ubuntu
spark = SparkSession.builder .master("local") .appName("app name") .config("spark.some.config.option", true).getOrCreate() df = spark.read.parquet("s3://path/to/parquet/file.parquet")
# define the schema, corresponding to a line in the csv data file. schema = StructType([ StructField("long", FloatType(), nullable=True), StructField("lat", FloatType(), nullable=True), StructField("medage", FloatType(), nullable=True), StructField("totrooms", FloatType(), nullable=True), StructField("totbdrms", FloatType(), nullable=True), StructField("pop", FloatType(), nullable=True), StructField("houshlds", FloatType(), nullable=True), StructField("medinc", FloatType(), nullable=True), StructField("medhv", FloatType(), nullable=True)] )
# 參數中包含了column的定義
housing_df = spark.read.csv(path=HOUSING_DATA, schema=schema).cache()
# User-friendly的表格顯示 housing_df.show(5)
# 包括了列的性質 housing_df.printSchema()
Ref: MySQL Binlog 解析工具 Maxwell 詳解api
(1) MySQL到HBaseapp
(2) HBase到Parquet
Ref: How to move HBase tables to HDFS in Parquet format?
Ref: spark 讀 hbase parquet 哪一個快
Spark讀hbase,生成task受所查詢table的region個數限制,任務數有限,例如查詢的40G數據,10G一個region,極可能就4~6個region,初始的task數就只有4~6個左右,RDD後續能夠partition設置task數;spark讀parquet按默認的bolck個數生成task個數,例如128M一個bolck,差很少就是300多個task,初始載入狀況就比hbase快,並且直接載入parquet文件到spark的內存,而hbase還須要同regionserver交互把數據傳到spark的內存也是須要消耗時間的。
整體來講,讀parquet更快。
—— RDD方式,以及正統的高階方法:[Spark] 03 - Spark SQL
前期發現缺失數據、不合格的數據。
# 可用於檢查「空數據」、「不合格的數據」
def convert_year(x): try: return int(x[-4:]) except: return 1900 # there is a 'bad' data point with a blank year, which we set to 1900 and will filter out later movie_fields = movie_data.map(lambda lines: lines.split("|")) years = movie_fields.map(lambda fields: fields[2]).map(lambda x: convert_year(x))
num_genders = user_fields.map(lambda fields: fields[2]).distinct().count() num_occupations = user_fields.map(lambda fields: fields[3]).distinct().count() num_zipcodes = user_fields.map(lambda fields: fields[4]).distinct().count()
也就是下圖中慘素hist中的bins的原始值。
是否符合正態分佈,可視化後甄別「異常值」。
數據若是有偏,能夠經過log轉換。
簡單地,使用hist直接獲得柱狀圖;若是數據量太大,能夠先抽樣,再顯示。
import matplotlib.pyplot as plt ages = user_fields.map(lambda x: int(x[1])).collect() plt.hist(ages, bins=30, color='gray', normed=True) fig = matplotlib.pyplot.gcf() fig.set_size_inches(8, 5)
顯示特徵列 「medage" 的直方圖。
result_df.toPandas().plot.bar(x='medage',figsize=(14, 6))
import numpy as np count_by_occupation = user_fields.map(lambda fields: (fields[3], 1)).reduceByKey(lambda x, y: x + y).collect() # count_by_occupation2 = user_fields.map(lambda fields: fields[3]).countByValue()
#######################################################
# 如下怎麼用了 np 這個處理小數據的東東。
#######################################################
x_axis1 = np.array([c[0] for c in count_by_occupation])
y_axis1 = np.array([c[1] for c in count_by_occupation])
# sort by y_axis1
x_axis = x_axis1[np.argsort(y_axis1)] y_axis = y_axis1[np.argsort(y_axis1)]
pos = np.arange(len(x_axis)) width = 1.0 ax = plt.axes() ax.set_xticks(pos + (width / 2)) ax.set_xticklabels(x_axis) plt.bar(pos, y_axis, width, color='lightblue') plt.xticks(rotation=30) fig = matplotlib.pyplot.gcf() fig.set_size_inches(16, 5)
RDD 獲取一列
rating_data = rating_data_raw.map(lambda line: line.split("\t")) ratings = rating_data.map(lambda fields: int(fields[2])) max_rating = ratings.reduce(lambda x, y: max(x, y)) min_rating = ratings.reduce(lambda x, y: min(x, y)) mean_rating = ratings.reduce(lambda x, y: x + y) / float(num_ratings) median_rating = np.median(ratings.collect())
We can also use the stats function to get some similar information to the above.
ratings.stats() Out[11]: (count: 100000, mean: 3.52986, stdev: 1.12566797076, max: 5.0, min: 1.0)
(housing_df.describe().select( "summary", F.round("medage", 4).alias("medage"), F.round("totrooms", 4).alias("totrooms"), F.round("totbdrms", 4).alias("totbdrms"), F.round("pop", 4).alias("pop"), F.round("houshlds", 4).alias("houshlds"), F.round("medinc", 4).alias("medinc"), F.round("medhv", 4).alias("medhv")) .show())
+-------+-------+---------+--------+---------+--------+-------+-----------+ |summary| medage| totrooms|totbdrms| pop|houshlds| medinc| medhv| +-------+-------+---------+--------+---------+--------+-------+-----------+ | count|20640.0| 20640.0| 20640.0| 20640.0| 20640.0|20640.0| 20640.0| | mean|28.6395|2635.7631| 537.898|1425.4767|499.5397| 3.8707|206855.8169| | stddev|12.5856|2181.6153|421.2479|1132.4621|382.3298| 1.8998|115395.6159| | min| 1.0| 2.0| 1.0| 3.0| 1.0| 0.4999| 14999.0| | max| 52.0| 39320.0| 6445.0| 35682.0| 6082.0|15.0001| 500001.0| +-------+-------+---------+--------+---------+--------+-------+-----------+
—— Spark SQL's DataFrame爲主力工具,參考: [Spark] 03 - Spark SQL
Ref: https://github.com/drabastomek/learningPySpark/blob/master/Chapter04/LearningPySpark_Chapter04.ipynb
df能夠經過rdd轉變而來。
print('Count of rows: {0}'.format(df.count())) print('Count of distinct rows: {0}'.format(df.distinct().count())) # 全部列的集合 print('Count of distinct ids: {0}'.format(df.select([c for c in df.columns if c != 'id']).distinct().count())) # 自定義某些列的集合
df = df.dropDuplicates() df.show()
df = df.dropDuplicates(subset=[c for c in df.columns if c != 'id']) df.show()
構造一個典型的 「問題數據表」。
df_miss = spark.createDataFrame([ (1, 143.5, 5.6, 28, 'M', 100000), (2, 167.2, 5.4, 45, 'M', None), (3, None , 5.2, None, None, None), (4, 144.5, 5.9, 33, 'M', None), (5, 133.2, 5.7, 54, 'F', None), (6, 124.1, 5.2, None, 'F', None), (7, 129.2, 5.3, 42, 'M', 76000), ], ['id', 'weight', 'height', 'age', 'gender', 'income'])
(1) 哪些行有缺失值?
df_miss.rdd.map( lambda row: (row['id'], sum([c == None for c in row])) ).collect()
(2) 瞧瞧細節
df_miss.where('id == 3').show()
(3) 每列的缺失率如何?
df_miss.agg(*[ (1 - (fn.count(c) / fn.count('*'))).alias(c + '_missing') for c in df_miss.columns ]).show()
(4) 缺失太多的特徵,則「廢」
df_miss_no_income = df_miss.select([c for c in df_miss.columns if c != 'income']) df_miss_no_income.show()
(5) 缺失太多的行,則「廢」
df_miss_no_income.dropna(thresh=3).show()
(6) 填補缺失值
means = df_miss_no_income.agg( *[fn.mean(c).alias(c) for c in df_miss_no_income.columns if c != 'gender'] ).toPandas().to_dict('records')[0] means['gender'] = 'missing' df_miss_no_income.fillna(means).show()
df_outliers = spark.createDataFrame([ (1, 143.5, 5.3, 28), (2, 154.2, 5.5, 45), (3, 342.3, 5.1, 99), (4, 144.5, 5.5, 33), (5, 133.2, 5.4, 54), (6, 124.1, 5.1, 21), (7, 129.2, 5.3, 42), ], ['id', 'weight', 'height', 'age'])
cols = ['weight', 'height', 'age'] bounds = {} for col in cols: quantiles = df_outliers.approxQuantile(col, [0.25, 0.75], 0.05) IQR = quantiles[1] - quantiles[0] bounds[col] = [quantiles[0] - 1.5 * IQR, quantiles[1] + 1.5 * IQR] bounds
{'age': [-11.0, 93.0], 'height': [4.499999999999999, 6.1000000000000005], 'weight': [91.69999999999999, 191.7]}
outliers = df_outliers.select(*['id'] + [ ( (df_outliers[c] < bounds[c][0]) | (df_outliers[c] > bounds[c][1]) ).alias(c + '_o') for c in cols ])
outliers.show()
+---+--------+--------+-----+ | id|weight_o|height_o|age_o| +---+--------+--------+-----+ | 1| false| false|false| | 2| false| false|false| | 3| true| false| true| | 4| false| false|false| | 5| false| false|false| | 6| false| false|false| | 7| false| false|false| +---+--------+--------+-----+
並查看細節,以下。
df_outliers = df_outliers.join(outliers, on='id')
df_outliers.filter('weight_o').select('id', 'weight').show() df_outliers.filter('age_o').select('id', 'age').show()
+---+------+ | id|weight| +---+------+ | 3| 342.3| +---+------+ +---+---+ | id|age| +---+---+ | 3| 99| +---+---+