http://training.data-artisans.com/是Apache Flink商業公司DataArtisans提供的一個flink學習平臺,主要提供了一些業務場景和flink api結合的case。本文摘取其中一個計算出租車上/下客人熱點區域demo進行分析。html
一 數據準備java
flink-traing的大部分例子是以New York City Taxi & Limousine Commission 提供的一份歷史數據集做爲練習數據源,其中最經常使用一種類型爲taxi ride的事件定義爲git
rideId : Long // a unique id for each ride taxiId : Long // a unique id for each taxi driverId : Long // a unique id for each driver isStart : Boolean // TRUE for ride start events, FALSE for ride end events startTime : DateTime // the start time of a ride endTime : DateTime // the end time of a ride, // "1970-01-01 00:00:00" for start events startLon : Float // the longitude of the ride start location startLat : Float // the latitude of the ride start location endLon : Float // the longitude of the ride end location endLat : Float // the latitude of the ride end location passengerCnt : Short // number of passengers on the ride
下載數據集github
wget http://training.data-artisans.com/trainingData/nycTaxiRides.gz
windows
將數據源轉化爲flink stream source數據api
// get an ExecutionEnvironment StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); // configure event-time processing env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime); // get the taxi ride data stream DataStream<TaxiRide> rides = env.addSource( new TaxiRideSource("/path/to/nycTaxiRides.gz", maxDelay, servingSpeed));
二 座標分格app
以下圖所示,程序將整個城市座標由西北向東南劃分爲大約250X400個單位的單元格ide
三 根據單元格計算座標值學習
基礎座標數據ui
// geo boundaries of the area of NYC public static double LON_EAST = -73.7; public static double LON_WEST = -74.05; public static double LAT_NORTH = 41.0; public static double LAT_SOUTH = 40.5; // area width and height public static double LON_WIDTH = 74.05 - 73.7; public static double LAT_HEIGHT = 41.0 - 40.5; // delta step to create artificial grid overlay of NYC public static double DELTA_LON = 0.0014; public static double DELTA_LAT = 0.00125; // ( |LON_WEST| - |LON_EAST| ) / DELTA_LON public static int NUMBER_OF_GRID_X = 250; // ( LAT_NORTH - LAT_SOUTH ) / DELTA_LAT public static int NUMBER_OF_GRID_Y = 400;
根據經緯度計算單元格惟一id
public static int mapToGridCell(float lon, float lat) { int xIndex = (int)Math.floor((Math.abs(LON_WEST) - Math.abs(lon)) / DELTA_LON); int yIndex = (int)Math.floor((LAT_NORTH - lat) / DELTA_LAT); return xIndex + (yIndex * NUMBER_OF_GRID_X); }
四 程序實現
將座標映射到gridId以後剩下的就是採用窗口統計單位時間內event事件超過必定閾值的grid。
// find popular places DataStream<Tuple5<Float, Float, Long, Boolean, Integer>> popularSpots = rides // remove all rides which are not within NYC .filter(new RideCleansing.NYCFilter()) // match ride to grid cell and event type (start or end) .map(new GridCellMatcher()) // partition by cell id and event type .<KeyedStream<Tuple2<Integer, Boolean>, Tuple2<Integer, Boolean>>>keyBy(0, 1) // build sliding window .timeWindow(Time.minutes(15), Time.minutes(5)) // count ride events in window .apply(new RideCounter()) // filter by popularity threshold .filter((Tuple4<Integer, Long, Boolean, Integer> count) -> (count.f3 >= popThreshold)) // map grid cell to coordinates .map(new GridToCoordinates()); // print result on stdout popularSpots.print();
上述flink job在統計完熱點區域後又將gridId映射回每一個單元格的中心點經緯度,具體實現爲:
/** * Maps the grid cell id back to longitude and latitude coordinates. */ public static class GridToCoordinates implements MapFunction<Tuple4<Integer, Long, Boolean, Integer>, Tuple5<Float, Float, Long, Boolean, Integer>> { @Override public Tuple5<Float, Float, Long, Boolean, Integer> map( Tuple4<Integer, Long, Boolean, Integer> cellCount) throws Exception { return new Tuple5<>( GeoUtils.getGridCellCenterLon(cellCount.f0), GeoUtils.getGridCellCenterLat(cellCount.f0), cellCount.f1, cellCount.f2, cellCount.f3); } } /** * Returns the longitude of the center of a grid cell. * * @param gridCellId The grid cell. * * @return The longitude value of the cell's center. */ public static float getGridCellCenterLon(int gridCellId) { int xIndex = gridCellId % NUMBER_OF_GRID_X; return (float)(Math.abs(LON_WEST) - (xIndex * DELTA_LON) - (DELTA_LON / 2)) * -1.0f; } /** * Returns the latitude of the center of a grid cell. * * @param gridCellId The grid cell. * * @return The latitude value of the cell's center. */ public static float getGridCellCenterLat(int gridCellId) { int xIndex = gridCellId % NUMBER_OF_GRID_X; int yIndex = (gridCellId - xIndex) / NUMBER_OF_GRID_X; return (float)(LAT_NORTH - (yIndex * DELTA_LAT) - (DELTA_LAT / 2)); }
結論: 綜上所示,經過單元格劃分,flink程序能夠方便的解決實時統計熱點地理區域這一類問題。
代碼地址:https://github.com/dataArtisans/flink-training-exercises/blob/master/src/main/java/com/dataartisans/flinktraining/exercises/datastream_java/windows/PopularPlaces.java