Flink入門訓練--以New York City Taxi爲例

最近在學Flink,準備用Flink搭建一個實時的推薦系統。找到一個好的網站(也算做是flink創始者的官方網站),上面有關於Flink的上手教程,用來練練手,熟悉熟悉,下文僅僅是個人筆記。html

1. 數據集

網站New York City Taxi & Limousine Commission提供了關於紐約市從2009-1015年關於出租車駕駛的公共數據集。java

具體數據下載方法,可見# Taxi Data Streams,下載完數據後,不要解壓縮。git

咱們的第一個數據集包含紐約市的出租車出行的信息,每一次出行包含兩個事件:START和END,能夠分別理解爲開始和結束該行程。每個事件又包括11個屬性,詳細介紹以下:apache

taxiId         : Long      // a unique id for each taxi
driverId       : Long      // a unique id for each driver
isStart        : Boolean   // TRUE for ride start events, FALSE for ride end events
startTime      : DateTime  // the start time of a ride
endTime        : DateTime  // the end time of a ride,
                           //   "1970-01-01 00:00:00" for start events
startLon       : Float     // the longitude of the ride start location
startLat       : Float     // the latitude of the ride start location
endLon         : Float     // the longitude of the ride end location
endLat         : Float     // the latitude of the ride end location
passengerCnt   : Short     // number of passengers on the ride

另外一個數據集包含出租車的費用信息,與每一次行程對應:緩存

taxiId         : Long      // a unique id for each taxi
driverId       : Long      // a unique id for each driver
startTime      : DateTime  // the start time of a ride
paymentType    : String    // CSH or CRD
tip            : Float     // tip(小費) for this ride
tolls          : Float     // tolls for this ride
totalFare      : Float     // total fare collected

2. 生成數據流

首先定義TaxiRide事件,即數據集中的每個record。app

咱們使用Flink的source函數(TaxiRideSource)讀取TaxiRide流,這個source是基於事件時間進行的。一樣的,費用事件TaxiFare的流經過函數TaxiFareSource進行傳送。爲了讓生成的流更加真實,事件傳送的時間是與timestamp成比例的。兩個真實相隔十分鐘發生的事件在流中也相差十分鐘。此外,咱們能夠定義一個變量speed-up factor爲60,該變量爲加速因子,那麼真實事件中的一分鐘在流中只有1秒鐘,縮短60倍嘛。不只如此,咱們還能夠定義最大服務延時,這個延時使得每一個事件在最大服務延時以內隨機出現,這麼作的目的是讓這個流的事件產生與在real-world發生的不肯定性更接近。dom

對於這個應用,咱們設置speed-up factor爲600(即10分鐘至關於1秒),以及最大延時時間爲60。socket

全部的行動都應使用事件時間(event time)(相對於處理時間(processing time))來實現。ide

Event-time decouples the program semantics from serving speed and guarantees consistent results even in case of historic data or data which is delivered out-of-order.函數

事件時間(event time)將程序語義與服務速度分離開,即便在歷史數據或無序傳送的數據的狀況下也能保證一致的結果。簡單來講就是,在數據處理的過程當中,依賴的時間跟在流中出現的時間無關,只跟該事件發生的時間有關。

