Storm構建分佈式實時處理應用初探

  最近利用閒暇時間,又從新研讀了一下Storm。認真對比了一下Hadoop,前者更擅長的是,實時流式數據處理,後者更擅長的是基於HDFS,經過MapReduce方式的離線數據分析計算。對於Hadoop,自己不擅長實時的數據分析處理。二者的共同點都是分佈式的架構,並且,都相似有主/從關係的概念。本文中我就不具體闡述Storm集羣和Zookeeper集羣如何部署的問題,我想經過一個實際的案例切入,分析一下如何利用Storm,完成實時分析處理數據的。java

  Storm自己是Apache託管的開源的分佈式實時計算系統,它的前身是Twitter Storm。在Storm問世之前,處理海量的實時數據信息,大部分是相似於使用消息隊列,加上工做進程/線程的方式。這使得構建這類的應用程序,變得異常的複雜。不少的業務邏輯中,你不得不考慮消息的發送和接收,線程之間的併發控制等等問題。而其中的業務邏輯可能只是佔據整個應用的一小部分,並且很難作到業務邏輯的解耦。可是Storm的出現改變了這種局面,它首先抽象出數據流Stream的抽象概念,一個Stream指的是tuples組成的無邊界的序列。後面又繼續提出Spouts、Bolts的概念。Spouts在Storm裏面是數據源,專門負責生成流。而Bolts則是以流做爲輸入,並從新生成流做爲輸出,而且Bolts還會繼續指定它輸入的流應該如何劃分。最後Storm是經過拓撲(Topology)這種抽象概念,組織起若干個Spouts、Bolts構成的分佈式數據處理網絡。Storm設計的時候,就有意的把Spouts、Bolts組成的拓撲(Topology)網絡經過Thrift服務方式進行封裝,這個作法,使得Storm的Spouts、Bolts組件能夠經過目前主流的任意語言實現,使得整個框架的兼容性和擴展性更加的優秀。spring

  在Storm裏面拓撲(Topology)的概念,很是相似Hadoop裏面MapReduce的Job的概念。不一樣的是Storm的拓撲(Topology)只要你啓動了,它就會一直運行下去,除非你kill掉;而MapReduce的Job最終它是會結束的。基於這樣的模式,使得Storm很是適合處理實時性的數據分析,持續計算,DRPC(分佈式RPC)等。sql

  好了,我就結合實際的案例,設計分析一下,如何利用Storm改善應用的處理性能。數據庫

  移動公司的垃圾短信監控平臺,實時地上傳每一個省的疑似垃圾短信用戶的垃圾短信內容文件,每一個省則根據文件中垃圾短信的內容,解析過濾出,包含指定敏感關鍵字的垃圾短信進行入庫。被入庫的垃圾短信用戶被列爲敏感用戶,是重點監控對象,畢竟亂髮這些垃圾短信是很是不對的。垃圾短信監控平臺生成的文件速度很是驚人,原來的傳統作法是,根據每一個省的每個地市,對應一個獨立應用,串行化地解析、過濾敏感關鍵字,來進行入庫處理。可是,從現狀來看,程序處理的性能並不高效,經常形成文件積壓,沒有及時處理入庫。apache

  如今,咱們就經過Storm,來從新梳理、組織一下上述的應用場景。瀏覽器

  首先,我先說明一下,該案例中,Storm集羣和Zookeeper集羣的部署狀況,以下圖所示:緩存

 

  Nimbus對應的主機是192.168.95.134是Storm主節點,其他兩臺從節點Supervisor對應的主機分別是192.168.95.135(主機名:slave1)、192.168.95.136(主機名:slave2)。一樣的,Zookeeper集羣也是部署在上述節點上。Storm集羣和Zookeeper集羣會互相通訊,由於Storm就是基於Zookeeper的。而後先啓動每一個節點的Zookeeper服務,其次分別啓動Storm的Nimbus、Supervisor服務。具體能夠到Storm安裝的bin目錄下面啓動服務,啓動命令分別爲storm nimbus > /dev/null 2 > &1 &和storm supervisor > /dev/null 2 > &1 &。而後用jps觀察啓動的效果。沒有問題的話,在Nimbus服務對應的主機上啓動Storm UI監控對應的服務,在Storm安裝目錄的bin目錄輸入命令:storm ui >/dev/null 2>&1 &。而後打開瀏覽器輸入:http://{Nimbus服務對應的主機ip}:8080,這裏就是輸入:http://192.168.95.134:8080/。觀察Storm集羣的部署狀況,以下圖所示:服務器

    

  能夠發現,咱們的Storm的版本是0.9.5,它的從節點(Supervisor)有2個,分別是slave一、slave2。一共的woker的數量是8個(Total slots)。Storm集羣咱們已經部署完畢,也啓動成功了。如今咱們就利用Storm的方式,來從新改寫一下這種敏感信息實時監控過濾的應用。首先看下,Storm方式的拓撲結構圖:網絡

  其中的SensitiveFileReader-59一、SensitiveFileReader-592(用戶短信採集器,分地市)表明的是Storm中的Spouts組件,表示一個數據的源頭,這裏是表示從服務器的指定目錄下,讀取疑似垃圾短信用戶的垃圾短信內容文件。固然Spouts的組件你能夠根據實際的需求,擴展出許多Spouts。session

  而後讀取出文件中每一行的內容以後,就是分析文件的內容組件了,這裏是指:SensitiveFileAnalyzer(監控短信內容拆解分析),它負責分析出文件的格式內容。

  爲了簡單演示起見,我這裏定義文件的格式爲以下內容(隨便寫一個例子):home_city=591&user_id=5911000&msisdn=10000&sms_content=abc-slave1。每一個列之間用&進行鏈接。其中home_city=591表示疑似垃圾短信的用戶歸屬地市編碼,591表示福州、592表示廈門;user_id=5911000表示疑似垃圾短信的用戶標識;msisdn=10000表示疑似垃圾短信的用戶手機號碼;sms_content=abc-slave1表明的就是垃圾短信的內容了。SensitiveFileAnalyzer表明的就是Storm中的Bolt組件,用來處理Spouts「流」出的數據。

  最後,就是咱們根據解析好的數據,匹配業務規定的敏感關鍵字,進行過濾入庫了。這裏咱們是把過濾好的數據存入MySQL數據庫中。負責這項任務的組件是:SensitiveBatchBolt(敏感信息採集處理),固然它也是Storm中的Bolt組件。好了,以上就是完整的Storm拓撲(Topology)結構了。

  如今,咱們對於整個敏感信息採集過濾監控的拓撲結構,有了一個總體的瞭解以後,咱們再來看下如何具體編碼實現!先來看下整個工程的代碼層次結構,它以下圖所示:

  

  首先來看下,咱們定義的敏感用戶的數據結構RubbishUsers,假設,咱們要過濾的敏感用戶的短信內容中,要包含「racketeer」、「Bad」等敏感關鍵字。具體代碼以下:

