mapreduce的join

一篇超級詳細的文章:this onehtml

讀完以後感觸頗深,何時在map的時候join何時在reducer的時候joinjava

以前寫兩個輸入的時候,寫的多麼好笑,效率極低。apache

先用了一遍這篇文章的分佈式緩存,在reduce的時候讀取,由於hadoop版本過低,因此又作了修改,結合好幾篇文章結果:緩存

版本:Hadoop0.20.203.0app

package bjut.edu.ting;

import java.io.BufferedReader;
import java.io.FileNotFoundException;
import java.io.FileReader;
import java.io.IOException;
import java.text.ParseException;
import java.text.SimpleDateFormat;
import java.util.Date;
import java.util.Hashtable;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.filecache.DistributedCache;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;

import com.opencsv.CSVParser;

//有兩個輸入:GPS(數據量大),Dictionary(數據量小);
//二者經過屬性bus_line在reduce過程當中鏈接,將dictonary放在內存,讀取之時用hashtable,存儲檢索
//其中mapper過程經過passtime計算date,並賦值給GPS數據
public class DateLineJob{ public static class JoinMapper extends Mapper<LongWritable,Text,Text,Text>{ //處理GPS數據 protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException{ CSVParser parser = new CSVParser(); String[] gpsData = parser.parseLine(value.toString()); Integer date_label=null; try { date_label=getDateStamp(gpsData[2]); } catch (ParseException e) { // TODO Auto-generated catch block e.printStackTrace(); } if(date_label==12&&date_label!=-1){//這兒的date_label須要修改 String outValue=date_label.toString()+","+gpsData[0]+","+gpsData[2]+","+gpsData[3]+","+gpsData[4]; //key:bus_line  value:0:date,1:vehicle,2:pass,3:lon,4:lat context.write(new Text(gpsData[1]),new Text(outValue)); } } } public static class JoinReducer extends Reducer<Text, Text, NullWritable, Text>{ //定義HashTable存放緩存數據 private Hashtable <String,String> table=new Hashtable<String,String>(); /** * 獲取分佈式緩存文件 */ private Path[] modelPath; private BufferedReader modelBR; protected void setup(Context context) throws IOException {
//返回本地文件路徑 Configuration conf = context.getConfiguration(); modelPath = DistributedCache.getLocalCacheFiles(conf); if(modelPath.length==0){ throw new FileNotFoundException("Distributed cache file not found"); } modelBR = new BufferedReader(new FileReader(modelPath[0].toString())); //按行讀取並解析字典數據, String infoDic=null; while((infoDic=modelBR.readLine())!=null){ String[] records=infoDic.split(","); //key爲bus_line value爲line_code table.put(records[1],records[0]);//將相應的字段存入Hashtable裏面 } modelBR.close(); } public void reduce(Text key,Iterable<Text> values, Context context) throws IOException, InterruptedException{ //字典數據根據bus_line獲取line_code String line_code=table.get(key.toString());//從Hashtable中獲取line_code if(line_code!=null){//有些線路在字典中沒有 for(Text value:values){ String outValue=value.toString(); String[] valueData=outValue.split(","); //0:date,1:vehicle,2:pass,3:lon,4:lat String out=valueData[0]+","+line_code+","+valueData[1]+","+valueData[2]+","+valueData[3]+","+valueData[4]; context.write(null, new Text(out)); } } } } public static void main(String[] args) throws Exception{ Configuration conf = new Configuration(); conf.set("mapreduce.admin.reduce.child.java.opts", "-Xmx512m"); DistributedCache.addCacheFile(new Path("hdfs://172.18.49.17:8020/Anewday/line_route_dict_update08_nohead.csv").toUri(), conf); DistributedCache.createSymlink(conf); Job job = new Job(conf,"join"); job.setJarByClass(DateLineJob.class); //設置GPS做爲輸入 FileInputFormat.addInputPath(job,new Path(args[0])); //輸出目錄 FileOutputFormat.setOutputPath(job, new Path(args[1])); job.setMapperClass(JoinMapper.class); job.setReducerClass(JoinReducer.class); job.setMapOutputKeyClass(Text.class); job.setMapOutputValueClass(Text.class); job.setOutputKeyClass(NullWritable.class); job.setOutputValueClass(Text.class); System.exit(job.waitForCompletion(true)?0:1); } private static int getDateStamp(String timeStr) throws ParseException{ if(timeStr.length()==19){//若不是這個形式,則不考慮 SimpleDateFormat formatter=new SimpleDateFormat("yyyy-MM-dd HH:mm:ss"); Date data=formatter.parse(timeStr); int dateStamp=-1; dateStamp=data.getDate(); //這個地方會出現日期不存在的狀況錯誤提示,但不影響運行,並且這樣的數據量特別少。 return dateStamp; }else{//返回-1 return -1; } } }
相關文章
相關標籤/搜索