中國移動實時數據分析-基於spark+kafka+flume

  這兩天主要是作了中國移動的實時數據分析一個小項目(能夠說是demo了),這裏記錄下來整個過程裏面遇到的坑,首先安裝好flume,kafka,spark(基於代碼本地運行能夠不安裝),redis,zookeeper 主要是爲了熟悉一下整個的一個spark-streaming的一個整個流程,還有就是了解調優的地方。java

  上述假設已經安裝好了相應的組件,而後就開始正式的踩坑之路:redis

  1.編寫一個java程序去讀取原始數據文件,模擬1s進行文件的插入一行,原始的數據文件格式以下:sql

    

     坑aapache

    .整個的數據格式是json,可是是一整行的。。。。json

    解決a1:因而就想這去把這樣的數據轉化爲json格式的,就去搗鼓了一下notepad++轉json格式的方法:notepad++上面的菜單欄中,插件-> plugins Admin..->search中直接查找就行了,而後找找有個install的按鈕點擊一下就ok了,而後各類肯定,以後notepad++會自動重啓,重啓以後上面的菜單欄中,插件->就會多出一個JSON Viewer,而後就能夠了。可是我操做的時候遇到了notepad++重啓以後沒有出現JSON Viewer(可是後來又出現了),bootstrap

    解決a2:因而又去找了idea實現json格式的方法:setting->keymap->main enum->code->reformat code 這個功能是將文本格式化,該功能的快捷鍵默認是ctrl+shift+l,可是這個快捷鍵組合是有衝突的,因此將其轉化爲ctrl+shift+s,修改後進行保存,而後建立一個xxx.json的文件,複製一行json數據到該文件中,而後全選,按下ctrl+shift+s便可轉化爲標準的json文件格式windows

    

    相應的java實現代碼以下:數據結構

 
 
import java.io.*;
import java.util.ArrayList;
import java.util.List;

public class WriteCMCC {
public static void main(String[] args) {
List<String> allLines = getCmcc(args[0]);
System.out.println(allLines.size());
writeCmcc(allLines, args[1]);
}

/**
* 一次性讀取cmcc中的數據
* @return 存放在list中
*/
private static List<String> getCmcc(String path) {
BufferedReader br = null;
List<String> allLines = new ArrayList<String>();
try {
br = new BufferedReader(new FileReader(new File(path)));
String line = "";
while ((line = br.readLine()) != null) {
allLines.add(line);
}
} catch (Exception e) {
e.printStackTrace();
}finally {
try {
if (br != null) br.close();
} catch (IOException e) {
e.printStackTrace();
}
}

return allLines;
}

/**
* 寫入cmcc中的數據,一次寫入一個list的數據集
*/
private static void writeCmcc(List<String> cmcc, String path) {

BufferedWriter bw = null;
try {
bw = new BufferedWriter(new FileWriter(new File(path)));
for(String line : cmcc) {
bw.write(line);
bw.flush();
Thread.sleep(1000);
bw.newLine();
}

} catch (Exception e) {
e.printStackTrace();
}finally {
try {
if (bw != null) bw.close();
} catch (IOException e) {
e.printStackTrace();
}
}
}
}
 

    代碼寫好,而後測試完,而後打成jar包,丟到Linux準備運行。app

    java -jar /home/soft/jar/write_cmcc_5_seconds.jar /home/soft/cmcc.log /home/soft/cmcc/cmcc_write.logide

  2.flume編寫相應的conf去把數據抽取到kafka中(cmcc.conf)

    先啓動zookeeper,啓動kafka並建立topic(cmcc):

      zookeeper啓動命令:

        /home/soft/zookeeper-3.4.6/bin/zkServer.sh start(每一個節點都須要啓動)
      kafka啓動命令:
        /home/soft/kafka_2.11-0.10.1.0/bin/kafka-server-start.sh /home/soft/kafka_2.11-0.10.1.0/config/server.properties &
      kafka建立topic:
        bin/kafka-topics.sh --zookeeper os1:2181,os2:2181,os3:2181 --create --topic cmcc --partitions 6 --replication-factor

      kafka查看全部的topic:
        bin/kafka-topics.sh --zookeeper os1:2181,os2:2181,os3:2181 --list

    而後編寫conf測試(cmcc.conf):

      

a1.sources = s1
a1.channels = c1

#這裏先不使用該種方式去讀取文件,由於該方式flume會出以下的錯誤
#java.lang.IllegalStateException: File has been modified since being read: /home/soft/cmcc/cmcc_write.log
#緣由:出現這個問題的緣由是,當咱們拷貝一個文件的時候,一些對文件進行了修改
#解決:最好的方法就是,確保大文件徹底拷貝後,再讓flume來讀取,思路是將拷貝中的文件加上一個多餘的後綴,flume一開始不會讀取文件,當文件拷貝完成後去掉多餘的後綴,這個時候flume就會針對新文件進行讀取。
#a1.sources.s1.type =spooldir
#a1.sources.s1.spoolDir =/home/soft/cmcc
#a1.sources.s1.fileHeader= true

