python合併dict套dict,相同key的值求和。可對df進行groupby求count

遇到一個需求,數據結構是dict套dict:java

a = {'col_1': {'v1': 200,
               'v2': 300},
     'col_2': {'v1': 75,
               'v2': 80,
               'v3': 92}}
b = {'col_1': {'v1': 200,
               'v2': 300},
     'col_2': {'v1': 75,
               'v4': 80,
               'v3': 92}}

合併方法以下: python

def merge_two_dicts(x, y):
        # type: (dict, dict) -> dict
        z = dict()
        for key in x.keys():
            if key in y.keys():
                # 合併同類項
                x_value = x[key]
                y_value = y[key]
                if isinstance(x_value, dict) and isinstance(y_value, dict):
                    result_x_y = merge_two_dicts(x_value, y_value)
                else:
                    result_x_y = x_value + y_value
                z[key] = result_x_y
            else:
                z[key] = x[key]

        for key in y.keys():
            if key in x.keys():
                # 合併同類項
                x_value = x[key]
                y_value = y[key]
                if isinstance(x_value, dict) and isinstance(y_value, dict):
                    result_x_y = merge_two_dicts(x_value, y_value)
                else:
                    result_x_y = x_value + y_value
                z[key] = result_x_y
            else:
                z[key] = y[key]

        return z

合併效果:sql

{'col_2': {'v1': 150, 'v2': 80, 'v3': 184, 'v4': 80}, 'col_1': {'v1': 400, 'v2': 600}}

 運用這個方法,咱們能夠模擬pyspark DataFrame的groupby和求和功能:apache

if __name__ == "__main__":

    def merge_two_dicts(x, y):
        # type: (dict, dict) -> dict
        z = dict()
        for key in x.keys():
            if key in y.keys():
                # 合併同類項
                x_value = x[key]
                y_value = y[key]
                if isinstance(x_value, dict) and isinstance(y_value, dict):
                    result_x_y = merge_two_dicts(x_value, y_value)
                else:
                    result_x_y = x_value + y_value
                z[key] = result_x_y
            else:
                z[key] = x[key]

        for key in y.keys():
            if key in x.keys():
                # 合併同類項
                x_value = x[key]
                y_value = y[key]
                if isinstance(x_value, dict) and isinstance(y_value, dict):
                    result_x_y = merge_two_dicts(x_value, y_value)
                else:
                    result_x_y = x_value + y_value
                z[key] = result_x_y
            else:
                z[key] = y[key]

        return z

    # --------------------------- Configure Spark ----------------------------
    conf = SparkConf()
    conf = conf.setMaster("local[*]")
    conf = conf.set('spark.sql.warehouse.dir', 'file:///G:/tmp')

    # ---------------------- Execute Main functionality ----------------------
    sparksn = SparkSession.builder.appName("APP").config(conf=conf).getOrCreate()
    sc = sparksn.sparkContext
    sc.setLogLevel("ERROR")
    
    df_p = pd.DataFrame({'col_1': ['a', 'a', 'a', 'c', 'c', 'b', 'b', 'b', 'b', 'd'],
                         'col_2': ['h', 'h', 'h', 'h', 'i', 'j', 'j', 'k', 'k', 'k']})
    df = sparksn.createDataFrame(df_p)
    df.show()

    def map_count(a, b):
        if isinstance(a, Row):
            a = a.asDict()
            aa = dict()
            for key in a.keys():
                v = a[key]
                aa[key] = {v: 1}
            a = aa

        if isinstance(b, Row):
            b = b.asDict()
            bb = dict()
            for key in b.keys():
                v = b[key]
                bb[key] = {v: 1}
            b = bb

        return merge_two_dicts(a, b)

    result = df.rdd.reduce(lambda a, b: map_count(a, b))
    print result

    sparksn.stop()

最終打印輸出爲:數據結構

Using Spark's default log4j profile: org/apache/spark/log4j-defaults.properties
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel).
18/06/13 10:54:18 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
+-----+-----+
|col_1|col_2|
+-----+-----+
|    a|    h|
|    a|    h|
|    a|    h|
|    c|    h|
|    c|    i|
|    b|    j|
|    b|    j|
|    b|    k|
|    b|    k|
|    d|    k|
+-----+-----+

{'col_2': {u'i': 1, u'h': 4, u'k': 3, u'j': 2}, 'col_1': {u'a': 3, u'c': 2, u'b': 4, u'd': 1}}

Process finished with exit code 0

咱們能夠看到同一個row內的字段都進行了count操做app

相關文章
相關標籤/搜索