Pilosa文檔翻譯(三)示例

原文地址:https://www.pilosa.com/docs/latest/examples/
位圖數據庫:Pilosa查詢十億級出租車搭乘數據案例python

簡單說明 Introduction

紐約市發佈了一個很是詳細的包含了超過10億條出租車搭乘數據的集合。該數據已經成爲科技博客分析的熱門目標,而且已經獲得了很好的研究。出於這個緣由,咱們認爲將這些數據導入Pilosa,以便肯定同一數據集狀況下與其餘數據存儲和技術進行比較。linux

通常來講,傳輸(Transportation)是Pilosa的值得關注的用例,由於它一般涉及多個不一樣的數據源,以及高速率,實時和極大量的數據(特別是若是想得出合理的結論)。git

咱們編寫了一個工具來幫助將NYC(紐約市)出租車數據導入Pilosa這個工具是PDK(Pilosa開發工具包)的一部分,並利用了許多可重複使用的模塊,這些模塊也能夠幫助您導入其餘數據。 接下來,咱們將逐步解釋整個過程。github

初始設置以後,PDK導入工具會執行咱們定義Pilosa架構所需的一切,相應地將數據映射到位圖,而後將其導入Pilosa數據庫

數據模型 Data Model

紐約出租車數據由如下列出的許多csv文件組成:http://www.nyc.gov/html/tlc/html/about/trip_record_data.shtml。 這些數據文件大約有20列,其中大約一半與咱們正在研究的基準查詢相關:json

  • 距離(Distance): miles(英里), floating point(浮點值)
  • 車費(Fare): dollars(美圓), floating point(浮點值)
  • 乘客人數(Number of passengers): integer(整數值)
  • 下車位置(Dropoff location): latitude and longitude(經緯度), floating point(浮點值)
  • 上車位置(Pickup location): latitude and longitude(經緯度), floating point(浮點值)
  • 下車時間(Dropoff time): timestamp(時間戳)
  • 上車時間(Pickup time): timestamp(時間戳)

注意:下面表格中的row ID是指記錄的取值範圍,不要理解成MySQL等數據庫的rowID。
咱們導入這些字段,從每一個字段建立一個或多個Pilosa字段:架構

字段(Field) 映射(Mapping)
cab_type(出租車類型) 直接映射整數枚舉值 → row ID
dist_miles(距離) 四捨五入round(dist) → row ID
total_amount_dollars(總金額) 四捨五入round(dist) → row ID
passenger_count(乘車人數) 直接映射整數值 → row ID
drop_grid_id(下車位置網格ID) (lat, lon) → 100x100矩形分割網格 → cell(格子) ID
drop_year 年份year(timestamp) → row ID
drop_month 月份month(timestamp) → row ID
drop_day 該月第幾天day(timestamp) → row ID
drop_time(下車時間) 該天中的時間映射到48個半小時組成的桶中
pickup_grid_id(下車位置網格ID) (lat, lon) → 100x100矩形分割網格 → cell ID
pickup_year year(timestamp) → row ID
pickup_month month(timestamp) → row ID
pickup_day day(timestamp) → row ID
pickup_time(上車時間) time of day mapped to one of 48 half-hour buckets → row ID

咱們還建立了兩個附加字段表示持續時間和每一次乘坐的平均速度:app

字段(Field) 映射(Mapping)
duration_minutes(持續時間) round(drop_timestamp - pickup_timestamp) → row ID
speed_mph() round(dist_miles ÷ (drop_timestamp - pickup_timestamp)) → row ID

映射Mapping

咱們要使用的每一個列(column)都必須根據某些規則映射到字段(fields)row ID的組合。 有不少方法能夠實現這個映射,出租車數據集爲咱們提供了一個可能性的很好的描述。函數

0列(colums) --> 1字段(field)

