https://zhuanlan.zhihu.com/p/104155572windows
Streaming systems often talk about exactly-once processing; that is, ensuring that every record is processed exactly one time.app
It almost goes without saying that for many users, any risk of dropped records or data loss in their data processing pipelines is unacceptable. Some systems provided at-least-once guarantees, ensuring that records were always processed at least once, but records might be duplicated (and thus result in inaccurate aggregations). These systems were used for low-latency, speculative results but generally could guarantee nothing about the veracity of these results. This led to a strategy that was coined the Lambda Architecture:less
The nature of streaming pipelines is such that records sometimes show up late, after aggregates for their time windows have already been processed. The Beam SDK allows the user to configure how long the system should wait for late data to arrive; any (and only) records arriving later than this deadline are dropped. This feature contributes to completeness, not to accuracy: all records that showed up in time for processing are accurately processed exactly once, whereas these late records are explicitly droppedide
If you write code that has side effects external to the pipeline, such as contacting an outside service, these effects might be executed more than once for a given record. This situation is usually unavoidable because there is no way to atomically commit Dataflow’s processing with the side effect on the external service. Pipelines do need to eventually send results to the outside world, and such calls might not be idempotent.ui
Example 5-1: A simple streaming pipelinethis
Pipeline p = Pipeline.create(options); // Calculate 1-minute counts of events per user.
PCollection<..> perUserCounts =
p.apply(ReadFromUnboundedSource.read())
.apply(new KeyByUser())
.Window.<..>into(FixedWindows.of(Duration.standardMinutes(1)))
.apply(Count.perKey()); // Process these per-user counts, and write the output somewhere. perUserCounts.apply(new ProcessPerUserCountsAndWriteToSink()); // Add up all these per-user counts to get 1-minute counts of all events. perUserCounts.apply(Values.<..>create()) .apply(Count.globally()) .apply(new ProcessGlobalCountAndWriteToSink());
p.run();
This pipeline also both reads and writes data from and to the outside world, so Dataflow must ensure that this interaction does not introduce any inaccuracies. Dataflow has always supported this task—what Apache Spark and Apache Flink call end-to-end exactly once—for sources and sinks whenever technically feasible. The focus of this chapter will be on three things:atom
As just explained, Dataflow’s streaming shuffle uses RPCs. Now, any time you have two machines communicating via RPC, you should think long and hard about data integrity. To guarantee that records are not lost in shuffle, Dataflow employs upstream backup. This simply means that the sender will retry RPCs until it receives positive acknowledgment of receipt. Dataflow also ensures that it will continue retrying these RPCs even if the sender crashes. This guarantees that every record is delivered at least once. spa
The problem is that these retries might themselves create duplicates. Most RPC frameworks, including the one Dataflow uses, provide the sender with a status indicating success or failure.scala
In a distributed system, you need to be aware that RPCs can sometimes succeed even when they have appeared to fail. There are many reasons for this: race conditions with the RPC timeout, positive acknowledgment from the server failing to transfer even though the RPC succeeded, and so on. The only status that a sender can really trust is a successful one.rest
At a high level, the algorithm for this task is quite simple (see Figure 5-2): every message sent is tagged with a unique identifier. Each receiver stores a catalog of all identifiers that have already been seen and processed. Every time a record is received, its identifier is looked up in this catalog. If it is found, the record is dropped as a duplicate. Because Dataflow is built on top of a scalable key/value store, this store is used to hold the deduplication catalog:

A ParDo can execute twice on the same input record (due to a retry), yet produce different output on each retry. The desired behavior is that only one of those outputs will commit into the pipeline; however, the nondeterminism involved makes it difficult to guarantee that both outputs have the same deterministic ID. Even trickier, a ParDo can output multiple records, so each of these retries might produce a different number of outputs!
Many pipelines require nondeterministic transforms And all too often, pipeline authors do not realize that the code they wrote is nondeterministic. For example, consider a transform that looks up supplemental data in Cloud Bigtable in order to enrich its input data. This is a nondeterministic task, as the external value might change in between retries of the transform.
Dataflow addresses this issue by using checkpointing to make nondeterministic processing effectively deterministic. Each output from a transform is checkpointed, together with its unique ID, to stable storage before being delivered to the next stage.
To implement exactly-once shuffle delivery, a catalog of record IDs is stored in each receiver key. For every record that arrives, Dataflow looks up the catalog of IDs already seen to determine whether this record is a duplicate. Every output from step to step is checkpointed to storage to ensure that the generated record IDs are stable.
However, unless implemented carefully, this process would significantly degrade pipeline performance for customers by creating a huge increase in reads and writes.
Dataflow achieves this goal via two key techniques: graph optimization and Bloom filters.
The Dataflow service runs a series of optimizations on the pipeline graph before executing it.
One such optimization is fusion, in which the service fuses many logical steps into a single execution stage:

Dataflow also optimizes associative and commutative Combine operations (such as Count and Sum) by performing partial combining locally before sending the data to the main grouping operation, as illustrated in the followed figure:
Bloom filters have a very interesting property: they can return false positives but never false negatives. If the filter says 「Yes, the element is in the set,」 we know that the element is probably in the set (with a probability that can be calculated).
When a record arrives, Dataflow queries the appropriate filter based on the system timestamp. 7 This step prevents the Bloom filters from saturating because filters are garbage-collected over time, and it also bounds the amount of data that needs to be scanned at startup.
The follow figure illustrates this process: records arrive in the system and are delegated to a Bloom filter based on their arrival time. None of the records hitting the first filter are duplicates, and all of their catalog lookups are filtered. Record r1 is delivered a second time, so a catalog lookup is needed to verify that it is indeed a duplicate; the same is true for records r4 and r6. Record r8 is not a duplicate; however, due to a false positive in its Bloom filter, a catalog lookup is generated (which will determine that r8 is not a duplicate and should be processed).
As Dataflow’s state and consistency model is per-key, in reality each key stores a catalog of records that have been delivered to that key. We can’t store these identifiers forever, or all available storage will eventually fill up. To avoid that issue, you need garbage collection of acknowledged record IDs.
One strategy for accomplishing this goal would be for senders to tag each record with a strictly increasing sequence number in order to track the earliest sequence number still in flight (corresponding to an unacknowledged record delivery).
Such sources are deterministic: For example, consider a source that reads data out of files. The records in a file will always be in a deterministic order and at deterministic byte locations, no matter how many times the file is read. Another source that provides similar determinism guarantees is Apache Kafka; each Kafka topic is divided into a static set of partitions, and records in a partition always have a deterministic order.
However, not all sources are so simple. For example, one common source for Dataflow pipelines is Google Cloud Pub/Sub. Pub/Sub is a nondeterministic source: multiple subscribers can pull from a Pub/Sub topic, but which subscribers receive a given message is unpredictable.
The simplest answer is that a number of built-in sinks are provided as part of the Beam SDK. These sinks are carefully designed to ensure that they do not produce duplicates, even if executed multiple times.
However, sometimes the built-ins are insufficient and you need to write your own. The best approach is to ensure that your side-effect operation is idempotent and therefore robust in the face of replay.
Example 5-2: Reshuffle example
c.apply(Window.<..>into(FixedWindows.of(Duration.standardMinutes(1))))
.apply(GroupByKey.<..>.create())
.apply(new PrepareOutputData())
.apply(Reshuffle.<..>of())
.apply(WriteToSideEffect());