Flink維表關聯方式

在實際生產中,咱們常常會有這樣的需求,須要以原始數據流做爲基礎,而後關聯大量的外部表來補充一些屬性。例如,咱們在訂單數據中,但願能獲得訂單收貨人所在省的名稱,通常來講訂單中會記錄一個省的 ID,那麼須要根據 ID 去查詢外部的維度表補充省名稱屬性。mysql

在 Flink 流式計算中,咱們的一些維度屬性通常存儲在 MySQL/HBase/Redis 中,這些維表數據存在定時更新,須要咱們根據業務進行關聯。根據咱們業務對維表數據關聯的時效性要求,有如下幾種解決方案:sql

  1. 實時查詢維表關聯數據庫

  2. 預加載維表關聯json

  3. 熱存儲關聯緩存

  4. 其餘網絡

 

實時查詢維表關聯

實時查詢維表是指用戶在 Flink 算子中直接訪問外部數據庫,好比用 MySQL 來進行關聯,這種方式是同步方式,數據保證是最新的。可是,當咱們的流計算數據過大,會對外部系統帶來巨大的訪問壓力,一旦出現好比鏈接失敗、線程池滿等狀況,因爲咱們是同步調用,因此通常會致使線程阻塞、Task 等待數據返回,影響總體任務的吞吐量。並且這種方案對外部系統的 QPS 要求較高,在大數據實時計算場景下,QPS 遠遠高於普通的後臺系統,峯值高達十萬到幾十萬,總體做業瓶頸轉移到外部系統。併發

這種方式的核心是,咱們能夠在 Flink 的 Map 算子中創建訪問外部系統的鏈接。下面以訂單數據爲例,咱們根據下單用戶的城市 ID,去關聯城市名稱,核心代碼實現以下:異步

public class Order {
    private Integer cityId;
    private String userName;
    private String items;
    private String cityName;
​
    public Order(Integer cityId, String userName, String items, String cityName) {
        this.cityId = cityId;
        this.userName = userName;
        this.items = items;
        this.cityName = cityName;
    }
​
    public Order() {
    }
​
    public Integer getCityId() {
        return cityId;
    }
​
    public void setCityId(Integer cityId) {
        this.cityId = cityId;
    }
​
    public String getUserName() {
        return userName;
    }
​
    public void setUserName(String userName) {
        this.userName = userName;
    }
​
    public String getItems() {
        return items;
    }
​
    public void setItems(String items) {
        this.items = items;
    }
​
    public String getCityName() {
        return cityName;
    }
​
    public void setCityName(String cityName) {
        this.cityName = cityName;
    }
​
    @Override
    public String toString() {
        return "Order{" +
                "cityId=" + cityId +
                ", userName='" + userName + '\'' +
                ", items='" + items + '\'' +
                ", cityName='" + cityName + '\'' +
                '}';
    }
}

 

public class DimSync extends RichMapFunction<String,Order> {
    private static final Logger LOGGER = LoggerFactory.getLogger(DimSync.class);
    private Connection conn = null;
    public void open(Configuration parameters) throws Exception {
        super.open(parameters);
        conn = DriverManager.getConnection("jdbc:mysql://localhost:3306/dim?characterEncoding=UTF-8", "admin", "admin");
    }
    public Order map(String in) throws Exception {
        JSONObject jsonObject = JSONObject.parseObject(in);
        Integer cityId = jsonObject.getInteger("city_id");
        String userName = jsonObject.getString("user_name");
        String items = jsonObject.getString("items");
        //根據city_id 查詢 city_name
        PreparedStatement pst = conn.prepareStatement("select city_name from info where city_id = ?");
        pst.setInt(1,cityId);
        ResultSet resultSet = pst.executeQuery();
        String cityName = null;
        while (resultSet.next()){
            cityName = resultSet.getString(1);
        }
        pst.close();
        return new Order(cityId,userName,items,cityName);
    }
    public void close() throws Exception {
        super.close();
        conn.close();
    }
}

 

