遇到一個需求,數據結構是dict套dict:java
a = {'col_1': {'v1': 200, 'v2': 300}, 'col_2': {'v1': 75, 'v2': 80, 'v3': 92}} b = {'col_1': {'v1': 200, 'v2': 300}, 'col_2': {'v1': 75, 'v4': 80, 'v3': 92}}
合併方法以下: python
def merge_two_dicts(x, y): # type: (dict, dict) -> dict z = dict() for key in x.keys(): if key in y.keys(): # 合併同類項 x_value = x[key] y_value = y[key] if isinstance(x_value, dict) and isinstance(y_value, dict): result_x_y = merge_two_dicts(x_value, y_value) else: result_x_y = x_value + y_value z[key] = result_x_y else: z[key] = x[key] for key in y.keys(): if key in x.keys(): # 合併同類項 x_value = x[key] y_value = y[key] if isinstance(x_value, dict) and isinstance(y_value, dict): result_x_y = merge_two_dicts(x_value, y_value) else: result_x_y = x_value + y_value z[key] = result_x_y else: z[key] = y[key] return z
合併效果:sql
{'col_2': {'v1': 150, 'v2': 80, 'v3': 184, 'v4': 80}, 'col_1': {'v1': 400, 'v2': 600}}
運用這個方法,咱們能夠模擬pyspark DataFrame的groupby和求和功能:apache
if __name__ == "__main__": def merge_two_dicts(x, y): # type: (dict, dict) -> dict z = dict() for key in x.keys(): if key in y.keys(): # 合併同類項 x_value = x[key] y_value = y[key] if isinstance(x_value, dict) and isinstance(y_value, dict): result_x_y = merge_two_dicts(x_value, y_value) else: result_x_y = x_value + y_value z[key] = result_x_y else: z[key] = x[key] for key in y.keys(): if key in x.keys(): # 合併同類項 x_value = x[key] y_value = y[key] if isinstance(x_value, dict) and isinstance(y_value, dict): result_x_y = merge_two_dicts(x_value, y_value) else: result_x_y = x_value + y_value z[key] = result_x_y else: z[key] = y[key] return z # --------------------------- Configure Spark ---------------------------- conf = SparkConf() conf = conf.setMaster("local[*]") conf = conf.set('spark.sql.warehouse.dir', 'file:///G:/tmp') # ---------------------- Execute Main functionality ---------------------- sparksn = SparkSession.builder.appName("APP").config(conf=conf).getOrCreate() sc = sparksn.sparkContext sc.setLogLevel("ERROR") df_p = pd.DataFrame({'col_1': ['a', 'a', 'a', 'c', 'c', 'b', 'b', 'b', 'b', 'd'], 'col_2': ['h', 'h', 'h', 'h', 'i', 'j', 'j', 'k', 'k', 'k']}) df = sparksn.createDataFrame(df_p) df.show() def map_count(a, b): if isinstance(a, Row): a = a.asDict() aa = dict() for key in a.keys(): v = a[key] aa[key] = {v: 1} a = aa if isinstance(b, Row): b = b.asDict() bb = dict() for key in b.keys(): v = b[key] bb[key] = {v: 1} b = bb return merge_two_dicts(a, b) result = df.rdd.reduce(lambda a, b: map_count(a, b)) print result sparksn.stop()
最終打印輸出爲:數據結構
Using Spark's default log4j profile: org/apache/spark/log4j-defaults.properties Setting default log level to "WARN". To adjust logging level use sc.setLogLevel(newLevel). 18/06/13 10:54:18 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable +-----+-----+ |col_1|col_2| +-----+-----+ | a| h| | a| h| | a| h| | c| h| | c| i| | b| j| | b| j| | b| k| | b| k| | d| k| +-----+-----+ {'col_2': {u'i': 1, u'h': 4, u'k': 3, u'j': 2}, 'col_1': {u'a': 3, u'c': 2, u'b': 4, u'd': 1}} Process finished with exit code 0
咱們能夠看到同一個row內的字段都進行了count操做app