聊聊flink的Broadcast State

本文主要研究一下flink的Broadcast Statehtml

實例

@Test
    public void testBroadcastState() throws Exception {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        DataStreamSource<String> originStream = env.addSource(new RandomWordSource());

        MapStateDescriptor<String, String> descriptor = new MapStateDescriptor("dynamicConfig", BasicTypeInfo.STRING_TYPE_INFO, BasicTypeInfo.STRING_TYPE_INFO);
        BroadcastStream<Tuple2<String,String>> configStream = env.addSource(new DynamicConfigSource()).broadcast(descriptor);

        BroadcastConnectedStream<String, Tuple2<String,String>> connectStream = originStream.connect(configStream);
        connectStream.process(new BroadcastProcessFunction<String, Tuple2<String,String>, Void>() {
            @Override
            public void processElement(String value, ReadOnlyContext ctx, Collector<Void> out) throws Exception {
                ReadOnlyBroadcastState<String,String> config = ctx.getBroadcastState(descriptor);
                String configValue = config.get("demoConfigKey");
                //do some process base on the config
                LOGGER.info("process value:{},config:{}",value,configValue);
            }

            @Override
            public void processBroadcastElement(Tuple2<String, String> value, Context ctx, Collector<Void> out) throws Exception {
                LOGGER.info("receive config item:{}",value);
                //update state
                ctx.getBroadcastState(descriptor).put(value.getField(0),value.getField(1));
            }
        });

        env.execute("testBroadcastState");
    }

public class DynamicConfigSource implements SourceFunction<Tuple2<String,String>> {

    private volatile boolean isRunning = true;

    @Override
    public void run(SourceContext<Tuple2<String, String>> ctx) throws Exception {
        long idx = 1;
        while (isRunning){
            ctx.collect(Tuple2.of("demoConfigKey","value" + idx));
            idx++;
            TimeUnit.SECONDS.sleep(10);
        }
    }

    @Override
    public void cancel() {
        isRunning = false;
    }
}
  • 這裏模擬了一個配置的source,定時去刷新配置,而後broadcast到每一個task

MapStateDescriptor

flink-core-1.7.0-sources.jar!/org/apache/flink/api/common/state/MapStateDescriptor.javajava

@PublicEvolving
public class MapStateDescriptor<UK, UV> extends StateDescriptor<MapState<UK, UV>, Map<UK, UV>> {

	private static final long serialVersionUID = 1L;

	/**
	 * Create a new {@code MapStateDescriptor} with the given name and the given type serializers.
	 *
	 * @param name The name of the {@code MapStateDescriptor}.
	 * @param keySerializer The type serializer for the keys in the state.
	 * @param valueSerializer The type serializer for the values in the state.
	 */
	public MapStateDescriptor(String name, TypeSerializer<UK> keySerializer, TypeSerializer<UV> valueSerializer) {
		super(name, new MapSerializer<>(keySerializer, valueSerializer), null);
	}

	/**
	 * Create a new {@code MapStateDescriptor} with the given name and the given type information.
	 *
	 * @param name The name of the {@code MapStateDescriptor}.
	 * @param keyTypeInfo The type information for the keys in the state.
	 * @param valueTypeInfo The type information for the values in the state.
	 */
	public MapStateDescriptor(String name, TypeInformation<UK> keyTypeInfo, TypeInformation<UV> valueTypeInfo) {
		super(name, new MapTypeInfo<>(keyTypeInfo, valueTypeInfo), null);
	}

	/**
	 * Create a new {@code MapStateDescriptor} with the given name and the given type information.
	 *
	 * <p>If this constructor fails (because it is not possible to describe the type via a class),
	 * consider using the {@link #MapStateDescriptor(String, TypeInformation, TypeInformation)} constructor.
	 *
	 * @param name The name of the {@code MapStateDescriptor}.
	 * @param keyClass The class of the type of keys in the state.
	 * @param valueClass The class of the type of values in the state.
	 */
	public MapStateDescriptor(String name, Class<UK> keyClass, Class<UV> valueClass) {
		super(name, new MapTypeInfo<>(keyClass, valueClass), null);
	}

	@Override
	public Type getType() {
		return Type.MAP;
	}

	/**
	 * Gets the serializer for the keys in the state.
	 *
	 * @return The serializer for the keys in the state.
	 */
	public TypeSerializer<UK> getKeySerializer() {
		final TypeSerializer<Map<UK, UV>> rawSerializer = getSerializer();
		if (!(rawSerializer instanceof MapSerializer)) {
			throw new IllegalStateException("Unexpected serializer type.");
		}

		return ((MapSerializer<UK, UV>) rawSerializer).getKeySerializer();
	}