a1.sources.s1.type=exec
a1.sources.r1.command = tail -F /root/app_weichat_login.log

a1.channels.c1.type = org.apache.flume.channel.kafka.KafkaChannel
a1.channels.c1.kafka.bootstrap.servers = os1:9092,os2:9092,os3:9092
#建立好相應的topic
a1.channels.c1.kafka.topic = cmcc
#這個是本身定義的沒啥事情
a1.channels.c1.kafka.consumer.group.id = flume-consumer
#這個必定要設置,不然就是個坑,寫入到kafka中的數據會被追加進一些數據,並且仍是亂碼的
a1.channels.c1.parseAsFlumeEvent = false

#拼接source和channel
a1.sources.s1.channels=c1

     flume啓動命令:下面的a1就對應着上面的a1(控制檯打印信息)
      bin/flume-ng agent -n a1 -c conf -f conf/cmcc.conf -Dflume.root.logger=INFO,console

  3.spark程序去讀取kafka的中的數據並將結果存放至redis中

    啓動redis:/usr/local/redis/bin/redis-server  /usr/local/redis/etc/redis.conf

    程序相應的配置:resources -> application.conf 

#kafka的相關參數
kafka.topic = "cmcc"
kafka.broker.list="os1:9092,os2:9092,os3:9092"
kafka.group.id="cmcc"
redis.host="xxx.xxx.xxx.xxx"
redis.db.index="0"

    主程序代碼:scala -> BootStarpApp

package app

import java.text.SimpleDateFormat

import com.alibaba.fastjson.JSON
import org.apache.spark.SparkConf
import org.apache.spark.streaming.kafka010.{ConsumerStrategies, KafkaUtils, LocationStrategies}
import org.apache.spark.streaming.{Seconds, StreamingContext}
import utils.{AppParams, Jpools}

