使用MapReduce解決矩陣乘法的問題

摘要:在海量數據中淘金,已經是各大互聯網公司的既定目標,亞馬遜是數據化運營的成功典範,Google、百度投巨資用於對海量數據進行深度學習研究,阿里把數據與平臺、金融並列成爲將來三大戰略。想在海量數據中淘到金子,強大的挖掘工具是必不可少的,而諸如迴歸、聚類、主成分分析、決策樹等數據挖掘算法經常涉及大規模矩陣運算。這其中,大矩陣乘法具備較大的時間消耗,是算法的瓶頸。因此將矩陣乘法移植到分佈式系統中進行運算,可謂是基本需求,因此本文暫且從最基礎開始,簡單介紹使用MapReduce實現矩陣乘法的方式。

簡單回顧一下矩陣乘法: java

C=AB

矩陣乘法要求左矩陣的列數與右矩陣的行數相等,m×n的矩陣A,與n×p的矩陣B相乘,結果爲m×p的矩陣C。 算法

爲了方便描述,先進行假設: apache

  • 矩陣A的行數爲m,列數爲n,aij爲矩陣A第i行j列的元素。
  • 矩陣B的行數爲n,列數爲p,bij爲矩陣B第i行j列的元素。

分析

由於分佈式計算的特色,須要找到相互獨立的計算過程,以便可以在不一樣的節點上進行計算而不會彼此影響。根據矩陣乘法的公式,C中各個元素的計算都是相互獨立的,即各個cij在計算過程當中彼此不影響。這樣的話,在Map階段能夠把計算所須要的元素都集中到同一個key中,而後,在Reduce階段就能夠從中解析出各個元素來計算cij數據結構

另外,以a11爲例,它將會在c11、c12……c1p的計算中使用。也就是說,在Map階段,當咱們從HDFS取出一行記錄時,若是該記錄是A的元素,則須要存儲成p個<key, value>對,而且這p個key互不相同;若是該記錄是B的元素,則須要存儲成m個<key, value>對,一樣的,m個key也應互不相同;但同時,用於存放計算cij的ai1、ai2……ain和b1j、b2j……bnj的<key, value>對的key應該都是相同的,這樣才能被傳遞到同一個Reduce中。 app

設計

廣泛有一個共識是:數據結構+算法=程序,因此在編寫代碼以前須要先理清數據存儲結構和處理數據的算法。 分佈式

算法

map階段

在map階段,須要作的是進行數據準備。把來自矩陣A的元素aij,標識成p條<key, value>的形式,key="i,k",(其中k=1,2,...,p),value="a:j,aij";把來自矩陣B的元素bij,標識成m條<key, value>形式,key="k,j"(其中k=1,2,...,m),value="b:i,bij"。 ide

通過處理,用於計算cij須要的a、b就轉變爲有相同key("i,j")的數據對,經過value中"a:"、"b:"能區分元素是來自矩陣A仍是矩陣B,以及具體的位置(在矩陣A的第幾列,在矩陣B的第幾行)。 工具

shuffle階段

這個階段是Hadoop自動完成的階段,具備相同key的value被分到同一個Iterable中,造成<key,Iterable(value)>對,再傳遞給reduce。 oop

reduce階段

經過map數據預處理和shuffle數據分組兩個階段,reduce階段只須要知道兩件事就行: 學習

  • <key,Iterable(value)>對通過計算獲得的是矩陣C的哪一個元素?由於map階段對數據的處理,key(i,j)中的數據對,就是其在矩陣C中的位置,第i行j列。
  • Iterable中的每一個value來自於矩陣A和矩陣B的哪一個位置?這個也在map階段進行了標記,對於value(x:y,z),只須要找到y相同的來自不一樣矩陣(即x分別爲a和b)的兩個元素,取z相乘,而後加和便可。

數據結構

計算過程已經設計清楚了,就須要對數據結構進行設計。大致有兩種設計方案:

第一種:使用最原始的表示方式,相同行內不一樣列數據經過","分割,不一樣行經過換行分割;

第二種:經過行列表示法,即文件中的每行數據有三個元素經過分隔符分割,第一個元素表示行,第二個元素表示列,第三個元素表示數據。這種方式對於能夠不列出爲0的元素,便可以減小稀疏矩陣的數據量。

http://img.blog.csdn.net/20141009222508641

在上圖中,第一種方式存儲的數據量小於第二種,但這只是由於例子中的數據設計成這樣。在現實中,使用分佈式計算矩陣乘法的環境中,大部分矩陣是稀疏矩陣,且數據量極大,在這種狀況下,第二種數據結構的優點就顯現了出來。並且,由於使用分佈式計算,若是數據大於64m,在map階段將不可以逐行處理,將不能肯定數據來自於哪一行。不過,因爲現實中對於大矩陣的乘法,考慮到存儲空間和內存的狀況,須要特殊的處理方式,有一種是將矩陣進行行列轉換而後計算,這個時候第一種仍是挺實用的。