	/**
	 * Gets the serializer for the values in the state.
	 *
	 * @return The serializer for the values in the state.
	 */
	public TypeSerializer<UV> getValueSerializer() {
		final TypeSerializer<Map<UK, UV>> rawSerializer = getSerializer();
		if (!(rawSerializer instanceof MapSerializer)) {
			throw new IllegalStateException("Unexpected serializer type.");
		}

		return ((MapSerializer<UK, UV>) rawSerializer).getValueSerializer();
	}
}
  • MapStateDescriptor繼承了StateDescriptor,其中state爲MapState類型,value爲Map類型

DataStream.broadcast

flink-streaming-java_2.11-1.7.0-sources.jar!/org/apache/flink/streaming/api/datastream/DataStream.javaapache

/**
	 * Sets the partitioning of the {@link DataStream} so that the output elements
	 * are broadcasted to every parallel instance of the next operation. In addition,
	 * it implicitly as many {@link org.apache.flink.api.common.state.BroadcastState broadcast states}
	 * as the specified descriptors which can be used to store the element of the stream.
	 *
	 * @param broadcastStateDescriptors the descriptors of the broadcast states to create.
	 * @return A {@link BroadcastStream} which can be used in the {@link #connect(BroadcastStream)} to
	 * create a {@link BroadcastConnectedStream} for further processing of the elements.
	 */
	@PublicEvolving
	public BroadcastStream<T> broadcast(final MapStateDescriptor<?, ?>... broadcastStateDescriptors) {
		Preconditions.checkNotNull(broadcastStateDescriptors);
		final DataStream<T> broadcastStream = setConnectionType(new BroadcastPartitioner<>());
		return new BroadcastStream<>(environment, broadcastStream, broadcastStateDescriptors);
	}

	/**
	 * Internal function for setting the partitioner for the DataStream.
	 *
	 * @param partitioner
	 *            Partitioner to set.
	 * @return The modified DataStream.
	 */
	protected DataStream<T> setConnectionType(StreamPartitioner<T> partitioner) {
		return new DataStream<>(this.getExecutionEnvironment(), new PartitionTransformation<>(this.getTransformation(), partitioner));
	}

	/**
	 * Sets the partitioning of the {@link DataStream} so that the output elements
	 * are broadcast to every parallel instance of the next operation.
	 *
	 * @return The DataStream with broadcast partitioning set.
	 */
	public DataStream<T> broadcast() {
		return setConnectionType(new BroadcastPartitioner<T>());
	}
  • DataStream的broadcast方法,首先調用setConnectionType,而後使用MapStateDescriptor做爲參數建立BroadcastStream返回;DataStream也有一個無參的broadcast方法,它直接調用setConnectionType返回DataStream

DataStream.connect

flink-streaming-java_2.11-1.7.0-sources.jar!/org/apache/flink/streaming/api/datastream/DataStream.javaapi

/**
	 * Creates a new {@link ConnectedStreams} by connecting
	 * {@link DataStream} outputs of (possible) different types with each other.
	 * The DataStreams connected using this operator can be used with
	 * CoFunctions to apply joint transformations.
	 *
	 * @param dataStream
	 *            The DataStream with which this stream will be connected.
	 * @return The {@link ConnectedStreams}.
	 */
	public <R> ConnectedStreams<T, R> connect(DataStream<R> dataStream) {
		return new ConnectedStreams<>(environment, this, dataStream);
	}

	/**
	 * Creates a new {@link BroadcastConnectedStream} by connecting the current
	 * {@link DataStream} or {@link KeyedStream} with a {@link BroadcastStream}.
	 *
	 * <p>The latter can be created using the {@link #broadcast(MapStateDescriptor[])} method.
	 *
	 * <p>The resulting stream can be further processed using the {@code BroadcastConnectedStream.process(MyFunction)}
	 * method, where {@code MyFunction} can be either a
	 * {@link org.apache.flink.streaming.api.functions.co.KeyedBroadcastProcessFunction KeyedBroadcastProcessFunction}
	 * or a {@link org.apache.flink.streaming.api.functions.co.BroadcastProcessFunction BroadcastProcessFunction}
	 * depending on the current stream being a {@link KeyedStream} or not.
	 *
	 * @param broadcastStream The broadcast stream with the broadcast state to be connected with this stream.
	 * @return The {@link BroadcastConnectedStream}.
	 */
	@PublicEvolving
	public <R> BroadcastConnectedStream<T, R> connect(BroadcastStream<R> broadcastStream) {
		return new BroadcastConnectedStream<>(
				environment,
				this,
				Preconditions.checkNotNull(broadcastStream),
				broadcastStream.getBroadcastStateDescriptor());
	}
  • DataStream的connect方法參數能夠是DataStream類型,也能夠是BroadcastStream類型,若是是BroadcastStream類型則返回的是BroadcastConnectedStream,不然是普通的ConnectedStreams

