億級流量場景下,大型架構設計實現【2】---storm篇

 正文前先來一波福利推薦:html

 福利一:java

百萬年薪架構師視頻,該視頻能夠學到不少東西,是本人花錢買的VIP課程,學習消化了一年,爲了支持一下女友公衆號也方便你們學習,共享給你們。mysql

福利二:面試

畢業答辯以及工做上各類答辯,平時積累了很多精品PPT,如今共享給你們,大大小小加起來有幾千套,總有適合你的一款,不少是網上是下載不到。sql

獲取方式:數據庫

微信關注 精品3分鐘 ,id爲 jingpin3mins,關注後回覆   百萬年薪架構師 ,精品收藏PPT  獲取雲盤連接,謝謝你們支持!apache

-----------------------正文開始---------------------------緩存

 

承接以前的博:億級流量場景下,大型緩存架構設計實現
微信

續寫本博客:架構

****************** start:

  接下來,咱們是要講解商品詳情頁緩存架構,緩存預熱和解決方案,緩存預熱可能致使整個系統崩潰的問題以及解決方案;

  緩存--->熱: 預熱;熱數據

  解決方案中和架構設計中,會引入大數據的實時計算技術---> storm;

  爲何引入這storm,必須是storm嗎,咱們後面面去講解那個解決方案的時候再說;

  爲何引入storm:

  由於一些熱點數據相關的一些實時處理方案,好比快速預熱,熱點數據的實時感知以及快速降級,都會用到storm,

由於咱們可能須要實時的去計算出熱點緩存數據,實時計算,億級流量,高併發,大量的請求處理。這個時候你要作一些實時計算,

那麼必須涉及到分佈式一些技術,分佈式技術才能處理高併發,大量的請求,目前在計算的領域,最成熟的大數據技術就是storm;storm分佈式的大數據實時計算技術/系統;

  java工程師跟storm之間的關係是什麼呢?

  大公司的java工程師都會用到一些大數據的技術:好比:storm,hbase,zookeeper,或者hive,spark等。

  Storm: 實時緩存熱點數據統計--->緩存預熱,緩存熱點數據自動降級;

  Hive:Hadoop生態棧中作數據倉庫的系統,高併發訪問下,海量請求日誌的批量統計分析,日報,月報,週報,接口調用狀況等;好比有一些公司將海量的請求日誌達到hive裏邊

作離線分析,而後反過來優化本身的系統。

  Spark:離線批量數據處理,好比從DB中一次性批量處理幾億的數據,清洗和處理後寫入Redis中提供後續系統使用;大型互聯網公司的用戶相關數據等。

  zookeeper:分佈式協調,分佈式鎖。分佈式選舉-->高可用HA架構,輕量級元數據存儲。

  HBase:海量數據的在線存儲和簡單查詢,替代mysql的分庫分表,提供更好的伸縮性。

  

       

         

  

  storm,說句實話,在作熱數據這塊,若是要作複雜的熱數據的統計和分析,億流量,高併發的場景下,我還真以爲,最合適的技術就是storm,沒有其餘

緩存架構,熱數據先關的架構設計,熱數據相關的架構中最重要的惟一的可選技術。

************************入門介紹*********************************

  

 

 

storm的特色是什麼?

(1)支撐各類實時類的項目場景:實時處理消息以及更新數據庫,基於最基礎的實時計算語義和API(實時數據處理領域);對實時的數據流持續的進行查詢或計算,同時將最新的計算結果持續的推送給客戶端展現,一樣基於最基礎的實時計算語義和API(實時數據分析領域);對耗時的查詢進行並行化,基於DRPC,即分佈式RPC調用,單表30天數據,並行化,每一個進程查詢一天數據,最後組裝結果

storm作各類實時類的項目都ok

(2)高度的可伸縮性:若是要擴容,直接加機器,調整storm計算做業的並行度就能夠了,storm會自動部署更多的進程和線程到其餘的機器上去,無縫快速擴容

擴容起來,超方便

(3)數據不丟失的保證:storm的消息可靠機制開啓後,能夠保證一條數據都不丟

數據不丟失,也不重複計算

(4)超強的健壯性:從歷史經驗來看,storm比hadoop、spark等大數據類系統,健壯的多的多,由於元數據所有放zookeeper,不在內存中,隨便掛都沒關係