private void generateUnorderedStream(SourceContext<TaxiRide> sourceContext) throws Exception {  
  
  // 設置服務開始時間servingStartTime  
  long servingStartTime = Calendar.getInstance().getTimeInMillis();  
  
  // 數據開始時間dataStartTime,即第一個ride的timestamp  
  long dataStartTime;  
  
  Random rand = new Random(7452);  
  
  // 使用優先隊列進行emit,其比較方式爲他們的等待時間  
  PriorityQueue<Tuple2<Long, Object>> emitSchedule = new PriorityQueue<>(  
         32,  
			 new Comparator<Tuple2<Long, Object>>() {  
              @Override  
			  public int compare(Tuple2<Long, Object> o1, Tuple2<Long, Object> o2) {  
		               return o1.f0.compareTo(o2.f0); }  
	          });  
  
  // 讀取第一個ride,並將第一個ride插入到schedule裏  
  String line;  
  TaxiRide ride;  
  if (reader.ready() && (line = reader.readLine()) != null) {  
  // read first ride  
  ride = TaxiRide.fromString(line);  
  // extract starting timestamp  
  dataStartTime = getEventTime(ride);  
  // get delayed time,這個delayedtime是dataStartTime加一個隨機數,隨機數有最大範圍,用來模擬真實世界狀況  
  long delayedEventTime = dataStartTime + getNormalDelayMsecs(rand);  
  
  // 將ride插入到schedule裏  
  emitSchedule.add(new Tuple2<Long, Object>(delayedEventTime, ride));  
  // 設置水印時間  
  long watermarkTime = dataStartTime + watermarkDelayMSecs;  
  // 下一個水印時間是時間戳是 watermarkTime - maxDelayMsecs - 1  
  // 只能證實,這個時間必定是小於dataStartTime的  Watermark nextWatermark = new Watermark(watermarkTime - maxDelayMsecs - 1);  
  // 將該水印放入Schedule,且這個水印被優先隊列移到了ride以前  
  emitSchedule.add(new Tuple2<Long, Object>(watermarkTime, nextWatermark));  
  
  } else {  
      return;  
  }  
  
  // 從文件裏讀取下一個ride(peek)  
  if (reader.ready() && (line = reader.readLine()) != null) {  
      ride = TaxiRide.fromString(line);  
  }  
  
  // read rides one-by-one and emit a random ride from the buffer each time  
  while (emitSchedule.size() > 0 || reader.ready()) {  
  
	  // insert all events into schedule that might be emitted next  
	  // 在Schedule裏的下一個事件的延時後時間  long curNextDelayedEventTime = !emitSchedule.isEmpty() ? emitSchedule.peek().f0 : -1;  
	  // 當前從文件讀取的ride的事件時間  
	  long rideEventTime = ride != null ? getEventTime(ride) : -1;  
	  // 這個while循環用來進行當前Schedule爲空的狀況  
	  while(  
            ride != null && ( // while there is a ride AND  
			emitSchedule.isEmpty() || // and no ride in schedule OR  
			rideEventTime < curNextDelayedEventTime + maxDelayMsecs) // not enough rides in schedule  
			)  
      {  
          // insert event into emit schedule  
		  long delayedEventTime = rideEventTime + getNormalDelayMsecs(rand);  
		  emitSchedule.add(new Tuple2<Long, Object>(delayedEventTime, ride));  
  
		  // read next ride  
		  if (reader.ready() && (line = reader.readLine()) != null) {  
	            ride = TaxiRide.fromString(line);  
				rideEventTime = getEventTime(ride);  
		  }  
		  else {  
	            ride = null;  
			    rideEventTime = -1;  
		  }  
      }  
  
      // 提取Schedule裏的第一個ride,叫作head  
	  Tuple2<Long, Object> head = emitSchedule.poll();  
	  // head應該要到達的時間  
	  long delayedEventTime = head.f0;  
	  long now = Calendar.getInstance().getTimeInMillis();  
  
	  // servingTime = servingStartTime + (delayedEventTime - dataStartTime)/ this.servingSpeed  
	  long servingTime = toServingTime(servingStartTime, dataStartTime, delayedEventTime);  
	  // 應該再等多久,才讓這個ride發生呢?(哈哈,我好喜歡這個描述)  
	  long waitTime = servingTime - now;  
	  // 既然要等,那就睡着等吧  
	  Thread.sleep( (waitTime > 0) ? waitTime : 0);  
	  // 若是這個head是一個TaxiRide  
	  if(head.f1 instanceof TaxiRide) {  
	         TaxiRide emitRide = (TaxiRide)head.f1;  
			  // emit ride  
			 sourceContext.collectWithTimestamp(emitRide, getEventTime(emitRide));  
	   }  
      // 若是這個head是一個水印標誌  
	  else if(head.f1 instanceof Watermark) {  
	         Watermark emitWatermark = (Watermark)head.f1;  
			 // emit watermark  
			 sourceContext.emitWatermark(emitWatermark);  
			 // 並設置下一個水印標誌到Schedule中  
			 long watermarkTime = delayedEventTime + watermarkDelayMSecs;  
			 // 一樣,保證這個水印的時間戳在下一個ride的timestamp以前  
			 Watermark nextWatermark = new Watermark(watermarkTime - maxDelayMsecs - 1);  
			 emitSchedule.add(new Tuple2<Long, Object>(watermarkTime, nextWatermark));  
	  }  
   }  
}

那麼,如何在java中運行這些sources,下面是一個示例:

