數據清洗

一、 數據清洗:按照進行數據清洗,並將清洗後的數據導入hive數據庫中java

兩階段數據清洗:mysql

(1)第一階段:把須要的信息從原始日誌中提取出來sql

ip:    199.30.25.88數據庫

time:  10/Nov/2016:00:01:03 +0800apache

traffic:  62app

文章: article/11325ide

視頻: video/3235工具

(2)第二階段:根據提取出來的信息作精細化操做oop

ip--->城市 cityIPurl

date--> time:2016-11-10 00:01:03

day: 10

traffic:62

type:article/video

id:11325

(3)hive數據庫表結構:

create table data(  ip string,  time string , day string, traffic bigint,

type string, id   string )

 

 

經過mapreduce中的map程序對源數據進行去除逗號和轉化日期格式,採用跟mysql相同的解決方式對hive數據庫進行增長數據。

源代碼:

mapreduce類







package MapReduceMethod; import java.io.IOException; import java.text.SimpleDateFormat; import java.util.Date; import java.util.Locale; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.io.NullWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.Mapper; import org.apache.hadoop.mapreduce.Reducer; import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; import org.apache.hadoop.mapreduce.lib.input.TextInputFormat; import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat; import Entity.datainfo; import Service.Service; public class QingxiRuku { static Service service=new Service(); public static final SimpleDateFormat FORMAT = new SimpleDateFormat("d/MMM/yyyy:HH:mm:ss", Locale.ENGLISH); //原時間格式 public static final SimpleDateFormat dateformat1 = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");//現時間格式 private static Date parseDateFormat(String string) { //轉換時間格式 Date parse = null; try { parse = FORMAT.parse(string); } catch (Exception e) { e.printStackTrace(); } return parse; } public static class Map extends Mapper<LongWritable , Text , Text , Text>{ public void map(LongWritable key,Text value,Context context) throws IOException, InterruptedException{ String line=value.toString(); System.out.println(line); String arr[]=line.split(","); Date date = parseDateFormat(arr[1]); context.write(new Text(arr[0]), new Text(dateformat1.format(date)+","+arr[2]+","+arr[3]+","+arr[4]+","+arr[5])); } } public static class Reduce extends Reducer<Text, Text, Text, Text>{ static datainfo info=new datainfo(); public void reduce(Text key, Iterable<Text> values, Context context) throws IOException, InterruptedException{ for (Text value : values) { String[] strNlist = value.toString().split(","); info.setTime(strNlist[0]); info.setDay(strNlist[1]); info.setTraffic(strNlist[2]); info.setType(strNlist[3]); info.setId(strNlist[4]); service.add("data", info); context.write(new Text(key), new Text(value)); } } } public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException{ Configuration conf=new Configuration(); System.out.println("start"); Job job =new Job(conf,"QingxiRuku"); job.setJarByClass(QingxiRuku.class); job.setMapperClass(Map.class); job.setReducerClass(Reduce.class); //設置map輸出的key類型 job.setMapOutputKeyClass(Text.class); //設置map輸出的value類型 job.setMapOutputValueClass(Text.class); //設置輸出的key類型 job.setOutputKeyClass(Text.class); //設置輸出的value類型 job.setOutputValueClass(Text.class); job.setInputFormatClass(TextInputFormat.class); job.setOutputFormatClass(TextOutputFormat.class); Path in=new Path("hdfs://localhost:9000/mymapreduce1/in/result"); Path out=new Path("hdfs://localhost:9000/mymapreduce1/test"); FileInputFormat.addInputPath(job,in); FileOutputFormat.setOutputPath(job,out); System.exit(job.waitForCompletion(true) ? 0 : 1); } }

  DBUtil類 連接數據庫:

package DBUtil;

import java.sql.Connection;
import java.sql.DriverManager;
import java.sql.PreparedStatement;
import java.sql.ResultSet;
import java.sql.SQLException;
import java.sql.Statement;

import Entity.EntityToString;
import Service.Service;

/**
 * 數據庫鏈接工具
 * @author Hu
 *
 */
public class DBUtil {
	
	public static String db_url = "jdbc:mysql://localhost:3306/hive?createDatabaseIfNotExist=true";
	public static String db_user = "hive";
	public static String db_pass = "hive";
	
	public static Connection getConn () {
		Connection conn = null;
		
		try {
			Class.forName("org.apache.hive.jdbc.HiveDriver");//加載驅動
			
			conn = DriverManager.getConnection(db_url, db_user, db_pass);
		} catch (Exception e) {
			e.printStackTrace();
		}
		
		return conn;
	}
	
	/**
	 * 關閉鏈接
	 * @param state
	 * @param conn
	 */
	public static void close (Statement state, Connection conn) {
		if (state != null) {
			try {
				state.close();
			} catch (SQLException e) {
				e.printStackTrace();
			}
		}
		
		if (conn != null) {
			try {
				conn.close();
			} catch (SQLException e) {
				e.printStackTrace();
			}
		}
	}
	
	public static void close (ResultSet rs, Statement state, Connection conn) {
		if (rs != null) {
			try {
				rs.close();
			} catch (SQLException e) {
				e.printStackTrace();
			}
		}
		
		if (state != null) {
			try {
				state.close();
			} catch (SQLException e) {
				e.printStackTrace();
			}
		}
		
		if (conn != null) {
			try {
				conn.close();
			} catch (SQLException e) {
				e.printStackTrace();
			}
		}
	}

	public static void main(String[] args) throws SQLException {
		Service cs=new Service();
		EntityToString ets=new EntityToString();
		/*System.out.println(ets.getStringList(cs.list("data1", InfoNo2.class)));
		System.out.println(ets.getStringList(cs.list("data4", InfoNo3.class)));
		System.out.println(ets.getStringList(cs.list("data5", InfoNo4.class)));*/
	}
}

  Dao層  操做數據庫類

package Dao;

import java.lang.reflect.Field;
import java.lang.reflect.Modifier;
import java.sql.Connection;
import java.sql.ResultSet;
import java.sql.ResultSetMetaData;
import java.sql.SQLException;
import java.sql.Statement;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;

import DBUtil.DBUtil;
import Entity.EntityToString;


/**
 * 通用類Dao
 * Dao層操做數據
 * @author HDQ
 *
 */
public class Dao {

	/**
	 * 添加
	 * @return
	 */
	
	
	/**
	 * 添加T
	 * @return
	 */
	public <T> boolean add(String table,T obj) {
		StringHandle sh=new StringHandle();
		EntityToString ets=new EntityToString();
		String []strList=sh.StringListToStringNlist(ets.getNameList(obj.getClass()));
		String []strList1=sh.StringListToStringNlist(ets.getStringListSingle(obj));
		
		if(strList.length==0)
			return false;
		String sql = "insert into "+table+"(";
		for(int i=0;i<strList.length;i++)
		{
			if(i!=strList.length-1)
				sql+=strList[i]+",";
			else sql+=strList[i]+")";
		}
		sql+=" values('";
		for(int i=0;i<strList1.length;i++)
		{
			if(i!=strList1.length-1)
				sql+=strList1[i]+"','";
			else sql+=strList1[i]+"')";
		}
		//建立數據庫連接
		Connection conn = DBUtil.getConn();
		Statement state = null;
		boolean f = false;
		int a = 0;
		
		try {
			state = conn.createStatement();
			a=state.executeUpdate(sql);
		} catch (Exception e) {
			e.printStackTrace();
		} finally {
			//關閉鏈接
			DBUtil.close(state, conn);
		}
		
		if (a > 0) {
			f = true;
		}
		return f;
	}
	
	
	
	
}

  Service層 

package Service;


import java.util.List;
import java.util.Map;

import Dao.Dao;


/**
 * CourseService
 * 服務層
 * @author HDQ
 *
 */
public class Service {

	Dao dao = new Dao();
	
	
	
	
	/**
	 * 添加T
	 * @param Class
	 * @return
	 */
	public <T> boolean add(String table,T obj) {
		boolean f = dao.add(table,obj);
		return f;
	}
	
	
	
	
}

  運行結果:

 

 

可是出現報錯

Table 'hive.data' doesn't exist

 

 可是個人表確實存在 還餓可以show tables;以及select * from data;

 

 問題困擾暫未解決