Hadoop MapReduce編程 API入門系列之Crime數據分析(二十五)(未完)

 

 

  很少說,直接上代碼。html

  一共12列,咱們只需提取有用的列:第二列(犯罪類型)、第四列(一週的哪一天)、第五列(具體時間)和第七列(犯罪場所)。java

 

 

思路分析

        基於項目的需求,咱們經過如下幾步完成:mysql

一、首先根據數據集,分別統計出不一樣犯罪類別在周時段內發生犯罪次數和不一樣區域在周時段內發生犯罪的次數。jquery

二、而後根據第一步的輸出結果,再按日期統計出天天每種犯罪類別在每一個區域發生的犯罪次數。sql

三、將前兩步的輸出結果,按需求插入數據庫,便於對犯罪數據的分析。數據庫

 

 

程序開發

        咱們要編寫5個文件:apache

編寫基類,MapReduceJobBase.java數組

數據處理類,DataFile.javaapp

編寫第一個任務類,SanFranciscoCrime.javaide

編寫第二個任務類,SanFranciscoCrimePrepOlap.java

編寫第三個任務,插入數據庫類,LoadStarDB.java

 

 

 

 

 

    Hive那邊的 數據庫首先須要建立4個表,

分別爲:category(name,cid)、

district(name,did)、

fact(fid,district_id,category_id,time_id,crimes)和

timeperiod(tpid,year,month,week,day)。

 

 

 

 

編譯和執行MapReduce做業

  一、myclipse將項目編譯和打包爲crime.jar,使用SSH將crime.jar上傳至hadoop的/home/hadoop/目錄下。

  二、使用cd /home/hadoop/djt 切換到當前目錄,經過命令行執行任務。

         2.1 首先執行第一個做業 SanFranciscoCrime.java。

hadoop    jar    crime.jar     zhouls.bigdata.myMapReduce.SanFranciscoCrime.SanFranciscoCrime

     2.2    而後執行第二個做業SanFranciscoCrimePrepOlap.java。

hadoop    jar    crime.jar    zhouls.bigdata.myMapReduce.SanFranciscoCrime.SanFranciscoCrimePrepOlap

       2.3      最後執行第三個做業LoadStarDB.java,將數據插入數據庫。

hadoop     jar     crime.jar     zhouls.bigdata.myMapReduce.SanFranciscoCrime.LoadStarDB

 

 

 

 

 

運行結果

        任務的最終結果插入數據庫,數據結果以下圖所示。字段分別爲:區域主鍵district_id、類別主鍵category_id、時間主鍵time_id、犯罪次數crimes和主鍵fid。

 

 

 

 

 

 

 

 

 

 

 

 

 

 

代碼

package zhouls.bigdata.myMapReduce.SanFranciscoCrime;
import java.text.DateFormat;


import java.text.ParseException;
import java.text.SimpleDateFormat;
import java.util.Date;

import org.apache.hadoop.conf.Configured;

/**
*
* @function 在 MapReduce 基類中,定義基礎成員變量,減小 MapReduce 主類的工做量
*
*
*/
public class MapReduceJobBase extends Configured
{

/**
* 犯罪類型在犯罪數據數組的下標爲1的位置
*/
protected static final int CATEGORY_COLUMN_INDEX = 1;

/**
* 禮拜幾在犯罪數據數組的下標爲3的位置
*/
protected static final int DAY_OF_WEEK_COLUMN_INDEX = 3;

/**
* 日期在犯罪數據數組的下標爲4的位置
*/
protected static final int DATE_COLUMN_INDEX = 4;

/**
* 犯罪區域在犯罪數據數組的下標爲6的位置
*/
protected static final int DISTRICT_COLUMN_INDEX = 6;

/**
* 定義日期的數據格式
*/
protected static final DateFormat df = new SimpleDateFormat("MM/dd/yyyy");

/**
* 定義 map/reduce job結果中,日期的輸出格式
*/
protected static final DateFormat outputDateFormat = new SimpleDateFormat("yyyy/MM/dd");

/**
* @function 將字符串格式的日期轉換爲自定義Date類型的日期
* @param value 包含完整的日期字符串
* @return Date類型的日期
* @throws ParseException
*/
protected static Date getDate(String value) throws ParseException
{
Date retVal = null;
String[] dp = value.split(" ");
if (dp.length > 0) {
retVal = df.parse(dp[0]);
}
return retVal;
}

}

 

 

 

 

 

 

 

 

 

 

 

 

 