// get an ExecutionEnvironment
StreamExecutionEnvironment env = StreamExcutionEnvironment.getExecutionEnvironment();
// configure event-time processing
env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
// get the taxi ride data stream
DataStream<TaxiRide> rides = env.addSource(
	new TaxiRideSource("/path/to/nycTaxiRides.gz", maxDelay, servingSpeed));

另外,有一些應用須要咱們使用加入檢查點的機制。檢查點(checkpoint)是從failure中恢復的一種機制。他也須要創建CheckpointedTaxiRideSource來在流中運行。

3. 數據清洗🛀

3.1 數據鏈接🔗

因爲咱們的應用要研究的是在紐約市內的出租車狀況,因此咱們要排除掉紐約市外的地點。經過這個過濾器:

private static class NYCFilter implements FilterFunction<TaxiRide> {  
  @Override  
  public boolean filter(TaxiRide taxiRide) throws Exception {  
      return GeoUtils.isInNYC(taxiRide.startLon, taxiRide.startLat) &&  
             GeoUtils.isInNYC(taxiRide.endLon, taxiRide.endLat);  
  }  
}

執行過濾器:

// start the data generator  
DataStream<TaxiRide> rides = env.addSource(rideSourceOrTest(new TaxiRideSource(input, maxEventDelay, servingSpeedFactor)));  
  
DataStream<TaxiRide> filteredRides = rides  
  // filter out rides that do not start or stop in NYC  
  .filter(new NYCFilter());

如今咱們須要把TaxiRide和TaxiFare二者的數據記錄結合。在這個過程當中,咱們要同時處理兩個source的流數據。這裏介紹幾個用到的Transformation functions:

  • FlatMap: 輸入1個record,輸出爲0或1或更多個records的映射
  • Filter:進行評估,若是結果爲Ture,則傳輸record
  • KeyBy:用來將記錄按照第一個元素(一個字符串)進行分組,根據該key將數據進行從新分區,而後將記錄再發送給下一個算子

因爲咱們沒辦法控制ride和fare到達的前後,因此咱們儲存先到的信息直到和他匹配的信息到來。這就須要用到有狀態的計算

public class RidesAndFaresExercise extends ExerciseBase {  
   public static void main(String[] args) throws Exception {  
  
  ParameterTool params = ParameterTool.fromArgs(args);  
  final String ridesFile = params.get("rides", pathToRideData);  
  final String faresFile = params.get("fares", pathToFareData);  
 
  final int delay = 60; // at most 60 seconds of delay  
  final int servingSpeedFactor = 1800; // 30 minutes worth of events are served every second  
 
  // set up streaming execution environment  
  StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();  
  env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);  
  env.setParallelism(ExerciseBase.parallelism);  
  
  DataStream<TaxiRide> rides = env  
            .addSource(rideSourceOrTest(new TaxiRideSource(ridesFile, delay, servingSpeedFactor)))  
            .filter((TaxiRide ride) -> ride.isStart)  
            .keyBy("rideId");  
  
  DataStream<TaxiFare> fares = env  
            .addSource(fareSourceOrTest(new TaxiFareSource(faresFile, delay, servingSpeedFactor)))  
            .keyBy("rideId");  
  
  DataStream<Tuple2<TaxiRide, TaxiFare>> enrichedRides = rides  
            .connect(fares)  
            .flatMap(new EnrichmentFunction());  
  
  printOrTest(enrichedRides);  
  
  env.execute("Join Rides with Fares (java RichCoFlatMap)");  
  }  
  
   public static class EnrichmentFunction extends RichCoFlatMapFunction<TaxiRide, TaxiFare, Tuple2<TaxiRide, TaxiFare>> {  
  
   // keyed, managed state  
   private ValueState<TaxiRide> rideState;  
   private ValueState<TaxiFare> fareState;  
  
   @Override  
   public void open(Configuration config) throws Exception {  
         rideState = getRuntimeContext().getState(new ValueStateDescriptor<>("saved ride", TaxiRide.class));  
		 fareState = getRuntimeContext().getState(new ValueStateDescriptor<>("saved fare", TaxiFare.class));  
  }  
  
   @Override  
   public void flatMap1(TaxiRide ride, Collector<Tuple2<TaxiRide, TaxiFare>> out) throws Exception {  
         TaxiFare fare = fareState.value();  
		 if (fare != null) {  
            fareState.clear();  
			out.collect(new Tuple2(ride, fare));  
		  } else {  
            rideState.update(ride);  
		  }  
      }  
  
    @Override  
    public void flatMap2(TaxiFare fare, Collector<Tuple2<TaxiRide, TaxiFare>> out) throws Exception {  
         TaxiRide ride = rideState.value();  
		 if (ride != null){  
            rideState.clear();  
			out.collect(new Tuple2(ride, fare));  
		  } else {  
            fareState.update(fare);  
		  }  
      }  
   }  
}

