數據清洗很要命?那是由於你沒看到這份攻略!

對數據挖掘和分析的人員來講,數據清洗和轉化是一項很是繁瑣和複雜的工做,佔用了很大的工做量。html

目前,數據的挖掘和分析,基本都是採用pandas,numpy或者R語言,這種處理過程複雜,並且沒有一個統一的規範。本文將給你們介紹一項技術,使用FEA-spk技術,能夠快速實現數據的清洗和轉化工做,並且任何人都能看懂。mysql

FEA-spk技術,它的底層基於最流行的大數據開發框架spark,並且能夠和不少流行大數據開發框架結合,好比Hadoop,hbase,mongodb等。使用FEA-spk來作交互分析,不但很是簡單易懂,並且幾乎和spark的功能同樣強大,更重要的一點,它能夠實現可視化,處理的數據規模更大,下面就實際的項目爲例進行說明。正則表達式

1. 要想使用FEA-spk技術,首先要建立一個spk的鏈接,全部的操做都是以它爲上下文進行的。sql

在fea界面運行如下命令:mongodb

2. DataFrame的轉換數據庫

FEA-spk技術操做有2種dataframe,一種是pandas的dataframe,能夠直接在fea裏面運行dump查看。另一種是spark的dataframe,它可以進行各類各樣的spark算子操做,好比group,agg等json

spark dataframe須要轉換爲pandas的dataframe才能運行dump命令查看,轉換的原語以下oracle

pd= @udf df by spk.to_DF  #spark dataframe df轉換爲pandas dataframe pd框架

dump pd   #能夠直接使用dump命令查看oop

sdf= @udf spk,pd by spk.to_SDF #將pandas dataframe pd轉換爲spark dataframe sdf,以便進行spark的各類操做

3. 導入數據源

FEA-spk技術支持各類各樣的數據源,hive,mongodb,text,avro , json, csv , parquet,mysql,oracle, postgresql以及具備特定格式的文件

下面舉其中幾個爲例進行說明

(1) 加載csv數據源。

csv數據源分爲2種,第一種是帶header的(即有字段名的),另一種是沒有header字段名的,格式稍有區別

a.csv文件格式以下

id,hash

1,ssss

2,333

3,5567

下面進行數據加載的命令。

原語以下

df= @udf spk by spk.load_csv with (header,/data/a.csv)

#header爲具備字段名的,/data/a.csv爲hdfs上的文件路徑,若是沒有heade字段,原語爲df= @udf spk by spk.load_csv with (/data/a.csv)

  

(2)  關係型數據源的加載,好比mysql,oracle,postgresql等

首先須要定義一個json鏈接串,用來指定鏈接的地址,數據庫名,用戶名,密碼。

格式以下

define mysql1  as ({"url":"jdbc:mysql://bigdata-89:3306","database":"test",

"user":"root","passwd":"123456"})

在mysql的test數據庫裏面有一張student_infos表,下面進行加載

df= @udf spk by spk.load_db with (mysql1,student_infos)

#加載student_infos表

(3)hive數據源的加載

在hive的mydb數據庫裏面有一張student表,下面來加載它

df= @udf spk by spk.load_hive with (mydb.student)

 

4. 對數據進行切割,提取對於日誌分析數據來講,最重要的一步就是對數據進行切割,提取,這樣才能進行下一步的分析。下面以美國宇航局肯尼迪航天中心WEB日誌爲例進行說明。

數據的下載地址爲

http://ita.ee.lbl.gov/html/contrib/NASA-HTTP.html

下面就到了相當重要的一步了,對數據進行正則化提取,提取出其中的主機名,時間戳,路徑,訪問狀態,返回的字節數這5個字段,原語命令以下

df1= @udf df by spk.reg_extract with (regexp_extract('value', r'^([^\s]+\s)', 1).alias("host"),

regexp_extract('value', r'^.*\[(\d\d/\w{3}/\d{4}:\d{2}:\d{2}:\d{2} -\d{4})]', 1).alias('timestamp'),

regexp_extract('value', r'^.*"\w+\s+([^\s]+)\s+HTTP.*"', 1).alias('path'),

regexp_extract('value', r'^.*"\s+([^\s]+)', 1).cast('integer').alias('status'),

regexp_extract('value', r'^.*\s+(\d+)$', 1).cast('integer').alias('content_size'))

#將df表的value字段進行正則表達式提取出第一個匹配的主機名,將其重命名爲host列

將df表的value字段進行正則表達式提取出第一個匹配的時間,將其重命名爲timestamp列

將df表的value字段進行正則表達式提取出第一個匹配的路徑,將其重命名爲path列

將df表的value字段進行正則表達式提取出第一個匹配的狀態碼,將它的類型轉化爲int類型並將其重命名爲status列

將df表的value字段進行正則表達式提取出第一個匹配的狀態碼,將它的類型轉化爲int類型並將其重命名爲status列

將df表的value字段進行正則表達式提取出第一個匹配的字節數,將它的類型轉化爲int類型並將其重命名爲content_size列

能夠看到數據已經被切割成5列了

5. 清除無效語句

根據分析目標進行清洗獲得所須要的數據,下面以fea經典的cd_esql爲例進行說明

日誌的格式以下:

下面過濾掉日誌中的錯誤日誌

正常的日誌都包含有」-mylogger-」這個字段內容,根據這個特徵過濾掉錯誤日誌。

 df1= @udf df by spk.filter with (instr('value', '- mylogger -')<> 0)

# instr('value', '- mylogger -'),value字段若是不包含- mylogger -,返回0,不然返回它所在的索引。<>表示不爲0,這樣就過濾掉了錯誤日誌。

6. 分割有效字段

通過無效語句清洗,保留有效語句,可是仍是不能知足咱們基礎DF表的要求,下面進行有效字段的分割,提取。

有效的一條語句完整結構以下:

時間(精確到毫秒)/分割符(-mylogger-)/字符串(info-)/語句(事件)

2016-03-29 13:56:13,748 /- mylogger -/ INFO -/ select * from people_trail01_dest where KSSJ>=2001-02-28T01:05:24.000Z

整條語句中就是時間與事件是分析統計有用的,要從整條語句中分割出來,

原語以下所示。

df2= @udf df1 by spk.opfield with (split(value,'- mylogger - ')[0] as d1:split(value,'- mylogger - ')[1] as event)

#將df1表的value字段按照- mylogger –分割,第一個字段並存儲到d1列中、提取第二個字段存儲到event列中

能夠看到event列仍是不能知足要求,再進行分割

7. 提取時間,日期字段

對上面的數據提取天數

還有不少數據清洗攻略,咱們將在下一篇繼續介紹,敬請期待!

 

FEA-spk簡單,強大,可視化

不懂Java,Python一樣玩轉Spark

專門爲數據分析師打造!

相關文章
相關標籤/搜索