在上面這段代碼中,RichMapFunction 中封裝了整個查詢維表,而後進行關聯這個過程。須要注意的是,通常咱們在查詢小數據量的維表狀況下才使用這種方式,而且要妥善處理鏈接外部系統的線程,通常還會用到線程池。最後,爲了保證鏈接及時關閉和釋放,必定要在最後的 close 方式釋放鏈接,不然會將 MySQL 的鏈接數打滿致使任務失敗。async

預加載維表關聯

全量預加載數據是爲了解決每條數據流經咱們的數據系統都會對外部系統發起訪問,以及對外部系統頻繁訪問而致使的鏈接和性能問題。這種思路是,每當咱們的系統啓動時,就將維度表數據所有加載到內存中,而後數據在內存中進行關聯,不須要直接訪問外部數據庫。ide

這種方式的優點是咱們只須要一次性地訪問外部數據庫,大大提升了效率。但問題在於,一旦咱們的維表數據發生更新,那麼 Flink 任務是沒法感知的,可能會出現維表數據不一致,針對這種狀況咱們能夠採起定時拉取維表數據。而且這種方式因爲是將維表數據緩存在內存中,對計算節點的內存消耗很高,因此不能適用於數量很大的維度表。

咱們仍是用上面的場景,根據下單用戶的城市 ID 去關聯城市名稱,核心代碼實現以下:

public class WholeLoad extends RichMapFunction<String,Order> {
​
    private static final Logger LOGGER = LoggerFactory.getLogger(WholeLoad.class);
    ScheduledExecutorService executor = null;
    private Map<String,String> cache;
​
    @Override
    public void open(Configuration parameters) throws Exception {
        super.open(parameters);
        executor.scheduleAtFixedRate(new Runnable() {
            @Override
            public void run() {
                try {
                    load();
                } catch (Exception e) {
                    e.printStackTrace();
                }
            }
        },5,5, TimeUnit.MINUTES);//每隔 5 分鐘拉取一次維表數據
    }
​
    @Override
    public Order map(String value) throws Exception {
        JSONObject jsonObject = JSONObject.parseObject(value);
        Integer cityId = jsonObject.getInteger("city_id");
        String userName = jsonObject.getString("user_name");
        String items = jsonObject.getString("items");
        String cityName = cache.get(cityId);
        return new Order(cityId,userName,items,cityName);
    }
​
    public void load() throws Exception {
        Class.forName("com.mysql.jdbc.Driver");
        Connection con = DriverManager.getConnection("jdbc:mysql://localhost:3306/dim?characterEncoding=UTF-8", "admin", "admin");
        PreparedStatement statement = con.prepareStatement("select city_id,city_name from info");
        ResultSet rs = statement.executeQuery();
        //全量更新維度數據到內存
        while (rs.next()) {
            String cityId = rs.getString("city_id");
            String cityName = rs.getString("city_name");
            cache.put(cityId, cityName);
        }
        con.close();
    }
}

 

在上面的例子中,咱們使用 ScheduledExecutorService 每隔 5 分鐘拉取一次維表數據。這種方式適用於那些實時場景不是很高,維表數據較小的場景。

優勢:實現簡單

缺點:僅支持小數據量維表

適用場景:維表小,變動頻率低,對變動及時性要求低

方案2:

經過Distributed Cache 分發本地維度文件到task manager後加載到內存關聯。

實現方式:

經過env.registerCachedFile註冊文件。

實現RichFunction,在open()中經過RuntimeContext獲取cache文件。

解析和使用文件數據。

優勢:不須要外部數據庫

缺點:支持維度數據量比較小,更新須要更改文件並重啓做業

適用場景:維度數據是以文件形式,數據量小,更新頻率低。好比:靜態碼錶,配置文件。

 

熱存儲關聯

 

 

 

在這裏推薦使用 Guava 庫提供的 CacheBuilder 來建立咱們的緩存:

CacheBuilder.newBuilder()
        //最多存儲10000條
        .maximumSize(10000)
        //過時時間爲1分鐘
        .expireAfterWrite(60, TimeUnit.SECONDS)
        .build();