package zhouls.bigdata.myMapReduce.SanFranciscoCrime;

import java.io.BufferedReader;

import java.io.ByteArrayInputStream;
import java.io.IOException;
import java.io.InputStreamReader;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;

import com.opencsv.CSVReader;

 

/**
*
* @function 從 map/reduce的輸出結果中讀取並提取數據
*
*
*/
public abstract class DataFile
{

/**
* @function 從 map/reduce job 的輸出結果,提取key值集合
* @param fn HDFS上的文件路徑
* @return list key值的集合
* @throws IOException
*/
public static List<String> extractKeys(String fn,FileSystem fs) throws IOException
{
FSDataInputStream in = fs.open(new Path(fn));//打開文件
List<String> retVal = new ArrayList<String>();//新建存儲key值的集合list
BufferedReader br = new BufferedReader(new InputStreamReader(in));
String line = br.readLine();//按行讀取數據
while (line != null)
{
String[] lp = line.split("\t");
if (lp.length > 0)
{
retVal.add(lp[0]);//提取每行的第一個字段key
}
line = br.readLine();
}
br.close();
Collections.sort(retVal);//對key值進行排序
return retVal;
}

/**
* @function 將 csv文件格式的每行內容轉換爲數組返回
* @param 讀取的一行數據
* @return array 數組
* @throws IOException
*/
public static String[] getColumns(String line) throws IOException
{
CSVReader reader = new CSVReader(new InputStreamReader(new ByteArrayInputStream(line.getBytes())));
String[] retVal = reader.readNext();
reader.close();
return retVal;
}

}

 

 

 

 

 

 

 

 

 

 

 

package zhouls.bigdata.myMapReduce.SanFranciscoCrime;

import java.io.IOException;

