Storm 系列(七)—— Storm 集成 Redis 詳解

1、簡介

Storm-Redis 提供了 Storm 與 Redis 的集成支持,你只須要引入對應的依賴便可使用:html

<dependency>
    <groupId>org.apache.storm</groupId>
    <artifactId>storm-redis</artifactId>
    <version>${storm.version}</version>
    <type>jar</type>
</dependency>

Storm-Redis 使用 Jedis 爲 Redis 客戶端,並提供了以下三個基本的 Bolt 實現:java

  • RedisLookupBolt:從 Redis 中查詢數據;
  • RedisStoreBolt:存儲數據到 Redis;
  • RedisFilterBolt : 查詢符合條件的數據;

RedisLookupBoltRedisStoreBoltRedisFilterBolt 均繼承自 AbstractRedisBolt 抽象類。咱們能夠經過繼承該抽象類,實現自定義 RedisBolt,進行功能的拓展。git

2、集成案例

2.1 項目結構

這裏首先給出一個集成案例:進行詞頻統計並將最後的結果存儲到 Redis。項目結構以下:github

用例源碼下載地址:storm-redis-integrationredis

2.2 項目依賴

項目主要依賴以下:shell

<properties>
    <storm.version>1.2.2</storm.version>
</properties>

<dependencies>
    <dependency>
        <groupId>org.apache.storm</groupId>
        <artifactId>storm-core</artifactId>
        <version>${storm.version}</version>
    </dependency>
    <dependency>
        <groupId>org.apache.storm</groupId>
        <artifactId>storm-redis</artifactId>
        <version>${storm.version}</version>
    </dependency>
</dependencies>

2.3 DataSourceSpout

/**
 * 產生詞頻樣本的數據源
 */
public class DataSourceSpout extends BaseRichSpout {

    private List<String> list = Arrays.asList("Spark", "Hadoop", "HBase", "Storm", "Flink", "Hive");

    private SpoutOutputCollector spoutOutputCollector;

    @Override
    public void open(Map map, TopologyContext topologyContext, SpoutOutputCollector spoutOutputCollector) {
        this.spoutOutputCollector = spoutOutputCollector;
    }

    @Override
    public void nextTuple() {
        // 模擬產生數據
        String lineData = productData();
        spoutOutputCollector.emit(new Values(lineData));
        Utils.sleep(1000);
    }

    @Override
    public void declareOutputFields(OutputFieldsDeclarer outputFieldsDeclarer) {
        outputFieldsDeclarer.declare(new Fields("line"));
    }


    /**
     * 模擬數據
     */
    private String productData() {
        Collections.shuffle(list);
        Random random = new Random();
        int endIndex = random.nextInt(list.size()) % (list.size()) + 1;
        return StringUtils.join(list.toArray(), "\t", 0, endIndex);
    }

}

產生的模擬數據格式以下:apache

Spark   HBase
Hive    Flink   Storm   Hadoop  HBase   Spark
Flink
HBase   Storm
HBase   Hadoop  Hive    Flink
HBase   Flink   Hive    Storm
Hive    Flink   Hadoop
HBase   Hive
Hadoop  Spark   HBase   Storm

2.4 SplitBolt

/**
 * 將每行數據按照指定分隔符進行拆分
 */
public class SplitBolt extends BaseRichBolt {

    private OutputCollector collector;

    @Override
    public void prepare(Map stormConf, TopologyContext context, OutputCollector collector) {
        this.collector = collector;
    }

    @Override
    public void execute(Tuple input) {
        String line = input.getStringByField("line");
        String[] words = line.split("\t");
        for (String word : words) {
            collector.emit(new Values(word, String.valueOf(1)));
        }
    }

    @Override
    public void declareOutputFields(OutputFieldsDeclarer declarer) {
        declarer.declare(new Fields("word", "count"));
    }
}

2.5 CountBolt

/**
 * 進行詞頻統計
 */
public class CountBolt extends BaseRichBolt {

    private Map<String, Integer> counts = new HashMap<>();

    private OutputCollector collector;


    @Override
    public void prepare(Map stormConf, TopologyContext context, OutputCollector collector) {
            this.collector=collector;
    }

