下面的程序有個bug,應該是 java
Path[] cacheFiles = context.getLocalCacheFiles(); mongodb
if (null != cacheFiles && cacheFiles.length > 0) {
readFile(cacheFiles[0].toString(), context);
} apache
代碼中未修正,讀者自行修正,謝謝! 數組
數據源來源於2個: 1個是HDFS,數據量大,還一個是mongodb,數據量小。 併發
須要2個來源一塊兒作數據聯結,而後分析。代碼範例以下: app
----------------------------------------------------------------------- ide
主要思路: 函數
從mongodb中導入數據大hdfs,而後經過hadoop的分發機制分發此文件 oop
到全部計算節點上做爲「背景」數據。 測試
而後計算節點的map類的setup函數中讀取此文件便可。
----------------------------------------------------------------------- 實際代碼以下:
package com.dew.task; import java.io.BufferedReader; import java.io.FileReader; import java.io.IOException; import java.net.URI; import java.util.ArrayList; import java.util.Hashtable; import java.util.Set; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.conf.Configured; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.Mapper; import org.apache.hadoop.mapreduce.Reducer; import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; import org.apache.hadoop.mapreduce.lib.input.TextInputFormat; import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; import org.apache.hadoop.mapreduce.lib.output.NullOutputFormat; import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat; import org.apache.hadoop.util.Tool; import org.apache.hadoop.util.ToolRunner; import com.mongodb.BasicDBObject; public class ComputeProfileHDFS extends Configured implements Tool { // map public static class MapClass extends Mapper<LongWritable, Text, Text, Text> { private Hashtable<String, String> joinData = new Hashtable<String, String>(); private void readFile(String file) { BufferedReader joinReader = null; String line = null; try { joinReader = new BufferedReader(new FileReader(file)); while ((line = joinReader.readLine()) != null) { String[] array = line.split("\t"); if (null == array || array.length < 2) continue; String pkg = array[0]; if (null == pkg || pkg.length() <= 0) continue; String tagStr = array[1]; if (null == tagStr) continue; tagStr = tagStr.trim(); if (tagStr.length() <= 0) continue; joinData.put(pkg, tagStr); System.out.println("[map,setup] " + pkg + " | " + tagStr); } } catch (Exception e) { // XXX System.out .println("--------------------------------------------\n" + e.toString()); } finally { if (null != joinReader) try { joinReader.close(); } catch (IOException e) { e.printStackTrace(); } } } protected void setup(Context context) throws java.io.IOException, java.lang.InterruptedException { try { // Configuration conf = context.getConfiguration(); URI[] cacheFiles = context.getCacheFiles(); if (null != cacheFiles && cacheFiles.length > 0) { readFile(cacheFiles[0].getPath().toString()); } } catch (IOException e) { // xxx } } public void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException { // key neglected if (null == value) return; String content = value.toString(); if (null == content || content.trim().length() == 0) return; // split String[] strArray = content.split("\t"); if (null == strArray || strArray.length < 29) return; String sender = strArray[12].trim(); String receiver = strArray[14].trim(); String pkg = strArray[28].trim(); if (null == sender || sender.length() == 0 || null == receiver || receiver.length() == 0 || null == pkg || pkg.length() == 0) { return; } String tags = this.joinData.get(pkg); if (null == tags || tags.trim().length() == 0) return; // okay,output it System.out.println("sender---" + sender + " tags---" + tags); System.out.println("receiver---" + receiver + " tags---" + tags); context.write(new Text(sender), new Text(tags)); context.write(new Text(receiver), new Text(tags)); } } public static class Combiner extends Reducer<Text, Text, Text, Text> { public void reduce(Text key, Iterable<Text> values, Context context) throws IOException, InterruptedException { String totalTags = ""; for (Text tag : values) { totalTags += " " + tag; } totalTags = totalTags.trim(); if (totalTags.length() <= 0) return; // okay // System.out.println("combiner function invoked....********************"); context.write(key, new Text(totalTags)); } } public static class Reduce extends Reducer<Text, Text, Text, Text> { public void reduce(Text key, Iterable<Text> values, Context context) throws IOException, InterruptedException { String totalTags = ""; for (Text tag : values) { totalTags += " " + tag; } totalTags = totalTags.trim(); if (totalTags.length() <= 0) return; // okay,let us do it now! String[] tagArray = totalTags.split(" "); if (null == tagArray || tagArray.length <= 0) return; // context.write(arg0, arg1); HttpApiClient.writeRecord(key.toString(), tagArray); } protected void cleanup(Context context) throws java.io.IOException, java.lang.InterruptedException { HttpApiClient.submit(); } } public static class HttpApiClient { private static ArrayList<BasicDBObject> persons = new ArrayList<BasicDBObject>(); private static int lock = 50; public static void writeRecord(String key, String[] arrays) { System.out.println("writeRecord 4"); // 數據校驗 if (null == key || key.length() <= 0 || null == arrays || arrays.length <= 0) return; // 字符串數組--->數量統計 Hashtable<String, Integer> table = new Hashtable<String, Integer>(); for (String tag : arrays) { Integer number = table.get(tag); int count = (null == number ? 0 : number.intValue()); count++; table.put(tag, count); } // 構造單我的標籤 ArrayList<BasicDBObject> tagDocument = new ArrayList<BasicDBObject>(); Set<String> tagSet = table.keySet(); for (String tag : tagSet) { BasicDBObject doc = new BasicDBObject(); doc.put("n", tag); doc.put("#", table.get(tag).intValue()); tagDocument.add(doc); } // 構造單我的的文檔 BasicDBObject person = new BasicDBObject(); person.put("_id", key); person.put("t", tagDocument); System.out.println("*************************************"); System.out.println(person.toString()); System.out.println("*************************************"); // 加入到全局當中 persons.add(person); if (persons.size() >= lock) { submit(); } } public static void submit() { // 提交上去併發送http請求... // persons.clear(); } } @Override public int run(String[] args) throws Exception { Configuration conf = getConf(); Job job = new Job(conf, "ComputeProfileHDFSPlusMongoDB"); // add distributed file job.addCacheFile(new Path(args[1]).toUri()); // DistributedCache.addCacheFile(new Path(args[1]).toUri(), // job.getConfiguration()); // prepare FileInputFormat.setInputPaths(job, new Path(args[2])); FileOutputFormat.setOutputPath(job, new Path(args[3])); // FileOutputFormat.setOutputPath(job, new Path(args[2])); job.setJobName("ComputeProfileHDFSPlusMongoDB"); job.setMapperClass(MapClass.class); job.setCombinerClass(Combiner.class); job.setReducerClass(Reduce.class);// job.setNumReduceTasks(0); job.setInputFormatClass(TextInputFormat.class); job.setOutputKeyClass(Text.class); job.setOutputValueClass(Text.class); job.setOutputFormatClass(NullOutputFormat.class); // execute int exitCode = job.waitForCompletion(true) ? 0 : 1; try { FileSystem fs = FileSystem.get(conf); fs.delete(new Path(args[1])); fs.delete(new Path(args[3])); } catch (Exception e) { } return exitCode; } public static void main(String[] args) throws Exception { int res; res = ToolRunner.run(new Configuration(), new PullMongoDB(), args); res = ToolRunner.run(new Configuration(), new ComputeProfileHDFS(), args); System.exit(res); } }
package com.dew.task; import java.util.ArrayList; import java.util.List; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.conf.Configured; import org.apache.hadoop.fs.FSDataOutputStream; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.util.Tool; import com.mongodb.BasicDBObject; import com.mongodb.DB; import com.mongodb.DBCollection; import com.mongodb.DBCursor; import com.mongodb.MongoClient; import com.mongodb.ServerAddress; public class PullMongoDB extends Configured implements Tool { @Override public int run(String[] args) throws Exception { if (null == args || args.length < 4) { return 0; } List list = new ArrayList(); String[] array = args[0].split(":"); list.add(new ServerAddress(array[0], Integer.parseInt(array[1]))); MongoClient mongoClient = new MongoClient(list); DB database = mongoClient.getDB("" + array[2]); DBCollection collection = database.getCollection("" + array[3]); // 開始查詢 BasicDBObject query = new BasicDBObject(); query.put("pkg", new BasicDBObject("$exists", true)); query.put("tags", new BasicDBObject("$exists", true)); BasicDBObject fields = new BasicDBObject(); fields.put("pkg", 1); fields.put("tags", 1); // 準備寫入的HDFS文件 Configuration conf = new Configuration(); FileSystem hdfs = FileSystem.get(conf); FSDataOutputStream outHandler = hdfs.create(new Path("" + args[1])); // 準備寫入 DBCursor cursor = collection.find(query, fields); while (cursor.hasNext()) { BasicDBObject record = (BasicDBObject) cursor.next(); String pkg = record.getString("pkg"); ArrayList<String> als = (ArrayList<String>) record.get("tags"); String tags = ""; for (String s : als) { tags += " " + s.trim(); } tags = tags.trim(); String finalString = pkg + "\t" + tags + System.getProperty("line.separator"); outHandler.write(finalString.getBytes("UTF8")); } // 移除句柄 outHandler.close(); cursor.close(); mongoClient.close(); return 0; } }
測試經過
補充: job.setJarByClass(ComputeProfileHDFS.class); !!!