/**
 * @filename:RubbishUsers.java
 *
 * Newland Co. Ltd. All rights reserved.
 * 
 * @Description:敏感用戶實體定義
 * @author tangjie
 * @version 1.0
 * 
 */

package newlandframework.storm.model;

import org.apache.commons.lang.builder.ToStringBuilder;
import org.apache.commons.lang.builder.ToStringStyle;
import java.io.Serializable;

public class RubbishUsers implements Serializable {
    // 用戶歸屬地市編碼
    private Integer homeCity;
    // 用戶編碼
    private Integer userId;
    // 用戶號碼
    private Integer msisdn;
    // 短信內容
    String smsContent;

    public final static String HOMECITY_COLUMNNAME = "home_city";
    public final static String USERID_COLUMNNAME = "user_id";
    public final static String MSISDN_COLUMNNAME = "msisdn";
    public final static String SMSCONTENT_COLUMNNAME = "sms_content";

    public final static Integer[] SENSITIVE_HOMECITYS = new Integer[] {
            591/* 福州 */, 592 /* 廈門 */};

    // 敏感關鍵字,後續能夠考慮單獨開闢放入緩存或數據庫中,這裏僅僅爲了Demo演示
    public final static String SENSITIVE_KEYWORD1 = "Bad";
    public final static String SENSITIVE_KEYWORD2 = "racketeer";
    public final static String[] SENSITIVE_KEYWORDS = new String[] {
            SENSITIVE_KEYWORD1, SENSITIVE_KEYWORD2 };

    public Integer getHomeCity() {
        return homeCity;
    }

    public void setHomeCity(Integer homeCity) {
        this.homeCity = homeCity;
    }

    public Integer getUserId() {
        return userId;
    }

