springboot 集成storm的redis

springboot 集成storm,計算日誌中的展現信息,將實時的計算數據存儲到redis中,並判斷redis中的數量信息進行下一步的操做,存儲到mysql中等node

1.配置redis參數,redis採用集羣模式,須要配置redis集羣mysql

spring:
redis:
database: 0
password:
cluster:
nodes:
- 127.0.0.1:6380
maxRedirects: 3
pool:
max-idle: 8
min-idle: 0
max-active: 8
max-wait: -1

2.redis配置類實現
@Configuration
@EnableConfigurationProperties(RedisProperties.class)
public class RedisConfig {

@Autowired
private JedisConnectionFactory jedisConnectionFactory;

@Bean("redisTemplate")
public RedisTemplate<?, ?> getRedisTemplate(){
RedisTemplate<?,?> template = new StringRedisTemplate(jedisConnectionFactory);
GenericJackson2JsonRedisSerializer genericJackson2JsonRedisSerializer = new GenericJackson2JsonRedisSerializer();
StringRedisSerializer stringRedisSerializer = new StringRedisSerializer();
template.setKeySerializer(stringRedisSerializer);
template.setValueSerializer(genericJackson2JsonRedisSerializer);
template.setHashKeySerializer(stringRedisSerializer);
template.setHashValueSerializer(genericJackson2JsonRedisSerializer);
return template;
}


}

public class RedisConfUtils {

/**
* {@link org.springframework.boot.autoconfigure.data.redis.RedisAutoConfiguration#}
* @param redisProperties
* @return
*/
public static RedisClusterConfiguration getClusterConfiguration(RedisProperties redisProperties) {
if (redisProperties.getCluster() == null) {
return null;
}
RedisProperties.Cluster clusterProperties = redisProperties.getCluster();
RedisClusterConfiguration config = new RedisClusterConfiguration(
clusterProperties.getNodes());

if (clusterProperties.getMaxRedirects() != null) {
config.setMaxRedirects(clusterProperties.getMaxRedirects());
}
return config;
}

public static RedisTemplate buildRedisTemplate(byte[] redisProperties){
JedisConnectionFactory jedisConnectionFactory = new JedisConnectionFactory(
RedisConfUtils.getClusterConfiguration(
(RedisProperties) Serializer.INSTANCE.deserialize(redisProperties)));
RedisTemplate<String, Long> redisTemplate = new RedisTemplate<>();
redisTemplate.setConnectionFactory(jedisConnectionFactory);
jedisConnectionFactory.afterPropertiesSet();

GenericJackson2JsonRedisSerializer genericJackson2JsonRedisSerializer = new GenericJackson2JsonRedisSerializer();
StringRedisSerializer stringRedisSerializer = new StringRedisSerializer();
redisTemplate.setKeySerializer(stringRedisSerializer);
redisTemplate.setValueSerializer(genericJackson2JsonRedisSerializer);
redisTemplate.setHashKeySerializer(stringRedisSerializer);
redisTemplate.setHashValueSerializer(genericJackson2JsonRedisSerializer);
redisTemplate.afterPropertiesSet();
return redisTemplate;
}
}
3.storm 日誌控制Builder配置redis信息,將redis信息傳遞到控制類中
@Getter
@Setter
@Configuration
@DependsOn("redisTemplate")
@ConfigurationProperties(prefix = "storm.bolt.logConsoleBolt")
public class LogConsoleBoltBuilder extends BoltBuilder {
@Autowired
private RedisProperties redisProperties;
private int emitFrequencyInSeconds = 60;//每60s發射一次數據

@Bean("logConsoleBolt")
public LogConsoleBolt buildBolt() {
super.setId("logConsoleBolt");
LogConsoleBolt logConsoleBolt = new LogConsoleBolt();
logConsoleBolt.setRedisProperties(Serializer.INSTANCE.serialize(redisProperties));
logConsoleBolt.setEmitFrequencyInSeconds(emitFrequencyInSeconds);
return logConsoleBolt;
}
}

4.storm 日誌控制類獲取實例化redis信息,將計算獲得的信息存儲到redis中
@Slf4jpublic class LogConsoleBolt extends BaseRichBolt {    private final static String AD_LIST_SHOW_COUNT = "AD_LIST_SHOW_COUNT";    private OutputCollector collector;    private HashOperations<String, String, Long> hashOperations;    @Setter    private byte[] redisProperties;    @Setter    private int emitFrequencyInSeconds;    public void prepare(Map stormConf, TopologyContext context, OutputCollector collector) {        this.collector=collector;        this.hashOperations = RedisConfUtils.buildRedisTemplate(redisProperties).opsForHash();    }    @Override    public Map<String, Object> getComponentConfiguration() {        Map<String, Object> conf = new HashMap<String, Object>();        /**         * 這裏配置TickTuple的發送頻率         */        conf.put(Config.TOPOLOGY_TICK_TUPLE_FREQ_SECS, emitFrequencyInSeconds);        return conf;    }    @Override    public void execute(Tuple input) {        try {            log.info(input.toString());//判斷日誌類型,不是須要的日誌則不作處理            if (input.size()<5){                collector.ack(input);            }else {                String value = input.getStringByField("value").toString();                AdShowLogEntity adShowLogEntity = AdShowLogEntity.logToEntity(value);                if (adShowLogEntity != null){                    AdShowLogEntity.Message msg = adShowLogEntity.getMessage().get(0);                    // 輸出//                collector.emit(new Values(LogAdIdKeyEnum.AD_LIST_TYPE.getPrefix()+msg.getCreativeId(), String.valueOf(1)));            //存儲信息到redis                    Long cont = hashOperations.increment(AD_LIST_SHOW_COUNT, LogAdIdKeyEnum.AD_LIST_TYPE.getPrefix()+msg.getCreativeId(), 1l);                    collector.emit(new Values(Integer.parseInt(msg.getCreativeId()),System.currentTimeMillis(),0.01f));                }else {//                collector.ack(input);                }                collector.ack(input);//            System.out.println("received from kafka : "+ value);                // 必須ack,不然會重複消費kafka中的消息            }        }catch (Exception e){            e.printStackTrace();            collector.fail(input);        }    }    @Override    public void declareOutputFields(OutputFieldsDeclarer declarer) {        declarer.declare(new Fields("adId","updateTime","price")); //分詞定義的field爲word    }}
相關文章
相關標籤/搜索