編寫代碼

第一種數據結構

代碼爲:

import java.io.IOException;
import java.util.HashMap;
import java.util.Iterator;
import java.util.Map;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.input.FileSplit;
import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;

public class MatrixMultiply {
    public static class MatrixMapper extends
            Mapper<LongWritable, Text, Text, Text> {
        private String flag = null;// 數據集名稱
        private int rowNum = 4;// 矩陣A的行數
        private int colNum = 2;// 矩陣B的列數
        private int rowIndexA = 1; // 矩陣A,當前在第幾行
        private int rowIndexB = 1; // 矩陣B,當前在第幾行

        @Override
        protected void setup(Context context) throws IOException,
                InterruptedException {
            flag = ((FileSplit) context.getInputSplit()).getPath().getName();// 獲取文件名稱
        }

        @Override
        protected void map(LongWritable key, Text value, Context context)
                throws IOException, InterruptedException {
            String[] tokens = value.toString().split(",");
            if ("ma".equals(flag)) {
                for (int i = 1; i <= colNum; i++) {
                    Text k = new Text(rowIndexA + "," + i);
                    for (int j = 0; j < tokens.length; j++) {
                        Text v = new Text("a," + (j + 1) + "," + tokens[j]);
                        context.write(k, v);
                    }
                }
                rowIndexA++;// 每執行一次map方法,矩陣向下移動一行
            } else if ("mb".equals(flag)) {
                for (int i = 1; i <= rowNum; i++) {
                    for (int j = 0; j < tokens.length; j++) {
                        Text k = new Text(i + "," + (j + 1));
                        Text v = new Text("b," + rowIndexB + "," + tokens[j]);
                        context.write(k, v);
                    }
                }
                rowIndexB++;// 每執行一次map方法,矩陣向下移動一行
            }
        }
    }

    public static class MatrixReducer extends
            Reducer<Text, Text, Text, IntWritable> {
        @Override
        protected void reduce(Text key, Iterable<Text> values, Context context)
                throws IOException, InterruptedException {
            Map<String, String> mapA = new HashMap<String, String>();
            Map<String, String> mapB = new HashMap<String, String>();

            for (Text value : values) {
                String[] val = value.toString().split(",");
                if ("a".equals(val[0])) {
                    mapA.put(val[1], val[2]);
                } else if ("b".equals(val[0])) {
                    mapB.put(val[1], val[2]);
                }
            }

            int result = 0;
            Iterator<String> mKeys = mapA.keySet().iterator();
            while (mKeys.hasNext()) {
                String mkey = mKeys.next();
                if (mapB.get(mkey) == null) {// 由於mkey取的是mapA的key集合,因此只須要判斷mapB是否存在便可。
                    continue;
                }
                result += Integer.parseInt(mapA.get(mkey))
                        * Integer.parseInt(mapB.get(mkey));
            }
            context.write(key, new IntWritable(result));
        }
    }

    public static void main(String[] args) throws IOException,
            ClassNotFoundException, InterruptedException {
        String input1 = "hdfs://192.168.1.128:9000/user/lxh/matrix/ma";
        String input2 = "hdfs://192.168.1.128:9000/user/lxh/matrix/mb";
        String output = "hdfs://192.168.1.128:9000/user/lxh/matrix/out";

        Configuration conf = new Configuration();
        conf.addResource("classpath:/hadoop/core-site.xml");
        conf.addResource("classpath:/hadoop/hdfs-site.xml");
        conf.addResource("classpath:/hadoop/mapred-site.xml");
        conf.addResource("classpath:/hadoop/yarn-site.xml");

        Job job = Job.getInstance(conf, "MatrixMultiply");
        job.setJarByClass(MatrixMultiply.class);
        job.setOutputKeyClass(Text.class);
        job.setOutputValueClass(Text.class);

        job.setMapperClass(MatrixMapper.class);
        // job.setReducerClass(MatrixReducer.class);

        job.setInputFormatClass(TextInputFormat.class);
        job.setOutputFormatClass(TextOutputFormat.class);

        FileInputFormat.setInputPaths(job, new Path(input1), new Path(input2));// 加載2個輸入數據集
        Path outputPath = new Path(output);
        outputPath.getFileSystem(conf).delete(outputPath, true);
        FileOutputFormat.setOutputPath(job, outputPath);

        System.exit(job.waitForCompletion(true) ? 0 : 1);
    }
}

繪圖演示效果:

http://img.blog.csdn.net/20141010105520586

第二種數據結構

代碼爲:

import java.io.IOException;
import java.util.HashMap;
import java.util.Iterator;
import java.util.Map;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.input.FileSplit;
import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;  public class SparseMatrixMultiply {
    public static class SMMapper extends Mapper<LongWritable, Text, Text, Text> {
        private String flag = null;
        private int m = 4;// 矩陣A的行數
        private int p = 2;// 矩陣B的列數

        @Override
        protected void setup(Context context) throws IOException,
                InterruptedException {
            FileSplit split = (FileSplit) context.getInputSplit();
            flag = split.getPath().getName();
        }

        @Override
        protected void map(LongWritable key, Text value, Context context)
                throws IOException, InterruptedException {
            String[] val = value.toString().split(",");
            if ("t1".equals(flag)) {
                for (int i = 1; i <= p; i++) {
                    context.write(new Text(val[0] + "," + i), new Text("a,"
                            + val[1] + "," + val[2]));
                }
            } else if ("t2".equals(flag)) {
                for (int i = 1; i <= m; i++) {
                    context.write(new Text(i + "," + val[1]), new Text("b,"
                            + val[0] + "," + val[2]));
                }
            }
        }
    }

    public static class SMReducer extends
            Reducer<Text, Text, Text, IntWritable> {
        @Override
        protected void reduce(Text key, Iterable<Text> values, Context context)
                throws IOException, InterruptedException {
            Map<String, String> mapA = new HashMap<String, String>();
            Map<String, String> mapB = new HashMap<String, String>();

            for (Text value : values) {
                String[] val = value.toString().split(",");
                if ("a".equals(val[0])) {
                    mapA.put(val[1], val[2]);
                } else if ("b".equals(val[0])) {
                    mapB.put(val[1], val[2]);
                }
            }

            int result = 0;
            // 可能在mapA中存在在mapB中不存在的key,或相反狀況
            // 由於,數據定義的時候使用的是稀疏矩陣的定義
            // 因此,這種只存在於一個map中的key,說明其對應元素爲0,不影響結果
            Iterator<String> mKeys = mapA.keySet().iterator();
            while (mKeys.hasNext()) {
                String mkey = mKeys.next();
                if (mapB.get(mkey) == null) {// 由於mkey取的是mapA的key集合,因此只須要判斷mapB是否存在便可。
                    continue;
                }
                result += Integer.parseInt(mapA.get(mkey))
                        * Integer.parseInt(mapB.get(mkey));
            }
            context.write(key, new IntWritable(result));
        }
    }

    public static void main(String[] args) throws IOException,
            ClassNotFoundException, InterruptedException {
        String input1 = "hdfs://192.168.1.128:9000/user/lxh/matrix/t1";
        String input2 = "hdfs://192.168.1.128:9000/user/lxh/matrix/t2";
        String output = "hdfs://192.168.1.128:9000/user/lxh/matrix/out";

        Configuration conf = new Configuration();
        conf.addResource("classpath:/hadoop/core-site.xml");
        conf.addResource("classpath:/hadoop/hdfs-site.xml");
        conf.addResource("classpath:/hadoop/mapred-site.xml");
        conf.addResource("classpath:/hadoop/yarn-site.xml");

        Job job = Job.getInstance(conf, "SparseMatrixMultiply");
        job.setJarByClass(SparseMatrixMultiply.class);
        job.setOutputKeyClass(Text.class);
        job.setOutputValueClass(Text.class);

        job.setMapperClass(SMMapper.class);
        job.setReducerClass(SMReducer.class);

        job.setInputFormatClass(TextInputFormat.class);
        job.setOutputFormatClass(TextOutputFormat.class);

        FileInputFormat.setInputPaths(job, new Path(input1), new Path(input2));// 加載2個輸入數據集
        Path outputPath = new Path(output);
        outputPath.getFileSystem(conf).delete(outputPath, true);
        FileOutputFormat.setOutputPath(job, outputPath);

        System.exit(job.waitForCompletion(true) ? 0 : 1);
    }
}

繪圖演示效果:

http://img.blog.csdn.net/20141010101823682

代碼分析

比較兩種代碼,能夠很清楚的看出,兩種實現只是在map階段有些區別,reduce階段基本相同。對於其中關於行i、列j定義不是從0計數(雖然我傾向於從0開始計數,不用寫等號,簡單),是爲了更直觀的觀察數據處理過程是否符合設計。

在第一種實現中,須要記錄當前是讀取的哪一行數據,因此,這種僅適用於不須要分塊的小文件中進行的矩陣乘法運算。第二種實現中,每行數據記錄了所在行所在列,不會有這方面的限制。

在第二種實現中,遍歷兩個HashMap時,取mapA的key做爲循環標準,是由於在通常狀況下,mapA和mapB的key是相同的(如第一種實現),由於使用稀疏矩陣,兩個不相同的key說明是0,能夠捨棄不參與計算,因此只使用mapA的key,並判斷mapB是否存在該key對應的值。

兩種實現的reduce階段,計算最後結果時,都是直接使用內存存儲數據、計算結果,因此當數據量很大的時候(一般都會很大,不然不會用分佈式處理),極易形成內存溢出,因此,對於大矩陣的運算,還須要其餘的轉換方式,好比行列相乘運算、分塊矩陣運算、基於最小粒度相乘的算法等方式。另外,由於這兩份代碼都是demo,因此代碼中缺乏過濾錯誤數據的部分。

相關文章
相關標籤/搜索