    public void setUserId(Integer userId) {
        this.userId = userId;
    }

    public Integer getMsisdn() {
        return msisdn;
    }

    public void setMsisdn(Integer msisdn) {
        this.msisdn = msisdn;
    }

    public String getSmsContent() {
        return smsContent;
    }

    public void setSmsContent(String smsContent) {
        this.smsContent = smsContent;
    }

    public String toString() {
        return new ToStringBuilder(this, ToStringStyle.SHORT_PREFIX_STYLE)
                .append("homeCity", homeCity).append("userId", userId)
                .append("msisdn", msisdn).append("smsContent", smsContent)
                .toString();
    }
}

  如今,咱們看下敏感信息數據源組件SensitiveFileReader的具體實現,它負責從服務器的指定目錄下面,讀取疑似垃圾短信用戶的垃圾短信內容文件,而後把每一行的數據,發送給下一個處理的Bolt(SensitiveFileAnalyzer),每一個文件所有發送結束以後,在當前目錄中,把原文件重命名成後綴bak的文件(固然,你能夠從新創建一個備份目錄,專門用來存儲這種處理結束的文件),SensitiveFileReader的具體實現以下:

/**
 * @filename:SensitiveFileReader.java
 *
 * Newland Co. Ltd. All rights reserved.
 * 
 * @Description:用戶短信採集器
 * @author tangjie
 * @version 1.0
 * 
 */

package newlandframework.storm.spout;

import java.io.File;
import java.io.IOException;
import java.util.Collection;
import java.util.List;
import java.util.Map;

import org.apache.commons.io.FileUtils;
import org.apache.commons.io.filefilter.FileFilterUtils;

import backtype.storm.spout.SpoutOutputCollector;
import backtype.storm.task.TopologyContext;
import backtype.storm.topology.OutputFieldsDeclarer;
import backtype.storm.topology.base.BaseRichSpout;
import backtype.storm.tuple.Fields;
import backtype.storm.tuple.Values;

public class SensitiveFileReader extends BaseRichSpout {
    // 福州地市用戶敏感短信文件上傳路徑
    public static final String InputFuZhouPath = "/home/tj/data/591";
    // 廈門地市用戶敏感短信文件上傳路徑
    public static final String InputXiaMenPath = "/home/tj/data/592";
    // 處理成功改爲bak後綴
    public static final String FinishFileSuffix = ".bak";

    private String sensitiveFilePath = "";

    private SpoutOutputCollector collector;

    public SensitiveFileReader(String sensitiveFilePath) {
        this.sensitiveFilePath = sensitiveFilePath;
    }

    public void open(Map conf, TopologyContext context,
            SpoutOutputCollector collector) {
        this.collector = collector;
    }

    @Override
    public void nextTuple() {
        Collection<File> files = FileUtils.listFiles(
                new File(sensitiveFilePath),
                FileFilterUtils.notFileFilter(FileFilterUtils
                        .suffixFileFilter(FinishFileSuffix)), null);

        for (File f : files) {
            try {
                List<String> lines = FileUtils.readLines(f, "GBK");
                for (String line : lines) {
                    System.out.println("[SensitiveTrace]:" + line);
                    collector.emit(new Values(line));
                }
                FileUtils.moveFile(f,
                        new File(f.getPath() + System.currentTimeMillis()
                                + FinishFileSuffix));
            } catch (IOException e) {
                e.printStackTrace();
            }
        }
    }

    @Override
    public void declareOutputFields(OutputFieldsDeclarer declarer) {
        declarer.declare(new Fields("sensitive"));
    }
}

  監控短信內容拆解分析器SensitiveFileAnalyzer,這個Bolt組件,接收到數據源SensitiveFileReader的數據以後,就按照上面定義的格式,對文件中每一行的內容進行解析,而後把解析完畢的內容,繼續發送給下一個Bolt組件:SensitiveBatchBolt(敏感信息採集處理)。如今,咱們來看下SensitiveFileAnalyzer這個Bolt組件的實現:

/**
 * @filename:SensitiveFileAnalyzer.java
 *
 * Newland Co. Ltd. All rights reserved.
 * 
 * @Description:監控短信內容拆解分析
 * @author tangjie
 * @version 1.0
 * 
 */

package newlandframework.storm.bolt;

import java.util.Map;

import newlandframework.storm.model.RubbishUsers;