BroadcastConnectedStream.process

flink-streaming-java_2.11-1.7.0-sources.jar!/org/apache/flink/streaming/api/datastream/BroadcastConnectedStream.javaapp

@PublicEvolving
public class BroadcastConnectedStream<IN1, IN2> {

	private final StreamExecutionEnvironment environment;
	private final DataStream<IN1> inputStream1;
	private final BroadcastStream<IN2> inputStream2;
	private final List<MapStateDescriptor<?, ?>> broadcastStateDescriptors;

	protected BroadcastConnectedStream(
			final StreamExecutionEnvironment env,
			final DataStream<IN1> input1,
			final BroadcastStream<IN2> input2,
			final List<MapStateDescriptor<?, ?>> broadcastStateDescriptors) {
		this.environment = requireNonNull(env);
		this.inputStream1 = requireNonNull(input1);
		this.inputStream2 = requireNonNull(input2);
		this.broadcastStateDescriptors = requireNonNull(broadcastStateDescriptors);
	}

	public StreamExecutionEnvironment getExecutionEnvironment() {
		return environment;
	}

	/**
	 * Returns the non-broadcast {@link DataStream}.
	 *
	 * @return The stream which, by convention, is not broadcasted.
	 */
	public DataStream<IN1> getFirstInput() {
		return inputStream1;
	}

	/**
	 * Returns the {@link BroadcastStream}.
	 *
	 * @return The stream which, by convention, is the broadcast one.
	 */
	public BroadcastStream<IN2> getSecondInput() {
		return inputStream2;
	}

	/**
	 * Gets the type of the first input.
	 *
	 * @return The type of the first input
	 */
	public TypeInformation<IN1> getType1() {
		return inputStream1.getType();
	}

	/**
	 * Gets the type of the second input.
	 *
	 * @return The type of the second input
	 */
	public TypeInformation<IN2> getType2() {
		return inputStream2.getType();
	}

	/**
	 * Assumes as inputs a {@link BroadcastStream} and a {@link KeyedStream} and applies the given
	 * {@link KeyedBroadcastProcessFunction} on them, thereby creating a transformed output stream.
	 *
	 * @param function The {@link KeyedBroadcastProcessFunction} that is called for each element in the stream.
	 * @param <KS> The type of the keys in the keyed stream.
	 * @param <OUT> The type of the output elements.
	 * @return The transformed {@link DataStream}.
	 */
	@PublicEvolving
	public <KS, OUT> SingleOutputStreamOperator<OUT> process(final KeyedBroadcastProcessFunction<KS, IN1, IN2, OUT> function) {

		TypeInformation<OUT> outTypeInfo = TypeExtractor.getBinaryOperatorReturnType(
				function,
				KeyedBroadcastProcessFunction.class,
				1,
				2,
				3,
				TypeExtractor.NO_INDEX,
				getType1(),
				getType2(),
				Utils.getCallLocationName(),
				true);

		return process(function, outTypeInfo);
	}

	/**
	 * Assumes as inputs a {@link BroadcastStream} and a {@link KeyedStream} and applies the given
	 * {@link KeyedBroadcastProcessFunction} on them, thereby creating a transformed output stream.
	 *
	 * @param function The {@link KeyedBroadcastProcessFunction} that is called for each element in the stream.
	 * @param outTypeInfo The type of the output elements.
	 * @param <KS> The type of the keys in the keyed stream.
	 * @param <OUT> The type of the output elements.
	 * @return The transformed {@link DataStream}.
	 */
	@PublicEvolving
	public <KS, OUT> SingleOutputStreamOperator<OUT> process(
			final KeyedBroadcastProcessFunction<KS, IN1, IN2, OUT> function,
			final TypeInformation<OUT> outTypeInfo) {

		Preconditions.checkNotNull(function);
		Preconditions.checkArgument(inputStream1 instanceof KeyedStream,
				"A KeyedBroadcastProcessFunction can only be used on a keyed stream.");

		TwoInputStreamOperator<IN1, IN2, OUT> operator =
				new CoBroadcastWithKeyedOperator<>(clean(function), broadcastStateDescriptors);
		return transform("Co-Process-Broadcast-Keyed", outTypeInfo, operator);
	}

