本篇博客,小菌爲你們帶來關於如何將本地的多個文件導入到Hive分區表中對應的分區上的方法。一共有四種方法,本篇將介紹第一種—Java代碼。
首先編寫代碼,經過MapReduce將處理好的數據寫入到HDFS的目錄下。下面提供一種參考!java
Map
public class Mapper01 extends Mapper<LongWritable, Text,Text,Text> { /** * * @param key 行首偏移量 * @param value 一整行的數據 * @param context 上下文對象 * @throws IOException * @throws InterruptedException */ @Override protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException { //思路: //獲取一行數據,使用\t進行分割。若分割後造成的數組大於11(角標爲11的字段爲日期格式數據),而且角標爲14的字段不等於空。 String[] splits = value.toString().trim().split("\t"); if(splits.length> 15 && !"" .equals(splits[14])){ //截取出數據中的日期數據(含時間格式爲yyyy-MM-dd HH:mm:ss) String dataTime = splits[14]; //若數據中包含空格 if (dataTime.contains(" ")){ //截取出數據中的日期(格式爲:yyyy-MM-dd) String data = dataTime.substring(0, dataTime.indexOf(" ")); //分別獲取年份,月份,日期 String[] split = data.split("-"); String year = split[0]; String month = split[1]; String day = split[2]; //只有年份大於2000年之後而且月份和日數爲兩位數的才爲有效數據 if (Integer.parseInt(year)>=2000 && Integer.parseInt(year)<=2019 && month.length()==2 && day.length()==2){ // 進一步獲取時分秒 int i = dataTime.indexOf(" "); String time = dataTime.substring(i).trim(); //按照 : 進行切分 String[] split1 = time.split(":"); //若是切分的長度等於3才繼續作判斷 if (split1.length==3){ //獲取到時分秒 String hour = split1[0]; String min = split1[1]; String sec = split1[2]; if (hour.length()==2&&min.length()==2&&sec.length()==2){ //符合上述的全部條件以後就能夠輸出了 context.write(new Text(data),value); } } } } } } }
Reduce
public class Reducer01 extends Reducer<Text,Text, NullWritable,NullWritable> { private static FileSystem hdfs; static { try { hdfs = FileSystem.get(new URI("hdfs://node01:8020/"), new Configuration(),"root"); } catch (IOException e) { e.printStackTrace(); } catch (URISyntaxException e) { e.printStackTrace(); } catch (InterruptedException e) { e.printStackTrace(); } } /** * * @param key * @param values yyyy * @param context * @throws IOException * @throws InterruptedException */ @Override protected void reduce(Text key, Iterable<Text> values, Context context) throws IOException, InterruptedException { // 在輸入key值中切分字符串 if (key.toString().contains("-")){ StringBuilder stringBuilder = new StringBuilder(); for (Text value : values) { stringBuilder.append(value.toString()).append("\r\n"); } //建立一個目錄 FSDataOutputStream outputStream = hdfs.create(new Path("/cells_info/results/"+key+".txt")); //寫入數據 outputStream.writeBytes(stringBuilder.toString()); //關閉資源 outputStream.close(); } } }
Runner01
public class Runner01 { public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException { //新建一個配置文件對象 Configuration conf = new Configuration(); //實例化job對象 Job job = new Job(conf); //設置本地下載 //job.setJarByClass(Runner01.class); //設置map輸出類型 job.setMapperClass(Mapper01.class); job.setMapOutputKeyClass(Text.class); job.setMapOutputValueClass(Text.class); //設置reduce類型 job.setReducerClass(Reducer01.class); job.setOutputKeyClass(NullWritable.class); job.setOutputValueClass(NullWritable.class); //設置輸入和輸出 job.setInputFormatClass(TextInputFormat.class); //TextInputFormat.addInputPath(job,new Path("hdfs://192.168.100.100:8020/cells_info/cell_strength_data.sql")); TextInputFormat.addInputPath(job,new Path("hdfs://192.168.100.100:8020/cells_info/cell_strength_data.sql")); job.setOutputFormatClass(TextOutputFormat.class); TextOutputFormat.setOutputPath(job,new Path("hdfs://192.168.100.100:8020/cells_info/all_sort")); // 設置reduceTask的數量 // 等待執行 boolean result = job.waitForCompletion(true); System.out.println("status:"+result); } }
執行到這裏咱們已經成功將數據清洗後寫入到HDFS的目錄下了。
node
接下來咱們須要作的,就是把HDFS上的多個文件經過Java寫入到Hive的分區表。linux
自定義一個類書寫數據導入類LoadDataweb
LoadData
public class LoadData{ public static void main(String[] args) throws Exception { //設置連接的服務器 ConnBean connBean=new ConnBean("node01", "root","123456" ); //連接服務器 SSHExec sshExec =SSHExec.getInstance(connBean); sshExec.connect(); FileSystem hdfs = FileSystem.get(new URI("hdfs://node01:8020/"), new Configuration(), "root"); //獲取某一目錄下的全部文件 FileStatus[] status = hdfs.listStatus(new Path("/cells_info/result/")); //遍歷輸出 for (FileStatus fileStatus : status) { // 獲取文件名 String string = fileStatus.getPath().getName(); String[] split1 = string.split("-"); // 獲取年份 String year = split1[0]; // 獲取月份 String month = split1[1]; String days = split1[2]; // 獲取天數 String day = days.substring(0,days.indexOf(".txt")); // 設置命令,執行以後至關於在Linux上執行 // // ExecCommand add = new ExecCommand("hive -e \"LOAD DATA INPATH '"+fileStatus.getPath()+"' OVERWRITE INTO TABLE telecom.cell_strength_2 PARTITION (DS='local',year='"+year+"',month = '"+month+"' , day = '"+day+"'); \""); ExecCommand execCommand2 = new ExecCommand("hive -e \"LOAD DATA INPATH '"+fileStatus.getPath()+"' OVERWRITE INTO TABLE telecom.cell_strength PARTITION (YEAR='"+year+"',MONTH = '"+month+"' , DAY = '"+day+"'); \""); //執行命令 Result exec2 = sshExec.exec(execCommand2); } //關閉鏈接 sshExec.disconnect(); hdfs.close(); } }
經過在LoadData 類中設置命令以後,而後執行Java程序執行命令,就能夠作到用Java代碼實如今linux中從外部文件導入分區表的操做!sql
導入成功後的在HDFS,能夠經過目錄結構查看分區後的詳細狀況!
數組
到這裏咱們就實現了經過Java代碼把本地的文件數據導入到Hive的分區表中的操做!
下一篇博客,將介紹的是經過Linux腳本的方式批量導入數據至不一樣的分區,敬請期待!服務器
本文同步分享在 博客「Alice菌」(CSDN)。
若有侵權,請聯繫 support@oschina.cn 刪除。
本文參與「OSC源創計劃」,歡迎正在閱讀的你也加入,一塊兒分享。app