import org.apache.storm.guava.base.Splitter;
import backtype.storm.topology.BasicOutputCollector;
import backtype.storm.topology.OutputFieldsDeclarer;
import backtype.storm.topology.base.BaseBasicBolt;
import backtype.storm.tuple.Fields;
import backtype.storm.tuple.Tuple;
import backtype.storm.tuple.Values;

public class SensitiveFileAnalyzer extends BaseBasicBolt {

    @Override
    public void execute(Tuple input, BasicOutputCollector collector) {
        String line = input.getString(0);

        Map<String, String> join = Splitter.on("&").withKeyValueSeparator("=").split(line);

        collector.emit(new Values((String) join
                .get(RubbishUsers.HOMECITY_COLUMNNAME), (String) join
                .get(RubbishUsers.USERID_COLUMNNAME), (String) join
                .get(RubbishUsers.MSISDN_COLUMNNAME), (String) join
                .get(RubbishUsers.SMSCONTENT_COLUMNNAME)));
    }

    @Override
    public void declareOutputFields(OutputFieldsDeclarer declarer) {
        declarer.declare(new Fields(RubbishUsers.HOMECITY_COLUMNNAME,
                RubbishUsers.USERID_COLUMNNAME, RubbishUsers.MSISDN_COLUMNNAME,
                RubbishUsers.SMSCONTENT_COLUMNNAME));
    }
}

  最後一個Bolt組件SensitiveBatchBolt(敏感信息採集處理)根據上游Bolt組件SensitiveFileAnalyzer發送過來的數據,而後跟業務規定的敏感關鍵字進行匹配,若是匹配成功,說明這個用戶,就是咱們要重點監控的用戶,咱們把他,經過hibernate採集到MySQL數據庫,統一管理。最後要說明的是,SensitiveBatchBolt組件還實現了一個監控的功能,就是按期打印出,咱們已經採集到的敏感信息用戶數據。如今給出SensitiveBatchBolt的實現:

/**
 * @filename:SensitiveBatchBolt.java
 *
 * Newland Co. Ltd. All rights reserved.
 * 
 * @Description:敏感信息採集處理
 * @author tangjie
 * @version 1.0
 * 
 */

package newlandframework.storm.bolt;

import java.sql.SQLException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
import java.util.Map;

import backtype.storm.task.TopologyContext;
import backtype.storm.topology.BasicOutputCollector;
import backtype.storm.topology.IBasicBolt;
import backtype.storm.topology.OutputFieldsDeclarer;
import backtype.storm.tuple.Tuple;

import org.apache.commons.collections.Predicate;
import org.apache.commons.collections.iterators.FilterIterator;
import org.apache.commons.lang.StringUtils;

import org.hibernate.Criteria;
import org.hibernate.HibernateException;
import org.hibernate.Session;
import org.hibernate.SessionFactory;
import org.hibernate.criterion.MatchMode;
import org.hibernate.criterion.Restrictions;

import org.springframework.context.ApplicationContext;
import org.springframework.context.support.ClassPathXmlApplicationContext;

import newlandframework.storm.model.RubbishUsers;

public class SensitiveBatchBolt implements IBasicBolt {
    // Hibernate配置加載
    private final static String HIBERNATE_APPLICATIONCONTEXT = "newlandframework/storm/resource/jdbc-hibernate-bean.xml";

    // Spring、Hibernate上下文不要序列化
    private static transient ApplicationContext hibernate = new ClassPathXmlApplicationContext(
            HIBERNATE_APPLICATIONCONTEXT);

    private static transient SessionFactory sessionFactory = (SessionFactory) hibernate
            .getBean("sessionFactory");

    public SensitiveBatchBolt() throws SQLException {
        super();
    }

    private static List list = new ArrayList(Arrays.asList(RubbishUsers.SENSITIVE_KEYWORDS));

    // 敏感信息數據源,能夠考慮放入緩存或者數據庫中加載判斷
    private class SensitivePredicate implements Predicate {
        private String sensitiveWord = null;

        SensitivePredicate(String sensitiveWord) {
            this.sensitiveWord = sensitiveWord;
        }

        public boolean evaluate(Object object) {
            return this.sensitiveWord.contains((String) object);
        }
    }

    // Monitor線程按期打印監控採集處理狀況
    class SensitiveMonitorThread implements Runnable {
        private int sensitiveMonitorTimeInterval = 0;
        private Session session = null;

