學習hadoop有幾天了,記錄一下心得。html
初期的目標是:java
1:數據採集的方式,以shell腳本爲主,系統配置某一些文件夾,每一個文件夾是一個採集器,這樣的話,一旦發現有對應的文件,那麼就調用shell去進行文件上傳。node
2:map reduce計算,也是以shell爲主。當有數據時,就自動對該數據進行計算,彙總成對應的批量入庫文件。mysql
3:調用批量入庫腳本,將數據批量執行到數據庫中。sql
安裝的過程:shell
先下載hadoop到本地,解壓後。數據庫
設置環境變量:apache
export HADOOP_HOME=/app/data/hadoop/hadoop export PATH=.:$HADOOP_HOME/bin:$JAVA_HOME/bin:$PATH
設置完成後,就能夠調用hadoop的命令了。bash
而後配置,環境變量,路徑爲:
服務器
etc/hadoop/
我主要配置五個文件:hdfs-site.xml,core-site.xml,hadoop-env.sh,yarn-env.sh,mapred-site.xml
這幾個文件的做用沒有徹底搞清楚:
<configuration> <property> <name>dfs.replication</name> <value>1</value> </property> </configuration> core-site.xml <configuration> <property> <name>fs.default.name</name> <value>hdfs://localhost:9000</value> </property> <property> <name>dfs.replication</name> <value>1</value> </property> <property> <name>hadoop.tmp.dir</name> <value>/app/data/hadoop/hadoop/tmp</value> </property> </configuration> hadoop-env.sh export JAVA_HOME=/usr/java/default/ yarn-env.sh export JAVA_HOME=/usr/java/default/ mapred-site.xml <configuration> <property> <name>mapred.job.tracker</name> <value>localhost:9001</value> </property> </configuration>
一個簡單的配置,配置完成後便可啓動服務:
格式化分佈式文件系統./hadoop namenode -format
基本上就能夠了。
對於數據的採集,這裏是從別人那裏拿了一個shell
#!/bin/bash ################################ #2015/07/15 #做者 董磊 #hadoop數據文件採集 ############################### date ROOT=`dirname $0` CONF=./test.conf #能夠利用讀取配置方法加載多個服數據 #for conf in `cat $CONF` #do # echo $conf #done #是用SCP採集1,WGET採集0 is_scp=1 #scp www@121.9.245.117:/data/resin/aclocal.m4 /data/test1.m4 nowtime=`date "+%s"` startTime=$(($nowtime-7200)) #hadoop保存地址 hadoopPath=/usr/hdfs/log_kpi #採集的服務器地址 urlserver=http://service.uwan.com #採集帳號地址 IPserver=######## #採集服務器保存數據地址 getPath=/app/mspsys/msplatform/log/ #日誌保存目錄 LOG=/app/bigdata/collectiondata/log/ #採集後保存的地址 savePath=/app/bigdata/collectiondata/ #檢查LOCK LOCK=.lock if [ -f $LOCK ] then echo 'error!app is run!' exit fi #上鎖 touch $LOCK #判斷地址是否已經建立 if [ -d $savePath ] then echo 'file is Ok!' else mkdir $savePath fi #檢查日誌目錄 if [ -d $LOG ] then echo 'log path is ok!' else mkdir $LOG fi #記錄運行日誌2>&1記錄全部日誌 #exec >> $LOG/gather_`date +%Y%m%d`.log 2>&1 #執行採集 while [ $startTime -le $nowtime ] do #拼湊採集文件的名稱 filename=`date +%Y%m%d%H -d "@$nowtime"`.log echo aaa$filename savefilename=$savePath$filename #判斷文件是否已經存在 if [ -f $savefilename ] then is_repeat=1 else is_repeat=0 fi if [ $is_scp -eq 1 ] then scp $IPserver$getPath$filename $savefilename else url=$urlserver$getPath$filename wget -O $savefilename $url fi #若是文件存在,推送到hadoop服務器 if [ -f $savefilename ] then if [ $is_repeat -eq 1 ] then $HADOOP_HOME/bin/hadoop fs -rm $hadoopPath/$filename fi $HADOOP_HOME/bin/hadoop fs -put $savefilename $hadoopPath/$filename fi nowtime=$(($nowtime-3600)) done #解除鎖 rm -rf $LOCK
從hadoop的管理頁面中能夠查看一下:
http://10.136.2.14:50070/explorer.html#/
有了這個數據後,就能夠編寫mapreduce程序了。我這邊一開始是要用maven的,但好多的jar包找不到,只能是使用了一種比較笨的辦法,將例子中的jar拿過來用。
代碼以下:
package org.conan.myhadoop.mr.kpi; import java.io.IOException; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.IntWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.Mapper; import org.apache.hadoop.mapreduce.Reducer; import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; public class KPIPV { public static class KPIPVMapper extends Mapper<Object, Text, Text, IntWritable>{ private final static IntWritable one = new IntWritable(1); private Text word = new Text(); public void map(Object key, Text value, Context context ) throws IOException, InterruptedException { KPI kpi = KPI.filterPVs(value.toString()); if (kpi.isValid()) { word.set(kpi.getRequest()); context.write(word, one); } } } public static class KPIPVReducer extends Reducer<Text,IntWritable,Text,IntWritable> { private IntWritable result = new IntWritable(); public void reduce(Text key, Iterable<IntWritable> values, Context context ) throws IOException, InterruptedException { int sum = 0; for (IntWritable val : values) { try { sum += Integer.parseInt(String.valueOf(val.get())); } catch (Exception ex) { ex.printStackTrace(); sum += 0; } } result.set(sum); context.write(key, result); } } public void start() throws IOException, ClassNotFoundException, InterruptedException { String input = "hdfs://127.0.0.1:9000/usr/hdfs/log_kpi/"; String output = "hdfs://127.0.0.1:9000/usr/hdfs/outlog"; Configuration conf = new Configuration(); Job job = Job.getInstance(conf, "word count"); job.setJarByClass(KPIPV.class); job.setMapperClass(KPIPVMapper.class); job.setCombinerClass(KPIPVReducer.class); job.setReducerClass(KPIPVReducer.class); job.setOutputKeyClass(Text.class); job.setOutputValueClass(IntWritable.class); FileInputFormat.addInputPath(job, new Path(input)); FileOutputFormat.setOutputPath(job, new Path(output)); System.exit(job.waitForCompletion(true) ? 0 : 1); } }
好了,執行完成後,在對應的目錄中就能夠查看處處理結果了。
接下來準備用這個玩意來分析一下實際的日誌文件,而後把結果生成一個批量入庫的數據文件,批量寫到mysql數據庫中。未完待續。