簡單回顧一下矩陣乘法: java
矩陣乘法要求左矩陣的列數與右矩陣的行數相等,m×n的矩陣A,與n×p的矩陣B相乘,結果爲m×p的矩陣C。 算法
爲了方便描述,先進行假設: apache
由於分佈式計算的特色,須要找到相互獨立的計算過程,以便可以在不一樣的節點上進行計算而不會彼此影響。根據矩陣乘法的公式,C中各個元素的計算都是相互獨立的,即各個cij在計算過程當中彼此不影響。這樣的話,在Map階段能夠把計算所須要的元素都集中到同一個key中,而後,在Reduce階段就能夠從中解析出各個元素來計算cij。 數據結構
另外,以a11爲例,它將會在c11、c12……c1p的計算中使用。也就是說,在Map階段,當咱們從HDFS取出一行記錄時,若是該記錄是A的元素,則須要存儲成p個<key, value>對,而且這p個key互不相同;若是該記錄是B的元素,則須要存儲成m個<key, value>對,一樣的,m個key也應互不相同;但同時,用於存放計算cij的ai1、ai2……ain和b1j、b2j……bnj的<key, value>對的key應該都是相同的,這樣才能被傳遞到同一個Reduce中。 app
廣泛有一個共識是:數據結構+算法=程序,因此在編寫代碼以前須要先理清數據存儲結構和處理數據的算法。 分佈式
在map階段,須要作的是進行數據準備。把來自矩陣A的元素aij,標識成p條<key, value>的形式,key="i,k",(其中k=1,2,...,p),value="a:j,aij";把來自矩陣B的元素bij,標識成m條<key, value>形式,key="k,j"(其中k=1,2,...,m),value="b:i,bij"。 ide
通過處理,用於計算cij須要的a、b就轉變爲有相同key("i,j")的數據對,經過value中"a:"、"b:"能區分元素是來自矩陣A仍是矩陣B,以及具體的位置(在矩陣A的第幾列,在矩陣B的第幾行)。 工具
這個階段是Hadoop自動完成的階段,具備相同key的value被分到同一個Iterable中,造成<key,Iterable(value)>對,再傳遞給reduce。 oop
經過map數據預處理和shuffle數據分組兩個階段,reduce階段只須要知道兩件事就行: 學習
計算過程已經設計清楚了,就須要對數據結構進行設計。大致有兩種設計方案:
第一種:使用最原始的表示方式,相同行內不一樣列數據經過","分割,不一樣行經過換行分割;
第二種:經過行列表示法,即文件中的每行數據有三個元素經過分隔符分割,第一個元素表示行,第二個元素表示列,第三個元素表示數據。這種方式對於能夠不列出爲0的元素,便可以減小稀疏矩陣的數據量。
在上圖中,第一種方式存儲的數據量小於第二種,但這只是由於例子中的數據設計成這樣。在現實中,使用分佈式計算矩陣乘法的環境中,大部分矩陣是稀疏矩陣,且數據量極大,在這種狀況下,第二種數據結構的優點就顯現了出來。並且,由於使用分佈式計算,若是數據大於64m,在map階段將不可以逐行處理,將不能肯定數據來自於哪一行。不過,因爲現實中對於大矩陣的乘法,考慮到存儲空間和內存的狀況,須要特殊的處理方式,有一種是將矩陣進行行列轉換而後計算,這個時候第一種仍是挺實用的。
代碼爲:
import java.io.IOException; import java.util.HashMap; import java.util.Iterator; import java.util.Map; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.IntWritable; import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.Mapper; import org.apache.hadoop.mapreduce.Reducer; import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; import org.apache.hadoop.mapreduce.lib.input.FileSplit; import org.apache.hadoop.mapreduce.lib.input.TextInputFormat; import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat; public class MatrixMultiply { public static class MatrixMapper extends Mapper<LongWritable, Text, Text, Text> { private String flag = null;// 數據集名稱 private int rowNum = 4;// 矩陣A的行數 private int colNum = 2;// 矩陣B的列數 private int rowIndexA = 1; // 矩陣A,當前在第幾行 private int rowIndexB = 1; // 矩陣B,當前在第幾行 @Override protected void setup(Context context) throws IOException, InterruptedException { flag = ((FileSplit) context.getInputSplit()).getPath().getName();// 獲取文件名稱 } @Override protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException { String[] tokens = value.toString().split(","); if ("ma".equals(flag)) { for (int i = 1; i <= colNum; i++) { Text k = new Text(rowIndexA + "," + i); for (int j = 0; j < tokens.length; j++) { Text v = new Text("a," + (j + 1) + "," + tokens[j]); context.write(k, v); } } rowIndexA++;// 每執行一次map方法,矩陣向下移動一行 } else if ("mb".equals(flag)) { for (int i = 1; i <= rowNum; i++) { for (int j = 0; j < tokens.length; j++) { Text k = new Text(i + "," + (j + 1)); Text v = new Text("b," + rowIndexB + "," + tokens[j]); context.write(k, v); } } rowIndexB++;// 每執行一次map方法,矩陣向下移動一行 } } } public static class MatrixReducer extends Reducer<Text, Text, Text, IntWritable> { @Override protected void reduce(Text key, Iterable<Text> values, Context context) throws IOException, InterruptedException { Map<String, String> mapA = new HashMap<String, String>(); Map<String, String> mapB = new HashMap<String, String>(); for (Text value : values) { String[] val = value.toString().split(","); if ("a".equals(val[0])) { mapA.put(val[1], val[2]); } else if ("b".equals(val[0])) { mapB.put(val[1], val[2]); } } int result = 0; Iterator<String> mKeys = mapA.keySet().iterator(); while (mKeys.hasNext()) { String mkey = mKeys.next(); if (mapB.get(mkey) == null) {// 由於mkey取的是mapA的key集合,因此只須要判斷mapB是否存在便可。 continue; } result += Integer.parseInt(mapA.get(mkey)) * Integer.parseInt(mapB.get(mkey)); } context.write(key, new IntWritable(result)); } } public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException { String input1 = "hdfs://192.168.1.128:9000/user/lxh/matrix/ma"; String input2 = "hdfs://192.168.1.128:9000/user/lxh/matrix/mb"; String output = "hdfs://192.168.1.128:9000/user/lxh/matrix/out"; Configuration conf = new Configuration(); conf.addResource("classpath:/hadoop/core-site.xml"); conf.addResource("classpath:/hadoop/hdfs-site.xml"); conf.addResource("classpath:/hadoop/mapred-site.xml"); conf.addResource("classpath:/hadoop/yarn-site.xml"); Job job = Job.getInstance(conf, "MatrixMultiply"); job.setJarByClass(MatrixMultiply.class); job.setOutputKeyClass(Text.class); job.setOutputValueClass(Text.class); job.setMapperClass(MatrixMapper.class); // job.setReducerClass(MatrixReducer.class); job.setInputFormatClass(TextInputFormat.class); job.setOutputFormatClass(TextOutputFormat.class); FileInputFormat.setInputPaths(job, new Path(input1), new Path(input2));// 加載2個輸入數據集 Path outputPath = new Path(output); outputPath.getFileSystem(conf).delete(outputPath, true); FileOutputFormat.setOutputPath(job, outputPath); System.exit(job.waitForCompletion(true) ? 0 : 1); } }
繪圖演示效果:
代碼爲:
import java.io.IOException; import java.util.HashMap; import java.util.Iterator; import java.util.Map; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.IntWritable; import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.Mapper; import org.apache.hadoop.mapreduce.Reducer; import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; import org.apache.hadoop.mapreduce.lib.input.FileSplit; import org.apache.hadoop.mapreduce.lib.input.TextInputFormat; import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat; public class SparseMatrixMultiply { public static class SMMapper extends Mapper<LongWritable, Text, Text, Text> { private String flag = null; private int m = 4;// 矩陣A的行數 private int p = 2;// 矩陣B的列數 @Override protected void setup(Context context) throws IOException, InterruptedException { FileSplit split = (FileSplit) context.getInputSplit(); flag = split.getPath().getName(); } @Override protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException { String[] val = value.toString().split(","); if ("t1".equals(flag)) { for (int i = 1; i <= p; i++) { context.write(new Text(val[0] + "," + i), new Text("a," + val[1] + "," + val[2])); } } else if ("t2".equals(flag)) { for (int i = 1; i <= m; i++) { context.write(new Text(i + "," + val[1]), new Text("b," + val[0] + "," + val[2])); } } } } public static class SMReducer extends Reducer<Text, Text, Text, IntWritable> { @Override protected void reduce(Text key, Iterable<Text> values, Context context) throws IOException, InterruptedException { Map<String, String> mapA = new HashMap<String, String>(); Map<String, String> mapB = new HashMap<String, String>(); for (Text value : values) { String[] val = value.toString().split(","); if ("a".equals(val[0])) { mapA.put(val[1], val[2]); } else if ("b".equals(val[0])) { mapB.put(val[1], val[2]); } } int result = 0; // 可能在mapA中存在在mapB中不存在的key,或相反狀況 // 由於,數據定義的時候使用的是稀疏矩陣的定義 // 因此,這種只存在於一個map中的key,說明其對應元素爲0,不影響結果 Iterator<String> mKeys = mapA.keySet().iterator(); while (mKeys.hasNext()) { String mkey = mKeys.next(); if (mapB.get(mkey) == null) {// 由於mkey取的是mapA的key集合,因此只須要判斷mapB是否存在便可。 continue; } result += Integer.parseInt(mapA.get(mkey)) * Integer.parseInt(mapB.get(mkey)); } context.write(key, new IntWritable(result)); } } public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException { String input1 = "hdfs://192.168.1.128:9000/user/lxh/matrix/t1"; String input2 = "hdfs://192.168.1.128:9000/user/lxh/matrix/t2"; String output = "hdfs://192.168.1.128:9000/user/lxh/matrix/out"; Configuration conf = new Configuration(); conf.addResource("classpath:/hadoop/core-site.xml"); conf.addResource("classpath:/hadoop/hdfs-site.xml"); conf.addResource("classpath:/hadoop/mapred-site.xml"); conf.addResource("classpath:/hadoop/yarn-site.xml"); Job job = Job.getInstance(conf, "SparseMatrixMultiply"); job.setJarByClass(SparseMatrixMultiply.class); job.setOutputKeyClass(Text.class); job.setOutputValueClass(Text.class); job.setMapperClass(SMMapper.class); job.setReducerClass(SMReducer.class); job.setInputFormatClass(TextInputFormat.class); job.setOutputFormatClass(TextOutputFormat.class); FileInputFormat.setInputPaths(job, new Path(input1), new Path(input2));// 加載2個輸入數據集 Path outputPath = new Path(output); outputPath.getFileSystem(conf).delete(outputPath, true); FileOutputFormat.setOutputPath(job, outputPath); System.exit(job.waitForCompletion(true) ? 0 : 1); } }
繪圖演示效果:
比較兩種代碼,能夠很清楚的看出,兩種實現只是在map階段有些區別,reduce階段基本相同。對於其中關於行i、列j定義不是從0計數(雖然我傾向於從0開始計數,不用寫等號,簡單),是爲了更直觀的觀察數據處理過程是否符合設計。
在第一種實現中,須要記錄當前是讀取的哪一行數據,因此,這種僅適用於不須要分塊的小文件中進行的矩陣乘法運算。第二種實現中,每行數據記錄了所在行所在列,不會有這方面的限制。
在第二種實現中,遍歷兩個HashMap時,取mapA的key做爲循環標準,是由於在通常狀況下,mapA和mapB的key是相同的(如第一種實現),由於使用稀疏矩陣,兩個不相同的key說明是0,能夠捨棄不參與計算,因此只使用mapA的key,並判斷mapB是否存在該key對應的值。
兩種實現的reduce階段,計算最後結果時,都是直接使用內存存儲數據、計算結果,因此當數據量很大的時候(一般都會很大,不然不會用分佈式處理),極易形成內存溢出,因此,對於大矩陣的運算,還須要其餘的轉換方式,好比行列相乘運算、分塊矩陣運算、基於最小粒度相乘的算法等方式。另外,由於這兩份代碼都是demo,因此代碼中缺乏過濾錯誤數據的部分。