上一篇博客講解了使用jar -jar的方式來運行提交MR程序,以及經過修改YarnRunner的源碼來實現MR的windows開發環境提交到集羣的方式。本篇博主將分享sql中常見的join操做。java
1、需求sql
訂單數據表t_order:apache
idwindows |
datecentos |
pidapi |
amount服務器 |
1001app |
20150710ide |
P0001oop |
2 |
1002 |
20150710 |
P0001 |
3 |
1002 |
20150710 |
P0002 |
3 |
商品信息表t_product:
id |
pname |
category_id |
price |
P0001 |
小米5 |
1000 |
2 |
P0002 |
錘子T1 |
1000 |
3 |
假如數據量巨大,兩表的數據是以文件的形式存儲在HDFS中,須要用mapreduce程序來實現一下SQL查詢運算:
select a.id,a.date,b.name,b.category_id,b.price from t_order a join t_product b on a.pid = b.id |
實現機制:經過將關聯的條件做爲map輸出的key,將兩表知足join條件的數據並攜帶數據所來源的文件信息,發往同一個reduce task,在reduce中進行數據的串聯
2、實現代碼
join後的輸出類:
package com.empire.hadoop.mr.rjoin; import java.io.DataInput; import java.io.DataOutput; import java.io.IOException; import org.apache.hadoop.io.Writable; /** * 類 InfoBean.java的實現描述:實體類 * * @author arron 2018年12月10日 下午11:51:27 */ public class InfoBean implements Writable { private int order_id; private String dateString; private String p_id; private int amount; private String pname; private int category_id; private float price; // flag=0表示這個對象是封裝訂單表記錄 // flag=1表示這個對象是封裝產品信息記錄 private String flag; public InfoBean() { } public void set(int order_id, String dateString, String p_id, int amount, String pname, int category_id, float price, String flag) { this.order_id = order_id; this.dateString = dateString; this.p_id = p_id; this.amount = amount; this.pname = pname; this.category_id = category_id; this.price = price; this.flag = flag; } public int getOrder_id() { return order_id; } public void setOrder_id(int order_id) { this.order_id = order_id; } public String getDateString() { return dateString; } public void setDateString(String dateString) { this.dateString = dateString; } public String getP_id() { return p_id; } public void setP_id(String p_id) { this.p_id = p_id; } public int getAmount() { return amount; } public void setAmount(int amount) { this.amount = amount; } public String getPname() { return pname; } public void setPname(String pname) { this.pname = pname; } public int getCategory_id() { return category_id; } public void setCategory_id(int category_id) { this.category_id = category_id; } public float getPrice() { return price; } public void setPrice(float price) { this.price = price; } public String getFlag() { return flag; } public void setFlag(String flag) { this.flag = flag; } /** * private int order_id; private String dateString; private int p_id; * private int amount; private String pname; private int category_id; * private float price; */ @Override public void write(DataOutput out) throws IOException { out.writeInt(order_id); out.writeUTF(dateString); out.writeUTF(p_id); out.writeInt(amount); out.writeUTF(pname); out.writeInt(category_id); out.writeFloat(price); out.writeUTF(flag); } @Override public void readFields(DataInput in) throws IOException { this.order_id = in.readInt(); this.dateString = in.readUTF(); this.p_id = in.readUTF(); this.amount = in.readInt(); this.pname = in.readUTF(); this.category_id = in.readInt(); this.price = in.readFloat(); this.flag = in.readUTF(); } @Override public String toString() { return "order_id=" + order_id + ", dateString=" + dateString + ", p_id=" + p_id + ", amount=" + amount + ", pname=" + pname + ", category_id=" + category_id + ", price=" + price + ", flag=" + flag; } }
mapreduce主程序類:
package com.empire.hadoop.mr.rjoin; import java.io.IOException; import java.util.ArrayList; import org.apache.commons.beanutils.BeanUtils; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.io.NullWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.Mapper; import org.apache.hadoop.mapreduce.Reducer; import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; import org.apache.hadoop.mapreduce.lib.input.FileSplit; import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; /** * 訂單表和商品表合到一塊兒 order.txt(訂單id, 日期, 商品編號, 數量) 1001 20150710 P0001 2 1002 20150710 P0001 3 1002 20150710 P0002 3 1003 20150710 P0003 3 product.txt(商品編號, 商品名字, 價格, 數量) P0001 小米5 1001 2 P0002 錘子T1 1000 3 P0003 錘子 1002 4 */ public class RJoin { static class RJoinMapper extends Mapper<LongWritable, Text, Text, InfoBean> { InfoBean bean = new InfoBean(); Text k = new Text(); @Override protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException { String line = value.toString(); FileSplit inputSplit = (FileSplit) context.getInputSplit(); String name = inputSplit.getPath().getName(); // 經過文件名判斷是哪一種數據 String pid = ""; if (name.startsWith("order")) { String[] fields = line.split("\t"); // id date pid amount pid = fields[2]; bean.set(Integer.parseInt(fields[0]), fields[1], pid, Integer.parseInt(fields[3]), "", 0, 0, "0"); } else { String[] fields = line.split("\t"); // id pname category_id price pid = fields[0]; bean.set(0, "", pid, 0, fields[1], Integer.parseInt(fields[2]), Float.parseFloat(fields[3]), "1"); } k.set(pid); context.write(k, bean); } } static class RJoinReducer extends Reducer<Text, InfoBean, InfoBean, NullWritable> { @Override protected void reduce(Text pid, Iterable<InfoBean> beans, Context context) throws IOException, InterruptedException { InfoBean pdBean = new InfoBean(); ArrayList<InfoBean> orderBeans = new ArrayList<InfoBean>(); for (InfoBean bean : beans) { if ("1".equals(bean.getFlag())) { //產品的 try { BeanUtils.copyProperties(pdBean, bean); } catch (Exception e) { e.printStackTrace(); } } else { InfoBean odbean = new InfoBean(); try { BeanUtils.copyProperties(odbean, bean); orderBeans.add(odbean); } catch (Exception e) { e.printStackTrace(); } } } // 拼接兩類數據造成最終結果 for (InfoBean bean : orderBeans) { bean.setPname(pdBean.getPname()); bean.setCategory_id(pdBean.getCategory_id()); bean.setPrice(pdBean.getPrice()); context.write(bean, NullWritable.get()); } } } public static void main(String[] args) throws Exception { Configuration conf = new Configuration(); conf.set("mapred.textoutputformat.separator", "\t"); Job job = Job.getInstance(conf); // 指定本程序的jar包所在的本地路徑 // job.setJarByClass(RJoin.class); // job.setJar("D:/join.jar"); job.setJarByClass(RJoin.class); // 指定本業務job要使用的mapper/Reducer業務類 job.setMapperClass(RJoinMapper.class); job.setReducerClass(RJoinReducer.class); // 指定mapper輸出數據的kv類型 job.setMapOutputKeyClass(Text.class); job.setMapOutputValueClass(InfoBean.class); // 指定最終輸出的數據的kv類型 job.setOutputKeyClass(InfoBean.class); job.setOutputValueClass(NullWritable.class); // 指定job的輸入原始文件所在目錄 FileInputFormat.setInputPaths(job, new Path(args[0])); // 指定job的輸出結果所在目錄 FileOutputFormat.setOutputPath(job, new Path(args[1])); // 將job中配置的相關參數,以及job所用的java類所在的jar包,提交給yarn去運行 /* job.submit(); */ boolean res = job.waitForCompletion(true); System.exit(res ? 0 : 1); } }
3、運行程序
#上傳jar Alt+p lcd d:/ put rjoin.jar #準備hadoop處理的數據文件 cd /home/hadoop/apps/hadoop-2.9.1 hadoop fs -mkdir -p /rjoin/input hdfs dfs -put order.txt product.txt /rjoin/input #運行rjoin程序 hadoop jar rjoin.jar com.empire.hadoop.mr.rjoin.RJoin /rjoin/input /rjoin/outputs
4、運行效果
[main] DEBUG org.apache.hadoop.ipc.ProtobufRpcEngine - Call: getJobReport took 5ms [main] DEBUG org.apache.hadoop.security.UserGroupInformation - PrivilegedAction as:hadoop (auth:SIMPLE) from:org.apache.hadoop.mapreduce.Job.updateStatus(Job.java:328) [IPC Parameter Sending Thread #0] DEBUG org.apache.hadoop.ipc.Client - IPC Client (1318427113) connection to centos-aaron-h3/192.168.29.146:34672 from hadoop sending #116 org.apache.hadoop.mapreduce.v2.api.MRClientProtocolPB.getJobReport [IPC Client (1318427113) connection to centos-aaron-h3/192.168.29.146:34672 from hadoop] DEBUG org.apache.hadoop.ipc.Client - IPC Client (1318427113) connection to centos-aaron-h3/192.168.29.146:34672 from hadoop got value #116 [main] DEBUG org.apache.hadoop.ipc.ProtobufRpcEngine - Call: getJobReport took 7ms [main] INFO org.apache.hadoop.mapreduce.Job - Job job_1544487152077_0003 completed successfully [main] DEBUG org.apache.hadoop.security.UserGroupInformation - PrivilegedAction as:hadoop (auth:SIMPLE) from:org.apache.hadoop.mapreduce.Job.getCounters(Job.java:817) [IPC Parameter Sending Thread #0] DEBUG org.apache.hadoop.ipc.Client - IPC Client (1318427113) connection to centos-aaron-h3/192.168.29.146:34672 from hadoop sending #117 org.apache.hadoop.mapreduce.v2.api.MRClientProtocolPB.getCounters [IPC Client (1318427113) connection to centos-aaron-h3/192.168.29.146:34672 from hadoop] DEBUG org.apache.hadoop.ipc.Client - IPC Client (1318427113) connection to centos-aaron-h3/192.168.29.146:34672 from hadoop got value #117 [main] DEBUG org.apache.hadoop.ipc.ProtobufRpcEngine - Call: getCounters took 111ms [main] INFO org.apache.hadoop.mapreduce.Job - Counters: 49 File System Counters FILE: Number of bytes read=339 FILE: Number of bytes written=569177 FILE: Number of read operations=0 FILE: Number of large read operations=0 FILE: Number of write operations=0 HDFS: Number of bytes read=378 HDFS: Number of bytes written=452 HDFS: Number of read operations=9 HDFS: Number of large read operations=0 HDFS: Number of write operations=2 Job Counters Launched map tasks=2 Launched reduce tasks=1 Data-local map tasks=2 Total time spent by all maps in occupied slots (ms)=17791 Total time spent by all reduces in occupied slots (ms)=3709 Total time spent by all map tasks (ms)=17791 Total time spent by all reduce tasks (ms)=3709 Total vcore-milliseconds taken by all map tasks=17791 Total vcore-milliseconds taken by all reduce tasks=3709 Total megabyte-milliseconds taken by all map tasks=18217984 Total megabyte-milliseconds taken by all reduce tasks=3798016 Map-Reduce Framework Map input records=7 Map output records=7 Map output bytes=319 Map output materialized bytes=345 Input split bytes=230 Combine input records=0 Combine output records=0 Reduce input groups=3 Reduce shuffle bytes=345 Reduce input records=7 Reduce output records=4 Spilled Records=14 Shuffled Maps =2 Failed Shuffles=0 Merged Map outputs=2 GC time elapsed (ms)=552 CPU time spent (ms)=3590 Physical memory (bytes) snapshot=554237952 Virtual memory (bytes) snapshot=2538106880 Total committed heap usage (bytes)=259047424 Shuffle Errors BAD_ID=0 CONNECTION=0 IO_ERROR=0 WRONG_LENGTH=0 WRONG_MAP=0 WRONG_REDUCE=0 File Input Format Counters Bytes Read=148 File Output Format Counters Bytes Written=452 [main] DEBUG org.apache.hadoop.security.UserGroupInformation - PrivilegedAction as:hadoop (auth:SIMPLE) from:org.apache.hadoop.mapreduce.Job.updateStatus(Job.java:328) [IPC Parameter Sending Thread #0] DEBUG org.apache.hadoop.ipc.Client - IPC Client (1318427113) connection to centos-aaron-h3/192.168.29.146:34672 from hadoop sending #118 org.apache.hadoop.mapreduce.v2.api.MRClientProtocolPB.getJobReport [IPC Client (1318427113) connection to centos-aaron-h3/192.168.29.146:34672 from hadoop] DEBUG org.apache.hadoop.ipc.Client - IPC Client (1318427113) connection to centos-aaron-h3/192.168.29.146:34672 from hadoop got value #118 [main] DEBUG org.apache.hadoop.ipc.ProtobufRpcEngine - Call: getJobReport took 2ms [pool-4-thread-1] DEBUG org.apache.hadoop.ipc.Client - stopping client from cache: org.apache.hadoop.ipc.Client@4b5fd811 [Thread-3] DEBUG org.apache.hadoop.util.ShutdownHookManager - ShutdownHookManger complete shutdown.
5、運行結果
[hadoop@centos-aaron-h1 ~]$ hdfs dfs -ls /rjoin/outputs Found 2 items -rw-r--r-- 2 hadoop supergroup 0 2018-12-11 08:44 /rjoin/outputs/_SUCCESS -rw-r--r-- 2 hadoop supergroup 452 2018-12-11 08:44 /rjoin/outputs/part-r-00000 [hadoop@centos-aaron-h1 ~]$ hdfs dfs -cat /rjoin/outputs/part-r-00000 order_id=1002, dateString=20150710, p_id=P0001, amount=3, pname=小米5, category_id=1001, price=2.0, flag=0 order_id=1001, dateString=20150710, p_id=P0001, amount=2, pname=小米5, category_id=1001, price=2.0, flag=0 order_id=1002, dateString=20150710, p_id=P0002, amount=3, pname=錘子T1, category_id=1000, price=3.0, flag=0 order_id=1003, dateString=20150710, p_id=P0003, amount=3, pname=錘子, category_id=1002, price=4.0, flag=0 [hadoop@centos-aaron-h1 ~]$
6、補充知識
mapreduce程序輸出的日誌路徑通常爲:
/home/hadoop/apps/hadoop-2.9.1/logs/userlogs/application_1544487152077_0004/container_1544487152077_0004_01_000003;
其中/home/hadoop/apps/hadoop-2.9.1/logs/爲hadoop的安裝目錄下的logs。
最後寄語,以上是博主本次文章的所有內容,若是你們以爲博主的文章還不錯,請點贊;若是您對博主其它服務器大數據技術或者博主本人感興趣,請關注博主博客,而且歡迎隨時跟博主溝通交流。