主流大數據技術全體系參數與搭建與後臺代碼工程框架的編寫(百分之70)

以前查閱源碼啊,性能測試啊調優啊。。基本告一段落,項目也接近尾聲,那麼整理下spark全部配置參數與優化策略,方便之後開發與配置:javascript

Spark安裝配置與代碼框架php

 

 spark-default.conf 配置css

spark.executor.instance 參數,向Yarn申請建立的資源池實例數html

 

spark.executor.cores 參數,每一個container中所包含的core數量java

 

spark.executor.memory 參數,每一個資源池所具備的內存數node

 

spark.dirver.memory 參數,driver端所佔用的資源數mysql

 

spark.storage.memoryFraction 參數,用於cache數據與計算數據集的內存使用比例linux

 

spark.kryoserializer.buffer.max 參數,序列化最大值,默認64Mnginx

 

spark.shuffle.consolidateFiles 參數,shuffle是否合併文件c++

 

spark.rdd.compress 參數,rdd是否進行壓縮

 

spark.sql.shuffle.partitions 參數,shuffle過程當中所建立的partition個數

 

spark.reducer.maxSizeInFlight 參數,設置shuffle read task的buffer緩衝大小,它將決定每次數據從遠程的executors中拉取大小。這個拉取過程是由5個並行的request,從不一樣的executor中拉取過來,從而提高了fetch的效率。

 

spark.shuffle.io.retryWait 參數,每次拉取數據的等待間隔

 

spark.shuffle.manage 參數,使用hash,同時與參數spark.shuffle.consolidateFiles true並用。由於不須要對中間結果進行排序,同時合併中間文件的個數,從而減小打開文件的性能消耗,在spark2.0.2中不可直接配置hash,會報錯,其餘優化參數包括:Sort Shuffle、Tungsten Sort,這裏咱們要根據數據量進行選擇,優缺點請參考本博客《Spark Shuffle詳細過程》

 

spark.executor.heartbeatInterval 參數,與driver的通信間隔,使driver知道executor有木有掛

 

spark.driver.maxResultSize 參數,全部分區的序列化結果的總大小限制

 

spark.yarn.am.cores 參數,在yarn-client模式下,申請Yarn App Master所用的CPU核數

 

spark.master 參數,選用的模式

 

spark.task.maxFailures 參數,task失敗多少次後丟棄job(防止由於網絡IO等問題失敗,從新拉取)

 

spark.shuffle.file.buffer 參數,會增大Map任務的寫磁盤前的cache緩存

 

spark-env.sh 配置

export HADOOP_CONF_DIR 參數,配置hadoop所在配置文件路徑

export HADOOP_HOME 參數,配置hadoop Client的所在路徑

export JAVA_HOME 參數,配置JAVA的環境變量地址

export SPARK_YARN_APP_NAME 參數,配置application的名稱

export SPARK_LOG_DIR 參數,配置Spark log的輸出路徑

export SPARK_PID_DIR 參數,配置spark的Pid輸出路徑

 

 將hive-site.xml文件放入spark的conf下 修改spark thrift port,使其與hive的thrift的port分離開來,同時配置mysql的數據源,由於hive的meta信息存在mysql中,以及配置meta指定的hdfs路徑:

<property>
  <name>hive.server2.thrift.port</name>
  <value>10000</value>
  <description>Port number of HiveServer2 Thrift interface.
  Can be overridden by setting $HIVE_SERVER2_THRIFT_PORT</description>
</property>

 

<property>
  <name>hive.metastore.warehouse.dir</name>
  <value>/user/hive/warehouse</value>
</property>

 

<property>
  <name>javax.jdo.option.ConnectionDriverName</name>
  <value>com.mysql.jdbc.Driver</value>
  <description>Driver class name for a JDBC metastore</description>
</property>

 

<property>
  <name>javax.jdo.option.ConnectionUserName</name>
  <value>root</value>
  <description>username to use against metastore database</description>
</property>

 

<property>
  <name>javax.jdo.option.ConnectionPassword</name>
  <value>root</value>
  <description>password to use against metastore database</description>
</property>

 

Spark動態資源分配測試:

spark.dynamicAllocation.cachedExecutorIdleTimeout 360000000 若是executor中有數據則不移除

spark.dynamicAllocation.executorIdleTimeout 60s executor空閒時間達到規定值,則將該executor移除

spark.dynamicAllocation.initialExecutors 3 若是全部的executor都移除了,從新請求時啓動的初始executor數

spark.dynamicAllocation.maxExecutors 30 可以啓動的最大executor數目

spark.dynamicAllocation.minExecutors 1 可以啓動的最小executor數目

spark.dynamicAllocation.schedulerBacklogTimeout 1s task等待運行時間超過該值後開始啓動executor

spark.dynamicAllocation.enabled True 開啓動態參數配置

spark.dynamicAllocation.sustainedSchedulerBacklogTimeout 1s 啓動executor的時間間隔

 

啓動腳本:

/usr/local/spark-1.6.1/sbin/start-thriftserver.sh \

--conf spark.shuffle.service.enabled=true \

--conf spark.dynamicAllocation.enabled=true \

--conf spark.dynamicAllocation.minExecutors=2 \

--conf spark.dynamicAllocation.maxExecutors=30 \

--conf spark.dynamicAllocation.sustainedSchedulerBacklogTimeout = 5s \

--conf spark.dynamicAllocation.schedulerBacklogTimeout=1s \

--conf spark.dynamicAllocation.initialExecutors=2 \

--conf spark.dynamicAllocation.executorIdleTimeout=60s \

--conf spark.dynamicAllocation.cachedExecutorIdleTimeout=360000000s \

--conf spark.driver.memory=50g

 

代碼框架:

首先,咱們引入須要依賴的包括hadoop、spark、hbase等jar包,pom.xml配置以下:

<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd">
  <modelVersion>4.0.0</modelVersion>
  <groupId>sparkApp</groupId>
  <artifactId>sparkApp</artifactId>
  <version>1.0-SNAPSHOT</version>
  <inceptionYear>2008</inceptionYear>
  <properties>
      <scala.version>2.10.0</scala.version>
      <spring.version>4.0.2.RELEASE</spring.version>
      <hadoop.version>2.6.0</hadoop.version>
      <jedis.version>2.8.1</jedis.version>
  </properties>

  <repositories>
    <repository>
      <id>scala-tools.org</id>
      <name>Scala-Tools Maven2 Repository</name>
      <url>http://scala-tools.org/repo-releases</url>
    </repository>
  </repositories>

  <pluginRepositories>
    <pluginRepository>
      <id>scala-tools.org</id>
      <name>Scala-Tools Maven2 Repository</name>
      <url>http://scala-tools.org/repo-releases</url>
    </pluginRepository>
  </pluginRepositories>

  <dependencies>
    <dependency>
      <groupId>org.scala-lang</groupId>
      <artifactId>scala-library</artifactId>
      <version>${scala.version}</version>
    </dependency>
    <dependency>
      <groupId>junit</groupId>
      <artifactId>junit</artifactId>
      <version>4.4</version>
      <scope>test</scope>
    </dependency>

      <!-- SPARK START -->
      <dependency>
          <groupId>org.apache.spark</groupId>
          <artifactId>spark-core_2.10</artifactId>
          <version>1.6.1</version>
      </dependency>

      <!--<dependency>
          <groupId>org.apache.spark</groupId>
          <artifactId>spark-streaming-kafka_2.10</artifactId>
          <version>1.6.1</version>
      </dependency>-->

      <dependency>
          <groupId>org.apache.spark</groupId>
          <artifactId>spark-sql_2.10</artifactId>
          <version>1.6.1</version>
      </dependency>

      <dependency>
          <groupId>org.apache.spark</groupId>
          <artifactId>spark-mllib_2.10</artifactId>
          <version>1.6.1</version>
      </dependency>

      <dependency>
          <groupId>org.apache.spark</groupId>
          <artifactId>spark-hive_2.10</artifactId>
          <version>1.6.1</version>
      </dependency>

      <!-- SPARK END -->

      <!-- HADOOP START -->
      <dependency>
          <groupId>org.apache.hadoop</groupId>
          <artifactId>hadoop-hdfs</artifactId>
          <version>${hadoop.version}</version>
          <exclusions>
              <exclusion>
                  <groupId>commons-logging</groupId>
                  <artifactId>commons-logging</artifactId>
              </exclusion>
              <exclusion>
                  <groupId>log4j</groupId>
                  <artifactId>log4j</artifactId>
              </exclusion>
              <exclusion>
                  <groupId>org.slf4j</groupId>
                  <artifactId>slf4j-log4j12</artifactId>
              </exclusion>
              <exclusion>
                  <groupId>org.slf4j</groupId>
                  <artifactId>slf4j-api</artifactId>
              </exclusion>
              <exclusion>
                  <groupId>javax.servlet.jsp</groupId>
                  <artifactId>jsp-api</artifactId>
              </exclusion>
          </exclusions>
      </dependency>

      <dependency>
          <groupId>org.apache.hadoop</groupId>
          <artifactId>hadoop-common</artifactId>
          <version>${hadoop.version}</version>
      </dependency>

      <!-- HADOOP END -->


      <!-- hbase START  -->
      <dependency>
          <groupId>org.apache.hbase</groupId>
          <artifactId>hbase-server</artifactId>
          <version>1.0.2</version>
      </dependency>

      <dependency>
          <groupId>org.apache.htrace</groupId>
          <artifactId>htrace-core</artifactId>
          <version>3.1.0-incubating</version>
      </dependency>

      <dependency>
          <groupId>org.apache.hbase</groupId>
          <artifactId>hbase-client</artifactId>
          <version>1.0.2</version>
      </dependency>

      <dependency>
          <groupId>org.apache.hbase</groupId>
          <artifactId>hbase-common</artifactId>
          <version>1.0.2</version>
      </dependency>

      <dependency>
          <groupId>org.apache.hbase</groupId>
          <artifactId>hbase-protocol</artifactId>
          <version>1.0.2</version>
      </dependency>

      <!-- hbase END  -->

      <dependency>
          <groupId>com.google.guava</groupId>
          <artifactId>guava</artifactId>
          <version>11.0.2</version>
      </dependency>

      <dependency>
          <groupId>org.slf4j</groupId>
          <artifactId>slf4j-api</artifactId>
          <version>1.7.12</version>
      </dependency>


      <!-- REDIS START -->
      <dependency>
          <groupId>redis.clients</groupId>
          <artifactId>jedis</artifactId>
          <version>${jedis.version}</version>
      </dependency>
      <!-- REDIS END -->

      <dependency>
          <groupId>org.apache.commons</groupId>
          <artifactId>commons-lang3</artifactId>
          <version>3.1</version>
      </dependency>

      <dependency>
          <groupId>org.apache.commons</groupId>
          <artifactId>commons-pool2</artifactId>
          <version>2.3</version>
      </dependency>

      <dependency>
          <groupId>mysql</groupId>
          <artifactId>mysql-connector-java</artifactId>
          <version>5.1.26</version>
      </dependency>

      <!--<dependency>
          <groupId>org.datanucleus</groupId>
          <artifactId>datanucleus-api-jdo</artifactId>
          <version>3.2.6</version>
      </dependency>

      <dependency>
          <groupId>org.datanucleus</groupId>
          <artifactId>datanucleus-core</artifactId>
          <version>3.2.1</version>
      </dependency>

      <dependency>
          <groupId>org.datanucleus</groupId>
          <artifactId>datanucleus-rdbms</artifactId>
          <version>3.2.9</version>
      </dependency>-->

      <dependency>
          <groupId>io.netty</groupId>
          <artifactId>netty-all</artifactId>
          <version>5.0.0.Alpha1</version>
      </dependency>

     <!-- <dependency>
          <groupId>org.apache.hive</groupId>
          <artifactId>hive-service</artifactId>
          <version>1.2.1</version>
      </dependency>-->

      <dependency>
          <groupId>org.apache.hive</groupId>
          <artifactId>hive-jdbc</artifactId>
          <version>2.1.0</version>
      </dependency>

      <dependency>
          <groupId>org.apache.hive</groupId>
          <artifactId>hive-service</artifactId>
          <version>1.2.1</version>
      </dependency>

  </dependencies>

  <build>
