這兩天主要是作了中國移動的實時數據分析一個小項目(能夠說是demo了),這裏記錄下來整個過程裏面遇到的坑,首先安裝好flume,kafka,spark(基於代碼本地運行能夠不安裝),redis,zookeeper 主要是爲了熟悉一下整個的一個spark-streaming的一個整個流程,還有就是了解調優的地方。java
上述假設已經安裝好了相應的組件,而後就開始正式的踩坑之路:redis
1.編寫一個java程序去讀取原始數據文件,模擬1s進行文件的插入一行,原始的數據文件格式以下:sql
坑aapache
.整個的數據格式是json,可是是一整行的。。。。json
解決a1:因而就想這去把這樣的數據轉化爲json格式的,就去搗鼓了一下notepad++轉json格式的方法:notepad++上面的菜單欄中,插件-> plugins Admin..->search中直接查找就行了,而後找找有個install的按鈕點擊一下就ok了,而後各類肯定,以後notepad++會自動重啓,重啓以後上面的菜單欄中,插件->就會多出一個JSON Viewer,而後就能夠了。可是我操做的時候遇到了notepad++重啓以後沒有出現JSON Viewer(可是後來又出現了),bootstrap
解決a2:因而又去找了idea實現json格式的方法:setting->keymap->main enum->code->reformat code 這個功能是將文本格式化,該功能的快捷鍵默認是ctrl+shift+l,可是這個快捷鍵組合是有衝突的,因此將其轉化爲ctrl+shift+s,修改後進行保存,而後建立一個xxx.json的文件,複製一行json數據到該文件中,而後全選,按下ctrl+shift+s便可轉化爲標準的json文件格式windows
相應的java實現代碼以下:數據結構
import java.io.*;
import java.util.ArrayList;
import java.util.List;
public class WriteCMCC {
public static void main(String[] args) {
List<String> allLines = getCmcc(args[0]);
System.out.println(allLines.size());
writeCmcc(allLines, args[1]);
}
/**
* 一次性讀取cmcc中的數據
* @return 存放在list中
*/
private static List<String> getCmcc(String path) {
BufferedReader br = null;
List<String> allLines = new ArrayList<String>();
try {
br = new BufferedReader(new FileReader(new File(path)));
String line = "";
while ((line = br.readLine()) != null) {
allLines.add(line);
}
} catch (Exception e) {
e.printStackTrace();
}finally {
try {
if (br != null) br.close();
} catch (IOException e) {
e.printStackTrace();
}
}
return allLines;
}
/**
* 寫入cmcc中的數據,一次寫入一個list的數據集
*/
private static void writeCmcc(List<String> cmcc, String path) {
BufferedWriter bw = null;
try {
bw = new BufferedWriter(new FileWriter(new File(path)));
for(String line : cmcc) {
bw.write(line);
bw.flush();
Thread.sleep(1000);
bw.newLine();
}
} catch (Exception e) {
e.printStackTrace();
}finally {
try {
if (bw != null) bw.close();
} catch (IOException e) {
e.printStackTrace();
}
}
}
}
代碼寫好,而後測試完,而後打成jar包,丟到Linux準備運行。app
java -jar /home/soft/jar/write_cmcc_5_seconds.jar /home/soft/cmcc.log /home/soft/cmcc/cmcc_write.logide
2.flume編寫相應的conf去把數據抽取到kafka中(cmcc.conf)
先啓動zookeeper,啓動kafka並建立topic(cmcc):
zookeeper啓動命令:
/home/soft/zookeeper-3.4.6/bin/zkServer.sh start(每一個節點都須要啓動)
kafka啓動命令:
/home/soft/kafka_2.11-0.10.1.0/bin/kafka-server-start.sh /home/soft/kafka_2.11-0.10.1.0/config/server.properties &
kafka建立topic:
bin/kafka-topics.sh --zookeeper os1:2181,os2:2181,os3:2181 --create --topic cmcc --partitions 6 --replication-factor
kafka查看全部的topic:
bin/kafka-topics.sh --zookeeper os1:2181,os2:2181,os3:2181 --list
而後編寫conf測試(cmcc.conf):
a1.sources = s1
a1.channels = c1
#這裏先不使用該種方式去讀取文件,由於該方式flume會出以下的錯誤
#java.lang.IllegalStateException: File has been modified since being read: /home/soft/cmcc/cmcc_write.log
#緣由:出現這個問題的緣由是,當咱們拷貝一個文件的時候,一些對文件進行了修改
#解決:最好的方法就是,確保大文件徹底拷貝後,再讓flume來讀取,思路是將拷貝中的文件加上一個多餘的後綴,flume一開始不會讀取文件,當文件拷貝完成後去掉多餘的後綴,這個時候flume就會針對新文件進行讀取。
#a1.sources.s1.type =spooldir
#a1.sources.s1.spoolDir =/home/soft/cmcc
#a1.sources.s1.fileHeader= true
a1.sources.s1.type=exec
a1.sources.r1.command = tail -F /root/app_weichat_login.log
a1.channels.c1.type = org.apache.flume.channel.kafka.KafkaChannel
a1.channels.c1.kafka.bootstrap.servers = os1:9092,os2:9092,os3:9092
#建立好相應的topic
a1.channels.c1.kafka.topic = cmcc
#這個是本身定義的沒啥事情
a1.channels.c1.kafka.consumer.group.id = flume-consumer
#這個必定要設置,不然就是個坑,寫入到kafka中的數據會被追加進一些數據,並且仍是亂碼的
a1.channels.c1.parseAsFlumeEvent = false
#拼接source和channel
a1.sources.s1.channels=c1
flume啓動命令:下面的a1就對應着上面的a1(控制檯打印信息)
bin/flume-ng agent -n a1 -c conf -f conf/cmcc.conf -Dflume.root.logger=INFO,console
3.spark程序去讀取kafka的中的數據並將結果存放至redis中
啓動redis:/usr/local/redis/bin/redis-server /usr/local/redis/etc/redis.conf
程序相應的配置:resources -> application.conf
#kafka的相關參數 kafka.topic = "cmcc" kafka.broker.list="os1:9092,os2:9092,os3:9092" kafka.group.id="cmcc" redis.host="xxx.xxx.xxx.xxx" redis.db.index="0"
主程序代碼:scala -> BootStarpApp
package app import java.text.SimpleDateFormat import com.alibaba.fastjson.JSON import org.apache.spark.SparkConf import org.apache.spark.streaming.kafka010.{ConsumerStrategies, KafkaUtils, LocationStrategies} import org.apache.spark.streaming.{Seconds, StreamingContext} import utils.{AppParams, Jpools} object BootStarpApp { def main(args: Array[String]): Unit = { /** * 錯誤集: * 1.Caused by: org.apache.kafka.common.KafkaException: org.codehaus.jackson.map.deser.std.StringDeserializer is not an instance of org.apache.kafka.common.serialization.Deserializer * 錯誤解釋,kafka在進行序列化實例對象的時候出錯 * 查找緣由: * org.codehaus.jackson.map.deser.std.StringDeserializer是咱們AppParas中導入的類型,多是導錯了, * 查看後發現應該導入:import org.apache.kafka.common.serialization.StringDeserializer * 2. 程序出現INFO:Marking the coordinator os3:9092 (id: 2147483645 rack: null) dead for group cmcc_test2,且程序再也不執行下去 * 緣由:由於kafka-clent程序默認讀取到kafka上的信息以後將host:os3返回做爲主機節點去獲取數據,可是在本機中沒有配置相應的host與ip的映射,全部這裏就沒法直接進行訪問os3 * 解決辦法;在windows中配置相應的ip與hostname的映射(kafka中的broker節點) * 3.json解析出錯:error parse false * 緣由json格式錯誤 * * 4.flume的坑:a0.channels.c1.parseAsFlumeEvent = false 1.7之後默認爲true * 若是設置此項爲 true,Kafka Sink 則會把數據按照標準的 Flume Event 格式(即Headers域和body域結合的數據結構)發送。Flume Event 中的 Headers 域一般是一些附加字段,能夠是時間戳(好比時間戳攔截器指定的時間戳)、文件名(好比 spooldir Source 開啓的 fileHeader = true)等信息。可是 1.7.0 版本的 Flume 一旦開啓此配置,會致使 Headers 域裏面的信息亂碼 * * 5.flume異常崩潰 File has been modified since being read * 緣由:出現這個問題的緣由是,當咱們拷貝一個文件的時候,一些對文件進行了修改,就會出現這個錯誤 * 解決:最好的方法就是,確保大文件徹底拷貝後,再讓flume來讀取,思路是將拷貝中的文件加上一個多餘的後綴,flume一開始不會讀取文件,當文件拷貝完成後去掉多餘的後綴,這個時候flume就會針對新文件進行讀取。 * 另外針對大文件,flume的解決方案能夠設置一個文件完成後綴: */ val sparkConf = new SparkConf() sparkConf.setAppName("中國移動運營實時監控平臺") sparkConf.setMaster("local[*]") /** *將rdd以kryo的序列化保存,以減小內存的使用 */ sparkConf.set("spark.serializer","org.apache.spark.serializer.KryoSerializer") /** * 對rdd進行壓縮,使用內存空間換去處理時間的方式,減小內存的使用 */ sparkConf.set("spark.rdd.compress", "true") /** * */ sparkConf.set("spark.streaming.kafka.maxRatePerPartition", "100") /** * 進行優雅的中止程序 */ sparkConf.set("spark.streaming.stopGracefullyOnShutdown", "true") /** * 每兩秒執行一個批次 */ val ssc = new StreamingContext(sparkConf, Seconds(2)) /** * 獲取kafka的數據 * LocationStrategies:位置策略,若是kafka的broker節點與Excutor在同一臺機器上給一種策略,再也不一臺機器上給另外一種策略 * 設定策略以後會以最有的策略進行獲取數據 * 通常在企業中kafka節點與Excutor不會放到一臺機器的,緣由是kafka是消息存儲的,Executor是用來作消息計算的 * 所以計算與存儲須要分開,存儲對磁盤要求高,計算對內存和cpu的要求更高 * 若是Executor節點跟Broker的節點在一塊兒的話就使用PreferBrokers策略,再也不一塊兒的話就使用preferConsisent策略 * 使用preferConsisent策略的話,未來在kafka中拉去數據之後儘可能將數據分散到全部的Executor上 */ val stream = KafkaUtils.createDirectStream(ssc, LocationStrategies.PreferConsistent , ConsumerStrategies.Subscribe[String, String](AppParams.topic, AppParams.kafkaParams)) stream.foreachRDD(rdd => { /** * { * "bussinessRst": "0000", * "channelCode": "0705", * "chargefee": "10000", * "clientIp": "125.82.117.133", * "endReqTime": "20170412080609613", * "idType": "01", * "interFacRst": "0000", * "logOutTime": "20170412080609613", * "orderId": "384681890175026754", * "prodCnt": "1", * "provinceCode": "280", * "requestId": "20170412080450886738519397327610", * "retMsg": "成功", * "serverIp": "172.16.59.241", * "serverPort": "8088", * "serviceName": "sendRechargeReq", * "shouldfee": "9950", * "startReqTime": "20170412080609503", * "sysId": "15" * } */ /** * 業務邏輯: * serviceName:reChargeNotifyReq,則爲充值通知的記錄 * requestId:包含充值的日期(訂單開始時間) * bussinessRst:是否成功 0000 爲成功,其餘爲不成功 * chargefee:充值的金額 * receiveNotifyTime:訂單結束時間 * */ /** * 咱們能夠經過serviceName字段來肯定,若是該字段是reChargeNotifyReq則表明該條數據是充值通知部分的數據。 * 獲取全部的充值通知日誌 */ val baseData = rdd.map(cr => { print(cr.value()) JSON.parseObject(cr.value()) }).filter(obj => obj.getString("serviceName").equalsIgnoreCase("reChargeNotifyReq")).cache() /** * 獲取天天充值成功的訂單筆數 * 回憶: * wordcount flatMap-》map-》reduceByKey */ val totalSucc = baseData.map(obj=> { //獲取日期 val reqId = obj.getString("requestId") //獲取日期 val day = reqId.substring(0, 8) //取出該條充值是否成功的標誌 val result = obj.getString("bussinessRst") val flag = if(result.equals("0000")) 1 else 0 (day, flag) }).reduceByKey(_+_) /** * 獲取充值成功的訂單金額 */ val totalMoney = baseData.map(obj=> { val reqId = obj.getString("requestId") //獲取日期 val day = reqId.substring(0, 8) //去除該條充值是否成功的標記 val result = obj.getString("bussinessRst") val fee = if(result.equals("0000")) obj.getString("chargefee").toDouble else 0 (day, fee) }).reduceByKey(_+_) //總訂單數 val total = baseData.count() /** * 獲取充值成功的充值時長 */ val totalTime = baseData.map(obj=> { var reqId = obj.getString("requestId") //獲取日期 val day = reqId.substring(0, 8) //取出該條充值是否成功的標示 val result = obj.getString("bussinessRst") //時間格式爲:yyyyMMddHHmissSSS val endTime = obj.getString("receiveNotifyTime") val startTime = reqId.substring(0, 17) val format = new SimpleDateFormat("yyyyMMddHHmissSSS") val cost = if(result.equals("0000")) format.parse(endTime).getTime - format.parse(startTime).getTime else 0 (day, cost) }).reduceByKey(_+_) /** * 將數據存儲到redis中: * (CMCC-20170412,35) */ totalSucc.foreachPartition(itr=> { val jedis = Jpools.getJedis itr.foreach(tp => { // print("CMCC-"+tp._1, tp._2) jedis.incrBy("CMCC-"+tp._1, tp._2) }) }) }) ssc.start() ssc.awaitTermination() } }
兩個工具類:
package utils import com.typesafe.config.ConfigFactory import org.apache.kafka.common.serialization.StringDeserializer object AppParams { /**Scala中使用關鍵字lazy來定義惰性變量,實現延遲加載(懶加載)。 惰性變量只能是不可變變量,而且只有在調用惰性變量時,纔會去實例化這個變量。 load中能夠指定相應的配置文件,可是不指定的狀況下默認去讀取resources下的application.conf文件 默認規則:application.conf->application.json->application.properties **/ private lazy val config = ConfigFactory.load() val redisHost = config.getString("redis.host") val selectDBIndex = config.getInt("redis.db.index") /** * 返回訂閱的主題 */ val topic = config.getString("kafka.topic").split(",") /** * kafka集羣所在的主機和端口 */ val brokers:String = config.getString("kafka.broker.list") /** * 消費者的id */ val groupId = config.getString("kafka.group.id") /** * 將kafka的相關參數進行分裝到map中 */ val kafkaParams = Map[String, Object]( "bootstrap.servers" -> brokers, "key.deserializer" -> classOf[StringDeserializer], "value.deserializer"-> classOf[StringDeserializer], "group.id"-> groupId, "auto.offset.reset" -> "earliest", "enable.auto.commit" -> "false" ) }
package utils import org.apache.commons.pool2.impl.GenericObjectPoolConfig import redis.clients.jedis.JedisPool /** * 建立一個redis的線程池 */ object Jpools { private val poolConfig = new GenericObjectPoolConfig poolConfig.setMaxIdle(5) //最大的空閒鏈接數爲5,鏈接池中最大的空閒鏈接數,默認是8 poolConfig.setMaxTotal(2000) //最大支持的鏈接數量,默認也是8 //鏈接池是私有的,不能對外進行公開訪問 private lazy val jedisPool = new JedisPool(poolConfig, AppParams.redisHost) def getJedis = { val jedis = jedisPool.getResource jedis.select(AppParams.selectDBIndex) jedis } }
pom文件
<dependencies> <dependency> <groupId>org.apache.spark</groupId> <artifactId>spark-core_2.11</artifactId> <version>2.2.0</version> </dependency> <dependency> <groupId>org.apache.spark</groupId> <artifactId>spark-sql_2.11</artifactId> <version>2.2.0</version> </dependency> <dependency> <groupId>com.typesafe</groupId> <artifactId>config</artifactId> <version>1.3.3</version> </dependency> <dependency> <groupId>org.apache.spark</groupId> <artifactId>spark-streaming_2.11</artifactId> <version>2.2.0</version> </dependency> <!-- 導入kafka的依賴--> <!-- <dependency> <groupId>org.apache.kafka</groupId> <artifactId>kafka_2.11</artifactId> <version>0.10.1.0</version> </dependency>--> <!-- 指定kafka-client API的版本--> <dependency> <groupId>org.apache.kafka</groupId> <artifactId>kafka-clients</artifactId> <version>0.10.1.0</version> </dependency> <!-- 導入spark streaming 與kafka的依賴包--> <dependency> <groupId>org.apache.spark</groupId> <artifactId>spark-streaming-kafka-0-10_2.11</artifactId> <version>2.2.0</version> </dependency> <dependency> <groupId>com.alibaba</groupId> <artifactId>fastjson</artifactId> <version>1.2.46</version> </dependency> <dependency> <groupId>redis.clients</groupId> <artifactId>jedis</artifactId> <version>2.9.0</version> </dependency> </dependencies>
問題總結:
1.json格式的轉換 (已解決)
2.flume讀取數據到kafka後數據亂碼增多問題(已解決)
3.flume spooldir 讀取文件的同時對文件更改形成的java.lang.IllegalStateException:File has been modified since being read:問題 (待解決)
4.上述spark主程序代碼優化問題 (待解決)