Hadoop MapReduce 處理2表join編程案例

假設文件1(表1)結構(hdfs文件名:t_user.txt):

1 wangming 男 計算機java

2 hanmei 男 機械apache

3 lilei 女 法學編程

4 hanmeixiu 女 通訊session

5 chenyangxiu 男 設計app

6 yangxiuping 男 英語jvm

文件2(表2)結構(hdfs文件名:t_user_card.txt):

1 wangming 360781100207230023ide

2 hanmei 362781100207300033oop

3 lilei 36201100207100033ui

4 hanmeixiu 362202199697652519this

5 chenyangxiu 363654678906542785

6 yangxiuping 360876187618971008

7 machao 370875468820186543

如今使用mapreduce使得表1和表2用姓名進行join,使得用戶身份證號也展現出來

簡述思路:

編程思路:

* 在map階段會分別讀取filePath = /xxx/xxx/t_user.txt的文件和

* filePath = /xxx/xxx/t_user_card.txt的文件, 讀取2個不一樣文件會有不一樣的filePath

* 先把joinbean定義好, 讀取不一樣的文件的時候,set進對應的屬性值

* 而後把鏈接字段做爲map階段的key輸出

* 使得JoinBean在Reduce階段自動聚合成Iterable<JoinBean>

代碼以下:

package com.chenjun.MRstudy.join;

import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;

import org.apache.commons.lang.StringUtils;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.io.Writable;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.input.FileSplit;
import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;
import org.apache.hadoop.util.GenericOptionsParser;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;

public class MrJoinTest extends Configured implements Tool {

	/**
	 * 編程思路:
	 * 在map階段會分別讀取filePath = /xxx/xxx/t_user.txt的文件和
	 * filePath = /xxx/xxx/t_user_card.txt的文件, 讀取2個不一樣文件會有不一樣的filePath
	 * 先把joinbean定義好, 讀取不一樣的文件的時候,set進對應的屬性值
	 * 而後把鏈接字段做爲map階段的key輸出
	 * 使得JoinBean在Reduce階段自動聚合成Iterable<JoinBean>
	 * @author CJ
	 */
	public static class MyMapper extends Mapper<LongWritable, Text, Text, JoinBean> {

		String tableFlag = "";

		@Override
		public void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
			FileSplit fileSplit = (FileSplit) context.getInputSplit();
			// 獲取輸入文件的路徑
			String filePath = fileSplit.getPath().toString();
			System.out.println("filePath======================"+filePath);
			
