springboot 集成storm,計算日誌中的展現信息,將實時的計算數據存儲到redis中,並判斷redis中的數量信息進行下一步的操做,存儲到mysql中等node
1.配置redis參數,redis採用集羣模式,須要配置redis集羣mysql
spring:
redis:
database: 0
password:
cluster:
nodes:
- 127.0.0.1:6380
maxRedirects: 3
pool:
max-idle: 8
min-idle: 0
max-active: 8
max-wait: -1
2.redis配置類實現
@Configuration
@EnableConfigurationProperties(RedisProperties.class)
public class RedisConfig {
@Autowired
private JedisConnectionFactory jedisConnectionFactory;
@Bean("redisTemplate")
public RedisTemplate<?, ?> getRedisTemplate(){
RedisTemplate<?,?> template = new StringRedisTemplate(jedisConnectionFactory);
GenericJackson2JsonRedisSerializer genericJackson2JsonRedisSerializer = new GenericJackson2JsonRedisSerializer();
StringRedisSerializer stringRedisSerializer = new StringRedisSerializer();
template.setKeySerializer(stringRedisSerializer);
template.setValueSerializer(genericJackson2JsonRedisSerializer);
template.setHashKeySerializer(stringRedisSerializer);
template.setHashValueSerializer(genericJackson2JsonRedisSerializer);
return template;
}
}
public class RedisConfUtils {
/**
* {@link org.springframework.boot.autoconfigure.data.redis.RedisAutoConfiguration#}
* @param redisProperties
* @return
*/
public static RedisClusterConfiguration getClusterConfiguration(RedisProperties redisProperties) {
if (redisProperties.getCluster() == null) {
return null;
}
RedisProperties.Cluster clusterProperties = redisProperties.getCluster();
RedisClusterConfiguration config = new RedisClusterConfiguration(
clusterProperties.getNodes());
if (clusterProperties.getMaxRedirects() != null) {
config.setMaxRedirects(clusterProperties.getMaxRedirects());
}
return config;
}
public static RedisTemplate buildRedisTemplate(byte[] redisProperties){
JedisConnectionFactory jedisConnectionFactory = new JedisConnectionFactory(
RedisConfUtils.getClusterConfiguration(
(RedisProperties) Serializer.INSTANCE.deserialize(redisProperties)));
RedisTemplate<String, Long> redisTemplate = new RedisTemplate<>();
redisTemplate.setConnectionFactory(jedisConnectionFactory);
jedisConnectionFactory.afterPropertiesSet();
GenericJackson2JsonRedisSerializer genericJackson2JsonRedisSerializer = new GenericJackson2JsonRedisSerializer();
StringRedisSerializer stringRedisSerializer = new StringRedisSerializer();
redisTemplate.setKeySerializer(stringRedisSerializer);
redisTemplate.setValueSerializer(genericJackson2JsonRedisSerializer);
redisTemplate.setHashKeySerializer(stringRedisSerializer);
redisTemplate.setHashValueSerializer(genericJackson2JsonRedisSerializer);
redisTemplate.afterPropertiesSet();
return redisTemplate;
}
}
3.storm 日誌控制Builder配置redis信息,將redis信息傳遞到控制類中
@Getter
@Setter
@Configuration
@DependsOn("redisTemplate")
@ConfigurationProperties(prefix = "storm.bolt.logConsoleBolt")
public class LogConsoleBoltBuilder extends BoltBuilder {
@Autowired
private RedisProperties redisProperties;
private int emitFrequencyInSeconds = 60;//每60s發射一次數據
@Bean("logConsoleBolt")
public LogConsoleBolt buildBolt() {
super.setId("logConsoleBolt");
LogConsoleBolt logConsoleBolt = new LogConsoleBolt();
logConsoleBolt.setRedisProperties(Serializer.INSTANCE.serialize(redisProperties));
logConsoleBolt.setEmitFrequencyInSeconds(emitFrequencyInSeconds);
return logConsoleBolt;
}
}
4.storm 日誌控制類獲取實例化redis信息,將計算獲得的信息存儲到redis中
@Slf4jpublic class LogConsoleBolt extends BaseRichBolt { private final static String AD_LIST_SHOW_COUNT = "AD_LIST_SHOW_COUNT"; private OutputCollector collector; private HashOperations<String, String, Long> hashOperations; @Setter private byte[] redisProperties; @Setter private int emitFrequencyInSeconds; public void prepare(Map stormConf, TopologyContext context, OutputCollector collector) { this.collector=collector; this.hashOperations = RedisConfUtils.buildRedisTemplate(redisProperties).opsForHash(); } @Override public Map<String, Object> getComponentConfiguration() { Map<String, Object> conf = new HashMap<String, Object>(); /** * 這裏配置TickTuple的發送頻率 */ conf.put(Config.TOPOLOGY_TICK_TUPLE_FREQ_SECS, emitFrequencyInSeconds); return conf; } @Override public void execute(Tuple input) { try { log.info(input.toString());//判斷日誌類型,不是須要的日誌則不作處理 if (input.size()<5){ collector.ack(input); }else { String value = input.getStringByField("value").toString(); AdShowLogEntity adShowLogEntity = AdShowLogEntity.logToEntity(value); if (adShowLogEntity != null){ AdShowLogEntity.Message msg = adShowLogEntity.getMessage().get(0); // 輸出// collector.emit(new Values(LogAdIdKeyEnum.AD_LIST_TYPE.getPrefix()+msg.getCreativeId(), String.valueOf(1))); //存儲信息到redis Long cont = hashOperations.increment(AD_LIST_SHOW_COUNT, LogAdIdKeyEnum.AD_LIST_TYPE.getPrefix()+msg.getCreativeId(), 1l); collector.emit(new Values(Integer.parseInt(msg.getCreativeId()),System.currentTimeMillis(),0.01f)); }else {// collector.ack(input); } collector.ack(input);// System.out.println("received from kafka : "+ value); // 必須ack,不然會重複消費kafka中的消息 } }catch (Exception e){ e.printStackTrace(); collector.fail(input); } } @Override public void declareOutputFields(OutputFieldsDeclarer declarer) { declarer.declare(new Fields("adId","updateTime","price")); //分詞定義的field爲word }}