哈嘍~各位小夥伴們中秋快樂,很久沒更新新的文章啦,今天分享如何使用mapreduce進行join操做。sql
在離線計算中,咱們經常不僅是會對單一一個文件進行操做,進行須要進行兩個或多個文件關聯出更多數據,相似與sql中的join操做。
今天就跟你們分享一下如何在MapReduce中實現join操做數據庫
現有兩張,一張是產品信息表,一張是訂單表。訂單表中只表存了產品ID,若是想要查出訂單以及產品的相關信息就必須使用關聯。
根據MapReduce特性,你們都知道在reduce端,相同key的key,value對會被放到同一個reduce方法中(不設置partition的話)。 利用這個特色咱們能夠輕鬆實現join操做,請看下面示例。
ID | brand | model |
---|---|---|
p0001 | 蘋果 | iphone11 pro max |
p0002 | 華爲 | p30 |
p0003 | 小米 | mate10 |
id | name | address | produceID | num |
---|---|---|---|---|
00001 | kris | 深圳市福田區 | p0001 | 1 |
00002 | pony | 深圳市南山區 | p0001 | 2 |
00003 | jack | 深圳市阪田區 | p0001 | 3 |
假如數據量巨大,兩表的數據是以文件的形式存儲在HDFS中,須要用mapreduce程序來實現一下SQL查詢運算:緩存
select a.id,a.name,a.address,a.num from t_orders a join t_products on a.productID=b.ID
經過將關聯的條件(prodcueID)做爲map輸出的key,將兩表知足join條件的數據並攜帶數據所來源的文件信息,發往同一個 reduce task,在reduce中進行數據的串聯
public class RJoinInfo implements Writable{ private String customerName=""; private String customerAddr=""; private String orderID=""; private int orderNum; private String productID=""; private String productBrand=""; private String productModel=""; // 0是產品,1是訂單 private int flag; setter/getter
public class RJoinMapper extends Mapper<LongWritable,Text,Text,RJoinInfo> { private static Logger logger = LogManager.getLogger(RJoinMapper.class); private RJoinInfo rJoinInfo = new RJoinInfo(); private Text k = new Text(); @Override protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException { // 輸入方式支持不少中包括數據庫等等。這裏用的是文件,所以能夠直接強轉爲文件切片 FileSplit fileSplit = (FileSplit) context.getInputSplit(); // 獲取文件名稱 String name = fileSplit.getPath().getName(); logger.info("splitPathName:"+name); String line = value.toString(); String[] split = line.split("\t"); String productID = ""; if(name.contains("product")){ productID = split[0]; String setProductBrand = split[1]; String productModel = split[2]; rJoinInfo.setProductID(productID); rJoinInfo.setProductBrand(setProductBrand); rJoinInfo.setProductModel(productModel); rJoinInfo.setFlag(0); }else if(name.contains("orders")){ String orderID = split[0]; String customerName = split[1]; String cutsomerAddr = split[2]; productID = split[3]; String orderNum = split[4]; rJoinInfo.setProductID(productID); rJoinInfo.setCustomerName(customerName); rJoinInfo.setCustomerAddr(cutsomerAddr); rJoinInfo.setOrderID(orderID); rJoinInfo.setOrderNum(Integer.parseInt(orderNum)); rJoinInfo.setFlag(1); } k.set(productID); context.write(k,rJoinInfo); } }
代碼解釋,這裏根據split的文件名,判斷是products仍是orders,
而後根據是product仍是orders獲取不一樣的數據,最用都以productID爲Key發送給Reduce端併發
public class RJoinReducer extends Reducer<Text,RJoinInfo,RJoinInfo,NullWritable> { private static Logger logger = LogManager.getLogger(RJoinReducer.class); @Override protected void reduce(Text key, Iterable<RJoinInfo> values, Context context) throws IOException, InterruptedException { List<RJoinInfo> orders = new ArrayList<>(); String productID = key.toString(); logger.info("productID:"+productID); RJoinInfo rJoinInfo = new RJoinInfo(); for (RJoinInfo value : values) { int flag = value.getFlag(); if (flag == 0) { // 產品 try { BeanUtils.copyProperties(rJoinInfo,value); } catch (IllegalAccessException e) { logger.error(e.getMessage()); } catch (InvocationTargetException e) { logger.error(e.getMessage()); } }else { // 訂單 RJoinInfo orderInfo = new RJoinInfo(); try { BeanUtils.copyProperties(orderInfo,value); } catch (IllegalAccessException e) { logger.error(e.getMessage()); } catch (InvocationTargetException e) { logger.error(e.getMessage()); } orders.add(orderInfo); } } for (RJoinInfo order : orders) { rJoinInfo.setOrderNum(order.getOrderNum()); rJoinInfo.setOrderID(order.getOrderID()); rJoinInfo.setCustomerName(order.getCustomerName()); rJoinInfo.setCustomerAddr(order.getCustomerAddr()); // 只輸出key便可,value可使用nullwritable context.write(rJoinInfo,NullWritable.get()); } } }
代碼解釋:根據productID會分爲不一樣的組發到reduce端,reduce端拿到後一組數據後,其中有一個產品對象和多個訂單對象。
遍歷每個對象,根據flag區分產品和訂單。保存產品對象,獲取每一個訂單對象到一個集合中。當咱們對每一個對象都分好
類後,遍歷訂單集合將訂單和產品信息集合,而後輸出。app
注意:咱們這裏效率雖然不是最高的,主要是想說明join的思路。iphone
public class RJoinDriver { public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException { Configuration conf = new Configuration(); // conf.set("mapreduce.framework.name","yarn"); // conf.set("yarn.resourcemanager.hostname","server1"); // conf.set("fs.defaultFS","hdfs://server1:9000"); conf.set("mapreduce.framework.name","local"); conf.set("fs.defaultFS","file:///"); Job job = Job.getInstance(conf); // 若是是本地運行,能夠不用設置jar包的路徑,由於不用拷貝jar到其餘地方 job.setJarByClass(RJoinDriver.class); // job.setJar("/Users/kris/IdeaProjects/bigdatahdfs/target/rjoin.jar"); job.setMapperClass(RJoinMapper.class); job.setReducerClass(RJoinReducer.class); job.setMapOutputKeyClass(Text.class); job.setMapOutputValueClass(RJoinInfo.class); job.setOutputKeyClass(RJoinInfo.class); job.setOutputValueClass(NullWritable.class); FileInputFormat.setInputPaths(job,new Path("/Users/kris/Downloads/rjoin/input")); FileOutputFormat.setOutputPath(job,new Path("/Users/kris/Downloads/rjoin/output")); boolean waitForCompletion = job.waitForCompletion(true); System.out.println(waitForCompletion); } }
==上面實現的這種方式有個缺點,就是join操做是在reduce階段完成的,reduce端的處理壓力太大,map節點的運算負載則很低,資源利用率不高,且在reduce階段極易產生數據傾斜==ide
這種方式適用於關聯表中有小表的情形: 能夠將小表分發到全部的map節點,這樣,map節點就能夠在本地對本身所讀到的大表數據進行join操做並輸出結果, 能夠大大提升join操做的併發度,加快處理速度。
在Mapper端咱們一次性加載數據或者用Distributedbache將文件拷貝到每個運行的maptask的節點上加載 這裏咱們使用第二種,在mapper類中定義好小表進行join
static class RjoinMapper extends Mapper<LongWritable,Text,RJoinInfo,NullWritable>{ private static Map<String, RJoinInfo> productMap = new HashMap<>(); // 在循環調用map方法以前會先調用setup方法。所以咱們能夠在setup方法中,先對文件進行處理 @Override protected void setup(Context context) throws IOException, InterruptedException { //經過這幾句代碼能夠獲取到cache file的本地絕對路徑,測試驗證用 URI[] cacheFiles = context.getCacheFiles(); System.out.println(Arrays.toString(new URI[]{cacheFiles[0]})); // 直接指定名字,默認在工做文件夾的目錄下查找 1⃣ try (BufferedReader bufferedReader = new BufferedReader(new InputStreamReader(new FileInputStream("products.txt")))){ String line; while ((line = bufferedReader.readLine())!=null){ String[] split = line.split("\t"); String productID = split[0]; String setProductBrand = split[1]; String productModel = split[2]; RJoinInfo rJoinInfo = new RJoinInfo(); rJoinInfo.setProductID(productID); rJoinInfo.setProductBrand(setProductBrand); rJoinInfo.setProductModel(productModel); rJoinInfo.setFlag(0); productMap.put(productID, rJoinInfo); } } super.setup(context); } @Override protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException { FileSplit fileSplit = (FileSplit)context.getInputSplit(); String name = fileSplit.getPath().getName(); if (name.contains("orders")) { String line = value.toString(); String[] split = line.split("\t"); String orderID = split[0]; String customerName = split[1]; String cutsomerAddr = split[2]; String productID = split[3]; String orderNum = split[4]; RJoinInfo rJoinInfo = productMap.get(productID); rJoinInfo.setProductID(productID); rJoinInfo.setCustomerName(customerName); rJoinInfo.setCustomerAddr(cutsomerAddr); rJoinInfo.setOrderID(orderID); rJoinInfo.setOrderNum(Integer.parseInt(orderNum)); rJoinInfo.setFlag(1); context.write(rJoinInfo, NullWritable.get()); } } }
代碼解釋:這裏咱們又重寫了一個setup()方法,這個方法會在執行map()方法前先執行,所以咱們能夠在這個方法中事先加載好數據。
在上述代碼中,咱們直接指定名字就拿到了product.txt文件,這個究竟這個文件是怎麼複製在maptask的節點上的呢,還要看下面的driver測試
public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException, URISyntaxException { Configuration conf = new Configuration(); conf.set("mapreduce.framework.name","local"); conf.set("fs.defaultFS","file:///"); Job job = Job.getInstance(conf); job.setJarByClass(RJoinDemoInMapDriver.class); job.setMapperClass(RjoinMapper.class); job.setOutputKeyClass(RJoinInfo.class); job.setOutputValueClass(NullWritable.class); FileInputFormat.setInputPaths(job,new Path("/Users/kris/Downloads/rjoin/input")); FileOutputFormat.setOutputPath(job,new Path("/Users/kris/Downloads/rjoin/output2")); // 指定須要緩存一個文件到全部的maptask運行節點工做目錄 // job.addFileToClassPath(); 將普通文件緩存到task運行節點的classpath下 // job.addArchiveToClassPath();緩存jar包到task運行節點的classpath下 // job.addCacheArchive();緩存壓縮包文件到task運行節點的工做目錄 // job.addCacheFile();將普通文件 1⃣ job.addCacheFile(new URI("/Users/kris/Downloads/rjoin/products.txt")); // 設置reduce的數量爲0 job.setNumReduceTasks(0); boolean waitForCompletion = job.waitForCompletion(true); System.out.println(waitForCompletion); }
代碼解釋:上述Driver中,咱們經過job.addCacheFile()指定了一個URI本地地址,運行時mapreduce就會將這個文件拷貝到maptask的運行工做目錄中。spa
好啦~本期分享代碼量偏多,主要是想分享如何使用mapreduce進行join操做的思路。下一篇我會再講一下 計算共同好友的思路以及代碼~code
公衆號搜索:喜訊XiCent 獲取更多福利資源~~~~
本文由博客一文多發平臺 OpenWrite 發佈!