cab_type: 每一行表示一個出租車的類型,在一行數據中有一些bit位來表示一次乘車中使用的出租車類型。 這是一個簡單的枚舉映射,例如黃色yellow=0,綠色green=1等。這個字段值的位寬(bits)由數據源肯定。也就是說,咱們有幾個數據來源(NYC出租車-黃色, NYC出租車-綠色,Uber汽車),對於每個數據來源,要設置不一樣的cab_type常量值。

1列(colums) --> 1字段(field)

如下三個字段以簡單的直接方式從原始數據的單個列進行映射。
dist_miles:每一行值(row ID)表示乘車的距離,這個映射關係很簡單:例如行值1(row 1)表示乘車距離在[0.5,1.5]這個區間內。也就是說,咱們將距離這個浮點值舍入爲整數並將其直接用做row ID。一般,從浮點值到row ID的映射能夠是任意的。
舍入映射實現簡潔,簡化了導入和分析。而且,它是人類可讀的。 咱們會看到這種模式屢次使用。

PDK使用中,咱們定義了一個Mapper,它是一個只返回整數row ID的函數。 PDK具備許多預約義的映射器,可使用一些參數進行描述。 其中之一是LinearFloatMapper。在下面代碼中它將線性函數應用於輸入,並將其轉換爲整數,所以隱式處理舍入。

lfm := pdk.LinearFloatMapper{
    Min: -0.5,
    Max: 3600.5,
    Res: 3601,
}

MinMax定義線性函數,Res肯定輸出row ID的最大容許值。咱們選擇這些值以產生一個舍入到最接近的整數的行爲。其餘預約義的映射器也有本身的特定參數,一般是兩個或三個。

這個映射函數(mapping function)是核心操做,但咱們須要一些其餘部分來定義整個過程,它封裝在BitMapper對象中。此對象定義要使用的輸入數據源的哪些字段(Field),如何解析它們(Parsers),要使用的映射(Mapper)以及要使用的字段的名稱(Frame)。TODO更新,這是合理的。

pdk.BitMapper{
    Frame:   "dist_miles",
    Mapper:  lfm,
    Parsers: []pdk.Parser{pdk.FloatParser{}},
    Fields:  []int{fields["trip_distance"]},
},

這些相同的對象在JSON定義文件中表示:

{
    "Fields": {
        "Trip_distance": 10
    },
    "Mappers": [
        {
            "Name": "lfm0",
            "Min": -0.5,
            "Max": 3600.5,
            "Res": 3600
        }
    ],
    "BitMappers": [
        {
            "Frame": "dist_miles",
            "Mapper": {
                "Name": "lfm0"
            },
            "Parsers": [
                {"Name": "FloatParser"}
            ],
            "Fields": "Trip_distance"
        }
    ]
}

在這裏,咱們定義一個Mappers列表,每一個Mappers都包含一個名稱,咱們將在稍後的BitMappers列表中用它來引用mapper。咱們也可使用Parsers進行此操做,但默認狀況下可使用一些不須要配置的簡單解析器。 咱們還有一個Fields列表,它只是一個字段名稱(在源數據中)到列索引(在Pilosa中)的映射。 咱們使用的BitMapper定義這些名字讓人可讀。

total_amount_dollars:在這裏,咱們再次使用舍入映射,所以每行表明一次行程的總成本,該成本將舍入映射爲row ID。 BitMapper定義與前一個定義很是類似。

passenger_count:此列包含小整數,所以咱們使用最簡單的映射之一:列值是就是row ID。

1列(colums) --> 多字段(multiple field)

使用複合數據類型(如時間戳timestamp)時,有不少映射選項。 在這種狀況下,咱們但願看到有趣的週期性趨勢。所以,咱們但願以一種容許咱們在分析過程當中獨立查看它們的方式對時間的循環份量進行編碼。

咱們經過將時間數據存儲在每一個時間戳的四個單獨字段中來實現此目的:對於年year``月month``日day時間time各一個。 前三個是直接映射的。例如,乘坐的日期2015/06/24將在字段year的第2015行,字段month的第6行和字段day的第24行中設置。