import java.text.MessageFormat;
import java.text.ParseException;
import java.util.ArrayList;
import java.util.Calendar;
import java.util.Collections;
import java.util.Date;
import java.util.HashMap;
import java.util.List;
import java.util.logging.Level;
import java.util.logging.Logger;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.jobcontrol.ControlledJob;
import org.apache.hadoop.mapreduce.lib.jobcontrol.JobControl;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;
/**
*
* 時段系統(bucketed system),在物料需求計劃(MRP)、配銷資源規劃(DRP)或其餘時程化(time-phased)的系統裏,
* 全部時程化的資料都累積在同一時期,或稱時段(buchet)。若是累積的時間是以周爲時間單位,此係統就稱爲周時段(weekly buckets)。
* 周時段(weekly buckets)便是一種以周爲單位的統計方式
* @function 統計每一個事件在每一個周時段內發生的次數
*
*
*/
public class SanFranciscoCrime extends MapReduceJobBase implements Tool
{

private static Logger log = Logger
.getLogger(SanFranciscoCrime.class.getCanonicalName());

/**
* CrimeMapper是一個公共的父類
*/
public static class CrimeMapper extends Mapper<LongWritable, Text, Text, Text>
{

protected int keyID = 0;

protected int valueID = 0;

public void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException
{
String line = value.toString();
try {
String[] col = DataFile.getColumns(line);
if (col != null)
{
// 防止數組超界
if (col.length >= (DISTRICT_COLUMN_INDEX + 1))
{
//過濾文件第一行頭部名稱
if (!"date".equalsIgnoreCase(col[valueID]))
{
Text tk = new Text();
tk.set(col[keyID]);
Text tv = new Text();
tv.set(col[valueID]);
context.write(tk, tv);
}
} else
{
log.warning(MessageFormat.format(
"Data {0} did not parse into columns.",
new Object[] { line }));
}
} else
{
log.warning(MessageFormat.format(
"Data {0} did not parse into columns.",
new Object[] { line }));
}
} catch (NumberFormatException nfe)
{
log.log(Level.WARNING, MessageFormat
.format("Expected {0} to be a number.\n",
new Object[] { line }), nfe);
} catch (IOException e) {
log.log(Level.WARNING, MessageFormat.format(
"Cannot parse {0} into columns.\n",
new Object[] { line }), e);
}
}
}

/**
* 輸出key爲犯罪類別,value爲日期
*/
public static class CategoryMapByDate extends CrimeMapper
{
public CategoryMapByDate()
{
keyID = CATEGORY_COLUMN_INDEX;//key爲犯罪類別
valueID = DATE_COLUMN_INDEX;//value爲日期
}
}

/**
* 輸出key爲犯罪區域,value爲日期
*/
public static class DistrictMapByDate extends CrimeMapper
{
public DistrictMapByDate()
{
keyID = DISTRICT_COLUMN_INDEX;//key爲犯罪區域
valueID = DATE_COLUMN_INDEX;//value爲日期
}
}

/**
* 統計並解析 Mapper 端的輸出結果
*/
public static class CrimeReducerByWeek extends Reducer<Text, Text, Text, Text>
{

public void reduce(Text key, Iterable<Text> values, Context context) throws IOException, InterruptedException
{

List<String> incidents = new ArrayList<String>();
// 將values放入incidents列表中
for (Text value : values)
{
incidents.add(value.toString());
}
if (incidents.size() > 0)
{
//對incidents列表排序
Collections.sort(incidents);
java.util.Map<Integer, Integer> weekSummary = new HashMap<Integer, Integer>();
//由於是對1-3月數據分析,周時段(weekly buckets)最大爲15,因此weekSummary長度爲15便可
for (int i = 0; i < 16; i++)
{
weekSummary.put(i, 0);
}
//統計每一個周時段(weekly buckets)內,該事件發生的次數
for (String incidentDay : incidents)
{
try
{
Date d = getDate(incidentDay);
Calendar cal = Calendar.getInstance();
cal.setTime(d);
int week = cal.get(Calendar.WEEK_OF_MONTH);//這個月的第幾周
int month = cal.get(Calendar.MONTH);//第幾個月,從0開始
//若是累積的時間是以周爲時間單位,此係統就稱爲周時段(weekly buckets)。
//周時段的計算公式,最大爲15,它只是一種統計方式,沒必要深究
int bucket = (month * 5) + week;
//統計每一個周時段內,該事件發生的次數
if (weekSummary.containsKey(bucket))
{
weekSummary.put(bucket, new Integer(weekSummary
.get(bucket).intValue() + 1));
} else
{
weekSummary.put(bucket, new Integer(1));
}
} catch (ParseException pe)
{
log.warning(MessageFormat.format("Invalid date {0}",
new Object[] { incidentDay }));
}
}
// 將該事件在每一個周時段內發生的次數生成字符串輸出
StringBuffer rpt = new StringBuffer();
boolean first = true;
for (int week : weekSummary.keySet())
{
if (first)
{
first = false;
} else
{
rpt.append(",");
}
rpt.append(new Integer(weekSummary.get(week)).toString());
}
String list = rpt.toString();
Text tv = new Text();
tv.set(list);
//value爲0-15周時段內,該事件發生的次數
context.write(key, tv);
}
}
}
@Override
public int run(String[] args) throws Exception
{
Configuration conf1 = new Configuration();

Path out1 = new Path(args[1]);

FileSystem hdfs1 = out1.getFileSystem(conf1);
if (hdfs1.isDirectory(out1))
{
hdfs1.delete(out1, true);
}

// 任務1
Job job1 = new Job(conf1, "crime");
job1.setJarByClass(SanFranciscoCrime.class);

job1.setMapperClass(CategoryMapByDate.class);
job1.setReducerClass(CrimeReducerByWeek.class);
job1.setOutputKeyClass(Text.class);
job1.setOutputValueClass(Text.class);

FileInputFormat.addInputPath(job1, new Path(args[0]));
FileOutputFormat.setOutputPath(job1, new Path(args[1]));
// 任務2
Configuration conf2 = new Configuration();
Path out2 = new Path(args[2]);
FileSystem hdfs2 = out2.getFileSystem(conf2);
if (hdfs2.isDirectory(out2))
{
hdfs2.delete(out2, true);
}
Job job2 = new Job(conf2, "crime");
job2.setJarByClass(SanFranciscoCrime.class);

job2.setMapperClass(DistrictMapByDate.class);
job2.setReducerClass(CrimeReducerByWeek.class);
job2.setOutputKeyClass(Text.class);
job2.setOutputValueClass(Text.class);
FileInputFormat.addInputPath(job2, new Path(args[0]));
FileOutputFormat.setOutputPath(job2, new Path(args[2]));

// 構造一個 cJob1
ControlledJob cJob1 = new ControlledJob(conf1);
//設置 MapReduce job1
cJob1.setJob(job1);

// 構造一個 cJob2
ControlledJob cJob2 = new ControlledJob(conf2);
//設置 MapReduce job2
cJob2.setJob(job2);

//cJob2.addDependingJob(cJob1);// cjob2依賴cjob1

// 定義job管理對象
JobControl jobControl = new JobControl("12");

//把兩個構造的job加入到JobControl中
jobControl.addJob(cJob1);
jobControl.addJob(cJob2);

//啓動線程運行任務
Thread t = new Thread(jobControl);
t.start();
while (true)
{
if (jobControl.allFinished())
{
jobControl.stop();
break;
}

}
return 0;

}

public static void main(String[] args) throws Exception
{
String[] args0 =
{
"hdfs://HadoopMaster:9000/middle/crime/crime.csv",
"hdfs://HadoopMaster:9000/middle/test/out1/",
"hdfs://HadoopMaster:9000/middle/test/out2/" };
int ec = ToolRunner.run(new Configuration(), new SanFranciscoCrime(), args0);
System.exit(ec);
}

}

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