        SensitiveMonitorThread(int sensitiveMonitorTimeInterval) {
            this.sensitiveMonitorTimeInterval = sensitiveMonitorTimeInterval;
            session = sessionFactory.openSession();
        }

        public void run() {
            while (true) {
                try {
                    Criteria criteria1 = session.createCriteria(RubbishUsers.class);

                    criteria1.add(Restrictions.and(Restrictions.or(Restrictions
                            .like("smsContent", StringUtils
                                    .center(RubbishUsers.SENSITIVE_KEYWORD1,
                                            RubbishUsers.SENSITIVE_KEYWORD1
                                                    .length() + 2, "%"),
                                    MatchMode.ANYWHERE), Restrictions.like(
                            "smsContent", StringUtils
                                    .center(RubbishUsers.SENSITIVE_KEYWORD2,
                                            RubbishUsers.SENSITIVE_KEYWORD2
                                                    .length() + 2, "%"),
                            MatchMode.ANYWHERE)), Restrictions.in("homeCity",
                            RubbishUsers.SENSITIVE_HOMECITYS)));

                    List<RubbishUsers> rubbishList = (List<RubbishUsers>) criteria1.list();

                    System.out.println(StringUtils.center("[SensitiveTrace 敏感用戶清單以下]", 40, "-"));

                    if (rubbishList != null) {
                        System.out.println("[SensitiveTrace 敏感用戶數量]:" + rubbishList.size());
                        for (RubbishUsers rubbish : rubbishList) {
                            System.out.println(rubbish + rubbish.getSmsContent());
                        }
                    } else {
                        System.out.println("[SensitiveTrace 敏感用戶數量]:0");
                    }
                } catch (HibernateException e) {
                    e.printStackTrace();
                }
                
                try {
                    Thread.sleep(sensitiveMonitorTimeInterval * 1000);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }
        }
    }

    // 分佈式環境下面的要同步控制
    private synchronized void save(Tuple input) {
        Session session = sessionFactory.openSession();

        try {
            RubbishUsers users = new RubbishUsers();
            users.setUserId(Integer.parseInt(input
                    .getStringByField(RubbishUsers.USERID_COLUMNNAME)));
            users.setHomeCity(Integer.parseInt(input
                    .getStringByField(RubbishUsers.HOMECITY_COLUMNNAME)));
            users.setMsisdn(Integer.parseInt(input
                    .getStringByField(RubbishUsers.MSISDN_COLUMNNAME)));
            users.setSmsContent(input
                    .getStringByField(RubbishUsers.SMSCONTENT_COLUMNNAME));

            Predicate isSensitiveFileAnalysis = new SensitivePredicate(
                    (String) input.getStringByField(RubbishUsers.SMSCONTENT_COLUMNNAME));

            FilterIterator iterator = new FilterIterator(list.iterator(),isSensitiveFileAnalysis);

            if (iterator.hasNext()) {
                session.beginTransaction();
                // 入庫MySQL
                session.save(users);
                session.getTransaction().commit();
            }
        } catch (HibernateException e) {
            e.printStackTrace();
            session.getTransaction().rollback();
        } finally {
            session.close();
        }
    }

    // 不少狀況下面storm運行期執行報錯,都是因爲execute有異常致使的,重點觀察execute的函數邏輯
    // 最常常報錯的狀況是報告:ERROR backtype.storm.daemon.executor - java.lang.RuntimeException:java.lang.NullPointerException
    // backtype.storm.utils.DisruptorQueue.consumeBatchToCursor(DisruptorQueue.java ...)
    // 相似這樣的錯誤,有點莫名其妙,開始都運行的很正常,後面突然就報空指針異常了,我開始覺得是storm部署的問題,
    // 後面jstack跟蹤發現,主要仍是execute邏輯的問題,因此遇到這類的問題不要手忙腳亂,適當結合jstack跟蹤定位
    @Override
    public void execute(Tuple input, BasicOutputCollector collector) {
        save(input);
    }

    public Map<String, Object> getComponentConfiguration() {
        return null;
    }

    @Override
    public void prepare(Map stormConf, TopologyContext context) {
        final int sensitiveMonitorTimeInterval = Integer.parseInt(stormConf
                .get("RUBBISHMONITOR_INTERVAL").toString());

        SensitiveMonitorThread montor = new SensitiveMonitorThread(
                sensitiveMonitorTimeInterval);

        new Thread(montor).start();
    }

