把本地的文件數據導入到Hive分區表--系列①Java代碼

        本篇博客,小菌爲你們帶來關於如何將本地的多個文件導入到Hive分區表中對應的分區上的方法。一共有四種方法,本篇將介紹第一種—Java代碼。
        
        首先編寫代碼,經過MapReduce將處理好的數據寫入到HDFS的目錄下。下面提供一種參考!java

        

Map

public class Mapper01 extends Mapper<LongWritable, Text,Text,Text> { 
 
   
    /** * * @param key 行首偏移量 * @param value 一整行的數據 * @param context 上下文對象 * @throws IOException * @throws InterruptedException */
    @Override
    protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException { 
 
   

        //思路:
        //獲取一行數據,使用\t進行分割。若分割後造成的數組大於11(角標爲11的字段爲日期格式數據),而且角標爲14的字段不等於空。
        String[] splits = value.toString().trim().split("\t");

        if(splits.length> 15 && !"" .equals(splits[14])){ 
 
   

            //截取出數據中的日期數據(含時間格式爲yyyy-MM-dd HH:mm:ss)
            String dataTime = splits[14];

            //若數據中包含空格
            if (dataTime.contains(" ")){ 
 
   

                //截取出數據中的日期(格式爲:yyyy-MM-dd)
                String data = dataTime.substring(0, dataTime.indexOf(" "));

                //分別獲取年份,月份,日期
                String[] split = data.split("-");
                String year = split[0];
                String month  = split[1];
                String day = split[2];

                //只有年份大於2000年之後而且月份和日數爲兩位數的才爲有效數據
                if (Integer.parseInt(year)>=2000 && Integer.parseInt(year)<=2019 && month.length()==2 && day.length()==2){ 
 
   

                    // 進一步獲取時分秒

                    int i = dataTime.indexOf(" ");

                    String time = dataTime.substring(i).trim();

                    //按照 : 進行切分
                    String[] split1 = time.split(":");

                    //若是切分的長度等於3才繼續作判斷
                    if (split1.length==3){ 
 
   

                        //獲取到時分秒
                        String hour = split1[0];
                        String min = split1[1];
                        String sec = split1[2];

                        if (hour.length()==2&&min.length()==2&&sec.length()==2){ 
 
   

                            //符合上述的全部條件以後就能夠輸出了
                            context.write(new Text(data),value);

                        }
                    }


                }

            }
        }


    }
}

        

Reduce

public class Reducer01 extends Reducer<Text,Text, NullWritable,NullWritable> { 
 
   

    private static FileSystem hdfs;

    static { 
 
   
        try { 
 
   
            hdfs = FileSystem.get(new URI("hdfs://node01:8020/"), new Configuration(),"root");
        } catch (IOException e) { 
 
   
            e.printStackTrace();

        } catch (URISyntaxException e) { 
 
   

            e.printStackTrace();

        } catch (InterruptedException e) { 
 
   
            e.printStackTrace();
        }
    }

    /** * * @param key * @param values yyyy * @param context * @throws IOException * @throws InterruptedException */
    @Override
    protected void reduce(Text key, Iterable<Text> values, Context context) throws IOException, InterruptedException { 
 
   

        // 在輸入key值中切分字符串
        if (key.toString().contains("-")){ 
 
   

            StringBuilder stringBuilder = new StringBuilder();

            for (Text value : values) { 
 
   

                stringBuilder.append(value.toString()).append("\r\n");

            }

            //建立一個目錄
            FSDataOutputStream outputStream = hdfs.create(new Path("/cells_info/results/"+key+".txt"));

            //寫入數據
            outputStream.writeBytes(stringBuilder.toString());

            //關閉資源
            outputStream.close();


        }

    }
}

        

Runner01

public class Runner01 { 
 
   