package zhouls.bigdata.myMapReduce.SanFranciscoCrime;

import java.io.IOException;

import java.net.URI;
import java.text.MessageFormat;
import java.text.ParseException;
import java.util.HashMap;
import java.util.List;
import java.util.logging.Level;
import java.util.logging.Logger;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;
/**
*
* @function 統計天天每種犯罪類型在每一個區域發生的次數
*
*
*/
public class SanFranciscoCrimePrepOlap extends MapReduceJobBase implements Tool
{

private static Logger log = Logger.getLogger(SanFranciscoCrimePrepOlap.class.getCanonicalName());
private static List<String> categories = null;
private static List<String> districts = null;
private static final java.util.Map<String, Integer> categoryLookup = new HashMap<String, Integer>();
private static final java.util.Map<String, Integer> districtLookup = new HashMap<String, Integer>();
public static abstract class Map extends Mapper<LongWritable, Text, Text, Text>
{
protected int keyID = 0;
protected int valueID = 0;
protected int value2ID = 0;

/**
* @function 將key值轉換爲規範的數據格式
* @param value 包含不規範的 key值
* @return 返回規範的key值
* @throws ParseException
*/
protected abstract String formatKey(String value) throws ParseException;

public void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException
{
String line = value.toString();
try
{
String[] col = DataFile.getColumns(line);//將讀取的每行數據轉換爲數組
if (col != null)
{
if (col.length >= (DISTRICT_COLUMN_INDEX + 1))
{
Text tk = new Text();
tk.set(formatKey(col[keyID]));//將日期做爲key值
Text tv = new Text();
StringBuffer sv = new StringBuffer();
sv.append("\"");
sv.append(col[valueID]);//犯罪區域
sv.append("\"");
sv.append(",");
sv.append("\"");
sv.append(col[value2ID]);//犯罪類型
sv.append("\"");
tv.set(sv.toString());
context.write(tk, tv);
} else
{
log.warning(MessageFormat.format("Data {0} did not parse into columns.", new Object[]{line}));
}
} else
{
log.warning(MessageFormat.format("Data {0} did not parse into columns.", new Object[]{line}));
}
} catch (NumberFormatException nfe)
{
log.log(Level.WARNING, MessageFormat.format("Expected {0} to be a number.\n", new Object[]{line}), nfe);
} catch (IOException e)
{
log.log(Level.WARNING, MessageFormat.format("Cannot parse {0} into columns.\n", new Object[]{line}), e);
} catch (ParseException e)
{
log.log(Level.WARNING, MessageFormat.format("Expected {0} to be a date but it was not.\n", new Object[]{line}), e);
}
}
}

/**
* @function 將 map 輸入數據的日期做爲key,犯罪區域和犯罪類型做爲value,而後輸出
*/
public static class DateMapByCategoryAndDistrict extends Map
{
public DateMapByCategoryAndDistrict()
{
keyID = DATE_COLUMN_INDEX;//表明日期下標
valueID = DISTRICT_COLUMN_INDEX;//表明犯罪區域下標
value2ID = CATEGORY_COLUMN_INDEX;//表明犯罪類型下標
}

@Override
protected String formatKey(String value) throws ParseException
{
return outputDateFormat.format(getDate(value));
}
}

public static class Reduce extends Reducer<Text, Text, Text, Text>
{
public void reduce(Text key, Iterable<Text> values, Context context) throws IOException, InterruptedException
{
// 分配和初始化犯罪類型所在區域的二維數組
int[][] crimes = new int[categories.size()][districts.size()];
for (int i = 0; i < categories.size(); i++)
{
for (int j = 0; j < districts.size(); j++)
{
crimes[i][j] = 0;
}
}
//統計犯罪類型/區域二維數組的值(即每種犯罪類型在每一個區域發生的次數)
for (Text crime:values)
{
String[] cols = DataFile.getColumns(crime.toString());
if (cols.length == 2)
{
if (categoryLookup.containsKey(cols[1]))
{
if (districtLookup.containsKey(cols[0]))
{
int cat = categoryLookup.get(cols[1]);
int dist = districtLookup.get(cols[0]);
crimes[cat][dist]++;
} else
{
log.warning(MessageFormat.format("District {0} not found.", new Object[]{cols[0]}));
}
} else
{
log.warning(MessageFormat.format("Category {0} not found.", new Object[]{cols[1]}));
}
} else
{
log.warning(MessageFormat.format("Input {0} was in unexpected format", new Object[]{crime}));
}
}
//將非0二維數組的犯罪類別下標,犯罪區域下標,犯罪次數做爲value輸出
for (int i = 0; i < categories.size(); i++)
{
for (int j = 0; j < districts.size(); j++)
{
if (crimes[i][j] > 0)
{
StringBuffer sv = new StringBuffer();
sv.append(new Integer(i).toString());//犯罪類別下標
sv.append(",");
sv.append(new Integer(j).toString());//犯罪區域下標
sv.append(",");
sv.append(new Integer(crimes[i][j]));//犯罪次數
Text tv = new Text();
tv.set(sv.toString());
context.write(key, tv);
}
}
}
}
}
/**
* @function 加載已經生成的 犯罪類別數據和犯罪區域數據,並將這些數據排序後存入Map
* @param categoryReport SanFranciscoCrime job任務輸出犯罪類別的文件路徑
* @param districtReport SanFranciscoCrime job任務輸出犯罪區域的文件路徑
* @throws IOException
*/
private static void setup(String categoryReport, String districtReport,FileSystem fs) throws IOException
{
categories = DataFile.extractKeys(categoryReport,fs);
districts = DataFile.extractKeys(districtReport,fs);
int i = 0;
for (String category : categories)
{
categoryLookup.put(category, i++);
}
i = 0;
for (String district : districts)
{
districtLookup.put(district, i++);
}
}
@Override
public int run(String[] arg0) throws Exception
{
// TODO Auto-generated method stub
Configuration conf = new Configuration();

Path out = new Path(arg0[3]);

FileSystem hdfs = out.getFileSystem(conf);
if (hdfs.isDirectory(out))
{
hdfs.delete(out, true);
}

// 任務1
Job job = new Job(conf, "SanFranciscoCrimePrepOlap");
job.setJarByClass(SanFranciscoCrimePrepOlap.class);

job.setMapperClass(DateMapByCategoryAndDistrict.class);//Mapper
job.setReducerClass(Reduce.class);//Reducer
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(Text.class);

FileInputFormat.addInputPath(job, new Path(arg0[0]));
FileOutputFormat.setOutputPath(job, new Path(arg0[3]));
job.waitForCompletion(true);//提交任務
return 0;
}

public static void main(String[] args) throws Exception
{
String[] args0 = {
"hdfs://HadoopMaster:9000/middle/crime/crime.csv",
"hdfs://HadoopMaster:9000/middle/test/out1/part-r-00000",
"hdfs://HadoopMaster:9000/middle/test/out2/part-r-00000",
"hdfs://HadoopMaster:9000/middle/test/out3/"};
if (args0.length == 4)
{
Configuration conf = new Configuration();
FileSystem fs = FileSystem.get(URI.create("hdfs://HadoopMaster:9000"), conf);
//調用setup
setup(args0[1], args0[2],fs);
//執行MapReduce任務
int ec = ToolRunner.run(conf, new SanFranciscoCrimePrepOlap(), args0);
System.exit(ec);
} else
{
System.err.println("\nusage: bin/hadoop jar sfcrime.hadoop.mapreduce.jobs-0.0.1-SNAPSHOT.jar SanFranciscoCrimePrepOlap path/to/category/report path/to/district/report path/to/input/data path/to/output/data");
}
}
}

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