總體的實現思路是:咱們利用 Flink 的 RichAsyncFunction 讀取 Hbase 的數據到緩存中,咱們在關聯維度表時先去查詢緩存,若是緩存中不存在這條數據,就利用客戶端去查詢 Hbase,而後插入到緩存中。

首先咱們須要一個 Hbase 的異步客戶端:

<dependency>
    <groupId>org.hbase</groupId>
    <artifactId>asynchbase</artifactId>
    <version>1.8.2</version>
</dependency>

核心的代碼實現以下:

public class LRU extends RichAsyncFunction<String,Order> {
​
    private static final Logger LOGGER = LoggerFactory.getLogger(LRU.class);
    String table = "info";
    Cache<String, String> cache = null;
    private HBaseClient client = null;
    @Override
    public void open(Configuration parameters) throws Exception {
        super.open(parameters);
        //建立hbase客戶端
        client = new HBaseClient("127.0.0.1","7071");
        cache = CacheBuilder.newBuilder()
                //最多存儲10000條
                .maximumSize(10000)
                //過時時間爲1分鐘
                .expireAfterWrite(60, TimeUnit.SECONDS)
                .build();
    }
​
    @Override
    public void asyncInvoke(String input, ResultFuture<Order> resultFuture) throws Exception {
​
        JSONObject jsonObject = JSONObject.parseObject(input);
        Integer cityId = jsonObject.getInteger("city_id");
        String userName = jsonObject.getString("user_name");
        String items = jsonObject.getString("items");
        //讀緩存
        String cacheCityName = cache.getIfPresent(cityId);
​
        if(cacheCityName != null){
            Order order = new Order();
            order.setCityId(cityId);
            order.setItems(items);
            order.setUserName(userName);
            order.setCityName(cacheCityName);
            resultFuture.complete(Collections.singleton(order));
        }else {
            //若是緩存獲取失敗再從hbase獲取維度數據
            client.get(new GetRequest(table,String.valueOf(cityId))).addCallback((Callback<String, ArrayList<KeyValue>>) arg -> {
                for (KeyValue kv : arg) {
                    String value = new String(kv.value());
                    Order order = new Order();
                    order.setCityId(cityId);
                    order.setItems(items);
                    order.setUserName(userName);
                    order.setCityName(value);
                    resultFuture.complete(Collections.singleton(order));
                    cache.put(String.valueOf(cityId), value);
                }
                return null;
            });
​
        }
    }
​
}

 

這裏須要特別注意的是,咱們用到了異步 IO (RichAsyncFunction),這個功能的出現就是爲了解決與外部系統交互時網絡延遲成爲系統瓶頸的問題。

咱們在流計算環境中,在查詢外部維表時,假如訪問是同步進行的,那麼總體能力勢必受限於外部系統。正是由於異步 IO 的出現使得訪問外部系統能夠併發的進行,而且不須要同步等待返回,大大減輕了由於網絡等待時間等引發的系統吞吐和延遲問題。

咱們在使用異步 IO 時,必定要使用異步客戶端,若是沒有異步客戶端咱們能夠本身建立線程池模擬異步請求。

優勢:維度數據不受限於內存,支持較多維度數據

缺點:須要熱存儲資源,維度更新反饋到結果有延遲(熱存儲導入,cache)

適用場景:維度數據量大,可接受維度更新有必定的延遲。

其餘

除了上述常見的處理方式,咱們還能夠經過將維表消息廣播出去,或者自定義異步線程池訪問維表,甚至還能夠本身擴展 Flink SQL 中關聯維表的方式直接使用 SQL Join 方法關聯查詢結果。

整體來說,關聯維表的方式就以上幾種方式,而且基於這幾種方式還會衍生出各類各樣的解決方案。咱們在評價一個方案的優劣時,應該從業務自己出發,不一樣的業務場景下使用不一樣的方式。

相關文章
相關標籤/搜索