    @Override
    public void cleanup() {
        // TODO Auto-generated method stub
    }

    @Override
    public void declareOutputFields(OutputFieldsDeclarer arg0) {
        // TODO Auto-generated method stub
    }
}

  因爲是經過hibernate入庫到MySQL,因此給出hibernate配置,首先是:hibernate.cfg.xml

<?xml version="1.0" encoding="utf-8"?>
<!DOCTYPE hibernate-configuration PUBLIC "-//Hibernate/Hibernate Configuration DTD 3.0//EN"
"http://hibernate.sourceforge.net/hibernate-configuration-3.0.dtd">
<hibernate-configuration>
    <session-factory>
        <property name="hibernate.bytecode.use_reflection_optimizer">false</property>
        <property name="hibernate.dialect">org.hibernate.dialect.MySQLDialect</property>
        <property name="show_sql">true</property>
        <mapping resource="newlandframework/storm/resource/rubbish-users.hbm.xml"/>
    </session-factory>
</hibernate-configuration>

  對應的ORM映射配置文件rubbish-users.hbm.xml內容以下:

<?xml version="1.0"?>
<!DOCTYPE hibernate-mapping PUBLIC "-//Hibernate/Hibernate Mapping DTD 3.0//EN"
      "http://hibernate.sourceforge.net/hibernate-mapping-3.0.dtd">
<hibernate-mapping>
    <class name="newlandframework.storm.model.RubbishUsers" table="rubbish_users" catalog="ccs">
        <id name="userId" type="java.lang.Integer">
            <column name="user_id"/>
            <generator class="assigned"/>
        </id>
        <property name="homeCity" type="java.lang.Integer">
            <column name="home_city" not-null="true"/>
        </property>
        <property name="msisdn" type="java.lang.Integer">
            <column name="msisdn" not-null="true"/>
        </property>
        <property name="smsContent" type="java.lang.String">
            <column name="sms_content" not-null="true"/>
        </property>
    </class>
</hibernate-mapping>

  最後,仍是經過Spring把hibernate集成起來,數據庫鏈接池用的是:DBCP。對應的Spring配置文件jdbc-hibernate-bean.xml的內容以下:

<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xmlns:context="http://www.springframework.org/schema/context" xmlns:tx="http://www.springframework.org/schema/tx" xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans-3.0.xsd http://www.springframework.org/schema/context http://www.springframework.org/schema/context/spring-context-3.0.xsdhttp://www.springframework.org/schema/tx http://www.springframework.org/schema/tx/spring-tx-2.5.xsd" default-autowire="byType" default-lazy-init="false">
  <bean id="placeholder" class="org.springframework.beans.factory.config.PropertyPlaceholderConfigurer">
    <property name="locations">
      <list>
        <value>newlandframework/storm/resource/jdbc.properties</value>
      </list>
    </property>
  </bean>
  <bean id="dbcpDataSource" class="org.apache.commons.dbcp.BasicDataSource" destroy-method="close">
    <property name="driverClassName" value="${database.driverClassName}"/>
    <property name="url" value="${database.url}"/>
    <property name="username" value="${database.username}"/>
    <property name="password" value="${database.password}"/>
    <property name="maxActive" value="32"/>
    <property name="initialSize" value="1"/>
    <property name="maxWait" value="60000"/>
    <property name="maxIdle" value="32"/>
    <property name="minIdle" value="5"/>
    <property name="removeAbandoned" value="true"/>
    <property name="removeAbandonedTimeout" value="180"/>
    <property name="connectionProperties" value="bigStringTryClob=true;clientEncoding=GBK;defaultRowPrefetch=50;serverEncoding=ISO-8859-1"/>
    <property name="timeBetweenEvictionRunsMillis">
      <value>60000</value>
    </property>
    <property name="minEvictableIdleTimeMillis">
      <value>1800000</value>
    </property>
  </bean>
  <!-- hibernate session factory -->
  <bean id="sessionFactory" class="org.springframework.orm.hibernate3.LocalSessionFactoryBean">
    <property name="dataSource" ref="dbcpDataSource"/>
    <property name="configLocation" value="newlandframework/storm/resource/hibernate.cfg.xml"/>
    <property name="eventListeners">
      <map></map>
    </property>
    <property name="entityCacheStrategies">
      <props></props>
    </property>
    <property name="collectionCacheStrategies">
      <props></props>
    </property>
    <property name="configurationClass">
      <value>org.hibernate.cfg.AnnotationConfiguration</value>
    </property>
  </bean>
  <bean id="hibernateTemplete" class="org.springframework.orm.hibernate3.HibernateTemplate">
    <property name="sessionFactory" ref="sessionFactory"/>
  </bean>
</beans>

