PyODPS DataFrame 處理笛卡爾積的幾種方式

PyODPS 提供了 DataFrame API 來用相似 pandas 的接口進行大規模數據分析以及預處理,本文主要介紹如何使用 PyODPS 執行笛卡爾積的操做。html

笛卡爾積最常出現的場景是兩兩之間須要比較或者運算。以計算地理位置距離爲例,假設大表 Coordinates1 存儲目標點經緯度座標,共有 M 行數據,小表 Coordinates2 存儲出發點經緯度座標,共有 N 行數據,如今須要計算全部離目標點最近的出發點座標。對於一個目標點來講,咱們須要計算全部的出發點到目標點的距離,而後找到最小距離,因此整個中間過程須要產生 M * N 條數據,也就是一個笛卡爾積問題。python

haversine 公式

首先簡單介紹一下背景知識,已知兩個地理位置的座標點的經緯度,求解兩點之間的距離可使用 haversine 公式,使用 Python 的表達以下:git

def  haversine(lat1,  lon1,  lat2,  lon2):
        # lat1, lon1 爲位置 1 的經緯度座標
        # lat2, lon2 爲位置 2 的經緯度座標
        import  numpy  as  np

        dlon  =  np.radians(lon2  -  lon1)
        dlat  =  np.radians(lat2  -  lat1)
        a  =  np.sin(  dlat  /2  )  **2  +  np.cos(np.radians(lat1))  *  np.cos(np.radians(lat2))  *  np.sin(  dlon  /2  )  **2
        c  =  2  *  np.arcsin(np.sqrt(a))
        r  =  6371  # 地球平均半徑,單位爲千米
        return  c  *  r複製代碼

MapJoin

目前最推薦的方法就是使用 mapjoin,PyODPS 中使用 mapjoin 的方式十分簡單,只須要兩個 dataframe join 時指定 mapjoin=True,執行時會對右表作 mapjoin 操做。bash

In  [3]:  df1  =  o.get_table('coordinates1').to_df()                                                                                                                                                                                        

In  [4]:  df2  =  o.get_table('coordinates2').to_df()                                                                                                                                                                                        

In  [5]:  df3  =  df1.join(df2,  mapjoin=True)                                                                                                                                                                                                        

In  [6]:  df1.schema                                                                                                                                                                                                                                                      
Out[6]:  
odps.Schema  {
    latitude                    float64              
    longitude                  float64              
    id                                string                
}

In  [7]:  df2.schema                                                                                                                                                                                                                                                      
Out[7]:  
odps.Schema  {
    latitude                    float64              
    longitude                  float64              
    id                                string                
}

In  [8]:  df3.schema                                                                                                                                                                                                                                                      
Out[8]:  
odps.Schema  {
    latitude_x                        float64              
    longitude_x                      float64              
    id_x                                    string                
    latitude_y                        float64              
    longitude_y                      float64              
    id_y                                    string                
}
複製代碼

能夠看到在執行 join 時默認會將重名列加上 _x 和 _y 後綴,可經過在 suffixes 參數中傳入一個二元 tuple 來自定義後綴,當有了 join 以後的表後,經過 PyODPS 中 DataFrame 的自建函數就能夠計算出距離,十分簡潔明瞭,而且效率很高。app

In  [9]:  r  =  6371  
      ...:  dis1  =  (df3.latitude_y  -  df3.latitude_x).radians()  
      ...:  dis2  =  (df3.longitude_y  -  df3.longitude_x).radians()  
      ...:  a  =  (dis1  /  2).sin()  **  2  +  df3.latitude_x.radians().cos()  *  df3.latitude_y.radians().cos()  *  (dis2  /  2).sin()  **  2  
      ...:  df3['dis']  =  2  *  a.sqrt().arcsin()  *  r                                                                                                                                                                                              
                                                                                                                                                                                                        
In [12]: df3.head(10)                                                                                                                        
Out[12]: 
    latitude_x  longitude_x id_x  latitude_y   longitude_y id_y       dis
0   76.252432    59.628253    0   84.045210     6.517522    0  1246.864981
1   76.252432    59.628253    0   59.061796     0.794939    1  2925.953147
2   76.252432    59.628253    0   42.368304    30.119837    2  4020.604942
3   76.252432    59.628253    0   81.290936    51.682749    3   584.779748
4   76.252432    59.628253    0   34.665222   147.167070    4  6213.944942
5   76.252432    59.628253    0   58.058854   165.471565    5  4205.219179
6   76.252432    59.628253    0   79.150677    58.661890    6   323.070785
7   76.252432    59.628253    0   72.622352   123.195778    7  1839.380760
8   76.252432    59.628253    0   80.063614   138.845193    8  1703.782421
9   76.252432    59.628253    0   36.231584    90.774527    9  4717.284949

In [13]: df1.count()                                                                                                                         
Out[13]: 2000