			Text textKey = new Text();
			JoinBean joinBean = new JoinBean();
			// 根據輸入路徑名判斷讀取的是哪一個表
			if (filePath.substring(filePath.lastIndexOf('/') + 1, filePath.length()).equals("t_user.txt")) {
				tableFlag = "t_user.txt";
			} else if (filePath.substring(filePath.lastIndexOf('/') + 1, filePath.length()).equals("t_user_card.txt")) {
				tableFlag = "t_user_card.txt";
			} else {
				return;
			}
			// 根據不一樣的表名,把join字段做爲輸出的key,發送給reduce端
			String line = value.toString();
			String[] lineArray = line.split(" ");
			if ("t_user.txt".equals(tableFlag)) {
				String userid = lineArray[0];
				String userName = lineArray[1];
				String userSex = lineArray[2];
				String profession = lineArray[3];
				textKey.set(userName);
				joinBean.setUserId(userid);
				joinBean.setUserName(userName);
				joinBean.setUserSex(userSex);
				joinBean.setProfession(profession);
			} else if ("t_user_card.txt".equals(tableFlag)) {
				String userName = lineArray[1];
				String idNumber = lineArray[2];
				textKey.set(userName);
				joinBean.setIdNumber(idNumber);
				joinBean.setUserName(userName);
			} else {
				return;
			}
			System.out.println("textKey=" + textKey + "   " + "joinBean=" + joinBean.toString());
			// 發送給reduce端
			context.write(textKey, joinBean);
		}
	}

	public static class MyReducer extends Reducer<Text, JoinBean, NullWritable, Text> {
		@Override
		public void reduce(Text key, Iterable<JoinBean> values, Context context) throws IOException, InterruptedException {
			JoinBean joinBean = new JoinBean();
			for (JoinBean bean : values) {
				if (StringUtils.isNotBlank(bean.getUserId())) {
					joinBean.setUserId(bean.getUserId());
				}
				if (StringUtils.isNotBlank(bean.getUserName())) {
					joinBean.setUserName(bean.getUserName());
				}
				if (StringUtils.isNotBlank(bean.getUserSex())) {
					joinBean.setUserSex(bean.getUserSex());
				}
				if (StringUtils.isNotBlank(bean.getProfession())) {
					joinBean.setProfession(bean.getProfession());
				}
				if (StringUtils.isNotBlank(bean.getIdNumber())) {
					joinBean.setIdNumber(bean.getIdNumber());
				}
			}
			Text text = new Text(joinBean.getUserId() + " " + joinBean.getUserName() + " " + joinBean.getUserSex() + " " + joinBean.getProfession() + " "
					+ joinBean.getIdNumber());
			context.write(NullWritable.get(), text);
		}
	}

	public int run(String[] allArgs) throws Exception {
		Job job = Job.getInstance(getConf());
		job.setJarByClass(MrJoinTest.class);
		job.setOutputKeyClass(NullWritable.class);
		job.setOutputValueClass(Text.class);

		job.setMapperClass(MyMapper.class);
		job.setMapOutputKeyClass(Text.class);
		job.setMapOutputValueClass(JoinBean.class);

		job.setReducerClass(MyReducer.class);
		job.setNumReduceTasks(1);

		job.setInputFormatClass(TextInputFormat.class);
		job.setOutputFormatClass(TextOutputFormat.class);

		String[] args = new GenericOptionsParser(getConf(), allArgs).getRemainingArgs();

		FileInputFormat.addInputPaths(job, "/mrtest/joinInput/t_user.txt,/mrtest/joinInput/t_user_card.txt");

		FileOutputFormat.setOutputPath(job, new Path("/mrtest/joinOutput"));

		return job.waitForCompletion(true) ? 0 : 1;
	}

	public static void main(String[] args) throws Exception {
		Configuration configuration = new Configuration();
		ToolRunner.run(configuration, new MrJoinTest(), args);
	}

}

class JoinBean implements Writable {
	private String userId = "";
	private String userName = "";
	private String userSex = "";
	private String profession = "";
	private String IdNumber = "";

	public String getUserId() {
		return userId;
	}

	public void setUserId(String userId) {
		this.userId = userId;
	}

	public String getUserName() {
		return userName;
	}

	public void setUserName(String userName) {
		this.userName = userName;
	}

	public String getUserSex() {
		return userSex;
	}

	public void setUserSex(String userSex) {
		this.userSex = userSex;
	}

	public String getProfession() {
		return profession;
	}

	public void setProfession(String profession) {
		this.profession = profession;
	}

	public String getIdNumber() {
		return IdNumber;
	}

	public void setIdNumber(String idNumber) {
		IdNumber = idNumber;
	}

	@Override
	public void write(DataOutput out) throws IOException {
		out.writeUTF(this.userId);
		out.writeUTF(this.userName);
		out.writeUTF(this.userSex);
		out.writeUTF(this.profession);
		out.writeUTF(this.IdNumber);
	}

	@Override
	public void readFields(DataInput in) throws IOException {
		this.userId = in.readUTF();
		this.userName = in.readUTF();
		this.userSex = in.readUTF();
		this.profession = in.readUTF();
		this.IdNumber = in.readUTF();
	}

	@Override
	public String toString() {
		return "JoinBean [userId=" + userId + ", userName=" + userName + ", userSex=" + userSex + ", profession=" + profession + ", IdNumber=" + IdNumber + "]";
	}

}

_____________________________________________________________________________________________

編程過程當中遇到的錯誤:

錯誤1:

hadoop jar MRstudy-1.0.jar com.chenjun.MRstudy.join.MrJoinTest 
18/03/15 16:35:07 WARN util.NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
18/03/15 16:35:08 INFO Configuration.deprecation: session.id is deprecated. Instead, use dfs.metrics.session-id
18/03/15 16:35:08 INFO jvm.JvmMetrics: Initializing JVM Metrics with processName=JobTracker, sessionId=
18/03/15 16:35:09 INFO input.FileInputFormat: Total input paths to process : 1
18/03/15 16:35:09 INFO mapreduce.JobSubmitter: number of splits:1
18/03/15 16:35:09 INFO mapreduce.JobSubmitter: Submitting tokens for job: job_local2133625459_0001
18/03/15 16:35:09 INFO mapreduce.Job: The url to track the job: http://localhost:8080/
18/03/15 16:35:09 INFO mapred.LocalJobRunner: OutputCommitter set in config null
18/03/15 16:35:09 INFO mapreduce.Job: Running job: job_local2133625459_0001
18/03/15 16:35:09 INFO output.FileOutputCommitter: File Output Committer Algorithm version is 1
18/03/15 16:35:09 INFO mapred.LocalJobRunner: OutputCommitter is org.apache.hadoop.mapreduce.lib.output.FileOutputCommitter
18/03/15 16:35:09 INFO mapred.LocalJobRunner: Waiting for map tasks
18/03/15 16:35:09 INFO mapred.LocalJobRunner: Starting task: attempt_local2133625459_0001_m_000000_0
18/03/15 16:35:09 INFO output.FileOutputCommitter: File Output Committer Algorithm version is 1
18/03/15 16:35:09 INFO util.ProcfsBasedProcessTree: ProcfsBasedProcessTree currently is supported only on Linux.
18/03/15 16:35:09 INFO mapred.Task:  Using ResourceCalculatorProcessTree : null
18/03/15 16:35:09 INFO mapred.MapTask: Processing split: hdfs://localhost:8000/mrtest/joinInput/t_user.txt:0+137
18/03/15 16:35:09 INFO mapred.MapTask: (EQUATOR) 0 kvi 26214396(104857584)
18/03/15 16:35:09 INFO mapred.MapTask: mapreduce.task.io.sort.mb: 100
18/03/15 16:35:09 INFO mapred.MapTask: soft limit at 83886080
18/03/15 16:35:09 INFO mapred.MapTask: bufstart = 0; bufvoid = 104857600
18/03/15 16:35:09 INFO mapred.MapTask: kvstart = 26214396; length = 6553600
18/03/15 16:35:09 WARN mapred.MapTask: Unable to initialize MapOutputCollector org.apache.hadoop.mapred.MapTask$MapOutputBuffer
java.lang.NullPointerException
	at org.apache.hadoop.mapred.MapTask$MapOutputBuffer.init(MapTask.java:1011)
	at org.apache.hadoop.mapred.MapTask.createSortingCollector(MapTask.java:402)
	at org.apache.hadoop.mapred.MapTask.access$100(MapTask.java:81)
	at org.apache.hadoop.mapred.MapTask$NewOutputCollector.<init>(MapTask.java:698)
	at org.apache.hadoop.mapred.MapTask.runNewMapper(MapTask.java:770)
	at org.apache.hadoop.mapred.MapTask.run(MapTask.java:341)
	at org.apache.hadoop.mapred.LocalJobRunner$Job$MapTaskRunnable.run(LocalJobRunner.java:243)
	at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
	at java.util.concurrent.FutureTask.run(FutureTask.java:266)
	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
	at java.lang.Thread.run(Thread.java:745)
18/03/15 16:35:09 INFO mapred.LocalJobRunner: map task executor complete.
18/03/15 16:35:09 WARN mapred.LocalJobRunner: job_local2133625459_0001
java.lang.Exception: java.io.IOException: Initialization of all the collectors failed. Error in last collector was :null
	at org.apache.hadoop.mapred.LocalJobRunner$Job.runTasks(LocalJobRunner.java:462)
	at org.apache.hadoop.mapred.LocalJobRunner$Job.run(LocalJobRunner.java:522)