特別的健壯,穩定性和可用性很高

(5)使用的便捷性:核心語義很是的簡單,開發起來效率很高

用起來很簡單,開發API仍是很簡單的

2、Storm的集羣架構以及核心概念

一、Storm的集羣架構

Nimbus,Supervisor,ZooKeeper,Worker,Executor,Task

二、Storm的核心概念

Topology,Spout,Bolt,Tuple,Stream

拓撲:務虛的一個概念

Spout:數據源的一個代碼組件,就是咱們能夠實現一個spout接口,寫一個java類,在這個spout代碼中,咱們能夠本身嘗試去數據源獲取數據,好比說從kafka中消費數據

bolt:一個業務處理的代碼組件,spout會將數據傳送給bolt,各類bolt還能夠串聯成一個計算鏈條,java類實現了一個bolt接口,一堆spout+bolt,就會組成一個topology,就是一個拓撲,實時計算做業,spout+bolt,一個拓撲涵蓋數據源獲取/生產+數據處理的全部的代碼邏輯,topology

tuple:就是一條數據,每條數據都會被封裝在tuple中,在多個spout和bolt之間傳遞

stream:就是一個流,務虛的一個概念,抽象的概念,源源不斷過來的tuple,就組成了一條數據流

 

 什麼是並行度?什麼是流分組?

  好多年前,我第一次接觸storm的時候,真的,我以爲都沒幾我的能完全講清楚,用一句話講清楚什麼是並行度,什麼是流分組

不少時候,你之外你明白了,其實你不明白

好比我常常面試一些作過storm的人過來,我就問一個問題,就知道它的水深水淺,流分組的時候,數據在storm集羣中的流向,你畫一下

好比你本身隨便設想一個拓撲結果出來,幾個spout,幾個bolt,各類流分組狀況下,數據是怎麼流向的,要求具體畫出集羣架構中的流向

worker,executor,task,supervisor,流的

幾乎沒幾我的能畫對,爲何呢,不少人就沒搞明白這個並行度和流分組究竟是什麼


並行度:Worker->Executor->Task,沒錯,是Task

流分組:Task與Task之間的數據流向關係

Shuffle Grouping:隨機發射,【負載均衡】
Fields Grouping:根據某一個,或者某些個,fields,進行分組,那一個或者多個fields若是值徹底相同的話,那麼這些tuple,就會發送給下游bolt的其中固定的一個task

  你發射的每條數據是一個tuple,每一個tuple中有多個field做爲字段

好比tuple,3個字段,name,age,salary

{"name": "tom", "age": 25, "salary": 10000} -> tuple -> 3個field,name,age,salary

  其餘方式:

All Grouping
Global Grouping
None Grouping
Direct Grouping
Local or Shuffle Grouping

 

************************代碼實例*********************************

package com.roncoo.eshop.storm;

import java.util.HashMap;
import java.util.Map;
import java.util.Random;