  到此爲止,咱們已經完成了敏感信息實時監控的全部的Storm組件的開發。如今,咱們來完成Storm的拓撲(Topology),因爲拓撲(Topology)又分爲本地拓撲和分佈式拓撲,所以封裝了一個工具類StormRunner(拓撲執行器),對應的代碼以下:

/**
 * @filename:StormRunner.java
 *
 * Newland Co. Ltd. All rights reserved.
 * 
 * @Description:拓撲執行器
 * @author tangjie
 * @version 1.0
 * 
 */

package newlandframework.storm.topology;

import backtype.storm.Config;
import backtype.storm.LocalCluster;
import backtype.storm.StormSubmitter;
import backtype.storm.generated.AlreadyAliveException;
import backtype.storm.generated.InvalidTopologyException;
import backtype.storm.generated.StormTopology;

public final class StormRunner {

    private static final int MILLIS_IN_SEC = 1000;

    // 本地拓撲 Storm用一個進程裏面的N個線程進行模擬
    public static void runTopologyLocally(StormTopology topology,
            String topologyName, Config conf, int runtimeInSeconds)
            throws InterruptedException {
        LocalCluster cluster = new LocalCluster();
        cluster.submitTopology(topologyName, conf, topology);
        Thread.sleep((long) runtimeInSeconds * MILLIS_IN_SEC);
        cluster.killTopology(topologyName);
        cluster.shutdown();
    }

    // 分佈式拓撲 真正的Storm集羣運行環境
    public static void runTopologyRemotely(StormTopology topology,
            String topologyName, Config conf) throws AlreadyAliveException,
            InvalidTopologyException {
        StormSubmitter.submitTopology(topologyName, conf, topology);
    }
}

  好了,如今咱們把上面全部的Spouts/Bolts拼接成「拓撲」(Topology)結構,咱們這裏用的是分佈式拓撲,來進行部署運行。具體的SensitiveTopology(敏感用戶監控Storm拓撲)代碼以下:

/**
 * @filename:SensitiveTopology.java
 *
 * Newland Co. Ltd. All rights reserved.
 * 
 * @Description:敏感用戶監控Storm拓撲
 * @author tangjie
 * @version 1.0
 * 
 */

package newlandframework.storm.topology;

import java.sql.SQLException;

import newlandframework.storm.bolt.SensitiveBatchBolt;
import newlandframework.storm.bolt.SensitiveFileAnalyzer;
import newlandframework.storm.model.RubbishUsers;
import newlandframework.storm.spout.SensitiveFileReader;

import org.apache.commons.lang.StringUtils;

import backtype.storm.Config;
import backtype.storm.generated.AlreadyAliveException;
import backtype.storm.generated.InvalidTopologyException;
import backtype.storm.topology.TopologyBuilder;
import backtype.storm.tuple.Fields;

public class SensitiveTopology {
    // Spout/Bolt的ID定義
    public static final String SensitiveSpoutFuZhou = "SensitiveSpout591";
    public static final String SensitiveSpoutXiaMen = "SensitiveSpout592";
    public static final String SensitiveBoltAnalysis = "SensitiveBoltAnalysis";
    public static final String SensitiveBoltPersistence = "SensitiveBolPersistence";