咱們可能會在幾小時、幾分鐘和幾秒鐘內繼續使用這種模式,可是咱們在這裏沒有太多使用這種精度,因此咱們使用bucketing方法。 也就是說,咱們選擇一個分辨率(30分鐘),將日期劃分爲該大小的桶,併爲每一個桶建立一行。 所以,6:45 AM時間在time_of_day字段的第13行中設置了bit位(位圖中)。

咱們針對每一個感興趣的時間戳執行全部這些操做,一個用於上車時間,一個用於下車時間。 這爲咱們提供了兩個時間戳的總字段:pickup_yearpickup_monthpickup_daypickup_timedrop_yeardrop_monthdrop_daydrop_time

多列(multiple colums) --> 1字段(field)

乘坐數據還包含地理定位數據:上車和下車的緯度和經度。 咱們只是但願可以生成乘坐位置的粗略概述熱圖,所以咱們使用網格映射。 咱們將感興趣的區域劃分爲經緯度空間中的100x100矩形網格,使用單個整數標記此網格中的每一個單元格,並使用該整數做爲row ID。

咱們爲每一個感興趣的位置作了全部這些,一個用於上車,一個用於下車。 這爲兩個位置提供了兩個字段:pickup_grid_iddrop_grid_id

一樣,位置數據有許多映射選項。 例如,咱們可能會轉換爲不一樣的座標系,應用投影或將位置聚合到實際區域(如鄰域neighborhoods)。 這裏,簡單的方法就足夠了。

複合映射(Complex mappings)

咱們還指望尋找行程持續時間和速度的趨勢,所以咱們但願在導入過程當中捕獲此信息。
對於字段duration_minutes咱們使用round((drop_timestamp - pickup_timestamp).minutes)計算row ID
對於字段speed_mph咱們使用round(dist_miles / (drop_timestamp - pickup_timestamp).minutes)計算row ID`。
這些映射計算很簡單,但因爲它們須要對多列進行算術運算,所以在PDK中提供的基本映射器中捕獲它們有點過於複雜。 相反,咱們定義自定義映射器來完成工做:

durm := pdk.CustomMapper{
    Func: func(fields ...interface{}) interface{} {
        start := fields[0].(time.Time)
        end := fields[1].(time.Time)
        return end.Sub(start).Minutes()
    },
    Mapper: lfm,
}

導入處理

設計這個架構和映射後,咱們可使用PDK導入工具讀取的JSON定義文件捕獲它。
運行pdk taxi會根據此文件中的信息運行導入。 有關更多詳細信息,請參閱PDK部分,或查看代碼自己。

查詢

如今咱們能夠運行一些示例查詢。

能夠在一個PQL調用對cab類型進行檢索、排序、計數。

TopN(cab_type)
{"results":[[{"id":1,"count":1992943},{"id":0,"count":7057}]]}

可使用相似的調用檢索高流量位置ID。 這些ID對應於經緯度,能夠從生成ID的映射中反算。

TopN(pickup_grid_id)
{"results":[[{"id":5060,"count":40620},{"id":4861,"count":38145},{"id":4962,"count":35268},...]]}

每一個passenger_count(乘客計數)的total_amount(總金額)的平均值能夠經過一些後處理來計算。咱們使用少許的TopN調用來檢索passenger_count的行程計數, 而後使用這些計數來計算平均值。

queries = ''
pcounts = range(10)
for i in pcounts:
    queries += "TopN(Row(passenger_count=%d), total_amount_dollars)" % i
resp = requests.post(qurl, data=queries)

average_amounts = []
for pcount, topn in zip(pcounts, resp.json()['results']):
    wsum = sum([r['count'] * r['key'] for r in topn])
    count = sum([r['count'] for r in topn])
    average_amounts.append(float(wsum)/count)

注意,BSI-powered Sum查詢如今提供了這種查詢的替代方法。

有關更多示例和詳細信息,請參閱此ipython notebook

相關文章
相關標籤/搜索