import org.apache.storm.Config;
import org.apache.storm.LocalCluster;
import org.apache.storm.StormSubmitter;
import org.apache.storm.spout.SpoutOutputCollector;
import org.apache.storm.task.OutputCollector;
import org.apache.storm.task.TopologyContext;
import org.apache.storm.topology.OutputFieldsDeclarer;
import org.apache.storm.topology.TopologyBuilder;
import org.apache.storm.topology.base.BaseRichBolt;
import org.apache.storm.topology.base.BaseRichSpout;
import org.apache.storm.tuple.Fields;
import org.apache.storm.tuple.Tuple;
import org.apache.storm.tuple.Values;
import org.apache.storm.utils.Utils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/**
* 單詞計數拓撲
*
* 我認識不少java工程師,都是會一些大數據的技術的,不會太精通,沒有那麼多的時間去研究
* storm的課程,我就只是講到,最基本的開發,就夠了,java開發廣告計費系統,大量的流量的引入和接入,就是用storm作得
* 用storm,主要是用它的成熟的穩定的易於擴容的分佈式系統的特性
* java工程師,來講,作一些簡單的storm開發,掌握到這個程度差很少就夠了
*
* @author Administrator
*
*/
public class WordCountTopology {

/**
* spout
*
* spout,繼承一個基類,實現接口,這個裏面主要是負責從數據源獲取數據
*
* 咱們這裏做爲一個簡化,就不從外部的數據源去獲取數據了,只是本身內部不斷髮射一些句子
*
* @author Administrator
*
*/
public static class RandomSentenceSpout extends BaseRichSpout {

private static final long serialVersionUID = 3699352201538354417L;

private static final Logger LOGGER = LoggerFactory.getLogger(RandomSentenceSpout.class);

private SpoutOutputCollector collector;
private Random random;

/**
* open方法
*
* open方法,是對spout進行初始化的
*
* 好比說,建立一個線程池,或者建立一個數據庫鏈接池,或者構造一個httpclient
*
*/
@SuppressWarnings("rawtypes")
public void open(Map conf, TopologyContext context,
SpoutOutputCollector collector) {
// 在open方法初始化的時候,會傳入進來一個東西,叫作SpoutOutputCollector
// 這個SpoutOutputCollector就是用來發射數據出去的
this.collector = collector;
// 構造一個隨機數生產對象
this.random = new Random();
}

/**
* nextTuple方法
*
* 這個spout類,以前說過,最終會運行在task中,某個worker進程的某個executor線程內部的某個task中
* 那個task會負責去不斷的無限循環調用nextTuple()方法
* 只要的話呢,無限循環調用,能夠不斷髮射最新的數據出去,造成一個數據流
*
*/
public void nextTuple() {
Utils.sleep(100);
String[] sentences = new String[]{"the cow jumped over the moon", "an apple a day keeps the doctor away",
"four score and seven years ago", "snow white and the seven dwarfs", "i am at two with nature"};
String sentence = sentences[random.nextInt(sentences.length)];
LOGGER.info("【發射句子】sentence=" + sentence);
// 這個values,你能夠認爲就是構建一個tuple
// tuple是最小的數據單位,無限個tuple組成的流就是一個stream
collector.emit(new Values(sentence));
}

/**
* declareOutputFielfs這個方法
*
* 很重要,這個方法是定義一個你發射出去的每一個tuple中的每一個field的名稱是什麼
*
*/
public void declareOutputFields(OutputFieldsDeclarer declarer) {
declarer.declare(new Fields("sentence"));
}

}

/**
* 寫一個bolt,直接繼承一個BaseRichBolt基類
*
* 實現裏面的全部的方法便可,每一個bolt代碼,一樣是發送到worker某個executor的task裏面去運行
*
* @author Administrator
*
*/
public static class SplitSentence extends BaseRichBolt {

private static final long serialVersionUID = 6604009953652729483L;

private OutputCollector collector;

/**
* 對於bolt來講,第一個方法,就是prepare方法
*
* OutputCollector,這個也是Bolt的這個tuple的發射器
*
*/
@SuppressWarnings("rawtypes")
public void prepare(Map conf, TopologyContext context, OutputCollector collector) {
this.collector = collector;
}

/**
* execute方法
*
* 就是說,每次接收到一條數據後,就會交給這個executor方法來執行
*
*/
public void execute(Tuple tuple) {
String sentence = tuple.getStringByField("sentence");
String[] words = sentence.split(" ");
for(String word : words) {
collector.emit(new Values(word));
}
}

/**
* 定義發射出去的tuple,每一個field的名稱
*/
public void declareOutputFields(OutputFieldsDeclarer declarer) {
declarer.declare(new Fields("word"));
}

}

public static class WordCount extends BaseRichBolt {

private static final long serialVersionUID = 7208077706057284643L;

private static final Logger LOGGER = LoggerFactory.getLogger(WordCount.class);

private OutputCollector collector;
private Map<String, Long> wordCounts = new HashMap<String, Long>();

@SuppressWarnings("rawtypes")
public void prepare(Map conf, TopologyContext context, OutputCollector collector) {
this.collector = collector;
}

public void execute(Tuple tuple) {
String word = tuple.getStringByField("word");

Long count = wordCounts.get(word);
if(count == null) {
count = 0L;
}
count++;

wordCounts.put(word, count);

LOGGER.info("【單詞計數】" + word + "出現的次數是" + count);

collector.emit(new Values(word, count));
}

public void declareOutputFields(OutputFieldsDeclarer declarer) {
declarer.declare(new Fields("word", "count"));
}

}

public static void main(String[] args) {
// 在main方法中,會去將spout和bolts組合起來,構建成一個拓撲
TopologyBuilder builder = new TopologyBuilder();

// 這裏的第一個參數的意思,就是給這個spout設置一個名字
// 第二個參數的意思,就是建立一個spout的對象
// 第三個參數的意思,就是設置spout的executor有幾個
builder.setSpout("RandomSentence", new RandomSentenceSpout(), 2);
builder.setBolt("SplitSentence", new SplitSentence(), 5)
.setNumTasks(10)
.shuffleGrouping("RandomSentence");
// 這個很重要,就是說,相同的單詞,從SplitSentence發射出來時,必定會進入到下游的指定的同一個task中
// 只有這樣子,才能準確的統計出每一個單詞的數量
// 好比你有個單詞,hello,下游task1接收到3個hello,task2接收到2個hello
// 5個hello,全都進入一個task
builder.setBolt("WordCount", new WordCount(), 10)
.setNumTasks(20)
.fieldsGrouping("SplitSentence", new Fields("word"));

Config config = new Config();

// 說明是在命令行執行,打算提交到storm集羣上去
if(args != null && args.length > 0) {
config.setNumWorkers(3);
try {
StormSubmitter.submitTopology(args[0], config, builder.createTopology());
} catch (Exception e) {
e.printStackTrace();
}
} else {
// 說明是在eclipse裏面本地運行
config.setMaxTaskParallelism(20);

LocalCluster cluster = new LocalCluster();
cluster.submitTopology("WordCountTopology", config, builder.createTopology());

Utils.sleep(60000);

cluster.shutdown();
}
}

}

 ************************部署一個storm集羣*********************************