package zhouls.bigdata.myMapReduce.SanFranciscoCrime;

import java.io.BufferedReader;

import java.io.IOException;
import java.io.InputStreamReader;
import java.net.URI;
import java.sql.Connection;
import java.sql.DriverManager;
import java.sql.SQLException;
import java.sql.Statement;
import java.text.DateFormat;
import java.text.MessageFormat;
import java.text.ParseException;
import java.text.SimpleDateFormat;
import java.util.Calendar;
import java.util.Date;
import java.util.HashMap;
import java.util.List;
import java.util.Map;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;

 

 

/***
* @function 從 MapReduce 任務中,提取數據,插入到mysql數據庫
*
*/
public class LoadStarDB
{

private Connection db = null;//mysql數據庫鏈接

private Map<String, Integer> lastPrimaryKey = new HashMap<String, Integer>();

private List<String> categories = null;//犯罪類別list

private List<String> districts = null;//犯罪區域list

//映射date主鍵的關係
private final java.util.Map<Date, Integer> timeperiodLookup = new HashMap<Date, Integer>();

private final DateFormat df = new SimpleDateFormat("MM/dd/yyyy");//插入數據庫的日期格式

private final DateFormat kdf = new SimpleDateFormat("yyyy/MM/dd");//從map/reduce任務輸出文件中,解析出此日期

/***
* @function 向數據庫表中插入一條記錄
* @param table 表名稱
* @param row 包含插入字段的數據
* @return 返回此記錄的主鍵id
* @throws SQLException
*/
private int insert(String table, DataRecord row) throws SQLException
{
int retVal = 0;
Statement s = db.createStatement();
StringBuffer sql = new StringBuffer();
sql.append("insert into ");
sql.append(table);
sql.append(" ");

sql.append(row.toString());
s.execute(sql.toString());
if (lastPrimaryKey.containsKey(table))
{
retVal = lastPrimaryKey.get(table) + 1;
lastPrimaryKey.put(table, retVal);
} else
{
lastPrimaryKey.put(table, 1);
retVal = 1;
}
return retVal;
}

/***
* @function 向數據庫中插入一條犯罪類別記錄
* @param category name字段對應的值
* @return 返回此記錄的主鍵id
* @throws SQLException
*/
private int insertCategory(String category) throws SQLException
{
DataRecord dr = new DataRecord();
dr.put("name", category);
return insert("category", dr);
}

/***
* @function 向數據庫中插入一條犯罪區域記錄
* @param district name字段對應的值
* @return 返回此記錄的主鍵id
* @throws SQLException
*/
private int insertDistrict(String district) throws SQLException
{
DataRecord dr = new DataRecord();
dr.put("name", district);
return insert("district", dr);
}

/***
* @function 將日期date拆分爲字段 year, month, week, 和 day
* @param dr 包含date被拆分的字段
* @param d 須要拆分的date日期
*/
private void setTimePeriod(DataRecord dr, Date d)
{
Calendar cal = Calendar.getInstance();
cal.setTime(d);
dr.put("year", cal.get(Calendar.YEAR));
dr.put("month", cal.get(Calendar.MONTH));
dr.put("week", cal.get(Calendar.WEEK_OF_MONTH));
dr.put("day", cal.get(Calendar.DAY_OF_MONTH));
}

/***
* @function 若是日期date已經存在表中,返回主鍵id,若是不存在,則插入數據庫並返回主鍵id
* @param d 日期date
* @return 返回此日期對應的主鍵id
* @throws SQLException
*/
private int insertTimePeriod(Date d) throws SQLException
{
int retVal = 0;
if (timeperiodLookup.containsKey(d))
{
retVal = timeperiodLookup.get(d);
} else
{
DataRecord dr = new DataRecord();
setTimePeriod(dr, d);
retVal = insert("timeperiod", dr);
timeperiodLookup.put(d, retVal);
}
return retVal;
}

/***
* @function 將數據記錄插入fact表中
* @param districtId 犯罪區域外鍵id
* @param categoryId 犯罪類別外鍵id
* @param timeId 日期外鍵id
* @param crimes 在某一日期 某一區域 發生某一犯罪類別的總犯罪次數
* committed in this district of this category at his time*
* @throws SQLException
*/
private void insertFact(int districtId, int categoryId, int timeId, int crimes) throws SQLException
{
DataRecord dr = new DataRecord();
dr.put("district_id", districtId);
dr.put("category_id", categoryId);
dr.put("time_id", timeId);
dr.put("crimes", crimes);
insert("fact", dr);
}

/***
* @function 從SanFrancisco Crime map/reduce job輸出結果中,讀取數據
* @param categoryReport 犯罪類別文件路徑
* @param districtReport 犯罪區域文件路徑
* @throws IOException*
* @throws SQLException
*/
private void setup(String categoryReport, String districtReport,FileSystem fs) throws IOException, SQLException
{
categories = DataFile.extractKeys(categoryReport,fs);
districts = DataFile.extractKeys(districtReport,fs);
for (String category : categories)
{
insertCategory(category);
}
for (String district : districts)
{
insertDistrict(district);
}
}

/***
* @function 清空name表中的全部記錄
* @param name 表名稱
* @throws SQLException
*/
private void truncate(String name) throws SQLException
{
Statement s = db.createStatement();
s.execute("truncate table ".concat(name));
s.close();
}

/***
* @function 調用truncate()方法,清空表記錄
* @throws SQLException
*/
private void reset() throws SQLException
{
truncate("fact");
truncate("category");
truncate("district");
truncate("timeperiod");
}

/***
* @function 解析加載的數據
* @param categoryReport 犯罪類別文件路徑
* @param districtReport 犯罪區域文件路徑
* @param dbhost 數據庫地址
* @param dbname 數據庫名稱
* @param dbuser 用戶名
* @param dbpassword 密碼
* @throws ClassNotFoundException*
* @throws SQLException*
* @throws IOException
*/
private LoadStarDB(String categoryReport, String districtReport,
String dbhost, String dbname, String dbuser, String dbpassword,FileSystem fs)
throws ClassNotFoundException, SQLException, IOException
{
Class.forName("com.mysql.jdbc.Driver");
String cs = MessageFormat
.format("jdbc:mysql://192.168.80.128:3306/test?user=root&password=root&autoReconnect=true",
new Object[] { dbhost, dbname, dbuser, dbpassword });
db = DriverManager.getConnection(cs);
reset();
setup(categoryReport, districtReport,fs);
}

/***
*
* @function 處理 SanFranciscoCrimPrepOlap map/reduce job任務輸出結果,填充 timeperiod表和fact表
* @param dataFile 文件路徑
* @throws IOException*
* @throws ParseException
*/
private void processData(String dataFile,FileSystem fs) throws IOException,ParseException
{
FSDataInputStream in = fs.open(new Path(dataFile));//打開數據流
BufferedReader br = new BufferedReader(new InputStreamReader(in));//讀取數據
String line = br.readLine();
while (line != null)
{
String[] lp = line.split("\t");
if (lp.length > 0)
{
Date d = kdf.parse(lp[0]);//日期
String[] data = DataFile.getColumns(lp[1]);
if (data.length == 3)
{
try
{
int categoryId = Integer.parseInt(data[0]) + 1;//犯罪類別id
int districtId = Integer.parseInt(data[1]) + 1;//犯罪區域id
int crimes = Integer.parseInt(data[2]);//犯罪次數
int timeId = insertTimePeriod(d);//時間id
insertFact(districtId, categoryId, timeId, crimes);//插入fact表
} catch (NumberFormatException nfe)
{
System.err.println("invalid data: " + line);
} catch (SQLException e)
{
e.printStackTrace();
}
} else
{
System.err.println("invalid data: " + line);
}
}
line = br.readLine();
}
br.close();
}

/***
* @function 運行job任務
* @param args
* @throws IOException
* */
public static void main(String[] args) throws IOException
{
String[] args0 =
{
"hdfs://HadoopMaster:9000/middle/crime/out1/part-r-00000",
"hdfs://HadoopMaster:9000/middle/crime/out2/part-r-00000",
"hdfs://HadoopMaster:9000/middle/crime/out3/part-r-00000",
"192.168.80.128:3306",
"test",
"root",
"root"};
if (args0.length == 7)
{
Configuration conf = new Configuration();
FileSystem fs = FileSystem.get(URI.create("hdfs://HadoopMaster:9000"), conf);
try
{
LoadStarDB m = new LoadStarDB(args0[0], args0[1], args0[3],args0[4], args0[5], args0[6],fs);
m.processData(args0[2],fs);
} catch (ClassNotFoundException e)
{
e.printStackTrace();
} catch (SQLException e)
{
e.printStackTrace();
} catch (IOException e)
{
e.printStackTrace();
} catch (ParseException e)
{
e.printStackTrace();
}
} else {
System.err
.println("\nusage: java -jar sfcrime.hadoop.mapreduce.jobs-0.0.1-SNAPSHOT.jar com.dynamicalsoftware.olap.etl.LoadStarDB path/to/category/report path/to/district/report path/to/star/data dbhost dbname dbuser dbpassword\n");
}
}

/***
* 生成一條數據記錄
*/
class DataRecord extends HashMap<String, Object>
{
@Override
public String toString()
{
StringBuffer retVal = new StringBuffer();
// 生成表的數據字段
retVal.append("(");
boolean first = true;
for (String key : keySet())
{
if (first)
{
first = false;
} else
{
retVal.append(",");
}
retVal.append(key);
}
//生成表字段對應的值
retVal.append(") values (");
first = true;
for (String key : keySet())
{
Object o = get(key);
if (first)
{
first = false;
} else
{
retVal.append(",");
}
if (o instanceof Long)
{
retVal.append(((Long) o).toString());
} else if (o instanceof Integer)
{
retVal.append(((Integer) o).toString());
} else if (o instanceof Date)
{
Date d = (Date) o;
retVal.append("'");
retVal.append(df.format(d));
retVal.append("'");
} else if (o instanceof String)
{
retVal.append("'");
retVal.append(o.toString());
retVal.append("'");
}
}
retVal.append(")");
//返回一條sql格式的數據記錄
return retVal.toString();
}
}

}

相關文章
相關標籤/搜索