[TOC]html
前面在個人另外一篇文章中《大數據採集、清洗、處理:使用MapReduce進行離線數據分析完整案例》中已經有說起到,這裏依然給出下面的圖示:前端
前面給出的那篇文章是基於MapReduce
的離線數據分析案例,其經過對網站產生的用戶訪問日誌進行處理並分析出該網站在某天的PV、UV等數據,對應上面的圖示,其走的就是離線處理的數據處理方式,而這裏即將要介紹的是另一條路線的數據處理方式,即基於Storm的在線處理,在下面給出的完整案例中,咱們將會完成下面的幾項工做:java
若是你對上面說起的大數據組件已經有所認識,或者對如何構建大數據實時處理系統感興趣,那麼就能夠盡情閱讀下面的內容了。git
須要注意的是,核心在於如何構建實時處理系統,而這裏給出的案例是實時統計某個網站的PV、UV,在實際中,基於每一個人的工做環境不一樣,業務不一樣,所以業務系統的複雜度也不盡相同,相對來講,這裏統計PV、UV的業務是比較簡單的,但也足夠讓咱們對大數據實時處理系統有一個基本的、清晰的瞭解與認識,是的,它再也不那麼神祕了。github
咱們的實時處理系統總體架構以下:redis
即從上面的架構中咱們能夠看出,其由下面的幾部分構成:shell
從構建實時處理系統的角度出發,咱們須要作的是,如何讓數據在各個不一樣的集羣系統之間打通(從上面的圖示中也能很好地說明這一點),即須要作各個系統以前的整合,包括Flume與Kafka的整合,Kafka與Storm的整合。固然,各個環境是否使用集羣,依我的的實際須要而定,在咱們的環境中,Flume、Kafka、Storm都使用集羣。數據庫
對於Flume而言,關鍵在於如何採集數據,而且將其發送到Kafka上,而且因爲咱們這裏了使用Flume集羣的方式,Flume集羣的配置也是十分關鍵的。而對於Kafka,關鍵就是如何接收來自Flume的數據。從總體上講,邏輯應該是比較簡單的,便可以在Kafka中建立一個用於咱們實時處理系統的topic,而後Flume將其採集到的數據發送到該topic上便可。apache
在咱們的場景中,兩個Flume Agent分別部署在兩臺Web服務器上,用來採集Web服務器上的日誌數據,而後其數據的下沉方式都爲發送到另一個Flume Agent上,因此這裏咱們須要配置三個Flume Agent.後端
該Flume Agent部署在一臺Web服務器上,用來採集產生的Web日誌,而後發送到Flume Consolidation Agent上,建立一個新的配置文件flume-sink-avro.conf
,其配置內容以下:
######################################################### ## ##主要做用是監聽文件中的新增數據,採集到數據以後,輸出到avro ## 注意:Flume agent的運行,主要就是配置source channel sink ## 下面的a1就是agent的代號,source叫r1 channel叫c1 sink叫k1 ######################################################### a1.sources = r1 a1.sinks = k1 a1.channels = c1 #對於source的配置描述 監聽文件中的新增數據 exec a1.sources.r1.type = exec a1.sources.r1.command = tail -F /home/uplooking/data/data-clean/data-access.log #對於sink的配置描述 使用avro日誌作數據的消費 a1.sinks.k1.type = avro a1.sinks.k1.hostname = uplooking03 a1.sinks.k1.port = 44444 #對於channel的配置描述 使用文件作數據的臨時緩存 這種的安全性要高 a1.channels.c1.type = file a1.channels.c1.checkpointDir = /home/uplooking/data/flume/checkpoint a1.channels.c1.dataDirs = /home/uplooking/data/flume/data #經過channel c1將source r1和sink k1關聯起來 a1.sources.r1.channels = c1 a1.sinks.k1.channel = c1
配置完成後, 啓動Flume Agent,便可對日誌文件進行監聽:
$ flume-ng agent --conf conf -n a1 -f app/flume/conf/flume-sink-avro.conf >/dev/null 2>&1 &
該Flume Agent部署在一臺Web服務器上,用來採集產生的Web日誌,而後發送到Flume Consolidation Agent上,建立一個新的配置文件flume-sink-avro.conf
,其配置內容以下:
######################################################### ## ##主要做用是監聽文件中的新增數據,採集到數據以後,輸出到avro ## 注意:Flume agent的運行,主要就是配置source channel sink ## 下面的a1就是agent的代號,source叫r1 channel叫c1 sink叫k1 ######################################################### a1.sources = r1 a1.sinks = k1 a1.channels = c1 #對於source的配置描述 監聽文件中的新增數據 exec a1.sources.r1.type = exec a1.sources.r1.command = tail -F /home/uplooking/data/data-clean/data-access.log #對於sink的配置描述 使用avro日誌作數據的消費 a1.sinks.k1.type = avro a1.sinks.k1.hostname = uplooking03 a1.sinks.k1.port = 44444 #對於channel的配置描述 使用文件作數據的臨時緩存 這種的安全性要高 a1.channels.c1.type = file a1.channels.c1.checkpointDir = /home/uplooking/data/flume/checkpoint a1.channels.c1.dataDirs = /home/uplooking/data/flume/data #經過channel c1將source r1和sink k1關聯起來 a1.sources.r1.channels = c1 a1.sinks.k1.channel = c1
配置完成後, 啓動Flume Agent,便可對日誌文件進行監聽:
$ flume-ng agent --conf conf -n a1 -f app/flume/conf/flume-sink-avro.conf >/dev/null 2>&1 &
該Flume Agent用於接收其它兩個Agent發送過來的數據,而後將其發送到Kafka上,建立一個新的配置文件flume-source_avro-sink_kafka.conf
,配置內容以下:
######################################################### ## ##主要做用是監聽目錄中的新增文件,採集到數據以後,輸出到kafka ## 注意:Flume agent的運行,主要就是配置source channel sink ## 下面的a1就是agent的代號,source叫r1 channel叫c1 sink叫k1 ######################################################### a1.sources = r1 a1.sinks = k1 a1.channels = c1 #對於source的配置描述 監聽avro a1.sources.r1.type = avro a1.sources.r1.bind = 0.0.0.0 a1.sources.r1.port = 44444 #對於sink的配置描述 使用kafka作數據的消費 a1.sinks.k1.type = org.apache.flume.sink.kafka.KafkaSink a1.sinks.k1.topic = f-k-s a1.sinks.k1.brokerList = uplooking01:9092,uplooking02:9092,uplooking03:9092 a1.sinks.k1.requiredAcks = 1 a1.sinks.k1.batchSize = 20 #對於channel的配置描述 使用內存緩衝區域作數據的臨時緩存 a1.channels.c1.type = memory a1.channels.c1.capacity = 1000 a1.channels.c1.transactionCapacity = 100 #經過channel c1將source r1和sink k1關聯起來 a1.sources.r1.channels = c1 a1.sinks.k1.channel = c1
配置完成後, 啓動Flume Agent,便可對avro的數據進行監聽:
$ flume-ng agent --conf conf -n a1 -f app/flume/conf/flume-source_avro-sink_kafka.conf >/dev/null 2>&1 &
在咱們的Kafka
中,先建立一個topic
,用於後面接收Flume
採集過來的數據:
kafka-topics.sh --create --topic f-k-s --zookeeper uplooking01:2181,uplooking02:2181,uplooking03:2181 --partitions 3 --replication-factor 3
啓動Kafka
的消費腳本:
$ kafka-console-consumer.sh --topic f-k-s --zookeeper uplooking01:2181,uplooking02:2181,uplooking03:2181
若是在Web服務器上有新增的日誌數據,就會被咱們的Flume程序監聽到,而且最終會傳輸到到Kafka的f-k-s
topic中,這裏做爲驗證,咱們上面啓動的是一個kafka終端消費的腳本,這時會在終端中看到數據的輸出:
$ kafka-console-consumer.sh --topic f-k-s --zookeeper uplooking01:2181,uplooking02:2181,uplooking03:2181 1003 221.8.9.6 80 0f57c8f5-13e2-428d-ab39-9e87f6e85417 10709 0 GET /index HTTP/1.1 null null Mozilla/5.0 (Windows; U; Windows NT 5.2)Gecko/2008070208 Firefox/3.0.1 1523107496164 1002 220.194.55.244 fb953d87-d166-4cb4-8a64-de7ddde9054c 10201 0 GET /check/detail HTTP/1.1 null null Mozilla/5.0 (Windows NT 6.1; WOW64; Trident/7.0; rv:11.0) like Gecko 1523107497165 1003 211.167.248.22 9d7bb7c2-00bf-4102-9c8c-3d49b18d1b48 10022 1 GET /user/add HTTP/1.1 null null Mozilla/4.0 (compatible; MSIE 8.0; Windows NT6.0) 1523107496664 1002 61.172.249.96 null 10608 0 POST /updateById?id=21 HTTP/1.1 null null Mozilla/5.0 (Windows NT 6.1; WOW64; Trident/7.0; rv:11.0) like Gecko 1523107498166 1000 202.98.11.101 aa7f62b3-a6a1-44ef-81f5-5e71b5c61368 20202 0 GET /getDataById HTTP/1.0 404 /check/init Mozilla/5.0 (Windows; U; Windows NT 5.1)Gecko/20070803 Firefox/1.5.0.12 1523107497666
這樣的話,咱們的整合就沒有問題,固然kafka中的數據應該是由咱們的storm來進行消費的,這裏只是做爲整合的一個測試,下面就會來作kafka+storm的整合。
Kafka和Storm的整合其實在Storm的官網上也有很是詳細清晰的文檔:http://storm.apache.org/releases/1.0.6/storm-kafka.html,想對其有更多瞭解的同窗能夠參考一下。
在此次的大數據實時處理系統的構建中,Kafka至關因而做爲消息隊列(或者說是消息中間件)的角色,其產生的消息須要有消費者去消費,因此Kafka與Storm的整合,關鍵在於咱們的Storm如何去消費Kafka消息topic中的消息(kafka消息topic中的消息正是由Flume採集而來,如今咱們須要在Storm中對其進行消費)。
在Storm中,topology是很是關鍵的概念。
對比MapReduce,在MapReduce中,咱們提交的做業稱爲一個job,在一個Job中,又包含若干個Mapper和Reducer,正是在Mapper和Reducer中有咱們對數據的處理邏輯:
在Storm中,咱們提交的一個做業稱爲topology,其又包含了spout和bolt,在Storm中,對數據的處理邏輯正是在spout和bolt中體現:
即在spout中,正是咱們數據的來源,又由於其實時的特性,因此能夠把它比做一個「水龍頭」,表示其源源不斷地產生數據:
因此,問題的關鍵是spout如何去獲取來自kafka的數據?
好在,storm-kafka
的整合庫中提供了這樣的API來供咱們進行操做。
在代碼的邏輯中只須要建立一個由storm-kafka
API提供的KafkaSpout
對象便可:
SpoutConfig spoutConf = new SpoutConfig(hosts, topic, zkRoot, id); return new KafkaSpout(spoutConf);
下面給出完整的整合代碼:
package cn.xpleaf.bigdata.storm.statics; import kafka.api.OffsetRequest; import org.apache.storm.Config; import org.apache.storm.LocalCluster; import org.apache.storm.StormSubmitter; import org.apache.storm.generated.StormTopology; import org.apache.storm.kafka.BrokerHosts; import org.apache.storm.kafka.KafkaSpout; import org.apache.storm.kafka.SpoutConfig; import org.apache.storm.kafka.ZkHosts; import org.apache.storm.topology.BasicOutputCollector; import org.apache.storm.topology.OutputFieldsDeclarer; import org.apache.storm.topology.TopologyBuilder; import org.apache.storm.topology.base.BaseBasicBolt; import org.apache.storm.tuple.Tuple; /** * Kafka和storm的整合,用於統計實時流量對應的pv和uv */ public class KafkaStormTopology { // static class MyKafkaBolt extends BaseRichBolt { static class MyKafkaBolt extends BaseBasicBolt { /** * kafkaSpout發送的字段名爲bytes */ @Override public void execute(Tuple input, BasicOutputCollector collector) { byte[] binary = input.getBinary(0); // 跨jvm傳輸數據,接收到的是字節數據 // byte[] bytes = input.getBinaryByField("bytes"); // 這種方式也行 String line = new String(binary); System.out.println(line); } @Override public void declareOutputFields(OutputFieldsDeclarer declarer) { } } public static void main(String[] args) throws Exception { TopologyBuilder builder = new TopologyBuilder(); /** * 設置spout和bolt的dag(有向無環圖) */ KafkaSpout kafkaSpout = createKafkaSpout(); builder.setSpout("id_kafka_spout", kafkaSpout); builder.setBolt("id_kafka_bolt", new MyKafkaBolt()) .shuffleGrouping("id_kafka_spout"); // 經過不一樣的數據流轉方式,來指定數據的上游組件 // 使用builder構建topology StormTopology topology = builder.createTopology(); String topologyName = KafkaStormTopology.class.getSimpleName(); // 拓撲的名稱 Config config = new Config(); // Config()對象繼承自HashMap,但自己封裝了一些基本的配置 // 啓動topology,本地啓動使用LocalCluster,集羣啓動使用StormSubmitter if (args == null || args.length < 1) { // 沒有參數時使用本地模式,有參數時使用集羣模式 LocalCluster localCluster = new LocalCluster(); // 本地開發模式,建立的對象爲LocalCluster localCluster.submitTopology(topologyName, config, topology); } else { StormSubmitter.submitTopology(topologyName, config, topology); } } /** * BrokerHosts hosts kafka集羣列表 * String topic 要消費的topic主題 * String zkRoot kafka在zk中的目錄(會在該節點目錄下記錄讀取kafka消息的偏移量) * String id 當前操做的標識id */ private static KafkaSpout createKafkaSpout() { String brokerZkStr = "uplooking01:2181,uplooking02:2181,uplooking03:2181"; BrokerHosts hosts = new ZkHosts(brokerZkStr); // 經過zookeeper中的/brokers便可找到kafka的地址 String topic = "f-k-s"; String zkRoot = "/" + topic; String id = "consumer-id"; SpoutConfig spoutConf = new SpoutConfig(hosts, topic, zkRoot, id); // 本地環境設置以後,也能夠在zk中創建/f-k-s節點,在集羣環境中,不用配置也能夠在zk中創建/f-k-s節點 //spoutConf.zkServers = Arrays.asList(new String[]{"uplooking01", "uplooking02", "uplooking03"}); //spoutConf.zkPort = 2181; spoutConf.startOffsetTime = OffsetRequest.LatestTime(); // 設置以後,剛啓動時就不會把以前的消費也進行讀取,會從最新的偏移量開始讀取 return new KafkaSpout(spoutConf); } }
其實代碼的邏輯很是簡單,咱們只建立了 一個由storm-kafka
提供的KafkaSpout
對象和一個包含咱們處理邏輯的MyKafkaBolt
對象,MyKafkaBolt
的邏輯也很簡單,就是把kafka的消息打印到控制檯上。
須要注意的是,後面咱們分析網站PV、UV的工做,正是在上面這部分簡單的代碼中完成的,因此其是很是重要的基礎。
上面的整合代碼,能夠在本地環境中運行,也能夠將其打包成jar包上傳到咱們的Storm集羣中並提交業務來運行。若是Web服務器可以產生日誌,而且前面Flume+Kafka的整合也沒有問題的話,將會有下面的效果。
若是是在本地環境中運行上面的代碼,那麼能夠在控制檯中看到日誌數據的輸出:
...... 45016548 [Thread-16-id_kafka_spout-executor[3 3]] INFO o.a.s.k.ZkCoordinator - Task [1/1] Refreshing partition manager connections 45016552 [Thread-16-id_kafka_spout-executor[3 3]] INFO o.a.s.k.DynamicBrokersReader - Read partition info from zookeeper: GlobalPartitionInformation{topic=f-k-s, partitionMap={0=uplooking02:9092, 1=uplooking03:9092, 2=uplooking01:9092}} 45016552 [Thread-16-id_kafka_spout-executor[3 3]] INFO o.a.s.k.KafkaUtils - Task [1/1] assigned [Partition{host=uplooking02:9092, topic=f-k-s, partition=0}, Partition{host=uplooking03:9092, topic=f-k-s, partition=1}, Partition{host=uplooking01:9092, topic=f-k-s, partition=2}] 45016552 [Thread-16-id_kafka_spout-executor[3 3]] INFO o.a.s.k.ZkCoordinator - Task [1/1] Deleted partition managers: [] 45016552 [Thread-16-id_kafka_spout-executor[3 3]] INFO o.a.s.k.ZkCoordinator - Task [1/1] New partition managers: [] 45016552 [Thread-16-id_kafka_spout-executor[3 3]] INFO o.a.s.k.ZkCoordinator - Task [1/1] Finished refreshing 1003 221.8.9.6 80 0f57c8f5-13e2-428d-ab39-9e87f6e85417 10709 0 GET /index HTTP/1.1 null null Mozilla/5.0 (Windows; U; Windows NT 5.2)Gecko/2008070208 Firefox/3.0.1 1523107496164 1000 202.98.11.101 aa7f62b3-a6a1-44ef-81f5-5e71b5c61368 20202 0 GET /getDataById HTTP/1.0 404 /check/init Mozilla/5.0 (Windows; U; Windows NT 5.1)Gecko/20070803 Firefox/1.5.0.12 1523107497666 1002 220.194.55.244 fb953d87-d166-4cb4-8a64-de7ddde9054c 10201 0 GET /check/detail HTTP/1.1 null null Mozilla/5.0 (Windows NT 6.1; WOW64; Trident/7.0; rv:11.0) like Gecko 1523107497165 1003 211.167.248.22 9d7bb7c2-00bf-4102-9c8c-3d49b18d1b48 10022 1 GET /user/add HTTP/1.1 null null Mozilla/4.0 (compatible; MSIE 8.0; Windows NT6.0) 1523107496664 1002 61.172.249.96 null 10608 0 POST /updateById?id=21 HTTP/1.1 null null Mozilla/5.0 (Windows NT 6.1; WOW64; Trident/7.0; rv:11.0) like Gecko 1523107498166 ......
若是是在Storm集羣中提交的做業運行,那麼也能夠在Storm的日誌中看到Web服務器產生的日誌數據:
這樣的話就完成了Kafka+Storm的整合。
其實所謂Storm和Redis的整合,指的是在咱們的實時處理系統中的數據的落地方式,即在Storm中包含了咱們處理數據的邏輯,而數據處理完畢後,產生的數據處理結果該保存到什麼地方呢?顯然就有不少種方式了,關係型數據庫、NoSQL、HDFS、HBase等,這應該取決於具體的業務和數據量,在這裏,咱們使用Redis來進行最後分析數據的存儲。
因此實際上作這一步的整合,其實就是開始寫咱們的業務處理代碼了,由於經過前面Flume-Kafka-Storm的整合,已經打通了整個數據的流通路徑,接下來關鍵要作的是,在Storm中,如何處理咱們的數據並保存到Redis中。
而在Storm中,spout已經不須要咱們來寫了(由storm-kafka
的API提供了KafkaSpout
對象),因此問題就變成,如何根據業務編寫分析處理數據的bolt。
咱們實時獲取的日誌格式以下:
1002 202.103.24.68 1976dc2e-f03a-44f0-892f-086d85105f7e 14549 1 GET /top HTTP/1.1 200 /tologin Mozilla/5.0 (Windows; U; Windows NT 5.2)AppleWebKit/525.13 (KHTML, like Gecko) Version/3.1Safari/525.13 1523806916373 1000 221.8.9.6 80 542ccf0a-9b14-49a0-93cd-891d87ddabf3 12472 1 GET /index HTTP/1.1 500 /top Mozilla/4.0 (compatible; MSIE 5.0; WindowsNT) 1523806916874 1003 211.167.248.22 0e4c1875-116c-400e-a4f8-47a46ad04a42 12536 0 GET /tologin HTTP/1.1 200 /stat Mozilla/5.0 (Windows; U; Windows NT 5.2) AppleWebKit/525.13 (KHTML,like Gecko) Chrome/0.2.149.27 Safari/525.13 1523806917375 1000 219.147.198.230 07eebc1a-740b-4dac-b53f-bb242a45c901 11847 1 GET /userList HTTP/1.1 200 /top Mozilla/4.0 (compatible; MSIE 6.0; Windows NT5.1) 1523806917876 1001 222.172.200.68 4fb35ced-5b30-483b-9874-1d5917286675 13550 1 GET /getDataById HTTP/1.0 504 /tologin Mozilla/5.0 (Windows; U; Windows NT 5.2)AppleWebKit/525.13 (KHTML, like Gecko) Version/3.1Safari/525.13 1523806918377
其中須要說明的是第二個字段和第三個字段,由於它對咱們統計pv和uv很是有幫助,它們分別是ip字段和mid字段,說明以下:
ip:用戶的IP地址 mid:惟一的id,此id第一次會種在瀏覽器的cookie裏。若是存在則再也不種。做爲瀏覽器惟一標示。移動端或者pad直接取機器碼。
所以,根據IP地址,咱們能夠經過查詢獲得其所在的省份,而且建立一個屬於該省份的變量,用於記錄pv數,每來一條屬於該省份的日誌記錄,則該省份的pv就加1,以此來完成pv的統計。
而對於mid,咱們則能夠建立屬於該省的一個set集合,每來一條屬於該省份的日誌記錄,則能夠將該mid添加到set集合中,由於set集合存放的是不重複的數據,這樣就能夠幫咱們自動過濾掉重複的mid,根據set集合的大小,就能夠統計出uv。
在咱們storm的業務處理代碼中,咱們須要編寫兩個bolt:
固然上面只是說明了總體的思路,實際上還有不少須要注意的細節問題和技巧問題,這都在咱們的代碼中進行體現,我在後面寫的代碼中都加了很是詳細的註釋進行說明。
根據上面的分析,編寫用於數據預處理的bolt,代碼以下:
package cn.xpleaf.bigdata.storm.statistic; import cn.xpleaf.bigdata.storm.utils.JedisUtil; import org.apache.storm.topology.BasicOutputCollector; import org.apache.storm.topology.OutputFieldsDeclarer; import org.apache.storm.topology.base.BaseBasicBolt; import org.apache.storm.tuple.Fields; import org.apache.storm.tuple.Tuple; import org.apache.storm.tuple.Values; import redis.clients.jedis.Jedis; /** * 日誌數據預處理Bolt,實現功能: * 1.提取實現業務需求所須要的信息:ip地址、客戶端惟一標識mid * 2.查詢IP地址所屬地,併發送到下一個Bolt */ public class ConvertIPBolt extends BaseBasicBolt { @Override public void execute(Tuple input, BasicOutputCollector collector) { byte[] binary = input.getBinary(0); String line = new String(binary); String[] fields = line.split("\t"); if(fields == null || fields.length < 10) { return; } // 獲取ip和mid String ip = fields[1]; String mid = fields[2]; // 根據ip獲取其所屬地(省份) String province = null; if (ip != null) { Jedis jedis = JedisUtil.getJedis(); province = jedis.hget("ip_info_en", ip); // 須要釋放jedis的資源,不然會報can not get resource from the pool JedisUtil.returnJedis(jedis); } // 發送數據到下一個bolt,只發送實現業務功能須要的province和mid collector.emit(new Values(province, mid)); } /** * 定義了發送到下一個bolt的數據包含兩個域:province和mid */ @Override public void declareOutputFields(OutputFieldsDeclarer declarer) { declarer.declare(new Fields("province", "mid")); } }
這個bolt包含咱們統計網站pv、uv的代碼邏輯,所以很是重要,其代碼以下:
package cn.xpleaf.bigdata.storm.statistic; import cn.xpleaf.bigdata.storm.utils.JedisUtil; import org.apache.storm.Config; import org.apache.storm.Constants; import org.apache.storm.topology.BasicOutputCollector; import org.apache.storm.topology.OutputFieldsDeclarer; import org.apache.storm.topology.base.BaseBasicBolt; import org.apache.storm.tuple.Tuple; import redis.clients.jedis.Jedis; import java.text.SimpleDateFormat; import java.util.*; /** * 日誌數據統計Bolt,實現功能: * 1.統計各省份的PV、UV * 2.以天爲單位,將省份對應的PV、UV信息寫入Redis */ public class StatisticBolt extends BaseBasicBolt { Map<String, Integer> pvMap = new HashMap<>(); Map<String, HashSet<String>> midsMap = null; SimpleDateFormat sdf = new SimpleDateFormat("yyyyMMdd"); @Override public void execute(Tuple input, BasicOutputCollector collector) { if (!input.getSourceComponent().equalsIgnoreCase(Constants.SYSTEM_COMPONENT_ID)) { // 若是收到非系統級別的tuple,統計信息到局部變量mids String province = input.getStringByField("province"); String mid = input.getStringByField("mid"); pvMap.put(province, pvMap.get(province) + 1); // pv+1 if(mid != null) { midsMap.get(province).add(mid); // 將mid添加到該省份所對應的set中 } } else { // 若是收到系統級別的tuple,則將數據更新到Redis中,釋放JVM堆內存空間 /* * 以 廣東 爲例,其在Redis中保存的數據格式以下: * guangdong_pv(Redis數據結構爲hash) * --20180415 * --pv數 * --20180416 * --pv數 * guangdong_mids_20180415(Redis數據結構爲set) * --mid * --mid * --mid * ...... * guangdong_mids_20180415(Redis數據結構爲set) * --mid * --mid * --mid * ...... */ Jedis jedis = JedisUtil.getJedis(); String dateStr = sdf.format(new Date()); // 更新pvMap數據到Redis中 String pvKey = null; for(String province : pvMap.keySet()) { int currentPv = pvMap.get(province); if(currentPv > 0) { // 當前map中的pv大於0才更新,不然沒有意義 pvKey = province + "_pv"; String oldPvStr = jedis.hget(pvKey, dateStr); if(oldPvStr == null) { oldPvStr = "0"; } Long oldPv = Long.valueOf(oldPvStr); jedis.hset(pvKey, dateStr, oldPv + currentPv + ""); pvMap.replace(province, 0); // 將該省的pv從新設置爲0 } } // 更新midsMap到Redis中 String midsKey = null; HashSet<String> midsSet = null; for(String province: midsMap.keySet()) { midsSet = midsMap.get(province); if(midsSet.size() > 0) { // 當前省份的set的大小大於0才更新到,不然沒有意義 midsKey = province + "_mids_" + dateStr; jedis.sadd(midsKey, midsSet.toArray(new String[midsSet.size()])); midsSet.clear(); } } // 釋放jedis資源 JedisUtil.returnJedis(jedis); System.out.println(System.currentTimeMillis() + "------->寫入數據到Redis"); } } @Override public void declareOutputFields(OutputFieldsDeclarer declarer) { } /** * 設置定時任務,只對當前bolt有效,系統會定時向StatisticBolt發送一個系統級別的tuple */ @Override public Map<String, Object> getComponentConfiguration() { Map<String, Object> config = new HashMap<>(); config.put(Config.TOPOLOGY_TICK_TUPLE_FREQ_SECS, 10); return config; } /** * 初始化各個省份的pv和mids信息(用來臨時存儲統計pv和uv須要的數據) */ public StatisticBolt() { pvMap = new HashMap<>(); midsMap = new HashMap<String, HashSet<String>>(); String[] provinceArray = {"shanxi", "jilin", "hunan", "hainan", "xinjiang", "hubei", "zhejiang", "tianjin", "shanghai", "anhui", "guizhou", "fujian", "jiangsu", "heilongjiang", "aomen", "beijing", "shaanxi", "chongqing", "jiangxi", "guangxi", "gansu", "guangdong", "yunnan", "sicuan", "qinghai", "xianggang", "taiwan", "neimenggu", "henan", "shandong", "shanghai", "hebei", "liaoning", "xizang"}; for(String province : provinceArray) { pvMap.put(province, 0); midsMap.put(province, new HashSet()); } } }
咱們須要編寫一個topology用來組織前面編寫的Bolt,代碼以下:
package cn.xpleaf.bigdata.storm.statistic; import kafka.api.OffsetRequest; import org.apache.storm.Config; import org.apache.storm.LocalCluster; import org.apache.storm.StormSubmitter; import org.apache.storm.generated.StormTopology; import org.apache.storm.kafka.BrokerHosts; import org.apache.storm.kafka.KafkaSpout; import org.apache.storm.kafka.SpoutConfig; import org.apache.storm.kafka.ZkHosts; import org.apache.storm.topology.TopologyBuilder; /** * 構建topology */ public class StatisticTopology { public static void main(String[] args) throws Exception { TopologyBuilder builder = new TopologyBuilder(); /** * 設置spout和bolt的dag(有向無環圖) */ KafkaSpout kafkaSpout = createKafkaSpout(); builder.setSpout("id_kafka_spout", kafkaSpout); builder.setBolt("id_convertIp_bolt", new ConvertIPBolt()).shuffleGrouping("id_kafka_spout"); // 經過不一樣的數據流轉方式,來指定數據的上游組件 builder.setBolt("id_statistic_bolt", new StatisticBolt()).shuffleGrouping("id_convertIp_bolt"); // 經過不一樣的數據流轉方式,來指定數據的上游組件 // 使用builder構建topology StormTopology topology = builder.createTopology(); String topologyName = KafkaStormTopology.class.getSimpleName(); // 拓撲的名稱 Config config = new Config(); // Config()對象繼承自HashMap,但自己封裝了一些基本的配置 // 啓動topology,本地啓動使用LocalCluster,集羣啓動使用StormSubmitter if (args == null || args.length < 1) { // 沒有參數時使用本地模式,有參數時使用集羣模式 LocalCluster localCluster = new LocalCluster(); // 本地開發模式,建立的對象爲LocalCluster localCluster.submitTopology(topologyName, config, topology); } else { StormSubmitter.submitTopology(topologyName, config, topology); } } /** * BrokerHosts hosts kafka集羣列表 * String topic 要消費的topic主題 * String zkRoot kafka在zk中的目錄(會在該節點目錄下記錄讀取kafka消息的偏移量) * String id 當前操做的標識id */ private static KafkaSpout createKafkaSpout() { String brokerZkStr = "uplooking01:2181,uplooking02:2181,uplooking03:2181"; BrokerHosts hosts = new ZkHosts(brokerZkStr); // 經過zookeeper中的/brokers便可找到kafka的地址 String topic = "f-k-s"; String zkRoot = "/" + topic; String id = "consumer-id"; SpoutConfig spoutConf = new SpoutConfig(hosts, topic, zkRoot, id); // 本地環境設置以後,也能夠在zk中創建/f-k-s節點,在集羣環境中,不用配置也能夠在zk中創建/f-k-s節點 //spoutConf.zkServers = Arrays.asList(new String[]{"uplooking01", "uplooking02", "uplooking03"}); //spoutConf.zkPort = 2181; spoutConf.startOffsetTime = OffsetRequest.LatestTime(); // 設置以後,剛啓動時就不會把以前的消費也進行讀取,會從最新的偏移量開始讀取 return new KafkaSpout(spoutConf); } }
將上面的程序打包成jar包,並上傳到咱們的集羣提交業務後,若是前面的整合沒有問題,而且Web服務也有Web日誌產生,那麼一段時間後,咱們就能夠在Redis數據庫中看到數據的最終處理結果,即各個省份的uv和pv信息:
須要說明的是mid信息是一個set集合,只要求出該set集合的大小,也就能夠求出uv值。
至此,準確來講,咱們的統計pv、uv的大數據實時處理系統是構建完成了,處理的數據結果的用途根據不一樣的業務需求而不一樣,可是對於網站的pv、uv數據來講,是很是適合用做可視化處理的,即用網頁動態將數據展現出來,咱們下一步正是要構建一個簡單的Web應用將pv、uv數據動態展現出來。
數據可視化處理目前咱們須要完成兩部分的工做:
對於Web項目的開發,因我的的技術棧能力而異,選擇的語言和技術也有所不一樣,只要可以達到咱們最終數據可視化的目的,其實都行的。這個項目中咱們要展現的是pv和uv數據,難度不大,所以能夠選擇Java Web,如Servlet、SpringMVC等,或者Python Web,如Flask、Django等,Flask我我的很是喜歡,由於開發很是快,但由於前面一直用的是Java,所以這裏我仍是選擇使用SpringMVC來完成。
至於UI這一塊,我前端能力通常,普通的開發沒有問題,可是要作出像上面這種地圖類型的UI界面來展現數據的話,確實有點無能爲力。好在如今第三方的UI框架比較多,對於圖表類展現的,好比就有highcharts和echarts,其中echarts是百度開源的,有豐富的中文文檔,很是容易上手,因此這裏我選擇使用echarts來做爲UI,而且其恰好就有可以知足咱們需求的地圖類的UI組件。
由於難度不大,具體的開發流程的這裏就不說起了,有興趣的同窗能夠直接參考後面我提供的源代碼,這裏咱們就直接來看一下效果好了。
由於實際上在本次項目案例中,這一塊的代碼也是很是少的,使用SpringMVC開發的話,只要把JavaEE三層構架搭起來了,把依賴引入好了,後面的開發確實不難的;而若是有同窗會Flask或者Django的話,其項目自己的構建和代碼上也都會更容易。
啓動咱們的Web項目後,輸入地址就能夠訪問到數據的展現界面了:
能夠看到,echarts的這個UI仍是比較好看的,並且也真的可以知足咱們的需求。每一個省份上的兩個不一樣顏色的點表示目前咱們須要展現的數據有兩種,分別爲pv和uv,在左上角也有體現,而顏色的深淺就能夠體現pv或者uv的數量大小關係了。
在這個界面上,點擊左上角的uv,表示不查看uv的數據,這樣咱們就會只看到pv的狀況:
固然,也能夠只查看uv的狀況:
當鼠標停留在某個省份上時,就能夠查看這個省份具體的pv或uv值,好比下面咱們把鼠標停留在「廣東」上時,就能夠看到其此時的pv值爲170,查看其它省份的也是如此:
那麼數據是能夠查看了,又怎麼體現動態呢?
對於頁面數據的動態刷新有兩種方案,一種是定時刷新頁面,另一種則是定時向後端異步請求數據。
目前我採用的是第一種,頁面定時刷新,有興趣的同窗也能夠嘗試使用第二種方法,只須要在後端開發相關的返回JSON數據的API便可。
那麼至此,從整個大數據實時處理系統的構建到最後的數據可視化處理工做,咱們都已經完成了,能夠看到整個過程下來涉及到的知識層面仍是比較多的,不過我我的以爲,只要把核心的原理緊緊掌握了,對於大部分狀況而言,環境的搭建以及基於業務的開發都可以很好地解決。
寫此文,一來是對本身實踐中的一些總結,二來也是但願把一些比較不錯的項目案例分享給你們,總之但願可以對你們有所幫助。
項目案例涉及到的代碼我已經上傳到GitHub上面,分爲兩個,一個是storm的項目代碼,另一個是數據可視化處理的代碼,以下:
storm-statistic:https://github.com/xpleaf/storm-statistic
dynamic-show:https://github.com/xpleaf/dynamic-show