    @Override
    public void execute(Tuple input) {
        String word = input.getStringByField("word");
        Integer count = counts.get(word);
        if (count == null) {
            count = 0;
        }
        count++;
        counts.put(word, count);
        // 輸出
        collector.emit(new Values(word, String.valueOf(count)));

    }

    @Override
    public void declareOutputFields(OutputFieldsDeclarer declarer) {
        declarer.declare(new Fields("word", "count"));
    }
}

2.6 WordCountStoreMapper

實現 RedisStoreMapper 接口,定義 tuple 與 Redis 中數據的映射關係:即須要指定 tuple 中的哪一個字段爲 key,哪一個字段爲 value,而且存儲到 Redis 的何種數據結構中。服務器

/**
 * 定義 tuple 與 Redis 中數據的映射關係
 */
public class  WordCountStoreMapper implements RedisStoreMapper {
    private RedisDataTypeDescription description;
    private final String hashKey = "wordCount";

    public WordCountStoreMapper() {
        description = new RedisDataTypeDescription(
                RedisDataTypeDescription.RedisDataType.HASH, hashKey);
    }

    @Override
    public RedisDataTypeDescription getDataTypeDescription() {
        return description;
    }

    @Override
    public String getKeyFromTuple(ITuple tuple) {
        return tuple.getStringByField("word");
    }

    @Override
    public String getValueFromTuple(ITuple tuple) {
        return tuple.getStringByField("count");
    }
}

2.7 WordCountToRedisApp

/**
 * 進行詞頻統計 並將統計結果存儲到 Redis 中
 */
public class WordCountToRedisApp {

    private static final String DATA_SOURCE_SPOUT = "dataSourceSpout";
    private static final String SPLIT_BOLT = "splitBolt";
    private static final String COUNT_BOLT = "countBolt";
    private static final String STORE_BOLT = "storeBolt";

    //在實際開發中這些參數能夠將經過外部傳入 使得程序更加靈活
    private static final String REDIS_HOST = "192.168.200.226";
    private static final int REDIS_PORT = 6379;

    public static void main(String[] args) {
        TopologyBuilder builder = new TopologyBuilder();
        builder.setSpout(DATA_SOURCE_SPOUT, new DataSourceSpout());
        // split
        builder.setBolt(SPLIT_BOLT, new SplitBolt()).shuffleGrouping(DATA_SOURCE_SPOUT);
        // count
        builder.setBolt(COUNT_BOLT, new CountBolt()).shuffleGrouping(SPLIT_BOLT);
        // save to redis
        JedisPoolConfig poolConfig = new JedisPoolConfig.Builder()
                .setHost(REDIS_HOST).setPort(REDIS_PORT).build();
        RedisStoreMapper storeMapper = new WordCountStoreMapper();
        RedisStoreBolt storeBolt = new RedisStoreBolt(poolConfig, storeMapper);
        builder.setBolt(STORE_BOLT, storeBolt).shuffleGrouping(COUNT_BOLT);

        // 若是外部傳參 cluster 則表明線上環境啓動不然表明本地啓動
        if (args.length > 0 && args[0].equals("cluster")) {
            try {
                StormSubmitter.submitTopology("ClusterWordCountToRedisApp", new Config(), builder.createTopology());
            } catch (AlreadyAliveException | InvalidTopologyException | AuthorizationException e) {
                e.printStackTrace();
            }
        } else {
            LocalCluster cluster = new LocalCluster();
            cluster.submitTopology("LocalWordCountToRedisApp",
                    new Config(), builder.createTopology());
        }
    }
}

2.8 啓動測試

能夠用直接使用本地模式運行,也能夠打包後提交到服務器集羣運行。本倉庫提供的源碼默認採用 maven-shade-plugin 進行打包,打包命令以下:數據結構

# mvn clean package -D maven.test.skip=true

啓動後,查看 Redis 中的數據:併發

3、storm-redis 實現原理

3.1 AbstractRedisBolt

RedisLookupBoltRedisStoreBoltRedisFilterBolt 均繼承自 AbstractRedisBolt 抽象類,和咱們自定義實現 Bolt 同樣,AbstractRedisBolt 間接繼承自 BaseRichBolt

