Pyspark 最近使用的一些有趣姿式的梳理

以前對 SQL 仍是不是很是熟悉的,可是如今或多或少仍是會寫一些計算任務。好比最近在推送將全部天級的耗時任務都從傳統關係型數據庫遷移至 Spark 集羣當中進行計算,中間遇到一些有趣的小問題在這裏記錄一下。sql

 

Q: 我想按照某個字段分組而且把一組查詢字段連起來獲得一個 json 而後把結果做爲一個字段應該怎麼弄?數據庫

A: 這裏個人思路是將咱們須要 dumps 的字段給拼接起來,而後使用列表將同一個分組裏面的是數據組合起來。而後過一個 udf 把列表中的記錄處理成數組最後 json.dumps 一下便可。來看個栗子json

# 先查詢出要操做的目標信息 這一步能夠和下面的操做合併,我這裏爲了方便看拆開了
df = ss.sql("""
                        SELECT 
                            t1.pay_id, 
                            t1.pay_money, 
                            t1.user_id
                        FROM
                            analytics_db.hd_day_order_record t1 
                    """)

# 拼接目標字符串而且組合
df = df.select(
               df.pay_id,
               df.pay_money,
               df.pay_user_id,
               f.concat_ws('\001', df.pay_id,  df.pay_user_id, df.pay_money).alias('sku_buys'))
)

# 拼接一個重複 user_id 的 list
df = df.groupBy(df.pay_user_id).agg(f.collect_list('sku_buys').alias('sku_buys'))

# 將 sku_buys 丟給一個 jsondump 的 udf 就能夠獲得結果了
df = df.select(df.pay_user_id, sb_json(df.sku_buys).alias('sku_buys'))

 

Q: 若是我想對目標進行分組,而且讓他在組內有序應該怎麼作?數組

A: 這一般被稱爲進行組內排序。其實我以前一直嘗試用相似的語法來達到這種效果函數

df = ss.sql("""
        SELECT
            first(t1.sku_mode) AS sku_mode,
            first(t1.exchange_type_t01) AS exchange_type_t01,
            first(t1.user_id) AS user_id,
            first(t1.pay_id) AS pay_id,
            first(t1.charge_time) AS charge_time,
            first(t2.has_yxs_payment) AS has_yxs_payment,
            first(t2.has_sxy_payment) AS has_sxy_payment,
            first(t2.has_cxy_payment) AS has_cxy_payment,
            first(t2.has_sxy19_payment) AS has_sxy19_payment,
            first(t2.sxy19_join_time) AS sxy19_join_time,
            first(t2.yxs_join_time) AS yxs_join_time
        FROM
            d_exchange_info t1
        JOIN
            analytics_db.md_day_dump_users t2
        ON
            t2.the_day = '{}'
            AND t1.user_id = t2.user_id
        GROUP BY
            t1.user_id
        ORDER BY
            charge_time
""".format(st))

其實這是錯的,這裏的 order by 並不能達到一個組內排序的效果,而是一個外部排序。因此這裏取 first 是一個不穩定的結果。有時候你拿到的是一個結果,也許有時候你拿到的是另一個結果。要進行組內排序,咱們能夠先用這樣的思路,先對須要 order by 字段的表進行組內排序,而後再讓他與其餘表 join 得到更多的信息,這樣就能保證是有序的。spa

這裏我引用一個窗口函數來達到這樣的效果。code

        _df = ss.sql("""
                        SELECT 
                            t1.user_id,
                            t1.pay_id,
                            t1.sku_mode,
                            t1.charge_time,
                            t1.exchange_type_t01,
                            ROW_NUMBER() OVER(PARTITION BY t1.user_id ORDER BY t1.charge_time) as rid
                        FROM 
                            {} t1 
                        WHERE 
                            t1.refund_state = 0
                    """.format(exchange_info_table))
    _df = _df.filter(_df.rid==1)

我先使用窗口函數 ROW_NUMBER 以 user_id 分組而且根據 charge_time 對錶一進行組內排序。獲得結果以後,使用 filter 過濾一下 rid =1 的結果。再與另一張表 join 獲得補充信息就能達到想要的效果。orm

 

Q: 我想對結果進行轉列應該怎麼作?blog

A: 行轉列 列轉行多是 SQL 計算裏面會常用到的方法,可是對於 SQL 並不熟悉的同窗(好比我)就不知道該怎麼整來看個例子排序

df = ss.sql("""
    SELECT
        user_id,
        sku_mode,
        credit_score
    FROM
        analytics_db.hd_day_user_credit
    WHERE
        gain_time >= '{}'
        AND gain_time < '{}'
        AND the_day = '{}'
""".format(start_time, end_time, st))
# df.show(10)

展現的數據相似於

