Flink提供了專門操做redis的Redis Sinkjava
<dependency> <groupId>org.apache.bahir</groupId> <artifactId>flink-connector-redis_2.11</artifactId> <version>1.0</version> </dependency>
Redis Sink 提供用於向Redis發送數據的接口的類。接收器能夠使用三種不一樣的方法與不一樣類型的Redis環境進行通訊:redis
類 | 場景 | 備註 |
---|---|---|
FlinkJedisPoolConfig | 單Redis服務器 | 適用於本地、測試場景 |
FlinkJedisClusterConfig | Redis集羣 | |
FlinkJedisSentinelConfig | Redis哨兵 |
Redis Sink 核心類是 RedisMappe 是一個接口,使用時咱們要編寫本身的redis操做類實現這個接口中的三個方法apache
public interface RedisMapper<T> extends Function, Serializable { /** * 設置使用的redis數據結構類型,和key的名詞 * 經過RedisCommand設置數據結構類型 * Returns descriptor which defines data type. * * @return data type descriptor */ RedisCommandDescription getCommandDescription(); /** * 設置value中的鍵值對 key的值 * Extracts key from data. * * @param data source data * @return key */ String getKeyFromData(T data); /** * 設置value中的鍵值對 value的值 * Extracts value from data. * * @param data source data * @return value */ String getValueFromData(T data); }
使用RedisCommand設置數據結構類型時和redis結構對應關係。bootstrap
Data Type | Redis Command [Sink] |
---|---|
HASH | HSET |
LIST | RPUSH, LPUSH |
SET | SADD |
PUBSUB | PUBLISH |
STRING | SET |
HYPER_LOG_LOG | PFADD |
SORTED_SET | ZADD |
SORTED_SET | ZREM |
public class RedisSinkTest { public static void main(String[] args) throws Exception{ StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime); env.enableCheckpointing(2000); env.getCheckpointConfig().setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE); //鏈接kafka Properties properties = new Properties(); properties.setProperty("bootstrap.servers", "127.0.0.1:9092"); FlinkKafkaConsumer<String> consumer = new FlinkKafkaConsumer<>("test", new SimpleStringSchema(), properties); consumer.setStartFromEarliest(); DataStream<String> stream = env.addSource(consumer); DataStream<Tuple2<String, Integer>> counts = stream.flatMap(new LineSplitter()).keyBy(0).sum(1); //實例化FlinkJedisPoolConfig 配置redis FlinkJedisPoolConfig conf = new FlinkJedisPoolConfig.Builder().setHost("127.0.0.1").setPort("6379").build(); //實例化RedisSink,並經過flink的addSink的方式將flink計算的結果插入到redis counts.addSink(new RedisSink<>(conf,new RedisSinkExample())); env.execute("WordCount From Kafka To Redis"); } public static final class LineSplitter implements FlatMapFunction<String, Tuple2<String, Integer>> { @Override public void flatMap(String value, Collector<Tuple2<String, Integer>> out) { String[] tokens = value.toLowerCase().split("\\W+"); for (String token : tokens) { if (token.length() > 0) { out.collect(new Tuple2<String, Integer>(token, 1)); } } } } //指定Redis set public static final class RedisSinkExample implements RedisMapper<Tuple2<String,Integer>> { public RedisCommandDescription getCommandDescription() { return new RedisCommandDescription(RedisCommand.SET, null); } public String getKeyFromData(Tuple2<String, Integer> data) { return data.f0; } public String getValueFromData(Tuple2<String, Integer> data) { return data.f1.toString(); } } }