運行,能夠看到,生成的數據是這樣的,ride和fare結合到了一塊兒:

3> (196965,START,2013-01-01 11:54:08,1970-01-01 00:00:00,-73.99048,40.75611,-73.98388,40.767143,2,2013007021,2013014447,196965,2013007021,2013014447,2013-01-01 11:54:08,CSH,0.0,0.0,6.5)
1> (197311,START,2013-01-01 11:55:44,1970-01-01 00:00:00,-73.98894,40.72127,-73.95267,40.771126,1,2013008802,2013012009,197311,2013008802,2013012009,2013-01-01 11:55:44,CRD,2.7,0.0,16.2)
2> (196608,START,2013-01-01 11:53:00,1970-01-01 00:00:00,-73.97817,40.761055,-73.98574,40.75613,2,2013004060,2013014162,196608,2013004060,2013014162,2013-01-01 11:53:00,CSH,0.0,0.0,5.5)

3.2 狀態緩存清理

那麼如今,咱們想要上面的二者結合操做更加的Robust。對於現實中的數據,有時某些record會丟失,這意味着咱們可能只收到TaxiRide and TaxiFare中的一個,另外一個永遠不會到。因此先到的那個record會一直佔用着內存。爲了解決這個問題,咱們嘗試在CoProcessFunction中清理掉沒有被匹配的狀態。

這個功能定義在類 ExpiringStateExercise中:

首先給出missing data的輸入,這裏咱們丟掉全部ride的END事件,START事件每隔1000個丟失一個。😯

DataStream<TaxiRide> rides = env  
      .addSource(rideSourceOrTest(new CheckpointedTaxiRideSource(ridesFile, servingSpeedFactor)))  
      .filter((TaxiRide ride) -> (ride.isStart && (ride.rideId % 1000 != 0)))  
      .keyBy(ride -> ride.rideId);

SingleOutputStreamOperator processed = rides  
      .connect(fares)  
      // Applies the given ProcessFunction on the input stream, thereby creating a transformed output stream.  
      // The function will be called for every element in the input streams and can produce zero or more output elements.  
	  .process(new EnrichmentFunction());

咱們使用CoprocessingFunction來進行上面描述的操做。對於有兩個inputs的流來講,下面的描述生動形象的介紹了咱們須要override的3個方法:

For example, you might be joining customer data to financial trades, while keeping state for the customer data. If you care about having complete and deterministic joins in the face of out-of-order events, you can use a timer to evaluate and emit the join for a trade when the watermark for the customer data stream has passed the time of that trade.

processElement1(...) & processElement2(...) 用於兩個數據流的call。onTimer()用於設定拋棄掉沒有尋到匹配的record的動做。

@Override  
// Called when a timer set using TimerService fires.  
public void onTimer(long timestamp, OnTimerContext ctx, Collector<Tuple2<TaxiRide, TaxiFare>> out) throws Exception {  
   if (fareState.value() != null) {  
      ctx.output(unmatchedFares, fareState.value());  
	  fareState.clear();  
  }  
   if (rideState.value() != null) {  
      ctx.output(unmatchedRides, rideState.value());  
	  rideState.clear();  
  }  
}  
  
@Override  
// A Context that allows querying the timestamp of the element,  
// querying the TimeDomain of the firing timer and getting a TimerService for registering timers and querying the time.  
// The context is only valid during the invocation of this method, do not store it.  
public void processElement1(TaxiRide ride, Context context, Collector<Tuple2<TaxiRide, TaxiFare>> out) throws Exception {  
   // 當前處理事件是ride,且當前狀態中fare爲非空, 則輸出。  
   // (因爲ride在以前已經被keyby()過,這裏只會傳送跟fare相同rideId的ride)  TaxiFare fare = fareState.value();  
   if (fare != null) {  
	   fareState.clear();  
	   out.collect(new Tuple2(ride, fare));  
  } else { // 不然,更新rideState  
	   rideState.update(ride);  
       // 只要水印到達,咱們就中止等待相應的fare  
       // Registers a timer to be fired when the event time watermark passes the given time.  
	   context.timerService().registerEventTimeTimer(ride.getEventTime());  
  }  
}