    public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException { 
 
   

        //新建一個配置文件對象
        Configuration conf = new Configuration();

        //實例化job對象
        Job job = new Job(conf);

        //設置本地下載
        //job.setJarByClass(Runner01.class);

        //設置map輸出類型
        job.setMapperClass(Mapper01.class);
        job.setMapOutputKeyClass(Text.class);
        job.setMapOutputValueClass(Text.class);

        //設置reduce類型
        job.setReducerClass(Reducer01.class);
        job.setOutputKeyClass(NullWritable.class);
        job.setOutputValueClass(NullWritable.class);

        //設置輸入和輸出
        job.setInputFormatClass(TextInputFormat.class);
        //TextInputFormat.addInputPath(job,new Path("hdfs://192.168.100.100:8020/cells_info/cell_strength_data.sql"));
        TextInputFormat.addInputPath(job,new Path("hdfs://192.168.100.100:8020/cells_info/cell_strength_data.sql"));

        job.setOutputFormatClass(TextOutputFormat.class);
        TextOutputFormat.setOutputPath(job,new Path("hdfs://192.168.100.100:8020/cells_info/all_sort"));

        // 設置reduceTask的數量

        // 等待執行
        boolean result = job.waitForCompletion(true);

        System.out.println("status:"+result);


    }
}

        
執行到這裏咱們已經成功將數據清洗後寫入到HDFS的目錄下了。
在這裏插入圖片描述node

接下來咱們須要作的,就是把HDFS上的多個文件經過Java寫入到Hive的分區表linux

自定義一個類書寫數據導入類LoadDataweb

LoadData

public class LoadData{ 
 
   

    public static void main(String[] args) throws Exception { 
 
   

        //設置連接的服務器
        ConnBean connBean=new ConnBean("node01", "root","123456" );
        //連接服務器
        SSHExec sshExec =SSHExec.getInstance(connBean);
        sshExec.connect();

   
        FileSystem hdfs = FileSystem.get(new URI("hdfs://node01:8020/"), new Configuration(), "root");

        //獲取某一目錄下的全部文件
        FileStatus[] status = hdfs.listStatus(new Path("/cells_info/result/"));

        //遍歷輸出
        for (FileStatus fileStatus : status) { 
 
   


            // 獲取文件名
            String string = fileStatus.getPath().getName();

            String[] split1 = string.split("-");

            // 獲取年份
            String year = split1[0];
            // 獲取月份
            String month = split1[1];

            String days = split1[2];
            // 獲取天數
            String day = days.substring(0,days.indexOf(".txt"));

            // 設置命令,執行以後至關於在Linux上執行
// 
            // ExecCommand add = new ExecCommand("hive -e \"LOAD DATA INPATH '"+fileStatus.getPath()+"' OVERWRITE INTO TABLE telecom.cell_strength_2 PARTITION (DS='local',year='"+year+"',month = '"+month+"' , day = '"+day+"'); \"");
            ExecCommand execCommand2 = new ExecCommand("hive -e \"LOAD DATA INPATH '"+fileStatus.getPath()+"' OVERWRITE INTO TABLE telecom.cell_strength PARTITION (YEAR='"+year+"',MONTH = '"+month+"' , DAY = '"+day+"'); \"");

            //執行命令
            Result exec2 = sshExec.exec(execCommand2);

        }

        //關閉鏈接
        sshExec.disconnect();

        hdfs.close();


    }
}

經過在LoadData 類中設置命令以後,而後執行Java程序執行命令,就能夠作到用Java代碼實如今linux中從外部文件導入分區表的操做!sql

導入成功後的在HDFS,能夠經過目錄結構查看分區後的詳細狀況!
在這裏插入圖片描述
在這裏插入圖片描述
在這裏插入圖片描述數組

到這裏咱們就實現了經過Java代碼把本地的文件數據導入到Hive的分區表中的操做!
        
        
下一篇博客,將介紹的是經過Linux腳本的方式批量導入數據至不一樣的分區,敬請期待!服務器

        
        
        
在這裏插入圖片描述

本文同步分享在 博客「Alice菌」(CSDN)。
若有侵權,請聯繫 support@oschina.cn 刪除。
本文參與「OSC源創計劃」,歡迎正在閱讀的你也加入,一塊兒分享。app

相關文章
相關標籤/搜索