AbstractRedisBolt 中比較重要的是 prepare 方法,在該方法中經過外部傳入的 jedis 鏈接池配置 ( jedisPoolConfig/jedisClusterConfig) 建立用於管理 Jedis 實例的容器 JedisCommandsInstanceContainer

public abstract class AbstractRedisBolt extends BaseTickTupleAwareRichBolt {
    protected OutputCollector collector;

    private transient JedisCommandsInstanceContainer container;

    private JedisPoolConfig jedisPoolConfig;
    private JedisClusterConfig jedisClusterConfig;

   ......
   
    @Override
    public void prepare(Map map, TopologyContext topologyContext, OutputCollector collector) {
        // FIXME: stores map (stormConf), topologyContext and expose these to derived classes
        this.collector = collector;

        if (jedisPoolConfig != null) {
            this.container = JedisCommandsContainerBuilder.build(jedisPoolConfig);
        } else if (jedisClusterConfig != null) {
            this.container = JedisCommandsContainerBuilder.build(jedisClusterConfig);
        } else {
            throw new IllegalArgumentException("Jedis configuration not found");
        }
    }

  .......
}

JedisCommandsInstanceContainerbuild() 方法以下,實際上就是建立 JedisPool 或 JedisCluster 並傳入容器中。

public static JedisCommandsInstanceContainer build(JedisPoolConfig config) {
        JedisPool jedisPool = new JedisPool(DEFAULT_POOL_CONFIG, config.getHost(), config.getPort(), config.getTimeout(), config.getPassword(), config.getDatabase());
        return new JedisContainer(jedisPool);
    }

 public static JedisCommandsInstanceContainer build(JedisClusterConfig config) {
        JedisCluster jedisCluster = new JedisCluster(config.getNodes(), config.getTimeout(), config.getTimeout(), config.getMaxRedirections(), config.getPassword(), DEFAULT_POOL_CONFIG);
        return new JedisClusterContainer(jedisCluster);
    }

3.2 RedisStoreBolt和RedisLookupBolt

RedisStoreBolt 中比較重要的是 process 方法,該方法主要從 storeMapper 中獲取傳入 key/value 的值,並按照其存儲類型 dataType 調用 jedisCommand 的對應方法進行存儲。

RedisLookupBolt 的實現基本相似,從 lookupMapper 中獲取傳入的 key 值,並進行查詢操做。

public class RedisStoreBolt extends AbstractRedisBolt {
    private final RedisStoreMapper storeMapper;
    private final RedisDataTypeDescription.RedisDataType dataType;
    private final String additionalKey;

   public RedisStoreBolt(JedisPoolConfig config, RedisStoreMapper storeMapper) {
        super(config);
        this.storeMapper = storeMapper;

        RedisDataTypeDescription dataTypeDescription = storeMapper.getDataTypeDescription();
        this.dataType = dataTypeDescription.getDataType();
        this.additionalKey = dataTypeDescription.getAdditionalKey();
    }

    public RedisStoreBolt(JedisClusterConfig config, RedisStoreMapper storeMapper) {
        super(config);
        this.storeMapper = storeMapper;

        RedisDataTypeDescription dataTypeDescription = storeMapper.getDataTypeDescription();
        this.dataType = dataTypeDescription.getDataType();
        this.additionalKey = dataTypeDescription.getAdditionalKey();
    }
       
  
    @Override
    public void process(Tuple input) {
        String key = storeMapper.getKeyFromTuple(input);
        String value = storeMapper.getValueFromTuple(input);

        JedisCommands jedisCommand = null;
        try {
            jedisCommand = getInstance();

            switch (dataType) {
                case STRING:
                    jedisCommand.set(key, value);
                    break;

                case LIST:
                    jedisCommand.rpush(key, value);
                    break;

                case HASH:
                    jedisCommand.hset(additionalKey, key, value);
                    break;

                case SET:
                    jedisCommand.sadd(key, value);
                    break;

                case SORTED_SET:
                    jedisCommand.zadd(additionalKey, Double.valueOf(value), key);
                    break;

                case HYPER_LOG_LOG:
                    jedisCommand.pfadd(key, value);
                    break;

                case GEO:
                    String[] array = value.split(":");
                    if (array.length != 2) {
                        throw new IllegalArgumentException("value structure should be longitude:latitude");
                    }

                    double longitude = Double.valueOf(array[0]);
                    double latitude = Double.valueOf(array[1]);
                    jedisCommand.geoadd(additionalKey, longitude, latitude, key);
                    break;

                default:
                    throw new IllegalArgumentException("Cannot process such data type: " + dataType);
            }

            collector.ack(input);
        } catch (Exception e) {
            this.collector.reportError(e);
            this.collector.fail(input);
        } finally {
            returnInstance(jedisCommand);
        }
    }