<!--    <sourceDirectory>src/main/scala</sourceDirectory>
    <testSourceDirectory>src/test/scala</testSourceDirectory>-->
    <plugins>
      <plugin>
        <groupId>org.scala-tools</groupId>
        <artifactId>maven-scala-plugin</artifactId>
        <executions>
          <execution>
            <goals>
              <goal>compile</goal>
              <goal>testCompile</goal>
            </goals>
          </execution>
        </executions>
        <configuration>
          <scalaVersion>${scala.version}</scalaVersion>
          <args>
            <arg>-target:jvm-1.5</arg>
          </args>
        </configuration>
      </plugin>
      <plugin>
        <groupId>org.apache.maven.plugins</groupId>
        <artifactId>maven-eclipse-plugin</artifactId>
        <configuration>
          <downloadSources>true</downloadSources>
          <buildcommands>
            <buildcommand>ch.epfl.lamp.sdt.core.scalabuilder</buildcommand>
          </buildcommands>
          <additionalProjectnatures>
            <projectnature>ch.epfl.lamp.sdt.core.scalanature</projectnature>
          </additionalProjectnatures>
          <classpathContainers>
            <classpathContainer>org.eclipse.jdt.launching.JRE_CONTAINER</classpathContainer>
            <classpathContainer>ch.epfl.lamp.sdt.launching.SCALA_CONTAINER</classpathContainer>
          </classpathContainers>
        </configuration>
      </plugin>
    </plugins>
  </build>
  <reporting>
    <plugins>
      <plugin>
        <groupId>org.scala-tools</groupId>
        <artifactId>maven-scala-plugin</artifactId>
        <configuration>
          <scalaVersion>${scala.version}</scalaVersion>
        </configuration>
      </plugin>
    </plugins>
  </reporting>
</project>

而後將集羣中hive-site.xml、hdfs-site.xml、hbase-site.xml引入項目中。

編寫HBase公共方法(部分代碼):

  1 package hbase
  2 
  3 import java.util.{Calendar, Date}
  4 
  5 import org.apache.hadoop.hbase.HBaseConfiguration
  6 import org.apache.hadoop.hbase.client.{Result, Scan}
  7 import org.apache.hadoop.hbase.filter._
  8 import org.apache.hadoop.hbase.io.ImmutableBytesWritable
  9 import org.apache.hadoop.hbase.mapreduce.TableInputFormat
 10 import org.apache.hadoop.hbase.protobuf.ProtobufUtil
 11 import org.apache.hadoop.hbase.util.{Base64, Bytes}
 12 import org.apache.spark.{Logging, SparkContext}
 13 import org.apache.spark.rdd.RDD
 14 import org.slf4j.{Logger, LoggerFactory}
 15 
 16 /**
 17   * Created by ysy on 2016/11/6.
 18   */
 19 object HBaseTableHelper extends Serializable {
 20 
 21   val logger: Logger = LoggerFactory.getLogger(HBaseTableHelper.getClass)
 22 
 23   //根據timestramp過濾加載Hbase數據
 24   def tableInitByTime(sc : SparkContext,tablename:String,columns :String,fromdate: Date,todate:Date):RDD[(ImmutableBytesWritable,Result)] = {
 25     val configuration = HBaseConfiguration.create()
 26     configuration.set(TableInputFormat.INPUT_TABLE, tablename)
 27 
 28     val scan = new Scan
 29     scan.setTimeRange(fromdate.getTime,todate.getTime)
 30     val column = columns.split(",")
 31     for(columnName <- column){
 32       scan.addColumn("f1".getBytes, columnName.getBytes)
 33     }
 34     configuration.set(TableInputFormat.SCAN, convertScanToString(scan))
 35     val hbaseRDD = sc.newAPIHadoopRDD(configuration, classOf[TableInputFormat], classOf[org.apache.hadoop.hbase.io.ImmutableBytesWritable], classOf[org.apache.hadoop.hbase.client.Result])
 36     logger.info("-------count-------" + hbaseRDD.count() + "------------------")
 37     hbaseRDD
 38   }
 39 
 40   def convertScanToString(scan : Scan) = {
 41     val proto = ProtobufUtil.toScan(scan)
 42     Base64.encodeBytes(proto.toByteArray)
 43   }
 44 
 45   //根據時間條件filter數據
 46   def tableInitByFilter(sc : SparkContext,tablename : String,columns : String,time : String) : RDD[(ImmutableBytesWritable,Result)] = {
 47     val configuration = HBaseConfiguration.create()
 48     configuration.set(TableInputFormat.INPUT_TABLE,tablename)
 49     val filter: Filter = new RowFilter(CompareFilter.CompareOp.GREATER_OR_EQUAL, new SubstringComparator(time))
 50     val scan = new Scan
 51     scan.setFilter(filter)
 52     val column = columns.split(",")
 53     for(columnName <- column){
 54       scan.addColumn("f1".getBytes, columnName.getBytes)
 55     }
 56     val hbaseRDD = sc.newAPIHadoopRDD(configuration, classOf[TableInputFormat], classOf[org.apache.hadoop.hbase.io.ImmutableBytesWritable], classOf[org.apache.hadoop.hbase.client.Result])
 57     logger.info("-------count-------" + hbaseRDD.count() + "------------------")
 58     hbaseRDD
 59   }
 60 
 61   def HBaseTableInit(): Unit ={
 62 
 63   }
 64 
 65   def hbaseToHiveTable(): Unit ={
 66 
 67   }
 68 
 69   //前N天的時間戳獲取
 70   def getPassDays(beforeDay : Int): Date ={
 71     val calendar = Calendar.getInstance()
 72     var year = calendar.get(Calendar.YEAR)
 73     var dayOfYear  = calendar.get(Calendar.DAY_OF_YEAR)
 74     var j = 0
 75     for(i <- 0 to beforeDay){
 76       calendar.set(Calendar.DAY_OF_YEAR, dayOfYear - j);
 77       if (calendar.get(Calendar.YEAR) < year) {
 78         //跨年了
 79         j = 1;
 80         //更新 標記年
 81         year = year + 1;
 82         //重置日曆
 83         calendar.set(year,Calendar.DECEMBER,31);
 84         //從新獲取dayOfYear
 85         dayOfYear = calendar.get(Calendar.DAY_OF_YEAR);
 86       }else{
 87         j = j + 1
 88       }
 89     }
 90     calendar.getTime()
 91   }
 92 
 93   //根據startRow與endRow進行過濾
 94   def scanHbaseByStartAndEndRow(sc : SparkContext,startRow : String,stopRow : String,tableName : String) : RDD[(ImmutableBytesWritable,Result)] ={
 95     val configuration = HBaseConfiguration.create()
 96     val scan = new Scan()
 97     scan.setCacheBlocks(false)
 98     scan.setStartRow(Bytes.toBytes(startRow))
 99     scan.setStopRow(Bytes.toBytes(stopRow))
100     val filterList = new FilterList()
101     filterList.addFilter(new KeyOnlyFilter())
102     filterList.addFilter(new InclusiveStopFilter(Bytes.toBytes(stopRow)))
103     scan.setFilter(filterList)
104     configuration.set(TableInputFormat.INPUT_TABLE,tableName)
105     configuration.set(TableInputFormat.SCAN, convertScanToString(scan))
106     val hbaseRDD = sc.newAPIHadoopRDD(configuration, classOf[TableInputFormat], classOf[org.apache.hadoop.hbase.io.ImmutableBytesWritable], classOf[org.apache.hadoop.hbase.client.Result])
107     logger.info("-------ScanHbaseCount-------" + hbaseRDD.count() + "------------------")
108     hbaseRDD
109   }
110 
111 }

 編寫hive公共方法(部分代碼):

 1 package hive
 2 
 3 import org.apache.spark.{Logging, SparkContext}
 4 import org.apache.spark.rdd.RDD
 5 import org.apache.spark.sql.Row
 6 import org.apache.spark.sql.hive.HiveContext
 7 import org.apache.spark.sql.types.{StringType, StructField, StructType}
 8 
 9 /**
10   * Created by uatcaiwy on 2016/11/6.
11   */
12 object HiveTableHelper extends Logging {
13 
14     def hiveTableInit(sc:SparkContext): HiveContext ={
15       val sqlContext = new HiveContext(sc)
16       sqlContext
17     }
18 
19     def writePartitionTable(HCtx:HiveContext,inputRdd:RDD[Row],tabName:String,colNames:String):Unit ={
20       val schema = StructType(
21         colNames.split(" ").map(fieldName => StructField(fieldName,StringType,true))
22       )
23       val table = colNames.replace(" dt","").split(" ").map(name => name + " String").toList.toString().replace("List(","").replace(")","")
24       val df = HCtx.createDataFrame(inputRdd,schema)
25       df.show(20)
26       logInfo("----------------------------------begin write table-----------------------------------")
27       val temptb = "temp" + tabName
28       HCtx.sql("drop table if exists " + tabName)
29       df.registerTempTable(temptb)
30       HCtx.sql("CREATE EXTERNAL TABLE if not exists " + tabName +" ("+ table+ ") PARTITIONED BY (`dt` string) ROW FORMAT DELIMITED FIELDS TERMINATED BY '|' STORED AS INPUTFORMAT  'org.apache.hadoop.mapred.SequenceFileIn      putFormat' OUTPUTFORMAT 'org.apache.hadoop.hive.ql.io.HiveSequenceFileOutputFormat'")
31       HCtx.sql("set hive.exec.dynamic.partition.mode = nonstrict")
32       HCtx.sql("insert overwrite table " + tabName + " partition(`dt`)" + " select * from " + temptb)
33     }
34 }