    public static void main(String[] args) throws SQLException {
        System.out.println(StringUtils.center("SensitiveTopology", 40, "*"));

        TopologyBuilder builder = new TopologyBuilder();

        // 構建spout,分別設置並行度爲2
        builder.setSpout(SensitiveSpoutFuZhou, new SensitiveFileReader(
                SensitiveFileReader.InputFuZhouPath), 2);
        builder.setSpout(SensitiveSpoutXiaMen, new SensitiveFileReader(
                SensitiveFileReader.InputXiaMenPath), 2);

        // 構建bolt設置並行度爲4
        builder.setBolt(SensitiveBoltAnalysis, new SensitiveFileAnalyzer(), 4)
                .shuffleGrouping(SensitiveSpoutFuZhou)
                .shuffleGrouping(SensitiveSpoutXiaMen);

        // 構建bolt設置並行度爲4
        SensitiveBatchBolt persistenceBolt = new SensitiveBatchBolt();

        builder.setBolt(SensitiveBoltPersistence, persistenceBolt, 4)
                .fieldsGrouping(
                        SensitiveBoltAnalysis,
                        new Fields(RubbishUsers.HOMECITY_COLUMNNAME,
                                RubbishUsers.USERID_COLUMNNAME,
                                RubbishUsers.MSISDN_COLUMNNAME));

        Config conf = new Config();
        conf.setDebug(true);
        // 設置worker,集羣裏面最大就8個slots了,所有使用上
        conf.setNumWorkers(8);
        // 3秒監控一次敏感信息入庫MySQL狀況
        conf.put("RUBBISHMONITOR_INTERVAL", 3);

        // 執行分佈式拓撲
        try {
            StormRunner.runTopologyRemotely(builder.createTopology(),"SensitiveTopology", conf);
        } catch (AlreadyAliveException e) {
            e.printStackTrace();
        } catch (InvalidTopologyException e) {
            e.printStackTrace();
        }
    }
}

  到此爲止,全部的Storm組件已經開發完畢!如今,咱們把上述工程打成jar包,放到Storm集羣中運行,具體能夠到Nimbus對應的Storm安裝目錄下面的bin目錄,輸入:storm jar + {jar路徑}。

  好比我這裏是輸入:storm jar /home/tj/install/SensitiveTopology.jar newlandframework.storm.topology.SensitiveTopology,而後,把疑似垃圾短信用戶的垃圾短信內容文件放到指定的服務器下面的目錄(/home/tj/data/59一、/home/tj/data/592),最後,打開剛纔的Storm UI,觀察任務的啓動執行狀況,這裏以下圖所示:

  

  能夠看到咱們剛纔提交的拓撲:SensitiveTopology已經成功提交到Storm集羣裏面了。這個時候,你能夠鼠標點擊SensitiveTopology,而後,會打開以下的一個Spouts/Bolts的監控界面,以下圖所示:

  

  咱們能夠很清楚的看到:Spouts組件(用戶短信採集器):SensitiveFileReader59一、SensitiveFileReader592的線程數executors、任務提交emitted狀況。以及Bolts組件:監控短信內容拆解分析器(SensitiveFileAnalyzer)、敏感信息採集處理(SensitiveBatchBolt)的運行狀況,這樣監控起來就很是方便。除此以外,咱們還能夠到對應的Supervisor服務器對應的Storm安裝目錄下面的logs目錄,查看一下worker的工做日誌,咱們來看下敏感信息監控過濾的處理狀況,截圖以下:

  

  經過SensitiveBatchBolt模塊的監控線程,能夠看到,咱們目前已經採集到了9個敏感信息用戶了,咱們再來看下,這些包含敏感關鍵字的用戶有沒有入庫MySQL成功呢?

  

  發現入庫的結果也是9個,和日誌打印的數量上是一致的。並且垃圾短信內容sms_content果真都包含了「racketeer」、「Bad」這些敏感關鍵字!徹底符合咱們的預期。並且,之後文件處理量上來了,咱們能夠經過調整設置Spouts/Bolts的並行度,和Worker的數量進行化解。固然,你還能夠經過水平擴展集羣的數量來解決這個問題。

  Storm在Apache開源項目的網址是:http://storm.apache.org/,有興趣的朋友能夠常常關注一下。官網上面有很權威的技術規範說明,以及如何把Storm和消息隊列、HDFS、HBase有效的集成起來。目前在國內,就我我的見解,對Storm分析應用,作得最好的應該算是阿里巴巴,它在原來Storm的基礎上加以改良,開源出JStorm,有興趣的朋友,一樣能夠多關注一下。

  藉助Storm,咱們能夠很輕鬆地開發分佈式實時處理應用,上述場景的設計,只是Storm應用的一個案例。相比傳統的單機服務器應用而言,集羣化的並行協同計算處理,是雲計算、大數據時代的一個趨勢,也是我從此努力學習的方向。故在此寫下,本身的學習經驗體會,拋磚引玉,有說的不對的地方,還請各位園友批評指正!不吝賜教!

相關文章
相關標籤/搜索