輸出結果以下,能夠看到輸出的內容的時間戳都相差1000,跟以前定義的一致。

1> 1000,2013000992,2013000989,2013-01-01 00:05:38,CSH,0.0,4.8,18.3
3> 2000,2013001967,2013001964,2013-01-01 00:08:25,CSH,0.0,0.0,17.5
4> 3000,2013002904,2013002901,2013-01-01 00:11:00,CRD,4.38,0.0,22.38

3.3 窗口

如今,咱們想肯定每小時得到最多小費(tip)的駕駛員(每一條fare的record裏有小費這一欄)。 最簡單的方法是分兩步:首先使用一小時長的時間窗口(time window)來計算每小時內每一個駕駛員的總提示,而後從該窗口流的結果中找到每小時得到最多小費的駕駛員。

咱們在下列code中會遇到如下幾個問題:

AggregareFunction: 這個函數有一個將輸入元素加到accumulator的方法。首先,這個函數接口有一個初始化accumulator的方法,而且能夠將兩個accumulators融合成一個,不只如此還能夠從accumulator中提取出output。

ProcessWindowFunction:這個函數輸入一個包含窗口的全部元素的可迭代的集合以及一個包含time和state的Context object,這些輸入可以使他提供更加豐富的功能。

public class HourlyTipsExercise extends ExerciseBase {  
  
   public static void main(String[] args) throws Exception {  
  
   // read parameters  
   ParameterTool params = ParameterTool.fromArgs(args);  
   final String input = params.get("input", ExerciseBase.pathToFareData);  
  
   final int maxEventDelay = 60; // events are out of order by max 60 seconds  
   final int servingSpeedFactor = 600; // events of 10 minutes are served in 1 second  
  
   // set up streaming execution environment  
   StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();  
   env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);  
   env.setParallelism(ExerciseBase.parallelism);  
  
   // start the data generator  
   DataStream<TaxiFare> fares = env.addSource(fareSourceOrTest(new TaxiFareSource(input, maxEventDelay, servingSpeedFactor)));  
  
   // compute tips per hour for each driver  
   DataStream<Tuple3<Long, Long, Float>> hourlyTips = fares  
            // 根據driveId 進行分組  
			.keyBy((TaxiFare fare) -> fare.driverId)  
            // 設置窗口時間爲1小時  
		    .timeWindow(Time.hours(1))  
            // AddTips()爲aggFunction, WrapWithWindowInfo()爲windowFunction  
			.aggregate(new AddTips(), new WrapWithWindowInfo());  
  
   // find the highest total tips in each hour  
   DataStream<Tuple3<Long, Long, Float>> hourlyMax = hourlyTips  
            .timeWindowAll(Time.hours(1))  
            .maxBy(2);  
  
   printOrTest(hourlyMax);  
  
   // execute the transformation pipeline  
   env.execute("Hourly Tips (java)");  
  }  
    
 /* Adds up the tips. */  
 public static class AddTips implements AggregateFunction<  
            TaxiFare, // input type  
			Float, // accumulator type  
			Float     // output type  
			>		  
   {  
      @Override  
	  public Float createAccumulator() {  
	         return 0F;  
	  }  
  
      @Override  
	  public Float add(TaxiFare fare, Float aFloat) {  
	         return fare.tip + aFloat;  
	  }  
  
      @Override  
	  public Float getResult(Float aFloat) {  
	         return aFloat;  
	  }  
  
      @Override  
	  public Float merge(Float aFloat, Float accumulator) {  
	         return aFloat + accumulator;  
	  }  
   }  
  
   /*  
 * Wraps the pre-aggregated result into a tuple along with the window's timestamp and key. */  
 public static class WrapWithWindowInfo extends ProcessWindowFunction<  
            Float, Tuple3<Long, Long, Float>, Long, TimeWindow> {  
      @Override  
	  public void process(Long key, Context context, Iterable<Float> elements, 	Collector<Tuple3<Long, Long, Float>> out) throws Exception {  
      Float sumOfTips = elements.iterator().next();  
	  out.collect(new Tuple3<>(context.window().getEnd(), key, sumOfTips));  
	  }  
   }  
}

如下是輸出結果:

