public class BlackListBolt extends BaseRichBolt{ private static Logger logger = Logger.getLogger(BlackListBolt.class); private OutputCollector collector_; private Map<String,List<String>> blacklistMap_ = new ConcurrentHashMap<String,List<String>>(); //實現了從數據庫中獲取黑名單基礎數據的表,加載至內存做爲比阿孃blacklistMap_維護 nextTuple()在每一個tuple到達時被調用,這裏主要實現了車牌在黑名單中的比對。 public void prepare(Map stormConf,TopologyContext context,OutputCollector collector) { collector_ = collector; Connection con = null; Statement jjhmd_statement = null; ResultSet jjhmd_resultSet = null; String jjhmd_queryString = "select ID,CPHID,SFSSBJ,SFCL from JJHMD"; try{ con = DBUtil.getDataSource().getConnection(); jjhmd_statement = con.createStatement(); jjhmd_resultSet = jjhmd_statement.executeQuery(jjhmd_queryString); while(jjhmd_resultSet.next()){ String jjhmd_cphid = jjhmd_resultSet.getString("CPHID"); String jjhmd_sfssbj = jjhmd_resultSet.getString("SFSSBJ"); String jjhmd_SFCL = jjhmd_resultSet.getString("SFCL"); String jjhmd_id = jjhmd_resultSet.getString("ID"); List<String> temp_info = new ArrayList<String>(); temp_info.add(jjhmd_sfssbj); temp_info.add(jjhmd_SFCL); temp_info.add(jjhmd_id); blacklistMap_.put(jjhmd_cphid,temp_info); } jjhmd_resultSet.close(); jjhmd_statement.close(); }catch(SQLException e){ e.printStackTrace(); }finally{ if(con!=null){ try{ con.close(); }catch(SQLException e){ logger.warn("",e); } } } } public void execute(Tuple tuple){ String no = tuple.getStringByField("no"); String location = tuple.getStringByFiled("location"); String tpid = tuple.getStringByField(tpidl:); try{ if(blacklistMap_.containsKey(no)){ List<String>temp_info = blacklistMap_.get(no); if(temp_info.get(1).equals("否")){ String msg = convertToMsg(tuple); conllector_.emit(new Values(msg)); } } }catch(Excetption e){ logger.error(e.getMessage()); }finally{ } } …… }
public class BlackListTopology{ //topicSpout接收來自JMS消息中間件的主題數據,且不設置並行度(這是由topic在JMS協議中的語義決定的) public static final String TOPIC_SPOUT = "topic_spout"; //以隨機分組的方式接收來自JmsSpout的數據,並行度被設置爲2. public static final String BLACKLIST_BOLT = "blacklist_bolt"; //下面這兩個均以隨機分組的方式接收來自BlackListBolt的數據,分別向消息中間件和數據庫寫入計算的結果數據. public static final String TOPIC_BOLT = "topic_bolt"; public static final String DB_BOLT = "db_bolt"; public static void main(String[] args) throws Exception{ ConfigUtil cu = ConfigUtil.getInstance(); JmsProvider jmsTopicProvbider_source = new ActiveMQProvider("failover:(tcp://"+cu.getMessage_ip()+":"+cu.getMessage_port+")",cu.getMessage_sb_topic(),"",""); //消息中間件的IP地址、端口和主題名稱,都是在配置文件中維護的,此處經過ConfigUtil對象從配置文件中獲取的。 JmsSpout topicSpout = new JmsSpout(); topicSpout.setJmsProvider(jmsTopicProvider_source); topicSpout.setJmsTupleProducer(new SB_Beijing_TupleProducer()); topicSpout.setJmsAcknowledgeMode(Session.AUTO_ACKNOWLEDGE); topicSpout.setDistributed(false); JmsProvider jmsTopicProvider_target = new ActiveMQProvider("failover:(tcp://"+cu.getMessage_ip()+":"+cu.getMessage_port()+")",cu.getMessage_ijhmdbj_topic(),"","") JmsBolt topicBolt = new JmsBolt(); topicBolt.setJmsProvider(jmsTopicProvider_target); topicBolt.setJmsMessageProducer(new JsonMessageProducer()); topicBolt.setJmsAcknowledgeMode(Session.AUTO_ACKNOWLEDGE); TopologyBuilder builder = new ToplogyBuilder(); builder.setSpout(TOPIC_SPOUT,topicSpout; builder.setBolt(BLACKLIST_BOLT,new BlackListBolt(),2).shuffleGrouping(TOPIC_SPOUT); builder.setBolt(TOPIC_BOLT,topicBolt,1).shuffleGrouping(BLACKLIST_BOLT); RegisterBlackCarBolt dbBolt = new RegisterBlackCarBolt(); builder.setBolt(DB_BOLT,dbBolt,1).shuffleGrouping(BLACKLIST_BOLT); Config conf = new Config(); conf.setNumWorkers(2) if(args.length >0){ conf.setDebug(false); StormSubmitter.submitTopology(args[0],conf,builder.createTopology()); }else{ conf.setDebug(true); LocalCluster cluster = new LocalCluster(); cluster.submitTopology("storm-traffic-blcaklist",conf,builder.createTopology()); Utils.sleep(6000000); cluster.killTopology("storm-traffic-blacklist"); cluster.shutdown(); } } }
topicBolt是類JmsBolt的對象,它以隨機分組的方式,也接受來自BlackListBolt的數據,即黑名單檢索的即時結果,而後向消息中間件寫入計算的結果數據 public class JmsBolt extends BaseRichBolt{ private static Logger LOG = LoggerFactory.getLogger(JmsBolt.class); private Connection connection; …… public void prepare(Map stormConf,TopologyContext context,OutputCollector collector){ if(this.jmsProvider == null || this.producer == null){ throw new IllegalStateException("JMS Provider and MessageProducer not set."); } this.collector = collector; try{ ConnectionFactory cf = this.jmsProvider.connectionFactory(); Destination dest = this.jmsProvider.destination(); this.connection = cf.createConnection(); this.session = connection.createSeesion(this.jmsTransactional,this.jmsAcknowledgeMode); this.messageProducer = session.createProducer(dest); connection.start(); } catch(Exception e){ LOG.warn("Error creating JMS connection.",e); } } public void execute(Tuple input){ try{ Message msg = this.producer.toMessage(this.session,input); if(msg!=null){ if(msg.getJMSDestination()!=null){ this.messageProducer.sen(msg.getJMSDestination(),msg); }else{ this.messageProducer.send(msg); } } if(this.autoAck){ LOG.debug("ACKing tuple:"+input); this.collector.ack(intput); } }catch(JMSException e){ LOG.warn("Failing tuple:" + input + "Exception:" + e); this.collector.fail(input); } } …… }
JmsSpout類繼承了BaseRichSpout類並實現了MessageListener接口。做爲JMS的客戶端,JmsSpout實現了MessageListener接口,這裏分析一下該接口聲明的方法onMessage().方法onMessage()在Jms消息中間件向它推送一個消息時這裏的實現是將獲得的消息放入緩存隊列queue對象中. public class JmsSpout extends BaseRichSpout implements MessageListener{ private static final Logger LOG = LoggerFactory.getLogger(JmsSpout.class); private LinkedBlockingQueue<Message>queue; private ConcurrentHashMap<String,Message>pendingMessages; …… public void onMessage(Message msg){ try{ LOG.debug("Queuing msg ["+msg.getJMSMessageID()+"]"); }catch(JMSException e){ } this.queue.offer(msg); } //從queue對象獲取數據,組織爲tuple結構後發送(emit); public void nextTuple(){ Message msg = this.queue.poll(); if(msg == null){ Utils.sleep(50); }else{ LOG.debug("sending tuple:"+msg); try{ Values vals = this.tupleProducer.toTuple(msg); if(this.isDurableSubscription()||(msg.getJMSDeliveryMode()!=Session.AUTO_ACKNOWLEDGE)){ LOG.debug("Requesting acks."); this.collector.emit(vals,msg.getJMSMessageID()); this.pendingMessages.put(msg.getJMSMessageID(),msg); }else{ this.collector.emit(vals); }catch(JMSException e){ LOG.warn("Unable to convert JMS message:"+msg); } } } //在tuple須要被確認處理成功時調用,這裏的實現是從中間結果隊列pendingMessages移除相應數據項,並對這條消息調用JMS的方法acknowledge()進行確認. public void ack(Object msgId){ Message msg = this.pendingMessage.remove(msgId); if(msg!=null){ try{ msg.acknowledge(); LOG.debug("JMS Message Scked:"+msgId); }catch(JMSException e){ LOG.warn("Error acknowldging JMS message:" + msgId,e); } }else{ LOG.warn("Couldn't acknowledge unknown JMS message ID:"+msgId); } } //在tuple須要被確認處理失敗時調用,這裏的實現是從中間結果隊列pendingMessages移除相應數據項,並設置存在失敗的標誌位. public void fail(Object msgId){ LOG.warn("Message failed:" + msgId); this.pendingMessages.remove(msgId); synchronized(this.recoveryMutex);{ this.hasFailures = true; } } } …… }
//dbBolt是類RegisterBlackCarBolt的對象,它以隨機分組的方式,接受來自BlackListBolt的數據,也即黑名單檢索的即時結果,而後向數據庫寫入計算的結果數據。 public class RegisterBlackCarBolt implements IBasicBolt{ private static Logger log = Logger.getLogger(RegisterBlackCarBolt.class); private Connection con = null; private String tableName = "JJHMDBJ"; private void prepare(Map stormConf,TopologyContext context){ try{ con = DBUtil.getDataSource().getConnection(); }catch(SQLException el){ el.printStackTrace(); } } public void execute(Tuple input,BasicOutputCollector collector){ String json = (String)input.getValue(0); String[] tupleStrs = json.split(","); try{ String stmt = "insert into "+tableName+"("+TPID+","+JCDID+","+HMDID+","+CPHID+","+LRSJ+","+primaryKey+","+FQH+") values(?,?,?,?,?,?,?)"; PreparedStatment prepstmt = con.prepareStatement(stmt); if(tupleStrs.length==5){ prepstmt.setString(1,tupleStrs[0]); prepstmt.setString(2,tupleStrs[1]); prepstmt.setString(3,tupleStrs[2]); prepstmt.setString(4,tupleStrs[3]); prepstmt.setTimestamp(5,new Timestamp((TimeUtil.string2datetime(tupleStrs[4])).getTime())); prepstmt.setInt(6,1); prepstmt.setInt(7,getPartNO(tupleStrs[4])); }else{ log.error("tupple attribte size error!"); } int r = prepstmt.executeUpdate(); log.info("insert"+r+" row"); }catch(Exception e){ e.printStackTrace(); } } …… }