讀取hdfs文件,有時咱們須要根據文件的編碼來讀取,不然會亂碼,並改變編碼公共方法:

 1 package importSplitFiletoHive
 2 
 3 import org.apache.hadoop.io.{LongWritable, Text}
 4 import org.apache.hadoop.mapred.TextInputFormat
 5 import org.apache.spark.SparkContext
 6 import org.apache.spark.rdd.RDD
 7 
 8 /**
 9   * Created by ysy on 2016/12/7.
10   */
11 object changeEncode {
12 
13   def changeFileEncoding(sc:SparkContext,path:String,encode : String):RDD[String]={
14     sc.hadoopFile(path,classOf[TextInputFormat],classOf[LongWritable],classOf[Text],1)
15       .map(p => new String(p._2.getBytes,0,p._2.getLength,encode))
16   }

 spark進行xml解析(部分代碼):

 1 import hive.HiveTableHelper
 2 import org.apache.spark.Logging
 3 import org.apache.spark.rdd.RDD
 4 import org.apache.spark.sql.Row
 5 import org.apache.spark.sql.hive.HiveContext
 6 import org.apache.spark.sql.types.{StringType, StructField, StructType}
 7 import org.slf4j.LoggerFactory
 8 
 9 import scala.xml._
10 
11 object xmlParse extends Logging{
12   val schemaString = "column1,column2...."
13   def getPBOC_V1_F1(HCtx:HiveContext,rdd:RDD[String],outputTablename:String):Unit = {
14     val tbrdd = rdd.filter(_.split("\t").length == 9)
15       .filter(_.split("\t")(8) != "RESULTS")
16       .map(data => {
17         val sp = data.split("\t")
18         val dt = sp(5).substring(0, 10).replaceAll("-", "")
19         ((sp(0), sp(1), sp(2), sp(3), sp(4), sp(5), sp(6), "RN"), sp(8), dt)
20       }).filter(_._2 != "")
21       .filter(_._2.split("<").length > 2)
22       .filter(data => !(data._2.indexOf("SingleQueryResultMessage0009") == -1 || data._2.indexOf("ReportMessage") == -1))
23       .map(data => {
24         val xml = if (XML.loadString(data._2) != null) XML.loadString(data._2) else null
25         logDebug("%%%%%%%%%%%%%%%%%%finding xml-1:" + xml + "%%%%%%%%%%%%%%%%%%")
26         val column1 = if ((xml \ "PBOC" \ "TYPE") != null) (xml \ "PBOC" \ "TYPE").text else "null"
27         val column2 = if ((xml \ "HEAD" \ "VER") != null) (xml \ "HEAD" \ "VER").text else "null"
28         val column3 = if ((xml \ "HEAD" \ "SRC") != null) (xml \ "HEAD" \ "SRC").text else "null"
29         val column4 = if ((xml \ "HEAD" \ "DES") != null) (xml \ "HEAD" \ "DES").text else "null"
30         ....
31         (data._1,column1,column2,column3...)
32       })
33  ROW(....)
34     HiveTableHelper.writePartitionTable(HCtx, tbrdd, outputTablename, schemaString)

Redis編碼公共方法(部分代碼):

 1 package redis
 2 
 3 import org.slf4j.{Logger, LoggerFactory}
 4 import redis.clients.jedis.{Jedis, JedisPool, JedisPoolConfig}
 5 
 6 import scala.collection.mutable.ArrayBuffer
 7 import scala.util.Random
 8 
 9 /**
10   * Created by ysy on 2016/11/21.
11   */
12 object RedisClient extends Serializable{
13   val logger: Logger = LoggerFactory.getLogger(RedisClient.getClass)
14   @transient private var jedisPool : JedisPool = null
15     private var jedisPoolList  = new ArrayBuffer[JedisPool]
16     private val poolSize = 0
17     makePool
18 
19   def makePool() ={
20       val pc = new PropConfig("redis.properties");
21     if(jedisPool == null){
22       val poolConfig : JedisPoolConfig = new JedisPoolConfig
23       poolConfig.setMaxIdle(pc.getProperty("redis.pool.maxActive").toInt)
24       poolConfig.setMaxTotal(pc.getProperty("redis.pool.maxActive").toInt)
25       poolConfig.setMaxWaitMillis(pc.getProperty("redis.pool.maxWait").toInt)
26       poolConfig.setTestOnBorrow(pc.getProperty("redis.pool.testOnBorrow").toBoolean)
27       poolConfig.setTestOnReturn(pc.getProperty("redis.pool.testOnReturn").toBoolean)
28       val hosts = pc.getProperty("redis.pool.servers").split(",")
29         .map(data => data.split(":"))
30       for(host <- hosts){
31         jedisPool = new JedisPool(poolConfig,host(0),host(1).toInt,pc.getProperty("redis.server.timeout").toInt)
32         jedisPoolList += jedisPool
33       }
34     }
35 
36   }
37 
38 
39   def getForString(key : String) : String = {
40       var value = ""
41     if(key != null && !key.isEmpty()){
42       val jedis = getJedis
43       value = jedis.get(key)
44     }
45     value
46   }
47 
48   def setForString(key : String,value : String) ={
49     if(key != null && !key.isEmpty){
50       var jedis = getJedis
51        jedis.set(key,value)
52     }else{
53 
54     }
55   }
56 
57   def zexsist(key : String) : Boolean ={
58       var flag = false
59       if(key != null && !key.isEmpty){
60         val jedis = getJedis
61 
62         val resultNum = jedis.zcard(key)
63         if("0".equals(resultNum.toLong)){
64             flag = true
65         }
66       }
67     flag
68   }
69 
70   def getJedis() : Jedis ={
71       var ramNub = 0
72       if(poolSize == 1){
73         ramNub = 0
74       }else{
75         val random = new Random
76         ramNub = Math.abs(random.nextInt % 1)
77       }
78     jedisPool = jedisPoolList(ramNub)
79     jedisPool.getResource
80   }
81 
82   def returnJedisResource(redis : Jedis): Unit ={
83     if(redis != null){
84       redis.close()
85     }
86   }
87 
88   def close: Unit = {
89       for(jedisPool <- jedisPoolList){
90           jedisPool.close
91       }
92     if(jedisPool!=null && !jedisPool.isClosed()){
93       jedisPool.close
94     }else{
95       jedisPool=null
96     }
97   }
98 
99 }

 詳細就不寫了,那麼完整的工程框架搭建完畢:

隨後經過main方法建立sparkContext對象,開始數據分析與處理,在spark路徑的bin目錄下或者寫成腳本文件執行:

./spark-submit --conf spark.ui.port=5566 --name "sparkApp" --master yarn-client --num-executors 3 --executor-cores 2 --executor-memory 10g --class impl.spark /usr/local/spark1.6.1/sparkApp/sparkApp.jar

(注意:這裏的配置參數會覆蓋spark-default.conf中配置的變量,從新聲明spark.ui.port的緣由也是由於在同時啓動spark的thrfit的時候,提交submit會形成UI佔用的問題,至此spark完結)

 

Hadoop安裝配置與MapReduce代碼框架

安裝:

yum install gcc 

yum install gcc-c++

yum install make

yum install autoconfautomake libtool cmake

yum install ncurses-devel

yum install openssl-devel

安裝protoc(需用root用戶)

 tar -xvf protobuf-2.5.0.tar.bz2 

cd protobuf-2.5.0 

 ./configure --prefix=/opt/protoc/ 

 make && make install

編譯hadoop

mvn clean package -Pdist,native -DskipTests -Dtar 

編譯完的hadoop在 /home/hadoop/ocdc/hadoop-2.6.0-src/hadoop-dist/target 路徑下

配置hosts文件

10.1.245.244 master

10.1.245.243 slave1

命令行輸入 hostname master

免密碼登陸:

執行命令生成密鑰: ssh-keygen -t rsa -P ""

進入文件夾cd  .ssh (進入文件夾後能夠執行ls  -a 查看文件)

將生成的公鑰id_rsa.pub 內容追加到authorized_keys(執行命令:cat id_rsa.pub >> authorized_keys)

 

core-site.xml

<configuration>

 <!--指定hdfs的nameservice爲ns1-->

<property>

  <name>fs.defaultFS</name>

  <value>hdfs://master</value>

 </property>

 <property>

  <name>io.file.buffer.size</name>

  <value>131072</value>

 </property>

<!--指定hadoop數據存放目錄-->

 <property>

  <name>hadoop.tmp.dir</name>

  <value>/home/hadoop/ocdc/hadoop-2.6.0/tmp</value>

  <description>Abasefor other temporary directories.</description>

 </property>

 <property>

  <name>hadoop.proxyuser.spark.hosts</name>

  <value>*</value>

 </property>

<property>

  <name>hadoop.proxyuser.spark.groups</name>

  <value>*</value>

 </property>

</configuration>

 

<!--指定zookeeper地址-->

 <property>

   <name>ha.zookeeper.quorum</name>

   <value>h4:2181,h5:2181,h6:2181</value>

 </property>

</configuration>

 

hdfs-site.xml

<configuration>

 <property>

  <name>dfs.namenode.secondary.http-address</name>

  <value>master:9001</value>

 </property>

 

  <property>

   <name>dfs.namenode.name.dir</name>

   <value>/home/hadoop/ocdc/hadoop-2.6.0/name</value>

 </property>

 

 <property>

  <name>dfs.datanode.data.dir</name>

  <value>/home/hadoop/ocdc/hadoop-2.6.0/data</value>

  </property>

 

 <property>

  <name>dfs.replication</name>

  <value>3</value>

 </property>

 

 <property>

  <name>dfs.webhdfs.enabled</name>

  <value>true</value>

 </property>

 

<property>

   <name>dfs.nameservices</name>

   <value>ns1</value>

</property>

 

<!-- ns1下面有兩個NameNode,分別是nn1,nn2 -->

<property>

   <name>dfs.ha.namenodes.ns1</name>

   <value>nn1,nn2</value>

</property>

</configuration>

 

yarn-site.xml

<configuration>

<!-- Site specific YARN configuration properties -->

<!-- 指定nodemanager啓動時加載server的方式爲shuffle server -->

<property>

   <name>yarn.nodemanager.aux-services</name>

   <value>mapreduce_shuffle</value>

  </property>

  <property>

   <name>yarn.nodemanager.aux-services.mapreduce.shuffle.class</name>

   <value>org.apache.hadoop.mapred.ShuffleHandler</value>

  </property>

  <property>

   <name>yarn.resourcemanager.address</name>

   <value>master:8032</value>

  </property>

  <property>

   <name>yarn.resourcemanager.scheduler.address</name>

   <value>master:8030</value>

  </property>

  <property>

   <name>yarn.resourcemanager.resource-tracker.address</name>

   <value>master:8035</value>

  </property>

  <property>

   <name>yarn.resourcemanager.admin.address</name>

   <value>master:8033</value>

  </property>

  <property>

   <name>yarn.resourcemanager.webapp.address</name>

   <value>master:8088</value>

  </property>

<property>

<name>yarn.nodemanager.resource.memory-mb</name>

<value>16384</value>

</property>

<!-- 指定resourcemanager地址 -->

   <property>

       <name>yarn.resourcemanager.hostname</name>

            <value>h3</value>

   </property>

</configuration>

 

mapred-site.xml

<configuration>

<property>

<name>mapreduce.framework.name</name>

   <value>yarn</value>

 </property>

 <property>

  <name>mapreduce.jobhistory.address</name>

  <value>master:10020</value>

 </property>

 <property>

  <name>mapreduce.jobhistory.webapp.address</name>

  <value>master:19888</value>

 </property>

<property>

    <name>yarn.scheduler.maximum-allocation-mb</name>

    <value>16384</value>

  </property>

</configuration>

 

關於MapReduce,1年多前你們都以爲很神祕,其實 就至關於在Map階段或者Reduce階段中,進行數據的處理,也能夠在Map中讀取寫入hbase、redis均可以~其實就至關於在MapReduce中寫業務處理邏輯,代碼以下:

 1 public static class Map extends MapReduceBase implments Mapper<LongWritable,Text,Text,IntWritable>{
 2     //設置常量1,用來造成<word,1>形式的輸出
 3     private fianll static IntWritable one = new IntWritable(1)
 4     private Text word = new Text();
 5 
 6 public void map(LongWritable key,Text value,OutputCollector<Text,output,Reporter reporter) throws IOException{
 7    //hadoop執行map函數時爲是一行一行的讀取數據處理,有多少行,就會執行多少次map函數
 8     String line = value.toString();
 9     //進行單詞的分割,能夠多傳入進行分割的參數
10     StringTokenizer tokenizer = new StringTokenizer(line);
11     //遍歷單詞
12     while(tokenizer.hasMoreTokens()){
13        //往Text中寫入<word,1>
14         word.set(tokenizer.nextToken());
15         output.collect(word,one);
16     }
17     }
18 }
19 //須要注意的是,reduce將相同key值(這裏是word)的value值收集起來,造成<word,list of 1>的形式,再將這些1累加
20 public static class Reduce extends MapReduceBase implements Reducer<Text IntWritable,Text,IntWritable>{
21         public void reduce(Text key,Iterator<IntWritable> values,OutputCollector<Text,IntWritable> output,Reporter reporter) throws IOException{
22     //初始word個數設置
23     int sum = 0;
24     while(values,hasNext()){
25      //單詞個數相加   
26         sum += value.next().get();
27     }
28     output.collect(key,new IntWritbale(sum));
29     }
30 }

 

HBASE安裝配置與hbase代碼框架

因爲我使用的是外置的zookeeper,因此這裏HBASE_MANAGES_ZK設置爲,設置參數:

export JAVA_HOME=/usr/local/yangsy/jdk1.7.0_55

export HBASE_CLASSPATH=/usr/local/hbase-1.0.2/conf

export HBASE_MANAGES_ZK=false

 

hbase-site.xml

<configuration>

//設置將數據寫入hdfs的目錄
<property>
  <name>hbase.rootdir</name>
  <value>hdfs://master:9000/usr/local/hadoop-2.6.0/hbaseData</value>
</property>

//設置hbase的模式爲集羣模式
<property>
  <name>hbase.cluster.distributed</name>
  <value>true</value>
</property>

//設置hbase的master端口地址

<property>
  <name>hbase.master</name>
  <value>hdfs://master:60000</value>
</property>

//HBase Master Web借鑑綁定的默認端口,默認爲0.0.0.0

<property>
  <name>hbase.master.info.port</name>
  <value>60010</value>
</property>

//設置zookeeper的鏈接地址(必須爲基數個)

<property>
  <name>hbase.zookeeper.property.clientPort</name>
  <value>2183</value>
</property>

//zookeeper的節點

<property>
  <name>hbase.zookeeper.quorum</name>
  <value>master,slave1,slave2</value>
</property>

//zookeeper數據地址

<property>
  <name>hbase.zookeeper.property.dataDir</name>
  <value>/usr/local/zookeeper-3.4.6/data</value>
</property>

//zookeeper鏈接超時時間

<property>
  <name>zookeeper.session.timeout</name>

  <value>60000</value>
  </property>

</configuration>

這裏要注意的是,若是選擇外置的zookeeper集羣,則須要將zookeeper的zoo.cfg拷貝至HBase的conf下。在啓動HBase時,將會自動加載該配置文件。

regionServers中配置regionserver節點的地址

 

代碼結構:

  1 package HbaseTest;
  2 
  3 import akka.io.Tcp;
  4 import org.apache.hadoop.conf.Configuration;
  5 import org.apache.hadoop.hbase.*;
  6 import org.apache.hadoop.hbase.client.*;
  7 
  8 import java.util.ArrayList;
  9 import java.util.List;
 10 
 11 /**
 12  * Created by root on 5/30/16.
 13  */
 14 public class HbaseTest {
 15     private Configuration conf;
 16     public void init(){
 17         conf = HBaseConfiguration.create();
 18     }
 19 
 20    public void createTable(){
 21        Connection conn = null;
 22        try{
 23            conn = ConnectionFactory.createConnection(conf);
 24            HBaseAdmin hadmin = (HBaseAdmin)conn.getAdmin();
 25            HTableDescriptor desc = new HTableDescriptor("TableName".valueOf("yangsy"));
 26 
 27            desc.addFamily(new HColumnDescriptor("f1"));
 28            if(hadmin.tableExists("yangsy")){
 29                System.out.println("table is exists!");
 30                System.exit(0);
 31            }else{
 32                hadmin.createTable(desc);
 33                System.out.println("create table success");
 34            }
 35        }catch (Exception e){
 36            e.printStackTrace();
 37        }finally {
 38            {
 39                if(null != conn){
 40                    try{
 41                        conn.close();
 42                    }catch(Exception e){
 43                        e.printStackTrace();
 44                    }
 45                }
 46            }
 47        }
 48    }
 49 
 50     public void query(){
 51         Connection conn = null;
 52         HTable table = null;
 53         ResultScanner scan = null;
 54         try{
 55             conn = ConnectionFactory.createConnection(conf);
 56             table = (HTable)conn.getTable(TableName.valueOf("yangsy"));
 57 
 58             scan = table.getScanner(new Scan());
 59 
 60             for(Result rs : scan){
 61                 System.out.println("rowkey:" + new String(rs.getRow()));
 62 
 63                 for(Cell cell : rs.rawCells()){
 64                     System.out.println("column:" + new String(CellUtil.cloneFamily(cell)));
 65 
 66                     System.out.println("columnQualifier:"+new String(CellUtil.cloneQualifier(cell)));
 67 
 68                     System.out.println("columnValue:" + new String(CellUtil.cloneValue(cell)));
 69 
 70                     System.out.println("----------------------------");
 71                 }
 72             }
 73         }catch(Exception e){
 74             e.printStackTrace();
 75         }finally{
 76             try {
 77                 table.close();
 78                 if(null != conn) {
 79                     conn.close();
 80                 }
 81             }catch (Exception e){
 82                 e.printStackTrace();
 83             }
 84         }
 85     }
 86 
 87     public void queryByRowKey(){
 88         Connection conn = null;
 89         ResultScanner scann = null;
 90         HTable table = null;
 91         try {
 92             conn = ConnectionFactory.createConnection(conf);
 93             table = (HTable)conn.getTable(TableName.valueOf("yangsy"));
 94 
 95             Result rs = table.get(new Get("1445320222118".getBytes()));
 96             System.out.println("yangsy the value of rokey:1445320222118");
 97             for(Cell cell : rs.rawCells()){
 98                 System.out.println("family" + new String(CellUtil.cloneFamily(cell)));
 99                 System.out.println("value:"+new String(CellUtil.cloneValue(cell)));
100             }
101         }catch (Exception e){
102             e.printStackTrace();
103         }finally{
104             if(null != table){
105                 try{
106                     table.close();
107                 }catch (Exception e){
108                     e.printStackTrace();
109                 }
110             }
111         }
112     }
113 
114     public void insertData(){
115         Connection conn = null;
116         HTable hTable = null;
117         try{
118             conn = ConnectionFactory.createConnection(conf);
119             hTable = (HTable)conn.getTable(TableName.valueOf("yangsy"));
120 
121             Put put1 = new Put(String.valueOf("1445320222118").getBytes());
122 
123             put1.addColumn("f1".getBytes(),"Column_1".getBytes(),"123".getBytes());
124             put1.addColumn("f1".getBytes(),"Column_2".getBytes(),"456".getBytes());
125             put1.addColumn("f1".getBytes(),"Column_3".getBytes(),"789".getBytes());
126 
127             Put put2 = new Put(String.valueOf("1445320222119").getBytes());
128 
129             put2.addColumn("f1".getBytes(),"Column_1".getBytes(),"321".getBytes());
130             put2.addColumn("f1".getBytes(),"Column_2".getBytes(),"654".getBytes());
131             put2.addColumn("f1".getBytes(),"Column_3".getBytes(),"987".getBytes());
132 
133             List<Put> puts = new ArrayList<Put>();
134             puts.add(put1);
135             puts.add(put2);
136             hTable.put(puts);
137         }catch(Exception e){
138             e.printStackTrace();
139         }finally{
140             try {
141                 if (null != hTable) {
142                     hTable.close();
143                 }
144             }catch(Exception e){
145                 e.printStackTrace();
146             }
147         }
148     }
149 
150 public static void main(String args[]){
151     HbaseTest test = new HbaseTest();
152     test.init();
153     test.createTable();
154     test.insertData();
155     test.query();
156 }
157 
158 
159 }

 

 

Storm安裝配置與代碼框架

拓撲構造:
編寫topology實體類,在構造方法中加入配置參數,序列化等。經過CommandlLine獲取啓動時的workers與calc數量,最終調用StormSubmitter的submitTopologyWithProgressBar,傳入topo的名稱,配置項,以及TopologyBuilder實例。


數據接收Spout:

Storm從kafka中獲取數據,建立於BasicTopology,其中配置參數:

kafka.brokerZkStr Kafka使用的zookeeper服務器地址

kafka.brokerZkPath 保存offset的zookeeper服務器地址

kafka.offset.zkPort 保存offset的zookeeper端口 默認2181

kafka.offset.zkRoot 保存offset的zookeeper路徑 /kafka-offset

stateUpdateIntervalMs 把offset信息寫入zookeeper的間隔時間 30000

spout、bolt初始化時,構建對象stormBeanFactory,其後使用getBean方法從BeanFactory中獲取對象


Bolt數據處理:

自定義bolt繼承自extendsBaseRichBolt,實現它的prepare、declareOutputFileds方法。在prepare方法中,獲取StormBeanFactory的配置,加載業務實體類。

 

 

Storm配置參數:

 

dev.zookeeper.path : '/tmp/dev-storm-zookeeper'     以dev.zookeeper.path配置的值做爲本地目錄,以storm.zookeeper.port配置的值做爲端口,啓動一個新的zookeeper服務
drpc.childopts: '-Xms768m'  
drpc.invocations.port 3773
drpc.port : 3772
drpc.queue.size : 128
drpc.request.timeout.secs : 600
drpc.worker.threads : 64
java.library.path : ''
logviewer.appender.name : 'A1'
logviewer.childopts : '-Xms128m'
logviewr.port : 8000
metrics.reporter.register : 'org.apache.hadoop.metrics2.sink.storm.StormTimelineMetricsReporter'
nimbus.childopts : '-Xmx1024m -javaagent:/usr/hdp/current/storm-nimbus/contrib/storm-jmxetric/lib/jmxetric-1.0.4.jar=host=localhost,port=8649,wireformat31x-true...'
nimbus.cleanup.inbox.freq.secs : 600
nimbus.file.copy.exiration.secs : 600

storm.yaml

nimbus.host  nimbus的配置地址

nimbus.inbox.jar.expiration.secs   主要負責清理nimbus的inbox文件夾最後一次修改時間,默認3600秒

nimbus.monitor.freq.secs : 120
nimbus.reassign : true  當發現task失敗時nimbus是否從新分配執行。默認爲真,不建議修改
nimbus.supervisor.timeout.secs : 60
nimbus.task.launch.secs : 120
nimbus.task.timeout.secs : 30  心跳超時時間,超時後nimbus會認爲task死掉並重分配給另外一個地址
nimbus.thrift.max_buffer_size : 1048576
nimbus.thrift.port : 6627
nimbus.topology.validator : 'backtype.storm.nimbus.DefaultTopologyValidator'
storm.cluster.metrics.consumer.register : [{"class" : "org.apache.hadoop.metrics2.sink.storm.Strorm.StormTimeLineMetricsReporter"}]
storm.cluster.mode : 'distributed'
storm.local.dir : '/storage/disk1/hadoop/storm'
storm.local.mode.zmq : false
storm.log.dir : '/var/log/storm'
storm.messaging.netty.buffer_size : 5242880  爲每次批量發送的Tuple 序列化以後的Task Message 消息的大小
storm.messaging.netty.client_worker_threads : 10  指定netty服務器工做線程數量 
storm.messaging.netty.max_retries : 60  指定最大重試次數
storm.messaging.netty.max_wait_ms : 2000  指定最大等待時間(毫秒)
storm.messaging.netty.min_wait_ms : 100 指定最小等待時間(毫秒)
storm.messaging.netty.server_worker_threads : 10 指定netty服務器工做線程數量
storm.messaging.transport : 'backtype.storm.messaging.netty.Context' 指定傳輸協議
storm.thrift.transport : 'backtype.storm.security.auth.SimpleTransportPlugin'
storm.zookeeper.connection.timeout : 15000
storm.zookeeper.port : 2181
storm.zookeeper.retry.interval : 1000
storm.zookeeper.retry.intervalceiling.millis : 30000
storm.zookeeper.retry.times : 5
storm.zookeeper.root : '/storm'  ZooKeeper中Storm的根目錄位置
storm.zookeeper.servers : ['',''.....]   zookeeper服務器列表
storm.zookeeper.session.timeout : 20000  客戶端鏈接ZooKeeper超時時間
supervisor.childopts :    在storm-deploy項目中使用,用來配置supervisor守護進程的jvm選項
supervisor.heartbeat.frequency.secs : 5  supervisor心跳發送頻率(多久發送一次)
supervisor.monitor.frequency.secs : 3  supervisor檢查worker心跳的頻率
supervisor.slots.ports : [6700,6701,....]   supervisor上可以運行workers的端口列表.每一個worker佔用一個端口,且每一個端口只運行一個worker.經過這項配置能夠調整每臺機器上運行的worker數.(調整slot數/每機)
supervisor.worker.start.timeout.secs : 120
supervisor.worker.timeout.secs : 30  supervisor中的worker心跳超時時間,一旦超時supervisor會嘗試重啓worker進程.
task.heartbeat.frequency.secs : 3  task彙報狀態心跳時間間隔
task.refresh.poll.secs : 10  ask與其餘tasks之間連接同步的頻率.(若是task被重分配,其餘tasks向它發送消息須要刷新鏈接).通常來說,重分配發生時其餘tasks會理解獲得通知。該配置僅僅爲了防止未通知的狀況。
topology.acker.executors : null
topology.builtin.metrics.bucket.size.secs : 60
topology.debug : false
topology.disruptor.wait.strategy : 'com.lmax.disruptor.BlockingWaitStrategy'
topology.enable.message.timeouts : true
topology.error.throttle.interval.secs : 10
topology.executor.receive.buffer.size : 1024
topology.executor.send.buffer.size : 1024
topology.fall.back.on.java.serialization : true
topology.kryo.factory : 'backtype.storm.serialization.DefaultKryoFactory'
topology.max.error.report.per.interval : 5
topology.max.spout.pending : null  一個spout task中處於pending狀態的最大的tuples數量.該配置應用於單個task,而不是整個spouts或topology.
topology.max.task.parallelism : null  在一個topology中可以容許的最大組件並行度.該項配置主要用在本地模式中測試線程數限制.
topology.message.timeout.secs : 30  topology中spout發送消息的最大處理超時時間.若是一條消息在該時間窗口內未被成功ack,Storm會告知spout這條消息失敗。而部分spout實現了失敗消息重播功能。
topology.metrics.aggregate.metric.evict.secs : 5
topology.metrics.aggregate.per.worker : true
topology.metrics.consumer.register :
topology.metrics.expand.map.type : true
topology.metrics.metric.name.separator : ','
topology.optimize : true
topology.skip.missing.kryo.registrations : false  Storm是否應該跳過它不能識別的kryo序列化方案.若是設置爲否task可能會裝載失敗或者在運行時拋出錯誤.
topology.sleep.spout.wait.strategy.time.ms : 1
topology.spoout.wait.strategy : 'backtype.storm.spout.SleeSpoutWaitStrategy'
topology.state.synchronization.timeout.secs : 60
topology.stats.sample.rate : 0.05
topology.tick.tuple.freq.secs : null
topology.transfer.buffer.size : 1024
topology.trident.batch.emit.interval.millis : 500
topology.tuple.serializer : 'backtype.storm.serialization.types.ListDelegateSerializer'
topology.worker.childopts : null
topology.worker.shared.thread.pool.size : 4
topology.workers : 40  執行該topology集羣中應當啓動的進程數量.每一個進程內部將以線程方式執行必定數目的tasks.topology的組件結合該參數和並行度提示來優化性能
transactional.zookeeper.port : null
transactional.zookeeper.root : '/transactional'
transactional.zookeeper.servers : null
ui.childopts : '-Xmx2048m'
ui.filter : null
ui.port : 8744  Storm UI的服務端口
worker.childopts :
worker.heartbeet.frequency.secs : 1
zmq.hwm : 0
zmq.linger.millis : 5000  當鏈接關閉時,連接嘗試從新發送消息到目標主機的持續時長.這是一個不經常使用的高級選項,基本上能夠忽略.
zmq.threads : 1  每一個worker進程內zeromq通信用到的線程數

 

 

storm代碼框架總結

基類topology,BasicTopology.java

  1 import java.io.UnsupportedEncodingException;
  2 import java.math.BigDecimal;
  3 import java.util.Date;
  4 import java.util.HashMap;
  5 import java.util.List;
  6 import java.util.Map;
  7 
  8 import org.apache.commons.cli.CommandLine;
  9 import org.apache.commons.cli.CommandLineParser;
 10 import org.apache.commons.cli.DefaultParser;
 11 import org.apache.commons.cli.HelpFormatter;
 12 import org.apache.commons.cli.Options;
 13 import org.apache.commons.lang3.StringUtils;
 14 import org.apache.storm.guava.base.Preconditions;
 15 import org.edm.storm.topo.util.DelayKafkaSpout;
 16 import org.edm.storm.topo.util.StormBeanFactory;
 17 import org.joda.time.DateTime;
 18 
 19 import storm.kafka.BrokerHosts;
 20 import storm.kafka.KafkaSpout;
 21 import storm.kafka.SpoutConfig;
 22 import storm.kafka.ZkHosts;
 23 import backtype.storm.Config;
 24 import backtype.storm.LocalCluster;
 25 import backtype.storm.StormSubmitter;
 26 import backtype.storm.topology.TopologyBuilder;
 27 import backtype.storm.tuple.Fields;
 28 
 29 public abstract class BasicTopology {
 30     
 31     public static final String HASH_TAG = "hashTag";
 32     
 33     public static final Fields HASH_FIELDS = new Fields(HASH_TAG);
 34     
 35     protected Options options = new Options();
 36     
 37     protected StormBeanFactory stormBeanFactory;
 38     
 39     protected Config config = new Config();
 40     
 41     protected String configFile;
 42     
 43     public BasicTopology(){
 44         config.setFallBackOnJavaSerialization(false);
 45         
 46         config.setSkipMissingKryoRegistrations(false);
 47         config.registerSerialization(Date.class);
 48         config.registerSerialization(BigDecimal.class);
 49         config.registerSerialization(HashMap.class);
 50         config.registerSerialization(Map.class);
 51 
 52         options.addOption("name", true, "拓撲運行時名稱");
 53         options.addOption("conf", false, "配置文件路徑");
 54         options.addOption("workers", true, "虛擬機數量");
 55     }
 56     
 57     protected void setupConfig(CommandLine cmd) throws UnsupportedEncodingException{
 58         //配置文件名稱
 59         String confLocation = cmd.getOptionValue("conf",getConfigName());
 60         //建立stormBeanFactory
 61         stormBeanFactory = new StormBeanFactory(confLocation);
 62         Map<String,Object> stormConfig = stormBeanFactory.getBean("stormConfig",Map.class);
 63         Preconditions.checkNotNull(stormConfig);
 64         config.putAll(stormConfig);
 65         config.put(StormBeanFactory.SPRING_BEAN_FACTORY_XML, stormBeanFactory.getXml());
 66         //先默認加載,而後再加載命令行
 67         String numWorkers = cmd.getOptionValue("workers");
 68         if(numWorkers != null){
 69             config.setNumWorkers(Integer.parseInt(numWorkers));
 70         }else{
 71             config.setNumWorkers(getNumWorkers());
 72         }
 73         config.put(Config.TOPOLOGY_MESSAGE_TIMEOUT_SECS, 180);
 74         config.put(Config.TOPOLOGY_MAX_SPOUT_PENDING, 2000);
 75         config.put(Config.TOPOLOGY_EXECUTOR_RECEIVE_BUFFER_SIZE, 16384);
 76         config.put(Config.TOPOLOGY_EXECUTOR_SEND_BUFFER_SIZE, 16384);
 77         config.put(Config.TOPOLOGY_ACKER_EXECUTORS, config.get(Config.TOPOLOGY_WORKERS));
 78     }
 79     
 80     @SuppressWarnings("unchecked")
 81     protected SpoutConfig getSpoutConfig(String topic){
 82         String brokerZkStr = (String)config.get("kafka.brokerZkStr");
 83         String brokerZkPath = (String)config.get("kafka.brokerZkPath");
 84         
 85         List<String> zkServers = (List<String>)config.get("kafka.offset.zkServers");
 86         Integer zkPort = Integer.parseInt(String.valueOf(config.get("kafka.offset.zkPort")));
 87         String zkRoot = (String)config.get("kafka.offset.zkRoot");
 88         String id = StringUtils.join(getTopoName(),"-",topic);
 89         BrokerHosts kafkaBrokerZk = new ZkHosts(brokerZkStr, brokerZkPath);
 90         SpoutConfig spoutConfig = new SpoutConfig(kafkaBrokerZk, topic, zkRoot, id);
 91         spoutConfig.zkServers = zkServers;
 92         spoutConfig.zkPort = zkPort;
 93         spoutConfig.zkRoot = zkRoot;
 94         spoutConfig.stateUpdateIntervalMs = 30000;
 95         return spoutConfig;
 96     }
 97     
 98     //建立kafkaspout
 99     public KafkaSpout getKafkaSpout(String topic){
100         SpoutConfig spoutConfig = getSpoutConfig(topic);
101         return new DelayKafkaSpout(spoutConfig);
102     }
103     
104     
105     /**
106      * 拓撲可部署屢次,但從kafka獲取數據,作惟一次過濾等用的
107      * 
108      * @return
109      */
110     public abstract String getTopoName();
111     
112     public abstract String getConfigName();
113     
114     public abstract int getNumWorkers();
115     
116     public void registerKryo(Config config){
117         
118     }
119     
120     public abstract void addOptions(Options options);
121         
122     public abstract void setupOptionValue(CommandLine cmd);
123     
124     public abstract void createTopology(TopologyBuilder builder);
125     
126     public void runLocat(String[] args) throws Exception{
127         CommandLineParser parser = new DefaultParser();
128         CommandLine cmd = parser.parse(options, args);
129         HelpFormatter formatter = new HelpFormatter();
130         formatter.printHelp("topology", options);
131         setupConfig(cmd);
132         
133         config.setDebug(true);
134         config.setNumWorkers(1);
135         TopologyBuilder builder = new TopologyBuilder();
136         createTopology(builder);
137         LocalCluster cluster = new LocalCluster();
138         String topoName = cmd.getOptionValue("name",
139                 StringUtils.join(getTopoName(), "-", new DateTime().toString("yyyyMMdd-HHmmss")));
140         cluster.submitTopology(topoName,config,builder.createTopology());
141         
142     }
143     
144     public void run(String args[]) throws Exception{
145         CommandLineParser parser = new DefaultParser();
146         CommandLine cmd = parser.parse(options, args);
147         HelpFormatter formatter = new HelpFormatter();
148         formatter.printHelp("topology", options);
149         setupConfig(cmd);
150         setupOptionValue(cmd);
151         
152         TopologyBuilder builder = new TopologyBuilder();
153         createTopology(builder);
154         String topoName = cmd.getOptionValue("name",StringUtils.join(getTopoName(),new DateTime().toString("yyyyMMdd-HHmmss")));
155         StormSubmitter.submitTopologyWithProgressBar(topoName, config, builder.createTopology());
156     }
157 }

 

 用於實現數據流spout,bolt的業務實現Topology:

 1 import org.apache.commons.cli.CommandLine;
 2 import org.apache.commons.cli.Options;
 3 
 4 import storm.kafka.KafkaSpout;
 5 
 6 import backtype.storm.topology.TopologyBuilder;
 7 
 8 import com.practice.impl.BasicTopology;
 9 
10 public class LbsCalcTopology extends BasicTopology{
11     
12     private int spoutParallelism = 1;
13     
14     private int onceParallelism = 1;
15     
16     private int calculateParallelism = 1;
17     
18 
19     @Override
20     public void addOptions(Options options) {
21         options.addOption("spout",true,"spoutParallelism");
22         options.addOption("once",true,"onceParallelism");
23         options.addOption("calc",true,"calculateParallelism");
24     }
25 
26     @Override
27     public void setupOptionValue(CommandLine cmd) {
28         spoutParallelism = Integer.parseInt(cmd.getOptionValue("spout","1"));
29         onceParallelism = Integer.parseInt(cmd.getOptionValue("once","1"));
30         calculateParallelism = Integer.parseInt(cmd.getOptionValue("calc","1"));
31     }
32 
33     @Override
34     public void createTopology(TopologyBuilder builder) {
35         KafkaSpout kafkaSpout = getKafkaSpout("lbs");
36         LbsOnceBolt exactlyOnceBolt = new LbsOnceBolt(getTopoName(),"lbs","lbs");
37         LbsCalcBolt calculateBolt = new LbsCalcBolt();
38         
39         builder.setSpout("kafkaSpout", kafkaSpout,spoutParallelism);
40         builder.setBolt("onceParallelism", exactlyOnceBolt,onceParallelism).shuffleGrouping("kafkaSpout");
41         builder.setBolt("calculateBolt", calculateBolt, calculateParallelism).fieldsGrouping("onceParallelism", BasicTopology.HASH_FIELDS);
42     }
43     
44     @Override
45     public String getTopoName() {
46         return "lbs";
47     }
48 
49     @Override
50     public String getConfigName() {
51         return "lbs-topology.xml";
52     }
53 
54     @Override
55     public int getNumWorkers() {
56         return 3;
57     }
58     
59     public static void main(String args[]) throws Exception{
60         LbsCalcTopology topo = new LbsCalcTopology();
61         topo.run(args);
62     }
63 }

 

具體業務時間bolt:

 1 import java.util.Map;
 2 
 3 import backtype.storm.task.OutputCollector;
 4 import backtype.storm.task.TopologyContext;
 5 import backtype.storm.topology.OutputFieldsDeclarer;
 6 import backtype.storm.topology.base.BaseRichBolt;
 7 import backtype.storm.tuple.Tuple;
 8 
 9 public class LbsCalcBolt extends BaseRichBolt{
10 
11     @Override
12     public void execute(Tuple input) {
13         try{
14             String msg = input.getString(0);
15             if(msg != null){
16                 System.out.println(msg);
17             }
18         }catch(Exception e){
19             e.printStackTrace();
20         }
21     }
22 
23     @Override
24     public void prepare(Map arg0, TopologyContext arg1, OutputCollector arg2) {
25         // TODO Auto-generated method stub
26         
27     }
28 
29     @Override
30     public void declareOutputFields(OutputFieldsDeclarer arg0) {
31         // TODO Auto-generated method stub
32         
33     }
34 
35 }

 

 

Kafka安裝配置與代碼框架

 

一、主要配置參數說明

-- 日誌數據存儲目錄
log.dirs = /storage/disk17/kafka-logs

-- Kafka server端口
port = 6667

-- 強制切分日誌文件的間隔時間(切分的文件大小由log.segment.bytes決定,默認1073741824 bytes)
log.roll.hours = 168

-- 日誌文件保留時間
log.retention.hours = 168

-- 每次flush到磁盤的最大消息數量,數量太大會讓寫磁盤時間過長(IO阻塞),致使客戶端發生延遲
log.flush.interval.messages=10000

-- 每次flush消息的間隔時間
log.flush.scheduler.interval.ms=3000

-- zookeeper 鏈接超時時間
zookeeper.connection.timeout.ms = 20000

-- zookeeper 集羣的節點地址,多個以,號分隔
zookeeper.connect=127.0.0.1:2181

-- zooKeeper集羣中leader和follower之間的同步時間
zookeeper.sync.time.ms=2000

-- 是否容許自動建立主題
auto.create.topics.enable=true

-- 默認複製因子數量
default.replication.factor=1

-- 默認分區數量
num.partitions=1


二、消息以及日誌數據的存儲策略

kafka和JMS(Java Message Service)實現(activeMQ)不一樣的是:即便消息被消費,消息仍然不會被當即刪除,日誌文件將會根據broker中的配置要求,保留必定的時間以後刪除,好比log.retention.hours=168,那麼七天後,文件會被清除,不管其中的消息是否被消費。kafka經過這種簡單的手段,來釋放磁盤空間,以及減小消息消費以後對文件內容改動的磁盤IO開銷。

對於consumer而言,它須要保存消費消息的offset(若是不指定則存儲在默認的zookeeper目錄kafak-broker),對於offset的保存和使用,由consumer來控制。當consumer正常消費消息時,offset將會"線性"的向前驅動,即消息將依次順序被消費。事實上consumer可使用任意順序消費消息,它只須要將offset重置爲任意值。

kafka集羣幾乎不須要維護任何consumer和producer狀態信息,這些信息由zookeeper保存。所以producer和consumer的客戶端實現很是輕量級,它們能夠隨意離開,而不會對集羣形成額外的影響。

三、經常使用操做命令

3.一、建立消息主題(kafka topic)

./kafka-topics.sh --zookeeper 127.0.0.1:2181 --topic topicName --partitions 2 --replication-factor 3 --create

--zookeeper zookeeper 服務端地址
--topic 消息主題名稱
--partitions 分區數量(若是不指定數量則由Kafka的配置決定)
--replication-factor 複製因子數量(副本數量,若是不指定數量則由Kafka的配置決定)
--create 建立主題

3.二、刪除消息主題

* 慎用,這條命令只是刪除zookeeper中主題的元數據,日誌文件並不會刪除,不建議直接刪除日誌文件。如需刪除,安全的措施是中止kafka集羣服務,而後執行這條命令再刪除日誌文件,最後恢復服務,恢復服務後執行--describe命令(3.3)檢查主題是否已經移除!

./kafka-run-class.sh kafka.admin.DeleteTopicCommand --zookeeper 127.0.0.1:2181 --topic topicName

3.三、查看消息主題

./kafka-topics.sh --zookeeper 127.0.0.1:2181 --topic topicName --describe

--zookeeper zookeeper 服務端地址
--topic 消息主題名稱
----describe 主題信息

命令將輸出以下信息,如topicName主題有2個分區,複製因子爲3,以及主題所在的leader。

Topic:topicName PartitionCount:2 ReplicationFactor:3 Configs:
Topic: topicName Partition: 0 Leader: 2 Replicas: 2,1,0 Isr: 2,1,0
Topic: topicName Partition: 1 Leader: 0 Replicas: 0,2,1 Isr: 0,2,1

3.四、其餘命令

-- 列出全部的主題
./kafka-topics.sh --zookeeper 127.0.0.1:2181 --list

-- 修改主題分區數量
./kafka-topics.sh --zookeeper 127.0.0.1:2181 --topic topicName --partitions 4 --alter

 

 

Redis安裝配置與代碼框架

 

 

Jettry配置與代碼框架

 Jetty服務一共啓動8個線程進行數據的接入。存儲於bLOCKINGqUEUE中,進行消費,啓啓動多個線程的做用在於併發消費接入的報文數據。同時,同一事件啓動多個服務,經過nginx進行負載均衡。

1、Jetty服務的啓動

一、建立ApplicationContext,經過AbstractApplicationContext建立。
AbstractApplicationContext applicationContext = new FileSystemXmlApplicationContext(configXmlFile);

二、建立PlatformMBeanServer
MBeanServer mbs = java.lang.management.ManagementFactory.getPlatformMBeanServer();

三、註冊ApplicationServer

ObjectName mbeanName = getApplicationObjectName();
ApplicationServer applicationServer = new ApplicationServer();
....
mbs.registerMBean(applicationServer,mbeanNamer);

 

2、Jettry服務的中止
......

 

 

 

HIVE安裝配置與代碼框架

 

 

Rstudio與Spark安裝配置與代碼框架

 

 

Nginx配置

 

user www-data; #運行用戶
worker_processes 1; #啓動進程,一般設置成和cpu的數量相等
error_log /var/log/nginx/error.log; #全局錯誤日誌及PID文件
pid /var/run/nginx.pid;#PID文件

#工做模式及鏈接數上限
events {
use epoll; #epoll是多路複用IO(I/O Multiplexing)中的一種方式,可是僅用於linux2.6以上內核,能夠大大提升nginx的性能
worker_connections 1024;#單個後臺worker process進程的最大併發連接數
# multi_accept on;
}

#設定http服務器,利用它的反向代理功能提供負載均衡支持
http {
#設定mime類型,類型由mime.type文件定義
include /etc/nginx/mime.types;
default_type application/octet-stream;
#設定日誌格式
access_log /var/log/nginx/access.log;

#sendfile 指令指定 nginx 是否調用 sendfile 函數(zero copy 方式)來輸出文件,對於普通應用,
#必須設爲 on,若是用來進行下載等應用磁盤IO重負載應用,可設置爲 off,以平衡磁盤與網絡I/O處理速度,下降系統的uptime.
sendfile on;
#tcp_nopush on;

#鏈接超時時間
#keepalive_timeout 0;
keepalive_timeout 65;
tcp_nodelay on;

#開啓gzip壓縮
gzip on;
gzip_disable "MSIE [1-6]\.(?!.*SV1)";

#設定請求緩衝
client_header_buffer_size 1k;
large_client_header_buffers 4 4k;

include /etc/nginx/conf.d/*.conf;
include /etc/nginx/sites-enabled/*;

#設定負載均衡的服務器列表
upstream mysvr {
#weigth參數表示權值,權值越高被分配到的概率越大
#本機上的Squid開啓3128端口
server 192.168.8.1:3128 weight=5;
server 192.168.8.2:80 weight=1;
server 192.168.8.3:80 weight=6;
}


server {
#偵聽80端口
listen 80;
#定義使用www.xx.com訪問
server_name www.xx.com;(虛擬主機ip)

#設定本虛擬主機的訪問日誌
access_log logs/www.xx.com.access.log main;

#默認請求
location / {
root /root; #定義服務器的默認網站根目錄位置
index index.php index.html index.htm; #定義首頁索引文件的名稱

fastcgi_pass www.xx.com;
fastcgi_param SCRIPT_FILENAME $document_root/$fastcgi_script_name;
include /etc/nginx/fastcgi_params;
}

# 定義錯誤提示頁面
error_page 500 502 503 504 /50x.html;
location = /50x.html {
root /root;
}

#靜態文件,nginx本身處理
location ~ ^/(images|javascript|js|css|flash|media|static)/ {
root /var/www/virtual/htdocs;
#過時30天,靜態文件不怎麼更新,過時能夠設大一點,若是頻繁更新,則能夠設置得小一點。
expires 30d;
}
#PHP 腳本請求所有轉發到 FastCGI處理. 使用FastCGI默認配置.
location ~ \.php$ {
root /root;
fastcgi_pass 127.0.0.1:9000;
fastcgi_index index.php;
fastcgi_param SCRIPT_FILENAME /home/www/www$fastcgi_script_name;
include fastcgi_params;
}
#設定查看Nginx狀態的地址
location /NginxStatus {
stub_status on;
access_log on;
auth_basic "NginxStatus";
auth_basic_user_file conf/htpasswd;
}
#禁止訪問 .htxxx 文件
location ~ /\.ht {
deny all;
}

}
}

 

 

Sentry配置

 

 

Hue配置(3.11版本)

hue安裝
rpm -ivh hue-* --force --nodeps
rpm -ivh sentry-* --force --nodeps

hue 啓動
/usr/lib/hue/build/env/bin/hue runserver ip:port > /var/log/hue/hue-start.out 2> /var/log/hue/hue-error.out &


spark livy 集成
hue sqlited is locked
desktopdefault_hdfs_superuserhadoopHDFS管理用戶desktophttp_host10.10.4.125Hue Web Server所在主機/IPdesktophttp_port8000Hue Web Server服務端口desktopserver_userhadoop運行Hue Web Server的進程用戶desktopserver_grouphadoop運行Hue Web Server的進程用戶組desktopdefault_useryanjunHue管理員


1. 新建hue用戶
#usradd hue
將包放到/home/hue/底下(能夠本身選擇位置)
#rpm -ivh hue-* --force --nodeps
安裝完成以後,hue的目錄在/usr/lib/hue,配置文件目錄在/etc/hue/conf/hue.ini
2. 配置hue(/etc/hue/conf/hue.ini)
[beeswax]
# Host where HiveServer2 is running.
# If Kerberos security is enabled, use fully-qualified domain name (FQDN).
hive_server_host=172.19.189.50(hiveserver2的地址)

# Port where HiveServer2 Thrift server runs on.
hive_server_port=10010(hiveserver2端口)

# Hive configuration directory, where hive-site.xml is located
hive_conf_dir=/etc/conf(hive配置文件路徑)

# Timeout in seconds for thrift calls to Hive service
server_conn_timeout=120(鏈接hive超時時間)

# Choose whether to use the old GetLog() thrift call from before Hive 0.14 to retrieve the logs.
# If false, use the FetchResults() thrift call from Hive 1.0 or more instead.
## use_get_log_api=false

# Limit the number of partitions that can be listed.
list_partitions_limit=10000(不限制時查詢界面上list的partition數據)

# The maximum number of partitions that will be included in the SELECT * LIMIT sample query for partitioned tables.
query_partitions_limit=10(查詢語句中最多查詢10個partition)

# A limit to the number of cells (rows * columns) that can be downloaded from a query
# (e.g. - 10K rows * 1K columns = 10M cells.)
# A value of -1 means there will be no limit.
download_cell_limit=10000000(download最大量)

# Hue will try to close the Hive query when the user leaves the editor page.
# This will free all the query resources in HiveServer2, but also make its results inaccessible.
close_queries=true(關閉界面時直接斷開該用戶與hiveserver2的鏈接)

# Thrift version to use when communicating with HiveServer2.
# New column format is from version 7.
thrift_version=7(默認值,不建議改會報錯)

[[yarn_clusters]]
[[[default]]]
# Enter the host on which you are running the ResourceManager
resourcemanager_host= (RM地址)

# The port where the ResourceManager IPC listens on
resourcemanager_port=8032(RM端口)

# Whether to submit jobs to this cluster
submit_to=True

# Resource Manager logical name (required for HA)
logical_name=ActiveResourceManager(HA,給activeRM命名)

# Change this if your YARN cluster is Kerberos-secured
## security_enabled=false

# URL of the ResourceManager API
resourcemanager_api_url=http://172.19.xxx.xx:8088(RM URL)

# URL of the ProxyServer API
## proxy_api_url=http://localhost:8088

# URL of the HistoryServer API
history_server_api_url=http://172.19.xx.xx:19888

# URL of the Spark History Server
#spark_history_server_url=http://172.30.xx.xx:18088

# In secure mode (HTTPS), if SSL certificates from YARN Rest APIs
# have to be verified against certificate authority
## ssl_cert_ca_verify=True

# HA support by specifying multiple clusters.
# Redefine different properties there.
# e.g.

[[[ha]]]
# Resource Manager logical name (required for HA)
logical_name=StandbyResourceManager(RM的HA從節點name)

# Un-comment to enable
submit_to=True

# URL of the ResourceManager API
resourcemanager_api_url=http://172.19.189.50:8088(RM從節點地址)

[hadoop]
# Configuration for HDFS NameNode
# ------------------------------------------------------------------------
[[hdfs_clusters]]
# HA support by using HttpFs
[[[default]]]
# Enter the filesystem uri
fs_defaultfs=hdfs://hdp(HDFS的filesystem名字)

# NameNode logical name.
logical_name=MasterNamenode(NN name)

# Use WebHdfs/HttpFs as the communication mechanism.
# Domain should be the NameNode or HttpFs host.
# Default port is 14000 for HttpFs.
webhdfs_url=http://172.19.189.50:50070/webhdfs/v1(hue的獲取hdfs的nn HA是經過WebHDFS來切換的,這個地址是啓動Webhdfs的地址)

# Change this if your HDFS cluster is Kerberos-secured
## security_enabled=false

# In secure mode (HTTPS), if SSL certificates from YARN Rest APIs
# have to be verified against certificate authority
## ssl_cert_ca_verify=True

# Directory of the Hadoop configuration
hadoop_conf_dir=/etc/hadoop/conf(hadoop配置文件路徑)

[desktop]
# Set this to a random string, the longer the better.
# This is used for secure hashing in the session store.
## secret_key=

# Execute this script to produce the Django secret key. This will be used when
# 'secret_key' is not set.
## secret_key_script=

# Webserver listens on this address and port
http_host=0.0.0.0
http_port=8888

# Time zone name
time_zone=Asia/Shanghai

# Enable or disable Django debug mode.
#django_debug_mode=false

# Enable or disable database debug mode.
#database_logging=false

# Whether to send debug messages from JavaScript to the server logs.
#send_dbug_messages=true

# Enable or disable backtrace for server error
http_500_debug_mode=true

# Enable or disable memory profiling.
## memory_profiler=false

# Server email for internal error messages
## django_server_email='hue@localhost.localdomain'

# Email backend
## django_email_backend=django.core.mail.backends.smtp.EmailBackend

# Webserver runs as this user
server_user=hue
server_group=hadoop(hue界面登陸用戶)

# This should be the Hue admin and proxy user
default_user=hue(hue管理員用戶)

# This should be the hadoop cluster admin
default_hdfs_superuser=hue(hue在hdfs上的管理員用戶)
[[database]]
#engine=sqlite3
#name=/var/lib/hue/desktop.db
# Database engine is typically one of:
# postgresql_psycopg2, mysql, sqlite3 or oracle.
# Note that for sqlite3, 'name', below is a path to the filename. For other backends, it is the database name
# Note for Oracle, options={"threaded":true} must be set in order to avoid crashes.
# Note for Oracle, you can use the Oracle Service Name by setting "host=" and "port=" and then "name=<host>:<port>/<service_name>".
# Note for MariaDB use the 'mysql' engine.
engine=mysql
host=172.30.115.60
port=3306
user=hue
password=hue
name=hue
(hue的元數據存儲位置)
# Execute this script to produce the database password. This will be used when 'password' is not set.
## password_script=/path/script
## name=desktop/desktop.db
## options={}

[librdbms]
# The RDBMS app can have any number of databases configured in the databases
# section. A database is known by its section name
# (IE sqlite, mysql, psql, and oracle in the list below).
[[databases]]
# sqlite configuration.
## [[[sqlite]]]
# Name to show in the UI.
## nice_name=SQLite

# For SQLite, name defines the path to the database.
## name=/tmp/sqlite.db

# Database backend to use.
## engine=sqlite

# Database options to send to the server when connecting.
# https://docs.djangoproject.com/en/1.4/ref/databases/
## options={}

# mysql, oracle, or postgresql configuration.
[[[mysql]]](開啓mysql)
# Name to show in the UI.
nice_name="HUE DB"(界面顯示mysql的名字)

# For MySQL and PostgreSQL, name is the name of the database.
# For Oracle, Name is instance of the Oracle server. For express edition
# this is 'xe' by default.
name=hue(鏈接的mysql數據庫)

# Database backend to use. This can be:
# 1. mysql
# 2. postgresql
# 3. oracle
engine=mysql(鏈接引擎)

# IP or hostname of the database to connect to.
host=172.30.115.60(mysql地址)

# Port the database server is listening to. Defaults are:
# 1. MySQL: 3306
# 2. PostgreSQL: 5432
# 3. Oracle Express Edition: 1521
port=3306(端口)

# Username to authenticate with when connecting to the database.
user=hue(鏈接數據庫的用戶)

# Password matching the username to authenticate with when
# connecting to the database.
password=hue(鏈接數據庫的密碼)

# Database options to send to the server when connecting.
# https://docs.djangoproject.com/en/1.4/ref/databases/
## options={}

修改hue源碼
1. /usr/lib/hue/desktop/libs/notebook/src/notebook/connectors/hiveserver2.py
添加調用query_server方法:
def _get_db(self, snippet):

LOG.info('log by ck get_db')
if snippet['type'] == 'hive':

name = 'beeswax'

elif snippet['type'] == 'impala':

name = 'impala'

else:

name = 'spark-sql'

 

return dbms.get_by_type(self.user, 'data', query_server=get_query_server_config_for_meta(name=name))

修改計算(exector)的執行方法
@query_error_handler
def execute(self, notebook, snippet):
db = self._get_db(snippet)

statement = self._get_current_statement(db, snippet)
session = self._get_session(notebook, snippet['type'])
query = self._prepare_hql_query(snippet, statement['statement'], session)

try:
if statement.get('statement_id') == 0:
db.use(query.database)
handle = db.client.query(query)
except QueryServerException, ex:
raise QueryError(ex.message, handle=statement)

# All good
server_id, server_guid = handle.get()
response = {
'secret': server_id,
'guid': server_guid,
'operation_type': handle.operation_type,
'has_result_set': handle.has_result_set,
'modified_row_count': handle.modified_row_count,
'log_context': handle.log_context,
}
response.update(statement)

return response
2. /usr/lib/hue/apps/beeswax/src/beeswax/server/dbms.py
添加query執行的路徑:(修改ip爲spark的ip和端口)
def get_query_server_config_for_meta(name='beeswax', server=None):

if name == 'impala':

from impala.dbms import get_query_server_config as impala_query_server_config

query_server = impala_query_server_config()

else:

kerberos_principal = hive_site.get_hiveserver2_kerberos_principal(HIVE_SERVER_HOST.get())

 

query_server = {

'server_name': 'beeswax', # Aka HiveServer2 now

'server_host': '172.30.115.58',#HIVE_SERVER_HOST.get(),

'server_port': '10010',#HIVE_SERVER_PORT.get(),

'principal': kerberos_principal,

'http_url': '%(protocol)s://%(host)s:%(port)s/%(end_point)s' % {

'protocol': 'https' if hiveserver2_use_ssl() else 'http',

'host': '172.30.115.58',#HIVE_SERVER_HOST.get(),

'port': hive_site.hiveserver2_thrift_http_port(),

'end_point': hive_site.hiveserver2_thrift_http_path()

},

'transport_mode': 'http' if hive_site.hiveserver2_transport_mode() == 'HTTP' else 'socket',

'auth_username': AUTH_USERNAME.get(),

'auth_password': AUTH_PASSWORD.get()

}

 

if name == 'sparksql': # Spark SQL is almost the same as Hive

from spark.conf import SQL_SERVER_HOST as SPARK_SERVER_HOST, SQL_SERVER_PORT as SPARK_SERVER_PORT

 

query_server.update({

'server_name': 'sparksql',

'server_host': SPARK_SERVER_HOST.get(),

'server_port': SPARK_SERVER_PORT.get()

})

 

debug_query_server = query_server.copy()

debug_query_server['auth_password_used'] = bool(debug_query_server.pop('auth_password'))

LOG.info("Query Server: %s" % debug_query_server)

 

return query_server

添加query的cache locking
DBMS_CACHE = {}

DBMS_CACHE_LOCK = threading.Lock()


DBMS_META_CACHE = {}

DBMS_META_CACHE_LOCK = threading.Lock()

DBMS_DATA_CACHE = {}

DBMS_DATA_CACHE_LOCK = threading.Lock()

 

 

def get(user, query_server=None):

global DBMS_CACHE

global DBMS_CACHE_LOCK

 

if query_server is None:

query_server = get_query_server_config()

 

DBMS_CACHE_LOCK.acquire()

try:

DBMS_CACHE.setdefault(user.username, {})

 

if query_server['server_name'] not in DBMS_CACHE[user.username]:

# Avoid circular dependency

from beeswax.server.hive_server2_lib import HiveServerClientCompatible

 

if query_server['server_name'] == 'impala':

from impala.dbms import ImpalaDbms

from impala.server import ImpalaServerClient

DBMS_CACHE[user.username][query_server['server_name']] = ImpalaDbms(HiveServerClientCompatible(ImpalaServerClient(query_server, user)), QueryHistory.SERVER_TYPE[1][0])

else:

from beeswax.server.hive_server2_lib import HiveServerClient

DBMS_CACHE[user.username][query_server['server_name']] = HiveServer2Dbms(HiveServerClientCompatible(HiveServerClient(query_server, user)), QueryHistory.SERVER_TYPE[1][0])

 

return DBMS_CACHE[user.username][query_server['server_name']]

finally:

DBMS_CACHE_LOCK.release()

 

 

def get_by_type(user, type, query_server=None):

global DBMS_META_CACHE

global DBMS_META_CACHE_LOCK

global DBMS_DATA_CACHE

global DBMS_DATA_CACHE_LOCK

 

if query_server is None:

query_server = get_query_server_config()

if type == 'meta':

DBMS_META_CACHE_LOCK.acquire()

try:

DBMS_META_CACHE.setdefault(user.username, {})

 

if query_server['server_name'] not in DBMS_META_CACHE[user.username]:

# Avoid circular dependency

from beeswax.server.hive_server2_lib import HiveServerClientCompatible

 

if query_server['server_name'] == 'impala':

from impala.dbms import ImpalaDbms

from impala.server import ImpalaServerClient

DBMS_META_CACHE[user.username][query_server['server_name']] = ImpalaDbms(HiveServerClientCompatible(ImpalaServerClient(query_server, user)), QueryHistory.SERVER_TYPE[1][0])

else:

from beeswax.server.hive_server2_lib import HiveServerClient

DBMS_META_CACHE[user.username][query_server['server_name']] = HiveServer2Dbms(HiveServerClientCompatible(HiveServerClient(query_server, user)), QueryHistory.SERVER_TYPE[1][0])

 

return DBMS_META_CACHE[user.username][query_server['server_name']]

finally:

DBMS_META_CACHE_LOCK.release()

else:

DBMS_DATA_CACHE_LOCK.acquire()

try:

DBMS_DATA_CACHE.setdefault(user.username, {})

 

if query_server['server_name'] not in DBMS_DATA_CACHE[user.username]:

# Avoid circular dependency

from beeswax.server.hive_server2_lib import HiveServerClientCompatible

 

if query_server['server_name'] == 'impala':

from impala.dbms import ImpalaDbms

from impala.server import ImpalaServerClient

DBMS_DATA_CACHE[user.username][query_server['server_name']] = ImpalaDbms(HiveServerClientCompatible(ImpalaServerClient(query_server, user)), QueryHistory.SERVER_TYPE[1][0])

else:

from beeswax.server.hive_server2_lib import HiveServerClient

DBMS_DATA_CACHE[user.username][query_server['server_name']] = HiveServer2Dbms(HiveServerClientCompatible(HiveServerClient(query_server, user)), QueryHistory.SERVER_TYPE[1][0])

 

return DBMS_DATA_CACHE[user.username][query_server['server_name']]

finally:

DBMS_DATA_CACHE_LOCK.release()

3. /usr/lib/hue/apps/filebrowser/src/filebrowser/templates/listdir.mako
刪除filebrower中的delete和drop
< !-- ko if: $root.isS3 -->
< button

 

class ="btn fileToolbarBtn delete-link" title="${_('Delete forever')}" data-bind="enable: selectedFiles().length > 0, click: deleteSelected" > < i class ="fa fa-bolt" > < / i > ${_('Delete forever')} < / button >

 

< !-- / ko -->


< ul

class ="dropdown-menu" >

< li > < a

href = "#"

class ="delete-link" title="${_('Delete forever')}" data-bind="enable: selectedFiles().length > 0, click: deleteSelected" > < i class ="fa fa-bolt" > < / i > ${_('Delete forever')} < / a > < / li >

< / ul >

4. /usr/lib/hue/apps/filebrowser/src/filebrowser/templates/listdir_components.mako
< li > < a href = "#" class ="delete-link" title="${_('Delete forever')}" data-bind="enable: $root.selectedFiles().length > 0, click: $root.deleteSelected" > < i class ="fa fa-fw fa-bolt" > < / i > ${_('Delete forever')} < / a > < / li >

5. /usr/lib/hue/apps/filebrowser/src/filebrowser/views.py
request.fs.do_as_user(request.user, request.fs.rmtree, arg['path'], True) # the old code was ('skip_trash' in request.GET)

修改hdfs配置
hdfs-site.xml文件
修改:fs.permissions.umask-mode 000
(hue建表用戶爲web登陸用戶,在用spark作計算引擎時會在hdfs的表目錄中建立文件,後臺spark的用戶爲hive沒法寫入)

添加:添加hue用戶爲hdfs代理用戶(superuser)hadoop.proxyuser.hue.hosts *hadoop.proxyuser.hue.groups *

相關文章
相關標籤/搜索