正文前先來一波福利推薦:html
福利一:java
百萬年薪架構師視頻,該視頻能夠學到不少東西,是本人花錢買的VIP課程,學習消化了一年,爲了支持一下女友公衆號也方便你們學習,共享給你們。mysql
福利二:面試
畢業答辯以及工做上各類答辯,平時積累了很多精品PPT,如今共享給你們,大大小小加起來有幾千套,總有適合你的一款,不少是網上是下載不到。sql
獲取方式:數據庫
微信關注 精品3分鐘 ,id爲 jingpin3mins,關注後回覆 百萬年薪架構師 ,精品收藏PPT 獲取雲盤連接,謝謝你們支持!apache
-----------------------正文開始---------------------------緩存
承接以前的博:億級流量場景下,大型緩存架構設計實現
微信
續寫本博客:架構
****************** start:
接下來,咱們是要講解商品詳情頁緩存架構,緩存預熱和解決方案,緩存預熱可能致使整個系統崩潰的問題以及解決方案;
緩存--->熱: 預熱;熱數據
解決方案中和架構設計中,會引入大數據的實時計算技術---> storm;
爲何引入這storm,必須是storm嗎,咱們後面面去講解那個解決方案的時候再說;
爲何引入storm:
由於一些熱點數據相關的一些實時處理方案,好比快速預熱,熱點數據的實時感知以及快速降級,都會用到storm,
由於咱們可能須要實時的去計算出熱點緩存數據,實時計算,億級流量,高併發,大量的請求處理。這個時候你要作一些實時計算,
那麼必須涉及到分佈式一些技術,分佈式技術才能處理高併發,大量的請求,目前在計算的領域,最成熟的大數據技術就是storm;storm分佈式的大數據實時計算技術/系統;
java工程師跟storm之間的關係是什麼呢?
大公司的java工程師都會用到一些大數據的技術:好比:storm,hbase,zookeeper,或者hive,spark等。
Storm: 實時緩存熱點數據統計--->緩存預熱,緩存熱點數據自動降級;
Hive:Hadoop生態棧中作數據倉庫的系統,高併發訪問下,海量請求日誌的批量統計分析,日報,月報,週報,接口調用狀況等;好比有一些公司將海量的請求日誌達到hive裏邊
作離線分析,而後反過來優化本身的系統。
Spark:離線批量數據處理,好比從DB中一次性批量處理幾億的數據,清洗和處理後寫入Redis中提供後續系統使用;大型互聯網公司的用戶相關數據等。
zookeeper:分佈式協調,分佈式鎖。分佈式選舉-->高可用HA架構,輕量級元數據存儲。
HBase:海量數據的在線存儲和簡單查詢,替代mysql的分庫分表,提供更好的伸縮性。
storm,說句實話,在作熱數據這塊,若是要作複雜的熱數據的統計和分析,億流量,高併發的場景下,我還真以爲,最合適的技術就是storm,沒有其餘
緩存架構,熱數據先關的架構設計,熱數據相關的架構中最重要的惟一的可選技術。
************************入門介紹*********************************
storm的特色是什麼?
(1)支撐各類實時類的項目場景:實時處理消息以及更新數據庫,基於最基礎的實時計算語義和API(實時數據處理領域);對實時的數據流持續的進行查詢或計算,同時將最新的計算結果持續的推送給客戶端展現,一樣基於最基礎的實時計算語義和API(實時數據分析領域);對耗時的查詢進行並行化,基於DRPC,即分佈式RPC調用,單表30天數據,並行化,每一個進程查詢一天數據,最後組裝結果
storm作各類實時類的項目都ok
(2)高度的可伸縮性:若是要擴容,直接加機器,調整storm計算做業的並行度就能夠了,storm會自動部署更多的進程和線程到其餘的機器上去,無縫快速擴容
擴容起來,超方便
(3)數據不丟失的保證:storm的消息可靠機制開啓後,能夠保證一條數據都不丟
數據不丟失,也不重複計算
(4)超強的健壯性:從歷史經驗來看,storm比hadoop、spark等大數據類系統,健壯的多的多,由於元數據所有放zookeeper,不在內存中,隨便掛都沒關係
特別的健壯,穩定性和可用性很高
(5)使用的便捷性:核心語義很是的簡單,開發起來效率很高
用起來很簡單,開發API仍是很簡單的
2、Storm的集羣架構以及核心概念
一、Storm的集羣架構
Nimbus,Supervisor,ZooKeeper,Worker,Executor,Task
二、Storm的核心概念
Topology,Spout,Bolt,Tuple,Stream
拓撲:務虛的一個概念
Spout:數據源的一個代碼組件,就是咱們能夠實現一個spout接口,寫一個java類,在這個spout代碼中,咱們能夠本身嘗試去數據源獲取數據,好比說從kafka中消費數據
bolt:一個業務處理的代碼組件,spout會將數據傳送給bolt,各類bolt還能夠串聯成一個計算鏈條,java類實現了一個bolt接口,一堆spout+bolt,就會組成一個topology,就是一個拓撲,實時計算做業,spout+bolt,一個拓撲涵蓋數據源獲取/生產+數據處理的全部的代碼邏輯,topology
tuple:就是一條數據,每條數據都會被封裝在tuple中,在多個spout和bolt之間傳遞
stream:就是一個流,務虛的一個概念,抽象的概念,源源不斷過來的tuple,就組成了一條數據流
什麼是並行度?什麼是流分組?
好多年前,我第一次接觸storm的時候,真的,我以爲都沒幾我的能完全講清楚,用一句話講清楚什麼是並行度,什麼是流分組
不少時候,你之外你明白了,其實你不明白
好比我常常面試一些作過storm的人過來,我就問一個問題,就知道它的水深水淺,流分組的時候,數據在storm集羣中的流向,你畫一下
好比你本身隨便設想一個拓撲結果出來,幾個spout,幾個bolt,各類流分組狀況下,數據是怎麼流向的,要求具體畫出集羣架構中的流向
worker,executor,task,supervisor,流的
幾乎沒幾我的能畫對,爲何呢,不少人就沒搞明白這個並行度和流分組究竟是什麼
並行度:Worker->Executor->Task,沒錯,是Task
流分組:Task與Task之間的數據流向關係
Shuffle Grouping:隨機發射,【負載均衡】
Fields Grouping:根據某一個,或者某些個,fields,進行分組,那一個或者多個fields若是值徹底相同的話,那麼這些tuple,就會發送給下游bolt的其中固定的一個task
你發射的每條數據是一個tuple,每一個tuple中有多個field做爲字段
好比tuple,3個字段,name,age,salary
{"name": "tom", "age": 25, "salary": 10000} -> tuple -> 3個field,name,age,salary
其餘方式:
All Grouping
Global Grouping
None Grouping
Direct Grouping
Local or Shuffle Grouping
************************代碼實例*********************************
package com.roncoo.eshop.storm;
import java.util.HashMap;
import java.util.Map;
import java.util.Random;
import org.apache.storm.Config;
import org.apache.storm.LocalCluster;
import org.apache.storm.StormSubmitter;
import org.apache.storm.spout.SpoutOutputCollector;
import org.apache.storm.task.OutputCollector;
import org.apache.storm.task.TopologyContext;
import org.apache.storm.topology.OutputFieldsDeclarer;
import org.apache.storm.topology.TopologyBuilder;
import org.apache.storm.topology.base.BaseRichBolt;
import org.apache.storm.topology.base.BaseRichSpout;
import org.apache.storm.tuple.Fields;
import org.apache.storm.tuple.Tuple;
import org.apache.storm.tuple.Values;
import org.apache.storm.utils.Utils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
* 單詞計數拓撲
*
* 我認識不少java工程師,都是會一些大數據的技術的,不會太精通,沒有那麼多的時間去研究
* storm的課程,我就只是講到,最基本的開發,就夠了,java開發廣告計費系統,大量的流量的引入和接入,就是用storm作得
* 用storm,主要是用它的成熟的穩定的易於擴容的分佈式系統的特性
* java工程師,來講,作一些簡單的storm開發,掌握到這個程度差很少就夠了
*
* @author Administrator
*
*/
public class WordCountTopology {
/**
* spout
*
* spout,繼承一個基類,實現接口,這個裏面主要是負責從數據源獲取數據
*
* 咱們這裏做爲一個簡化,就不從外部的數據源去獲取數據了,只是本身內部不斷髮射一些句子
*
* @author Administrator
*
*/
public static class RandomSentenceSpout extends BaseRichSpout {
private static final long serialVersionUID = 3699352201538354417L;
private static final Logger LOGGER = LoggerFactory.getLogger(RandomSentenceSpout.class);
private SpoutOutputCollector collector;
private Random random;
/**
* open方法
*
* open方法,是對spout進行初始化的
*
* 好比說,建立一個線程池,或者建立一個數據庫鏈接池,或者構造一個httpclient
*
*/
@SuppressWarnings("rawtypes")
public void open(Map conf, TopologyContext context,
SpoutOutputCollector collector) {
// 在open方法初始化的時候,會傳入進來一個東西,叫作SpoutOutputCollector
// 這個SpoutOutputCollector就是用來發射數據出去的
this.collector = collector;
// 構造一個隨機數生產對象
this.random = new Random();
}
/**
* nextTuple方法
*
* 這個spout類,以前說過,最終會運行在task中,某個worker進程的某個executor線程內部的某個task中
* 那個task會負責去不斷的無限循環調用nextTuple()方法
* 只要的話呢,無限循環調用,能夠不斷髮射最新的數據出去,造成一個數據流
*
*/
public void nextTuple() {
Utils.sleep(100);
String[] sentences = new String[]{"the cow jumped over the moon", "an apple a day keeps the doctor away",
"four score and seven years ago", "snow white and the seven dwarfs", "i am at two with nature"};
String sentence = sentences[random.nextInt(sentences.length)];
LOGGER.info("【發射句子】sentence=" + sentence);
// 這個values,你能夠認爲就是構建一個tuple
// tuple是最小的數據單位,無限個tuple組成的流就是一個stream
collector.emit(new Values(sentence));
}
/**
* declareOutputFielfs這個方法
*
* 很重要,這個方法是定義一個你發射出去的每一個tuple中的每一個field的名稱是什麼
*
*/
public void declareOutputFields(OutputFieldsDeclarer declarer) {
declarer.declare(new Fields("sentence"));
}
}
/**
* 寫一個bolt,直接繼承一個BaseRichBolt基類
*
* 實現裏面的全部的方法便可,每一個bolt代碼,一樣是發送到worker某個executor的task裏面去運行
*
* @author Administrator
*
*/
public static class SplitSentence extends BaseRichBolt {
private static final long serialVersionUID = 6604009953652729483L;
private OutputCollector collector;
/**
* 對於bolt來講,第一個方法,就是prepare方法
*
* OutputCollector,這個也是Bolt的這個tuple的發射器
*
*/
@SuppressWarnings("rawtypes")
public void prepare(Map conf, TopologyContext context, OutputCollector collector) {
this.collector = collector;
}
/**
* execute方法
*
* 就是說,每次接收到一條數據後,就會交給這個executor方法來執行
*
*/
public void execute(Tuple tuple) {
String sentence = tuple.getStringByField("sentence");
String[] words = sentence.split(" ");
for(String word : words) {
collector.emit(new Values(word));
}
}
/**
* 定義發射出去的tuple,每一個field的名稱
*/
public void declareOutputFields(OutputFieldsDeclarer declarer) {
declarer.declare(new Fields("word"));
}
}
public static class WordCount extends BaseRichBolt {
private static final long serialVersionUID = 7208077706057284643L;
private static final Logger LOGGER = LoggerFactory.getLogger(WordCount.class);
private OutputCollector collector;
private Map<String, Long> wordCounts = new HashMap<String, Long>();
@SuppressWarnings("rawtypes")
public void prepare(Map conf, TopologyContext context, OutputCollector collector) {
this.collector = collector;
}
public void execute(Tuple tuple) {
String word = tuple.getStringByField("word");
Long count = wordCounts.get(word);
if(count == null) {
count = 0L;
}
count++;
wordCounts.put(word, count);
LOGGER.info("【單詞計數】" + word + "出現的次數是" + count);
collector.emit(new Values(word, count));
}
public void declareOutputFields(OutputFieldsDeclarer declarer) {
declarer.declare(new Fields("word", "count"));
}
}
public static void main(String[] args) {
// 在main方法中,會去將spout和bolts組合起來,構建成一個拓撲
TopologyBuilder builder = new TopologyBuilder();
// 這裏的第一個參數的意思,就是給這個spout設置一個名字
// 第二個參數的意思,就是建立一個spout的對象
// 第三個參數的意思,就是設置spout的executor有幾個
builder.setSpout("RandomSentence", new RandomSentenceSpout(), 2);
builder.setBolt("SplitSentence", new SplitSentence(), 5)
.setNumTasks(10)
.shuffleGrouping("RandomSentence");
// 這個很重要,就是說,相同的單詞,從SplitSentence發射出來時,必定會進入到下游的指定的同一個task中
// 只有這樣子,才能準確的統計出每一個單詞的數量
// 好比你有個單詞,hello,下游task1接收到3個hello,task2接收到2個hello
// 5個hello,全都進入一個task
builder.setBolt("WordCount", new WordCount(), 10)
.setNumTasks(20)
.fieldsGrouping("SplitSentence", new Fields("word"));
Config config = new Config();
// 說明是在命令行執行,打算提交到storm集羣上去
if(args != null && args.length > 0) {
config.setNumWorkers(3);
try {
StormSubmitter.submitTopology(args[0], config, builder.createTopology());
} catch (Exception e) {
e.printStackTrace();
}
} else {
// 說明是在eclipse裏面本地運行
config.setMaxTaskParallelism(20);
LocalCluster cluster = new LocalCluster();
cluster.submitTopology("WordCountTopology", config, builder.createTopology());
Utils.sleep(60000);
cluster.shutdown();
}
}
}
************************部署一個storm集羣*********************************
講了手寫了storm wordcount程序
蘊含了不少的知識點
(1)Spout
(2)Bolt
(3)OutputCollector,Declarer
(4)Topology
(5)設置worker,executor,task,流分組
storm的核心基本原理,基本的開發,學會了
storm集羣部署,怎麼將storm的拓撲扔到storm集羣上去跑
部署一個storm集羣
(1)安裝Java 7和Pythong 2.6.6 -- 此處略過
(2)下載storm安裝包,解壓縮,重命名,配置環境變量
-------------------------------------------------
上傳apache-storm-1.0.6.tar.gz到 /usr/local
cd /usr/local
tar -zxvf apache-storm-1.0.6.tar.gz
mv apache-storm-1.0.5 storm
cd ./storm/conf
編輯storm.yaml
-------------------------------------------------------
storm.zookeeper.servers:
- "eshop-cache01"
- "eshop-cache02"
- "eshop-cache03"
nimbus.seeds: ["eshop-cache01"]
storm.local.dir: "/var/storm"
supervisor.slots.ports:
- 6700
- 6701
- 6702
- 6703
-------------------------------------------------------
建立數據文件目錄:mkdir /var/storm
將storm分發到其餘主機上:
scp -r /usr/local/storm/ root@eshop-cache02:/usr/local
scp -r/usr/localstorm/ root@eshop-cache03:/usr/local
在全部主機上添加storm的環境變量: vi /etc/profile
export STORM_HOME=/usr/local/storm
export PATH=/usr/local/sbin:/usr/local/bin:/sbin:/bin:/usr/sbin:/usr/bin:/usr/java/latest/bin:/root/bin:/usr/local/zookeeper/bin:$STORM_HOME/bin
source /etc/profile
(4)啓動storm集羣和ui界面
eshop-cache01:
storm nimbus >/dev/null 2>&1 &
storm supervisor >/dev/null 2>&1 &
storm ui >/dev/null 2>&1 &
eshop-cache02:
storm supervisor >/dev/null 2>&1 &
storm ui >/dev/null 2>&1 &
eshop-cache03:
storm supervisor >/dev/null 2>&1 &
storm ui >/dev/null 2>&1 &
訪問一下ui界面,8080端口:
--------------------------------界面簡單介紹:
下圖爲細節圖:
--------------------------------中止storm執行的拓撲任務:
線上日誌截圖:
---------------------------------------------- end