一、 數據清洗:按照進行數據清洗,並將清洗後的數據導入hive數據庫中。java
兩階段數據清洗:mysql
(1)第一階段:把須要的信息從原始日誌中提取出來sql
ip: 199.30.25.88數據庫
time: 10/Nov/2016:00:01:03 +0800apache
traffic: 62app
文章: article/11325ide
視頻: video/3235工具
(2)第二階段:根據提取出來的信息作精細化操做oop
ip--->城市 city(IP)url
date--> time:2016-11-10 00:01:03
day: 10
traffic:62
type:article/video
id:11325
(3)hive數據庫表結構:
create table data( ip string, time string , day string, traffic bigint,
type string, id string )
經過mapreduce中的map程序對源數據進行去除逗號和轉化日期格式,採用跟mysql相同的解決方式對hive數據庫進行增長數據。
源代碼:
mapreduce類
package MapReduceMethod; import java.io.IOException; import java.text.SimpleDateFormat; import java.util.Date; import java.util.Locale; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.io.NullWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.Mapper; import org.apache.hadoop.mapreduce.Reducer; import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; import org.apache.hadoop.mapreduce.lib.input.TextInputFormat; import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat; import Entity.datainfo; import Service.Service; public class QingxiRuku { static Service service=new Service(); public static final SimpleDateFormat FORMAT = new SimpleDateFormat("d/MMM/yyyy:HH:mm:ss", Locale.ENGLISH); //原時間格式 public static final SimpleDateFormat dateformat1 = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");//現時間格式 private static Date parseDateFormat(String string) { //轉換時間格式 Date parse = null; try { parse = FORMAT.parse(string); } catch (Exception e) { e.printStackTrace(); } return parse; } public static class Map extends Mapper<LongWritable , Text , Text , Text>{ public void map(LongWritable key,Text value,Context context) throws IOException, InterruptedException{ String line=value.toString(); System.out.println(line); String arr[]=line.split(","); Date date = parseDateFormat(arr[1]); context.write(new Text(arr[0]), new Text(dateformat1.format(date)+","+arr[2]+","+arr[3]+","+arr[4]+","+arr[5])); } } public static class Reduce extends Reducer<Text, Text, Text, Text>{ static datainfo info=new datainfo(); public void reduce(Text key, Iterable<Text> values, Context context) throws IOException, InterruptedException{ for (Text value : values) { String[] strNlist = value.toString().split(","); info.setTime(strNlist[0]); info.setDay(strNlist[1]); info.setTraffic(strNlist[2]); info.setType(strNlist[3]); info.setId(strNlist[4]); service.add("data", info); context.write(new Text(key), new Text(value)); } } } public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException{ Configuration conf=new Configuration(); System.out.println("start"); Job job =new Job(conf,"QingxiRuku"); job.setJarByClass(QingxiRuku.class); job.setMapperClass(Map.class); job.setReducerClass(Reduce.class); //設置map輸出的key類型 job.setMapOutputKeyClass(Text.class); //設置map輸出的value類型 job.setMapOutputValueClass(Text.class); //設置輸出的key類型 job.setOutputKeyClass(Text.class); //設置輸出的value類型 job.setOutputValueClass(Text.class); job.setInputFormatClass(TextInputFormat.class); job.setOutputFormatClass(TextOutputFormat.class); Path in=new Path("hdfs://localhost:9000/mymapreduce1/in/result"); Path out=new Path("hdfs://localhost:9000/mymapreduce1/test"); FileInputFormat.addInputPath(job,in); FileOutputFormat.setOutputPath(job,out); System.exit(job.waitForCompletion(true) ? 0 : 1); } }
DBUtil類 連接數據庫:
package DBUtil; import java.sql.Connection; import java.sql.DriverManager; import java.sql.PreparedStatement; import java.sql.ResultSet; import java.sql.SQLException; import java.sql.Statement; import Entity.EntityToString; import Service.Service; /** * 數據庫鏈接工具 * @author Hu * */ public class DBUtil { public static String db_url = "jdbc:mysql://localhost:3306/hive?createDatabaseIfNotExist=true"; public static String db_user = "hive"; public static String db_pass = "hive"; public static Connection getConn () { Connection conn = null; try { Class.forName("org.apache.hive.jdbc.HiveDriver");//加載驅動 conn = DriverManager.getConnection(db_url, db_user, db_pass); } catch (Exception e) { e.printStackTrace(); } return conn; } /** * 關閉鏈接 * @param state * @param conn */ public static void close (Statement state, Connection conn) { if (state != null) { try { state.close(); } catch (SQLException e) { e.printStackTrace(); } } if (conn != null) { try { conn.close(); } catch (SQLException e) { e.printStackTrace(); } } } public static void close (ResultSet rs, Statement state, Connection conn) { if (rs != null) { try { rs.close(); } catch (SQLException e) { e.printStackTrace(); } } if (state != null) { try { state.close(); } catch (SQLException e) { e.printStackTrace(); } } if (conn != null) { try { conn.close(); } catch (SQLException e) { e.printStackTrace(); } } } public static void main(String[] args) throws SQLException { Service cs=new Service(); EntityToString ets=new EntityToString(); /*System.out.println(ets.getStringList(cs.list("data1", InfoNo2.class))); System.out.println(ets.getStringList(cs.list("data4", InfoNo3.class))); System.out.println(ets.getStringList(cs.list("data5", InfoNo4.class)));*/ } }
Dao層 操做數據庫類
package Dao; import java.lang.reflect.Field; import java.lang.reflect.Modifier; import java.sql.Connection; import java.sql.ResultSet; import java.sql.ResultSetMetaData; import java.sql.SQLException; import java.sql.Statement; import java.util.ArrayList; import java.util.HashMap; import java.util.List; import java.util.Map; import DBUtil.DBUtil; import Entity.EntityToString; /** * 通用類Dao * Dao層操做數據 * @author HDQ * */ public class Dao { /** * 添加 * @return */ /** * 添加T * @return */ public <T> boolean add(String table,T obj) { StringHandle sh=new StringHandle(); EntityToString ets=new EntityToString(); String []strList=sh.StringListToStringNlist(ets.getNameList(obj.getClass())); String []strList1=sh.StringListToStringNlist(ets.getStringListSingle(obj)); if(strList.length==0) return false; String sql = "insert into "+table+"("; for(int i=0;i<strList.length;i++) { if(i!=strList.length-1) sql+=strList[i]+","; else sql+=strList[i]+")"; } sql+=" values('"; for(int i=0;i<strList1.length;i++) { if(i!=strList1.length-1) sql+=strList1[i]+"','"; else sql+=strList1[i]+"')"; } //建立數據庫連接 Connection conn = DBUtil.getConn(); Statement state = null; boolean f = false; int a = 0; try { state = conn.createStatement(); a=state.executeUpdate(sql); } catch (Exception e) { e.printStackTrace(); } finally { //關閉鏈接 DBUtil.close(state, conn); } if (a > 0) { f = true; } return f; } }
Service層
package Service; import java.util.List; import java.util.Map; import Dao.Dao; /** * CourseService * 服務層 * @author HDQ * */ public class Service { Dao dao = new Dao(); /** * 添加T * @param Class * @return */ public <T> boolean add(String table,T obj) { boolean f = dao.add(table,obj); return f; } }
運行結果:
可是出現報錯
Table 'hive.data' doesn't exist
可是個人表確實存在 還餓可以show tables;以及select * from data;
問題困擾暫未解決