In [14]: df2.count()                                                                                                                         
Out[14]: 100

In [15]: df3.count()                                                                                                                         
Out[15]: 200000複製代碼

df3 已是有 M * N 條數據了,接下來若是須要知道最小距離,直接對 df3 調用 groupby 接上 min 聚合函數就能夠獲得每一個目標點的最小距離。函數

In [16]: df3.groupby('id_x').dis.min().head(10)                                                                                              
Out[16]: 
       dis_min
0   323.070785
1    64.755493
2  1249.283169
3   309.818288
4  1790.484748
5   385.107739
6   498.816157
7   615.987467
8   437.765432
9   272.589621
複製代碼

DataFrame 自定義函數

若是咱們須要知道對應最小距離的點的城市,也就是表中對應的 id ,能夠在 mapjoin 以後調用 MapReduce,不過咱們還有另外一種方式是使用 DataFrame 的 apply 方法。要對一行數據使用自定義函數,可使用 apply 方法,axis 參數必須爲 1,表示在行上操做。性能

表資源

要注意 apply 是在服務端執行的 UDF,因此不能在函數內使用相似於df=o.get_table('table_name').to_df() 的表達式去得到表數據,具體原理能夠參考PyODPS DataFrame 的代碼在哪裏跑。以本文中的狀況爲例,要想將表 1 與表 2 中全部的記錄計算,那麼須要將表 2 做爲一個資源表,而後在自定義中引用該表資源。PyODPS 中使用表資源也十分方便,只須要將一個 collection 傳入 resources 參數便可。collection 是個可迭代對象,不是一個 DataFrame 對象,不能夠直接調用 DataFrame 的接口,每一個迭代值是一個 namedtuple,能夠經過字段名或者偏移來取對應的值。ui

## use dataframe udf

df1 = o.get_table('coordinates1').to_df()
df2 = o.get_table('coordinates2').to_df()

def func(collections):
    import pandas as pd
    
    collection = collections[0]
    
    ids = []
    latitudes = []
    longitudes = []
    for r in collection:
        ids.append(r.id)
        latitudes.append(r.latitude)
        longitudes.append(r.longitude)

    df = pd.DataFrame({'id': ids, 'latitude':latitudes, 'longitude':longitudes})
    def h(x):        
        df['dis'] = haversine(x.latitude, x.longitude, df.latitude, df.longitude)
        return df.iloc[df['dis'].idxmin()]['id']
    return h

df1[df1.id, df1.apply(func, resources=[df2], axis=1, reduce=True, types='string').rename('min_id')].execute(
    libraries=['pandas.zip', 'python-dateutil.zip', 'pytz.zip', 'six.tar.gz'])
複製代碼

在自定義函數中,將表資源經過循環讀成 pandas DataFrame,利用 pandas 的 loc 能夠很方便的找到最小值對應的行,從而獲得距離最近的出發點 id。另外,若是在自定義函數中須要使用到三方包(例如本例中的 pandas)能夠參考這篇文章url

全局變量

當小表的數據量十分小的時候,咱們甚至能夠將小表數據做爲全局變量在自定義函數中使用。spa

df1 = o.get_table('coordinates1').to_df()
df2 = o.get_table('coordinates2').to_df()
df = df2.to_pandas()

def func(x):
    df['dis'] = haversine(x.latitude, x.longitude, df.latitude, df.longitude)
    return df.iloc[df['dis'].idxmin()]['id']

df1[df1.id, df1.apply(func, axis=1, reduce=True, types='string').rename('min_id')].execute(
    libraries=['pandas.zip', 'python-dateutil.zip', 'pytz.zip', 'six.tar.gz'])複製代碼

在上傳函數的時候,會將函數內使用到的全局變量(上面代碼中的 df) pickle 到 UDF 中。可是注意這種方式使用場景很侷限,由於 ODPS 的上傳的文件資源大小是有限制的,因此數據量太大會致使 UDF 生成的資源太大從而沒法上傳,並且這種方式最好保證三方包的客戶端與服務端的版本一致,不然頗有可能出現序列化的問題,因此建議只在數據量很是小的時候使用。

總結

使用 PyODPS 解決笛卡爾積的問題主要分爲兩種方式,一種是 mapjoin,比較直觀,性能好,通常能用 mapjoin 解決的咱們都推薦使用 mapjoin,而且最好使用內建函數計算,能到達最高的效率,可是它不夠靈活。另外一種是使用 DataFrame 自定義函數,比較靈活,性能相對差一點(可使用 pandas 或者 numpy 得到性能上的提高),經過使用表資源,將小表做爲表資源傳入 DataFrame 自定義函數中,從而完成笛卡爾積的操做。


本文做者:繼盛

原文連接

本文爲雲棲社區原創內容,未經容許不得轉載。

相關文章
相關標籤/搜索