object BootStarpApp {
  def main(args: Array[String]): Unit = {

    /**
      * 錯誤集:
      * 1.Caused by: org.apache.kafka.common.KafkaException: org.codehaus.jackson.map.deser.std.StringDeserializer is not an instance of org.apache.kafka.common.serialization.Deserializer
      *   錯誤解釋,kafka在進行序列化實例對象的時候出錯
      *   查找緣由:
      *   org.codehaus.jackson.map.deser.std.StringDeserializer是咱們AppParas中導入的類型,多是導錯了,
      *   查看後發現應該導入:import org.apache.kafka.common.serialization.StringDeserializer
      * 2. 程序出現INFO:Marking the coordinator os3:9092 (id: 2147483645 rack: null) dead for group cmcc_test2,且程序再也不執行下去
      *   緣由:由於kafka-clent程序默認讀取到kafka上的信息以後將host:os3返回做爲主機節點去獲取數據,可是在本機中沒有配置相應的host與ip的映射,全部這裏就沒法直接進行訪問os3
      *   解決辦法;在windows中配置相應的ip與hostname的映射(kafka中的broker節點)
      * 3.json解析出錯:error parse false
      *   緣由json格式錯誤
      *
      * 4.flume的坑:a0.channels.c1.parseAsFlumeEvent = false  1.7之後默認爲true
      * 若是設置此項爲 true,Kafka Sink 則會把數據按照標準的 Flume Event 格式(即Headers域和body域結合的數據結構)發送。Flume Event 中的 Headers 域一般是一些附加字段,能夠是時間戳(好比時間戳攔截器指定的時間戳)、文件名(好比 spooldir Source 開啓的 fileHeader = true)等信息。可是 1.7.0 版本的 Flume 一旦開啓此配置,會致使 Headers 域裏面的信息亂碼
      *
      * 5.flume異常崩潰 File has been modified since being read
      *   緣由:出現這個問題的緣由是,當咱們拷貝一個文件的時候,一些對文件進行了修改,就會出現這個錯誤
      *   解決:最好的方法就是,確保大文件徹底拷貝後,再讓flume來讀取,思路是將拷貝中的文件加上一個多餘的後綴,flume一開始不會讀取文件,當文件拷貝完成後去掉多餘的後綴,這個時候flume就會針對新文件進行讀取。
      *   另外針對大文件,flume的解決方案能夠設置一個文件完成後綴:
      */

    val sparkConf = new SparkConf()

    sparkConf.setAppName("中國移動運營實時監控平臺")
    sparkConf.setMaster("local[*]")

    /**
      *將rdd以kryo的序列化保存,以減小內存的使用
      */
    sparkConf.set("spark.serializer","org.apache.spark.serializer.KryoSerializer")
    /**
      * 對rdd進行壓縮,使用內存空間換去處理時間的方式,減小內存的使用
     */
    sparkConf.set("spark.rdd.compress", "true")

    /**
      *
      */
    sparkConf.set("spark.streaming.kafka.maxRatePerPartition", "100")

    /**
      * 進行優雅的中止程序
      */
    sparkConf.set("spark.streaming.stopGracefullyOnShutdown", "true")

    /**
      * 每兩秒執行一個批次
      */
    val ssc = new StreamingContext(sparkConf, Seconds(2))

    /**
      * 獲取kafka的數據
      * LocationStrategies:位置策略,若是kafka的broker節點與Excutor在同一臺機器上給一種策略,再也不一臺機器上給另外一種策略
      * 設定策略以後會以最有的策略進行獲取數據
      * 通常在企業中kafka節點與Excutor不會放到一臺機器的,緣由是kafka是消息存儲的,Executor是用來作消息計算的
      * 所以計算與存儲須要分開,存儲對磁盤要求高,計算對內存和cpu的要求更高
      * 若是Executor節點跟Broker的節點在一塊兒的話就使用PreferBrokers策略,再也不一塊兒的話就使用preferConsisent策略
      * 使用preferConsisent策略的話,未來在kafka中拉去數據之後儘可能將數據分散到全部的Executor上
      */
    val stream = KafkaUtils.createDirectStream(ssc, LocationStrategies.PreferConsistent
      , ConsumerStrategies.Subscribe[String, String](AppParams.topic, AppParams.kafkaParams))


    stream.foreachRDD(rdd => {

      /**
        * {
        * "bussinessRst": "0000",
        * "channelCode": "0705",
        * "chargefee": "10000",
        * "clientIp": "125.82.117.133",
        * "endReqTime": "20170412080609613",
        * "idType": "01",
        * "interFacRst": "0000",
        * "logOutTime": "20170412080609613",
        * "orderId": "384681890175026754",
        * "prodCnt": "1",
        * "provinceCode": "280",
        * "requestId": "20170412080450886738519397327610",
        * "retMsg": "成功",
        * "serverIp": "172.16.59.241",
        * "serverPort": "8088",
        * "serviceName": "sendRechargeReq",
        * "shouldfee": "9950",
        * "startReqTime": "20170412080609503",
        * "sysId": "15"
        * }
        */

      /**
        *  業務邏輯:
        *   serviceName:reChargeNotifyReq,則爲充值通知的記錄
        *   requestId:包含充值的日期(訂單開始時間)
        *   bussinessRst:是否成功 0000 爲成功,其餘爲不成功
        *   chargefee:充值的金額
        *   receiveNotifyTime:訂單結束時間
        *
        */

      /**
        * 咱們能夠經過serviceName字段來肯定,若是該字段是reChargeNotifyReq則表明該條數據是充值通知部分的數據。
        * 獲取全部的充值通知日誌
        */
      val baseData = rdd.map(cr => {
        print(cr.value())
        JSON.parseObject(cr.value())
      }).filter(obj => obj.getString("serviceName").equalsIgnoreCase("reChargeNotifyReq")).cache()

      /**
        * 獲取天天充值成功的訂單筆數
        * 回憶:
        *   wordcount flatMap-》map-》reduceByKey
        */
      val totalSucc = baseData.map(obj=> {
        //獲取日期
        val reqId = obj.getString("requestId")
        //獲取日期
        val day = reqId.substring(0, 8)
        //取出該條充值是否成功的標誌
        val result = obj.getString("bussinessRst")
        val flag = if(result.equals("0000")) 1 else 0
        (day, flag)
      }).reduceByKey(_+_)

      /**
        * 獲取充值成功的訂單金額
        */
      val totalMoney = baseData.map(obj=> {
        val reqId = obj.getString("requestId")
        //獲取日期
        val day = reqId.substring(0, 8)
        //去除該條充值是否成功的標記
        val result = obj.getString("bussinessRst")
        val fee = if(result.equals("0000")) obj.getString("chargefee").toDouble else 0
        (day, fee)
      }).reduceByKey(_+_)

      //總訂單數
      val total = baseData.count()

      /**
        * 獲取充值成功的充值時長
        */
      val totalTime = baseData.map(obj=> {
        var reqId = obj.getString("requestId")
        //獲取日期
        val day = reqId.substring(0, 8)

        //取出該條充值是否成功的標示
        val result = obj.getString("bussinessRst")
        //時間格式爲:yyyyMMddHHmissSSS
        val endTime = obj.getString("receiveNotifyTime")
        val startTime = reqId.substring(0, 17)

        val format = new SimpleDateFormat("yyyyMMddHHmissSSS")

        val cost = if(result.equals("0000")) format.parse(endTime).getTime - format.parse(startTime).getTime else 0
        (day, cost)
      }).reduceByKey(_+_)

      /**
        * 將數據存儲到redis中:
        * (CMCC-20170412,35)
        */
      totalSucc.foreachPartition(itr=> {
       val jedis = Jpools.getJedis
        itr.foreach(tp => {
         // print("CMCC-"+tp._1, tp._2)
          jedis.incrBy("CMCC-"+tp._1, tp._2)
        })
      })
    })


    ssc.start()
    ssc.awaitTermination()
  }
}

  兩個工具類:

package utils

import com.typesafe.config.ConfigFactory
import org.apache.kafka.common.serialization.StringDeserializer

object AppParams {
  /**Scala中使用關鍵字lazy來定義惰性變量,實現延遲加載(懶加載)。
  惰性變量只能是不可變變量,而且只有在調用惰性變量時,纔會去實例化這個變量。
    load中能夠指定相應的配置文件,可是不指定的狀況下默認去讀取resources下的application.conf文件
      默認規則:application.conf->application.json->application.properties
    **/
  private lazy val config = ConfigFactory.load()

  val redisHost = config.getString("redis.host")
  val selectDBIndex = config.getInt("redis.db.index")
  /**
    * 返回訂閱的主題
    */
  val topic = config.getString("kafka.topic").split(",")

  /**
    * kafka集羣所在的主機和端口
    */
  val brokers:String = config.getString("kafka.broker.list")

  /**
    * 消費者的id
    */
  val groupId = config.getString("kafka.group.id")

  /**
    * 將kafka的相關參數進行分裝到map中
    */
  val kafkaParams = Map[String, Object](
    "bootstrap.servers" -> brokers,
    "key.deserializer" -> classOf[StringDeserializer],
    "value.deserializer"-> classOf[StringDeserializer],
    "group.id"-> groupId,
    "auto.offset.reset" -> "earliest",
    "enable.auto.commit" -> "false"
  )
}
package utils
import org.apache.commons.pool2.impl.GenericObjectPoolConfig
import redis.clients.jedis.JedisPool

/**
  * 建立一個redis的線程池
  */
object Jpools {
  private val poolConfig = new GenericObjectPoolConfig
  poolConfig.setMaxIdle(5) //最大的空閒鏈接數爲5,鏈接池中最大的空閒鏈接數,默認是8
  poolConfig.setMaxTotal(2000) //最大支持的鏈接數量,默認也是8

  //鏈接池是私有的,不能對外進行公開訪問
  private lazy val  jedisPool = new JedisPool(poolConfig, AppParams.redisHost)


  def getJedis = {
    val jedis = jedisPool.getResource
    jedis.select(AppParams.selectDBIndex)
    jedis
  }
}

  pom文件

<dependencies>

        <dependency>
            <groupId>org.apache.spark</groupId>
            <artifactId>spark-core_2.11</artifactId>
            <version>2.2.0</version>
        </dependency>

        <dependency>
            <groupId>org.apache.spark</groupId>
            <artifactId>spark-sql_2.11</artifactId>
            <version>2.2.0</version>
        </dependency>

        <dependency>
            <groupId>com.typesafe</groupId>
            <artifactId>config</artifactId>
            <version>1.3.3</version>
        </dependency>

        <dependency>
            <groupId>org.apache.spark</groupId>
            <artifactId>spark-streaming_2.11</artifactId>
            <version>2.2.0</version>
        </dependency>


        <!-- 導入kafka的依賴-->
       <!-- <dependency>
            <groupId>org.apache.kafka</groupId>
            <artifactId>kafka_2.11</artifactId>
            <version>0.10.1.0</version>
        </dependency>-->
        <!-- 指定kafka-client API的版本-->
        <dependency>
            <groupId>org.apache.kafka</groupId>
            <artifactId>kafka-clients</artifactId>
            <version>0.10.1.0</version>
        </dependency>
        <!-- 導入spark streaming 與kafka的依賴包-->
        <dependency>
            <groupId>org.apache.spark</groupId>
            <artifactId>spark-streaming-kafka-0-10_2.11</artifactId>
            <version>2.2.0</version>
        </dependency>

        <dependency>
            <groupId>com.alibaba</groupId>
            <artifactId>fastjson</artifactId>
            <version>1.2.46</version>
        </dependency>

        <dependency>
            <groupId>redis.clients</groupId>
            <artifactId>jedis</artifactId>
            <version>2.9.0</version>
        </dependency>

    </dependencies>

 

 

問題總結:

  1.json格式的轉換 (已解決)

  2.flume讀取數據到kafka後數據亂碼增多問題(已解決)

  3.flume  spooldir  讀取文件的同時對文件更改形成的java.lang.IllegalStateException:File has been modified since being read:問題 (待解決)

  4.上述spark主程序代碼優化問題 (待解決)

相關文章
相關標籤/搜索