Caused by: java.io.IOException: Initialization of all the collectors failed. Error in last collector was :null
	at org.apache.hadoop.mapred.MapTask.createSortingCollector(MapTask.java:415)
	at org.apache.hadoop.mapred.MapTask.access$100(MapTask.java:81)
	at org.apache.hadoop.mapred.MapTask$NewOutputCollector.<init>(MapTask.java:698)
	at org.apache.hadoop.mapred.MapTask.runNewMapper(MapTask.java:770)
	at org.apache.hadoop.mapred.MapTask.run(MapTask.java:341)
	at org.apache.hadoop.mapred.LocalJobRunner$Job$MapTaskRunnable.run(LocalJobRunner.java:243)
	at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
	at java.util.concurrent.FutureTask.run(FutureTask.java:266)
	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
	at java.lang.Thread.run(Thread.java:745)
Caused by: java.lang.NullPointerException
	at org.apache.hadoop.mapred.MapTask$MapOutputBuffer.init(MapTask.java:1011)
	at org.apache.hadoop.mapred.MapTask.createSortingCollector(MapTask.java:402)
	... 10 more
18/03/15 16:35:10 INFO mapreduce.Job: Job job_local2133625459_0001 running in uber mode : false
18/03/15 16:35:10 INFO mapreduce.Job:  map 0% reduce 0%
18/03/15 16:35:10 INFO mapreduce.Job: Job job_local2133625459_0001 failed with state FAILED due to: NA
18/03/15 16:35:10 INFO mapreduce.Job: Counters: 0

這個錯誤百度了好久 ,到後面發現其實緣由是JoinBean沒有實現Writable接口致使的

_____________________________________________________________________________________________

錯誤2:

java.lang.Exception: java.lang.NullPointerException
	at org.apache.hadoop.mapred.LocalJobRunner$Job.runTasks(LocalJobRunner.java:462)
	at org.apache.hadoop.mapred.LocalJobRunner$Job.run(LocalJobRunner.java:522)
Caused by: java.lang.NullPointerException
	at java.io.DataOutputStream.writeUTF(DataOutputStream.java:347)
	at java.io.DataOutputStream.writeUTF(DataOutputStream.java:323)
	at com.chenjun.MRstudy.join.JoinBean.write(MrJoinTest.java:199)
	at org.apache.hadoop.io.serializer.WritableSerialization$WritableSerializer.serialize(WritableSerialization.java:98)
	at org.apache.hadoop.io.serializer.WritableSerialization$WritableSerializer.serialize(WritableSerialization.java:82)
	at org.apache.hadoop.mapred.MapTask$MapOutputBuffer.collect(MapTask.java:1157)
	at org.apache.hadoop.mapred.MapTask$NewOutputCollector.write(MapTask.java:715)
	at org.apache.hadoop.mapreduce.task.TaskInputOutputContextImpl.write(TaskInputOutputContextImpl.java:89)
	at org.apache.hadoop.mapreduce.lib.map.WrappedMapper$Context.write(WrappedMapper.java:112)
	at com.chenjun.MRstudy.join.MrJoinTest$MyMapper.map(MrJoinTest.java:76)
	at com.chenjun.MRstudy.join.MrJoinTest$MyMapper.map(MrJoinTest.java:29)
	at org.apache.hadoop.mapreduce.Mapper.run(Mapper.java:146)
	at org.apache.hadoop.mapred.MapTask.runNewMapper(MapTask.java:787)
	at org.apache.hadoop.mapred.MapTask.run(MapTask.java:341)
	at org.apache.hadoop.mapred.LocalJobRunner$Job$MapTaskRunnable.run(LocalJobRunner.java:243)
	at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
	at java.util.concurrent.FutureTask.run(FutureTask.java:266)
	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
	at java.lang.Thread.run(Thread.java:745)

 

這個緣由是由於write()和readFields()方法報出了空指針,

解決辦法:

在private String xxx 後面加入初始化賦值

 

最後運行結果:

________________________________________________________________________________

5 chenyangxiu 男 設計 363654678906542785

2 hanmei 男 機械 362781100207300033

4 hanmeixiu 女 通訊 362202199697652519

3 lilei 女 法學 36201100207100033

machao 370875468820186543

1 wangming 男 計算機 360781100207100033

6 yangxiuping 男 英語 360876187618971008

相關文章
相關標籤/搜索