講了手寫了storm wordcount程序

蘊含了不少的知識點

(1)Spout
(2)Bolt
(3)OutputCollector,Declarer
(4)Topology
(5)設置worker,executor,task,流分組

storm的核心基本原理,基本的開發,學會了

storm集羣部署,怎麼將storm的拓撲扔到storm集羣上去跑

部署一個storm集羣

(1)安裝Java 7和Pythong 2.6.6 --  此處略過

(2)下載storm安裝包,解壓縮,重命名,配置環境變量

-------------------------------------------------

上傳apache-storm-1.0.6.tar.gz到 /usr/local

cd /usr/local

tar -zxvf apache-storm-1.0.6.tar.gz

mv apache-storm-1.0.5 storm

cd ./storm/conf

編輯storm.yaml

-------------------------------------------------------

storm.zookeeper.servers:
- "eshop-cache01"
- "eshop-cache02"
- "eshop-cache03"

nimbus.seeds: ["eshop-cache01"]
storm.local.dir: "/var/storm"
supervisor.slots.ports:
- 6700
- 6701
- 6702
- 6703

-------------------------------------------------------

建立數據文件目錄:mkdir /var/storm

 

將storm分發到其餘主機上:

scp -r  /usr/local/storm/   root@eshop-cache02:/usr/local
scp -r/usr/localstorm/   root@eshop-cache03:/usr/local

 

在全部主機上添加storm的環境變量: vi /etc/profile

export STORM_HOME=/usr/local/storm
export PATH=/usr/local/sbin:/usr/local/bin:/sbin:/bin:/usr/sbin:/usr/bin:/usr/java/latest/bin:/root/bin:/usr/local/zookeeper/bin:$STORM_HOME/bin

 source  /etc/profile

(4)啓動storm集羣和ui界面

eshop-cache01:

storm nimbus >/dev/null 2>&1 &
storm supervisor >/dev/null 2>&1 &
storm ui >/dev/null 2>&1 &

eshop-cache02:

storm supervisor >/dev/null 2>&1 &
storm ui >/dev/null 2>&1 &

eshop-cache03:

storm supervisor >/dev/null 2>&1 &
storm ui >/dev/null 2>&1 &

 

訪問一下ui界面,8080端口:

--------------------------------界面簡單介紹:

  • Used slots:使用的worker數。
  • Free slots:空閒的worker數。
  • Executors:每一個worker的物理線程數。

下圖爲細節圖:

 

 

--------------------------------中止storm執行的拓撲任務:

 

中止此任務命令寫法爲: 
kill 後面的名稱爲 上面截圖中 紅圈的名稱
這樣就關閉了累加的storm任務
 storm kill WordCountTopology
 

 

線上日誌截圖:

 ---------------------------------------------- end

相關文章
相關標籤/搜索