hadoop的一些記錄

學習hadoop有幾天了,記錄一下心得。html

初期的目標是:java

1:數據採集的方式,以shell腳本爲主,系統配置某一些文件夾,每一個文件夾是一個採集器,這樣的話,一旦發現有對應的文件,那麼就調用shell去進行文件上傳。node

2:map reduce計算,也是以shell爲主。當有數據時,就自動對該數據進行計算,彙總成對應的批量入庫文件。mysql

3:調用批量入庫腳本,將數據批量執行到數據庫中。sql

安裝的過程:shell

先下載hadoop到本地,解壓後。數據庫

設置環境變量:apache

export HADOOP_HOME=/app/data/hadoop/hadoop
export PATH=.:$HADOOP_HOME/bin:$JAVA_HOME/bin:$PATH

設置完成後,就能夠調用hadoop的命令了。bash

而後配置,環境變量,路徑爲:
服務器

etc/hadoop/

我主要配置五個文件:hdfs-site.xml,core-site.xml,hadoop-env.sh,yarn-env.sh,mapred-site.xml

這幾個文件的做用沒有徹底搞清楚:

  

<configuration>
<property>
             <name>dfs.replication</name>
             <value>1</value>
      </property>
</configuration>

core-site.xml
<configuration>
<property>
            <name>fs.default.name</name>
            <value>hdfs://localhost:9000</value>
     </property>
     <property>
             <name>dfs.replication</name>
             <value>1</value>
     </property>
     <property>
              <name>hadoop.tmp.dir</name>
              <value>/app/data/hadoop/hadoop/tmp</value>
     </property>
</configuration>

hadoop-env.sh
export JAVA_HOME=/usr/java/default/
yarn-env.sh
export JAVA_HOME=/usr/java/default/

mapred-site.xml

<configuration>
<property>
     <name>mapred.job.tracker</name>
     <value>localhost:9001</value>
</property>
</configuration>

一個簡單的配置,配置完成後便可啓動服務:

 格式化分佈式文件系統./hadoop namenode -format

 基本上就能夠了。

 

對於數據的採集,這裏是從別人那裏拿了一個shell

#!/bin/bash
################################
#2015/07/15
#做者 董磊
#hadoop數據文件採集
###############################
date
ROOT=`dirname $0`
CONF=./test.conf


#能夠利用讀取配置方法加載多個服數據
#for conf in `cat $CONF`
#do
# echo $conf
#done
#是用SCP採集1,WGET採集0
is_scp=1
#scp www@121.9.245.117:/data/resin/aclocal.m4 /data/test1.m4
nowtime=`date "+%s"`
startTime=$(($nowtime-7200))
#hadoop保存地址
hadoopPath=/usr/hdfs/log_kpi
#採集的服務器地址
urlserver=http://service.uwan.com
#採集帳號地址
IPserver=########
#採集服務器保存數據地址
getPath=/app/mspsys/msplatform/log/
#日誌保存目錄
LOG=/app/bigdata/collectiondata/log/
#採集後保存的地址
savePath=/app/bigdata/collectiondata/
#檢查LOCK
LOCK=.lock
if [ -f $LOCK ]
  then
    echo 'error!app is run!'
  exit
fi
#上鎖
touch $LOCK
#判斷地址是否已經建立
if [ -d $savePath ]
  then
    echo 'file is Ok!'
  else
    mkdir $savePath
  fi
#檢查日誌目錄
if [ -d $LOG ]
  then
    echo 'log path is ok!'
  else
    mkdir $LOG
  fi
#記錄運行日誌2>&1記錄全部日誌
#exec >> $LOG/gather_`date +%Y%m%d`.log 2>&1
#執行採集
while [ $startTime -le $nowtime ]
  do
   #拼湊採集文件的名稱
    filename=`date +%Y%m%d%H -d "@$nowtime"`.log
    echo aaa$filename
    savefilename=$savePath$filename
    #判斷文件是否已經存在
    if [ -f $savefilename ]
      then
       is_repeat=1
      else
       is_repeat=0
   fi
if [ $is_scp -eq 1 ]
   then
     scp $IPserver$getPath$filename $savefilename
   else
     url=$urlserver$getPath$filename
     wget -O $savefilename $url
   fi
#若是文件存在,推送到hadoop服務器
if [ -f $savefilename ]
   then
    if [ $is_repeat -eq 1 ]
    then
      $HADOOP_HOME/bin/hadoop fs -rm $hadoopPath/$filename
    fi
      $HADOOP_HOME/bin/hadoop fs -put $savefilename $hadoopPath/$filename
fi
nowtime=$(($nowtime-3600))
done
#解除鎖
rm -rf $LOCK

   從hadoop的管理頁面中能夠查看一下: 

http://10.136.2.14:50070/explorer.html#/

有了這個數據後,就能夠編寫mapreduce程序了。我這邊一開始是要用maven的,但好多的jar包找不到,只能是使用了一種比較笨的辦法,將例子中的jar拿過來用。

代碼以下:

package org.conan.myhadoop.mr.kpi;

import java.io.IOException;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;


public class KPIPV {

	  public static class KPIPVMapper 
	       extends Mapper<Object, Text, Text, IntWritable>{
	    
	    private final static IntWritable one = new IntWritable(1);
	    private Text word = new Text();
	      
	    public void map(Object key, Text value, Context context
	                    ) throws IOException, InterruptedException {
	      
			KPI kpi = KPI.filterPVs(value.toString());
			if (kpi.isValid()) {
				word.set(kpi.getRequest());
				context.write(word, one);
			}
	    }
	  }
	  
	  public static class KPIPVReducer 
	       extends Reducer<Text,IntWritable,Text,IntWritable> {
	    private IntWritable result = new IntWritable();

	    public void reduce(Text key, Iterable<IntWritable> values, 
	                       Context context
	                       ) throws IOException, InterruptedException {
	    	int sum = 0;
			
	      for (IntWritable val : values) {
	    		try {
					sum += Integer.parseInt(String.valueOf(val.get()));
				} catch (Exception ex) {
					ex.printStackTrace();
					sum += 0;
				}

	      }
	      result.set(sum);
	      context.write(key, result); 
	    }
	  }


	public void start() throws IOException, ClassNotFoundException, InterruptedException {
		String input = "hdfs://127.0.0.1:9000/usr/hdfs/log_kpi/";
		String output = "hdfs://127.0.0.1:9000/usr/hdfs/outlog";

	    Configuration conf = new Configuration();

	    Job job = Job.getInstance(conf, "word count");
	    job.setJarByClass(KPIPV.class);
	    
	    job.setMapperClass(KPIPVMapper.class);
	    job.setCombinerClass(KPIPVReducer.class);
	    job.setReducerClass(KPIPVReducer.class);
	    job.setOutputKeyClass(Text.class);
	    job.setOutputValueClass(IntWritable.class);
	    FileInputFormat.addInputPath(job, new Path(input));
	    FileOutputFormat.setOutputPath(job,
	      new Path(output));
	    System.exit(job.waitForCompletion(true) ? 0 : 1);
		
	}

}

好了,執行完成後,在對應的目錄中就能夠查看處處理結果了。 

接下來準備用這個玩意來分析一下實際的日誌文件,而後把結果生成一個批量入庫的數據文件,批量寫到mysql數據庫中。未完待續。

相關文章
相關標籤/搜索