	/**
	 * Assumes as inputs a {@link BroadcastStream} and a non-keyed {@link DataStream} and applies the given
	 * {@link BroadcastProcessFunction} on them, thereby creating a transformed output stream.
	 *
	 * @param function The {@link BroadcastProcessFunction} that is called for each element in the stream.
	 * @param <OUT> The type of the output elements.
	 * @return The transformed {@link DataStream}.
	 */
	@PublicEvolving
	public <OUT> SingleOutputStreamOperator<OUT> process(final BroadcastProcessFunction<IN1, IN2, OUT> function) {

		TypeInformation<OUT> outTypeInfo = TypeExtractor.getBinaryOperatorReturnType(
				function,
				BroadcastProcessFunction.class,
				0,
				1,
				2,
				TypeExtractor.NO_INDEX,
				getType1(),
				getType2(),
				Utils.getCallLocationName(),
				true);

		return process(function, outTypeInfo);
	}

	/**
	 * Assumes as inputs a {@link BroadcastStream} and a non-keyed {@link DataStream} and applies the given
	 * {@link BroadcastProcessFunction} on them, thereby creating a transformed output stream.
	 *
	 * @param function The {@link BroadcastProcessFunction} that is called for each element in the stream.
	 * @param outTypeInfo The type of the output elements.
	 * @param <OUT> The type of the output elements.
	 * @return The transformed {@link DataStream}.
	 */
	@PublicEvolving
	public <OUT> SingleOutputStreamOperator<OUT> process(
			final BroadcastProcessFunction<IN1, IN2, OUT> function,
			final TypeInformation<OUT> outTypeInfo) {

		Preconditions.checkNotNull(function);
		Preconditions.checkArgument(!(inputStream1 instanceof KeyedStream),
				"A BroadcastProcessFunction can only be used on a non-keyed stream.");

		TwoInputStreamOperator<IN1, IN2, OUT> operator =
				new CoBroadcastWithNonKeyedOperator<>(clean(function), broadcastStateDescriptors);
		return transform("Co-Process-Broadcast", outTypeInfo, operator);
	}

	@Internal
	private <OUT> SingleOutputStreamOperator<OUT> transform(
			final String functionName,
			final TypeInformation<OUT> outTypeInfo,
			final TwoInputStreamOperator<IN1, IN2, OUT> operator) {

		// read the output type of the input Transforms to coax out errors about MissingTypeInfo
		inputStream1.getType();
		inputStream2.getType();

		TwoInputTransformation<IN1, IN2, OUT> transform = new TwoInputTransformation<>(
				inputStream1.getTransformation(),
				inputStream2.getTransformation(),
				functionName,
				operator,
				outTypeInfo,
				environment.getParallelism());

		if (inputStream1 instanceof KeyedStream) {
			KeyedStream<IN1, ?> keyedInput1 = (KeyedStream<IN1, ?>) inputStream1;
			TypeInformation<?> keyType1 = keyedInput1.getKeyType();
			transform.setStateKeySelectors(keyedInput1.getKeySelector(), null);
			transform.setStateKeyType(keyType1);
		}

		@SuppressWarnings({ "unchecked", "rawtypes" })
		SingleOutputStreamOperator<OUT> returnStream = new SingleOutputStreamOperator(environment, transform);

		getExecutionEnvironment().addOperator(transform);

		return returnStream;
	}

	protected <F> F clean(F f) {
		return getExecutionEnvironment().clean(f);
	}
}
  • BroadcastConnectedStream.process接收兩種類型的function,一種是KeyedBroadcastProcessFunction,另一種是BroadcastProcessFunction;它們都定義了processElement、processBroadcastElement抽象方法,只是KeyedBroadcastProcessFunction多定義了一個onTimer方法,默認是空操做,容許子類重寫

小結

  • 對於broadcast的使用有幾個步驟,1是創建MapStateDescriptor,而後經過DataStream.broadcast方法返回BroadcastStream;2是須要接受broadcast的stream經過DataStream.connect方法跟BroadcastStream進行鏈接返回BroadcastConnectedStream;3是經過BroadcastConnectedStream.process方法進行processElement及processBroadcastElement處理
  • BroadcastConnectedStream.process接收兩種類型的function,一種是KeyedBroadcastProcessFunction,另一種是BroadcastProcessFunction;它們都定義了processElement、processBroadcastElement抽象方法,只是KeyedBroadcastProcessFunction多定義了一個onTimer方法,默認是空操做,容許子類重寫
  • Broadcast State爲map format,它會將state廣播到每一個task,注意該state並不會跨task傳播,對其修改,僅僅是做用在其所在的task;downstream tasks接收到broadcast event的順序可能不同,因此依賴其到達順序來處理element的時候要當心;checkpoint的時候也會checkpoint broadcast state;另外就是Broadcast State只在內存有,沒有RocksDB state backend

doc

相關文章
相關標籤/搜索