     .........
}

3.3 JedisCommands

JedisCommands 接口中定義了全部的 Redis 客戶端命令,它有如下三個實現類,分別是 Jedis、JedisCluster、ShardedJedis。Strom 中主要使用前兩種實現類,具體調用哪個實現類來執行命令,由傳入的是 jedisPoolConfig 仍是 jedisClusterConfig 來決定。

3.4 RedisMapper 和 TupleMapper

RedisMapper 和 TupleMapper 定義了 tuple 和 Redis 中的數據如何進行映射轉換。

1. TupleMapper

TupleMapper 主要定義了兩個方法:

  • getKeyFromTuple(ITuple tuple): 從 tuple 中獲取那個字段做爲 Key;

  • getValueFromTuple(ITuple tuple):從 tuple 中獲取那個字段做爲 Value;

2. RedisMapper

定義了獲取數據類型的方法 getDataTypeDescription(),RedisDataTypeDescription 中 RedisDataType 枚舉類定義了全部可用的 Redis 數據類型:

public class RedisDataTypeDescription implements Serializable { 

    public enum RedisDataType { STRING, HASH, LIST, SET, SORTED_SET, HYPER_LOG_LOG, GEO }
     ......
    }

3. RedisStoreMapper

RedisStoreMapper 繼承 TupleMapper 和 RedisMapper 接口,用於數據存儲時,沒有定義額外方法。

4. RedisLookupMapper

RedisLookupMapper 繼承 TupleMapper 和 RedisMapper 接口:

  • 定義了 declareOutputFields 方法,聲明輸出的字段。
  • 定義了 toTuple 方法,將查詢結果組裝爲 Storm 的 Values 的集合,並用於發送。

下面的例子表示從輸入 Tuple 的獲取 word 字段做爲 key,使用 RedisLookupBolt 進行查詢後,將 key 和查詢結果 value 組裝爲 values 併發送到下一個處理單元。

class WordCountRedisLookupMapper implements RedisLookupMapper {
    private RedisDataTypeDescription description;
    private final String hashKey = "wordCount";

    public WordCountRedisLookupMapper() {
        description = new RedisDataTypeDescription(
                RedisDataTypeDescription.RedisDataType.HASH, hashKey);
    }

    @Override
    public List<Values> toTuple(ITuple input, Object value) {
        String member = getKeyFromTuple(input);
        List<Values> values = Lists.newArrayList();
        values.add(new Values(member, value));
        return values;
    }

    @Override
    public void declareOutputFields(OutputFieldsDeclarer declarer) {
        declarer.declare(new Fields("wordName", "count"));
    }

    @Override
    public RedisDataTypeDescription getDataTypeDescription() {
        return description;
    }

    @Override
    public String getKeyFromTuple(ITuple tuple) {
        return tuple.getStringByField("word");
    }

    @Override
    public String getValueFromTuple(ITuple tuple) {
        return null;
    }
}

5. RedisFilterMapper

RedisFilterMapper 繼承 TupleMapper 和 RedisMapper 接口,用於查詢數據時,定義了 declareOutputFields 方法,聲明輸出的字段。以下面的實現:

@Override
public void declareOutputFields(OutputFieldsDeclarer declarer) {
    declarer.declare(new Fields("wordName", "count"));
}

4、自定義RedisBolt實現詞頻統計

4.1 實現原理