+--------------------+--------+------------+
|             user_id|sku_mode|credit_score|
+--------------------+--------+------------+
|d394899919216bc10...|     yxs|           3|
|625002ad625bc9a69...|     yxs|           3|
|8dd11e29bf50cb8c8...|     cxy|           3|
|0f0b88ff589cb46cd...|     yxs|           3|
|eeb8e839139876971...|     yxs|           1|
|f63b2b9c8340d3c80...|     cxy|           1|
|806c9f0349e7e8389...|     cxy|           1|
|bf312181eaaa0ec9e...|     yxs|           1|
|ee4a7984dc40cabbf...|     yxs|           3|
|7a6b15f16c1f782de...|   sxy19|           3|
+--------------------+--------+------------+
only showing top 10 rows

咱們能夠基於此將 sku_mode 同樣的類型合併進行行轉列變換

df = df.groupby('user_id').pivot(
    'sku_mode', ['yxs', 'cxy', 'sxy', 'sxy19']
).agg(
    f.sum('credit_score')
).fillna(0)

這句話的意思是根據 user_id 進行分組,而且將 sku_mode 的行轉列,須要轉列的類型須要在後面的 list 中添加,而且列裏記錄 各sku_mode credit_score 彙總的量。

+--------------------+---+---+---+-----+
|             user_id|yxs|cxy|sxy|sxy19|
+--------------------+---+---+---+-----+
|5ec336994e7b5d73f...|  0|  0|  0|    2|
|06b1120a4544b1b8a...|  0|  0|  0|    2|
|6fe19e193ad43bafc...|  0|  0|  0|    3|
|3e5f9fc4550ae7cba...|  1|  0|  0|    0|
|b1d1d856e28908f5a...|  1|  0|  0|    3|
|7a065e02ed1693cf4...|  2|  0|  0|    0|
|651f9f0b11de08003...|  0|  0|  0|    3|
|d02491502946aba2f...|  0|  0|  0|    2|
|e24b58cb87762b2da...|  0|  6|  0|   15|
|925f6a832b1a95c45...|  1|  0|  0|    0|
+--------------------+---+---+---+-----+
only showing top 10 rows

 

Q: 我想對結果進行列轉行應該怎麼作?

A: 咱們接着上面的例子來說 unpivot 行轉列的逆操做。仍是接着剛纔那個栗子。

df2 = df
df2 = df2.selectExpr("user_id", 
                     "stack(4, 'yxs', yxs, 'cxy', cxy, 'sxy', sxy, 'sxy19', sxy19) AS (sku_mode, credit_score)")

df.where(df.user_id=='e24b58cb87762b2da9fa118316e9c91a').show(10, False)
df2.filter(df2.user_id=='e24b58cb87762b2da9fa118316e9c91a').show(10, False)


+--------------------------------+---+---+---+-----+
|user_id                         |yxs|cxy|sxy|sxy19|
+--------------------------------+---+---+---+-----+
|e24b58cb87762b2da9fa118316e9c91a|0  |6  |0  |15   
+--------------------------------+---+---+---+-----+

+--------------------------------+--------+------------+
|user_id                         |sku_mode|credit_score|
+--------------------------------+--------+------------+
|e24b58cb87762b2da9fa118316e9c91a|yxs     |0           |
|e24b58cb87762b2da9fa118316e9c91a|cxy     |6           |
|e24b58cb87762b2da9fa118316e9c91a|sxy     |0           |
|e24b58cb87762b2da9fa118316e9c91a|sxy19   |15          |
+--------------------------------+--------+------------+

能夠看到咱們經過這種辦法將列從新組合成行記錄。這裏須要多延伸一下,這裏使用的 selectExpr 語句的語意是將裏面的參數直接解析成 select 裏面的內容。

stack 函數是 spark 中的 func.他接收無數個參數,第一個參數 n 的意義是轉換的行數,對二個開始到後面的參數都是內容。

stack 的做用是將第二個開始的到後面的參數 塞進 n 行中。

舉個栗子來講哦,就是上文使用的

stack(4, 'yxs', yxs, 'cxy', cxy, 'sxy', sxy, 'sxy19', sxy19) AS (sku_mode, credit_score)

這裏的語意就是切分紅 4 行。從第二個字段開始 字符串部分表達的是匹配的 sku_mode 分辨是('yxs', 'cxy', 'sxy', 'sxy19')而後跟在他們後面的分別是credit_score 的值  而後展示成兩列兩個字段。有點繞須要多理解一下。最好是在 spark 終端中試一試比較有感受。

 

以後還有有意思的姿式會繼續補充在這裏。

 

 

Reference:

https://sparkbyexamples.com/how-to-pivot-table-and-unpivot-a-spark-dataframe/   How to Pivot and Unpivot a Spark SQL DataFrame

https://stackoverflow.com/questions/56371391/in-group-sort-table-join-a-another-table-use-first-func/56371513#56371513

相關文章
相關標籤/搜索