1> (1357002000000,2013000493,54.45)
2> (1357005600000,2013010467,64.53)
3> (1357009200000,2013010589,104.75)

4. Broadcast State

廣播變量(Broadcast State):這種機制用來支持數據從須要上游任務廣播傳送到下游任務的事件。

這篇文章對廣播變量講的很詳細:# A Practical Guide to Broadcast State in Apache Flink

在這個機制中,咱們將系統分爲actions stream和pattern stream。actions stream即爲正常的數據流,也就是例子中 rides。pattern爲咱們廣播的數據流,這裏能夠理解爲咱們的監聽室須要對rides進行監聽,即咱們傳輸一個pattern到broadcast state中,而後operator打印出action stream中符合這個pattern的數據。

在這裏,咱們的pattern是一個interger n,表明分鐘數。咱們想要打印的是在咱們傳送這個pattern的時刻,全部已經開始了n分鐘且尚未結束的rides。

接下來是他的應用代碼:

首先,在這個簡單的例子中,咱們須要一個廣播變量描述符,可是並不用他儲存東西。

final MapStateDescriptor<Long, Long> dummyBroadcastState = new MapStateDescriptor<>(  
      "dummy",  
  BasicTypeInfo.LONG_TYPE_INFO,  
  BasicTypeInfo.LONG_TYPE_INFO  
);

而後,設置一個socket接口,用來接收pattern:

BroadcastStream<String> queryStream = env.socketTextStream("localhost", 9999)  
      .assignTimestampsAndWatermarks(new QueryStreamAssigner())  
      .broadcast(dummyBroadcastState);

當咱們獲得按照rideId分組後的rides stream以及從socket返回的分鐘n的broadcast stream後,咱們鏈接這兩個streams。而後將它傳送到QueryFunction()處理。QueryFunction將pattern(也就是socket返回的分鐘數n)與ride進行匹配,最後返回被匹配的rides。

DataStream<TaxiRide> reports = rides  
      .keyBy((TaxiRide ride) -> ride.taxiId)  
      .connect(queryStream)  
      .process(new QueryFunction());

public static class QueryFunction extends KeyedBroadcastProcessFunction<Long, TaxiRide, String, TaxiRide> {  
	private ValueStateDescriptor<TaxiRide> taxiDescriptor =  new ValueStateDescriptor<>("saved ride", TaxiRide.class);  
	private ValueState<TaxiRide> taxiState;  
  
  @Override  
  public void open(Configuration config) {  
      // 獲得每個taxi的上一個事件的狀態 
	  taxiState = getRuntimeContext().getState(taxiDescriptor);  
  }  
  
  @Override  
  public void processElement(TaxiRide ride, ReadOnlyContext ctx, Collector< TaxiRide> out) throws Exception {  
     // For every taxi, let's store the most up-to-date information.  
	 // TaxiRide implements Comparable to make this easy.  TaxiRide savedRide = taxiState.value();  
	 if (ride.compareTo(savedRide) > 0) {  
         taxiState.update(ride);  
	  }  
   }  
  
  @Override  
  public void processBroadcastElement(String msg, Context ctx, Collector<TaxiRide> out) throws Exception {  
      DateTimeFormatter timeFormatter =  
            DateTimeFormat.forPattern("yyyy-MM-dd HH:mm:ss").withLocale(Locale.US).withZoneUTC();  
  
	  Long thresholdInMinutes = Long.valueOf(msg);  
	  Long wm = ctx.currentWatermark();  
	  System.out.println("QUERY: " + thresholdInMinutes + " minutes at " + timeFormatter.print(wm));  
  
	  // Collect to the output all ongoing rides that started at least thresholdInMinutes ago.  
	  ctx.applyToKeyedState(taxiDescriptor, new KeyedStateFunction<Long, ValueState<TaxiRide>>() {  
         @Override  
		 public void process(Long taxiId, ValueState<TaxiRide> taxiState) throws Exception {  
	         TaxiRide ride = taxiState.value();  
			 if (ride.isStart) {  
	             long minutes = (wm - ride.getEventTime()) / 60000;  
				 if (ride.isStart && (minutes >= thresholdInMinutes)) {  
	                 out.collect(ride);  
				  }  
	         }  
         }  
      });  
   }  
}

Reference:

  1. data Artisans
  2. 《Flink基礎教程》
  3. 《Learning Apache Flink》
相關文章
相關標籤/搜索