自定義 RedisBolt:主要利用 Redis 中哈希結構的 hincrby key field 命令進行詞頻統計。在 Redis 中 hincrby 的執行效果以下。hincrby 能夠將字段按照指定的值進行遞增,若是該字段不存在的話,還會新建該字段,並賦值爲 0。經過這個命令能夠很是輕鬆的實現詞頻統計功能。

redis>  HSET myhash field 5
(integer) 1
redis>  HINCRBY myhash field 1
(integer) 6
redis>  HINCRBY myhash field -1
(integer) 5
redis>  HINCRBY myhash field -10
(integer) -5
redis>

4.2 項目結構

4.3 自定義RedisBolt的代碼實現

/**
 * 自定義 RedisBolt 利用 Redis 的哈希數據結構的 hincrby key field 命令進行詞頻統計
 */
public class RedisCountStoreBolt extends AbstractRedisBolt {

    private final RedisStoreMapper storeMapper;
    private final RedisDataTypeDescription.RedisDataType dataType;
    private final String additionalKey;

    public RedisCountStoreBolt(JedisPoolConfig config, RedisStoreMapper storeMapper) {
        super(config);
        this.storeMapper = storeMapper;
        RedisDataTypeDescription dataTypeDescription = storeMapper.getDataTypeDescription();
        this.dataType = dataTypeDescription.getDataType();
        this.additionalKey = dataTypeDescription.getAdditionalKey();
    }

    @Override
    protected void process(Tuple tuple) {
        String key = storeMapper.getKeyFromTuple(tuple);
        String value = storeMapper.getValueFromTuple(tuple);

        JedisCommands jedisCommand = null;
        try {
            jedisCommand = getInstance();
            if (dataType == RedisDataTypeDescription.RedisDataType.HASH) {
                jedisCommand.hincrBy(additionalKey, key, Long.valueOf(value));
            } else {
                throw new IllegalArgumentException("Cannot process such data type for Count: " + dataType);
            }

            collector.ack(tuple);
        } catch (Exception e) {
            this.collector.reportError(e);
            this.collector.fail(tuple);
        } finally {
            returnInstance(jedisCommand);
        }
    }

    @Override
    public void declareOutputFields(OutputFieldsDeclarer declarer) {

    }
}

4.4 CustomRedisCountApp

/**
 * 利用自定義的 RedisBolt 實現詞頻統計
 */
public class CustomRedisCountApp {

    private static final String DATA_SOURCE_SPOUT = "dataSourceSpout";
    private static final String SPLIT_BOLT = "splitBolt";
    private static final String STORE_BOLT = "storeBolt";

    private static final String REDIS_HOST = "192.168.200.226";
    private static final int REDIS_PORT = 6379;

    public static void main(String[] args) {
        TopologyBuilder builder = new TopologyBuilder();
        builder.setSpout(DATA_SOURCE_SPOUT, new DataSourceSpout());
        // split
        builder.setBolt(SPLIT_BOLT, new SplitBolt()).shuffleGrouping(DATA_SOURCE_SPOUT);
        // save to redis and count
        JedisPoolConfig poolConfig = new JedisPoolConfig.Builder()
                .setHost(REDIS_HOST).setPort(REDIS_PORT).build();
        RedisStoreMapper storeMapper = new WordCountStoreMapper();
        RedisCountStoreBolt countStoreBolt = new RedisCountStoreBolt(poolConfig, storeMapper);
        builder.setBolt(STORE_BOLT, countStoreBolt).shuffleGrouping(SPLIT_BOLT);

        // 若是外部傳參 cluster 則表明線上環境啓動,不然表明本地啓動
        if (args.length > 0 && args[0].equals("cluster")) {
            try {
                StormSubmitter.submitTopology("ClusterCustomRedisCountApp", new Config(), builder.createTopology());
            } catch (AlreadyAliveException | InvalidTopologyException | AuthorizationException e) {
                e.printStackTrace();
            }
        } else {
            LocalCluster cluster = new LocalCluster();
            cluster.submitTopology("LocalCustomRedisCountApp",
                    new Config(), builder.createTopology());
        }
    }
}

參考資料

  1. Storm Redis Integration

更多大數據系列文章能夠參見 GitHub 開源項目大數據入門指南

相關文章
相關標籤/搜索