Flink異步之矛盾-鋒利的Async I/O

維表JOIN-繞不過去的業務場景

在Flink 流處理過程當中,常常須要和外部系統進行交互,用維度表補全事實表中的字段。java

例如:在電商場景中,須要一個商品的skuid去關聯商品的一些屬性,例如商品所屬行業、商品的生產廠家、生產廠家的一些狀況;在物流場景中,知道包裹id,須要去關聯包裹的行業屬性、發貨信息、收貨信息等等。mysql

默認狀況下,在Flink的MapFunction中,單個並行只能用同步方式去交互: 將請求發送到外部存儲,IO阻塞,等待請求返回,而後繼續發送下一個請求。這種同步交互的方式每每在網絡等待上就耗費了大量時間。爲了提升處理效率,能夠增長MapFunction的並行度,但增長並行度就意味着更多的資源,並非一種很是好的解決方式。面試

Async I/O異步非阻塞請求

Flink 在1.2中引入了Async I/O,在異步模式下,將IO操做異步化,單個並行能夠連續發送多個請求,哪一個請求先返回就先處理,從而在連續的請求間不須要阻塞式等待,大大提升了流處理效率。redis

Async I/O 是阿里巴巴貢獻給社區的一個呼聲很是高的特性,解決與外部系統交互時網絡延遲成爲了系統瓶頸的問題。sql

file

圖中棕色的長條表示等待時間,能夠發現網絡等待時間極大地阻礙了吞吐和延遲。爲了解決同步訪問的問題,異步模式能夠併發地處理多個請求和回覆。也就是說,你能夠連續地向數據庫發送用戶a、b、c等的請求,與此同時,哪一個請求的回覆先返回了就處理哪一個回覆,從而連續的請求之間不須要阻塞等待,如上圖右邊所示。這也正是 Async I/O 的實現原理。數據庫

詳細的原理能夠參考文末給出的第一個連接,來自阿里巴巴雲邪的分享。apache

一個簡單的例子以下:bootstrap

public class AsyncIOFunctionTest {
    public static void main(String[] args) throws Exception {

        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
        env.setParallelism(1);

        Properties p = new Properties();
        p.setProperty("bootstrap.servers", "localhost:9092");

        DataStreamSource<String> ds = env.addSource(new FlinkKafkaConsumer010<String>("order", new SimpleStringSchema(), p));
        ds.print();

        SingleOutputStreamOperator<Order> order = ds
                .map(new MapFunction<String, Order>() {
                    @Override
                    public Order map(String value) throws Exception {
                        return new Gson().fromJson(value, Order.class);
                    }
                })
                .assignTimestampsAndWatermarks(new AscendingTimestampExtractor<Order>() {
                    @Override
                    public long extractAscendingTimestamp(Order element) {
                        try {
                            return element.getOrderTime();
                        } catch (Exception e) {
                            e.printStackTrace();
                        }
                        return 0;
                    }
                })
                .keyBy(new KeySelector<Order, String>() {
                    @Override
                    public String getKey(Order value) throws Exception {
                        return value.getUserId();
                    }
                })
                .window(TumblingEventTimeWindows.of(Time.minutes(10)))
                .maxBy("orderTime");

        SingleOutputStreamOperator<Tuple7<String, String, Integer, String, String, String, Long>> operator = AsyncDataStream
                .unorderedWait(order, new RichAsyncFunction<Order, Tuple7<String, String, Integer, String, String, String, Long>>() {

                    private Connection connection;

                    @Override
                    public void open(Configuration parameters) throws Exception {
                        super.open(parameters);
                        Class.forName("com.mysql.jdbc.Driver");
                        connection = DriverManager.getConnection("url", "user", "pwd");
                        connection.setAutoCommit(false);
                    }

                    @Override
                    public void asyncInvoke(Order input, ResultFuture<Tuple7<String, String, Integer, String, String, String, Long>> resultFuture) throws Exception {
                        List<Tuple7<String, String, Integer, String, String, String, Long>> list = new ArrayList<>();
                        // 在 asyncInvoke 方法中異步查詢數據庫
                        String userId = input.getUserId();
                        Statement statement = connection.createStatement();
                        ResultSet resultSet = statement.executeQuery("select name,age,sex from user where userid=" + userId);
                        if (resultSet != null && resultSet.next()) {
                            String name = resultSet.getString("name");
                            int age = resultSet.getInt("age");
                            String sex = resultSet.getString("sex");
                            Tuple7<String, String, Integer, String, String, String, Long> res = Tuple7.of(userId, name, age, sex, input.getOrderId(), input.getPrice(), input.getOrderTime());
                            list.add(res);
                        }

                        // 將數據蒐集
                        resultFuture.complete(list);
                    }

                    @Override
                    public void close() throws Exception {
                        super.close();
                        if (connection != null) {
                            connection.close();
                        }
                    }
                }, 5000, TimeUnit.MILLISECONDS,100);

        operator.print();


        env.execute("AsyncIOFunctionTest");
    }
}
複製代碼

上述代碼中,原始訂單流來自Kafka,去關聯維度表將訂單的用戶信息取出來。從上面示例中可看到,咱們在open()中建立鏈接對象,在close()方法中關閉鏈接,在RichAsyncFunction的asyncInvoke()方法中,直接查詢數據庫操做,並將數據返回出去。這樣一個簡單異步請求就完成了。api

Async I/O的原理和基本用法

簡單的來講,使用 Async I/O 對應到 Flink 的 API 就是 RichAsyncFunction 這個抽象類,繼層這個抽象類實現裏面的open(初始化),asyncInvoke(數據異步調用),close(中止的一些操做)方法,最主要的是實現asyncInvoke 裏面的方法。緩存

咱們先來看一個使用Async I/O的模板方法:

// This example implements the asynchronous request and callback with Futures that have the
// interface of Java 8's futures (which is the same one followed by Flink's Future)

/**
 * An implementation of the 'AsyncFunction' that sends requests and sets the callback.
 */
class AsyncDatabaseRequest extends RichAsyncFunction<String, Tuple2<String, String>> {

    /** The database specific client that can issue concurrent requests with callbacks */
    private transient DatabaseClient client;

    @Override
    public void open(Configuration parameters) throws Exception {
        client = new DatabaseClient(host, post, credentials);
    }

    @Override
    public void close() throws Exception {
        client.close();
    }

    @Override
    public void asyncInvoke(String key, final ResultFuture<Tuple2<String, String>> resultFuture) throws Exception {

        // issue the asynchronous request, receive a future for result
        final Future<String> result = client.query(key);

        // set the callback to be executed once the request by the client is complete
        // the callback simply forwards the result to the result future
        CompletableFuture.supplyAsync(new Supplier<String>() {

            @Override
            public String get() {
                try {
                    return result.get();
                } catch (InterruptedException | ExecutionException e) {
                    // Normally handled explicitly.
                    return null;
                }
            }
        }).thenAccept( (String dbResult) -> {
            resultFuture.complete(Collections.singleton(new Tuple2<>(key, dbResult)));
        });
    }
}

// create the original stream
DataStream<String> stream = ...;

// apply the async I/O transformation
DataStream<Tuple2<String, String>> resultStream =
    AsyncDataStream.unorderedWait(stream, new AsyncDatabaseRequest(), 1000, TimeUnit.MILLISECONDS, 100);
複製代碼

假設咱們一個場景是須要進行異步請求其餘數據庫,那麼要實現一個經過異步I/O來操做數據庫還須要三個步驟:  一、實現用來分發請求的AsyncFunction  二、獲取操做結果的callback,並將它提交到AsyncCollector中  三、將異步I/O操做轉換成DataStream

其中的兩個重要的參數:複製代碼

Timeouttimeout 定義了異步操做過了多長時間後會被丟棄,這個參數是防止了死的或者失敗的請求Capacity 這個參數定義了能夠同時處理多少個異步請求。雖然異步I/O方法會帶來更好的吞吐量,可是算子仍然會成爲流應用的瓶頸。超過限制的併發請求數量會產生背壓。

幾個須要注意的點:

  • 使用Async I/O,須要外部存儲有支持異步請求的客戶端。
  • 使用Async I/O,繼承RichAsyncFunction(接口AsyncFunction 的抽象類),重寫或實現open(創建鏈接)、close(關閉鏈接)、asyncInvoke(異步調用)3個方法便可。
  • 使用Async I/O, 最好結合緩存一塊兒使用,可減小請求外部存儲的次數,提升效率。
  • Async I/O 提供了Timeout參數來控制請求最長等待時間。默認,異步I/O請求超時時,會引起異常並重啓或中止做業。 若是要處理超時,能夠重寫AsyncFunction#timeout方法。
  • Async I/O 提供了Capacity參數控制請求併發數,一旦Capacity被耗盡,會觸發反壓機制來抑制上游數據的攝入。
  • Async I/O 輸出提供亂序和順序兩種模式。
亂序, 用AsyncDataStream.unorderedWait(...) API,每一個並行的輸出順序和輸入順序可能不一致。複製代碼
順序, 用AsyncDataStream.orderedWait(...) API,每一個並行的輸出順序和輸入順序一致。爲保證順序,須要在輸出的Buffer中排序,該方式效率會低一些。
複製代碼

Flink 1.9 的優化

因爲新合入的 Blink 相關功能,使得 Flink 1.9 實現維表功能很簡單。若是你要使用該功能,那就須要本身引入 Blink 的 Planner。

<dependency>
    <groupId>org.apache.flink</groupId>
    <artifactId>flink-table-planner-blink_${scala.binary.version}</artifactId>
    <version>${flink.version}</version>
</dependency>
複製代碼

而後咱們只要自定義實現 LookupableTableSource 接口,同時實現裏面的方法就能夠進行,下面來分析一下 LookupableTableSource的代碼:

public interface LookupableTableSource<T> extends TableSource<T> {
     TableFunction<T> getLookupFunction(String[] lookupKeys);
     AsyncTableFunction<T> getAsyncLookupFunction(String[] lookupKeys);
     boolean isAsyncEnabled();
}
複製代碼

這三個方法分別是:

  • isAsyncEnabled 方法主要表示該表是否支持異步訪問外部數據源獲取數據,當返回 true 時,那麼在註冊到 TableEnvironment 後,使用時會返回異步函數進行調用,當返回 false 時,則使同步訪問函數。
  • getLookupFunction 方法返回一個同步訪問外部數據系統的函數,什麼意思呢,就是你經過 Key 去查詢外部數據庫,須要等到返回數據後才繼續處理數據,這會對系統處理的吞吐率有影響。
  • getAsyncLookupFunction 方法則是返回一個異步的函數,異步訪問外部數據系統,獲取數據,這能極大的提高系統吞吐率。

咱們拋開同步訪問函數無論,對於getAsyncLookupFunction會返回異步訪問外部數據源的函數,若是你想使用異步函數,前提是 LookupableTableSource 的 isAsyncEnabled 方法返回 true 才能使用。使用異步函數訪問外部數據系統,通常是外部系統有異步訪問客戶端,若是沒有的話,能夠本身使用線程池異步訪問外部系統。例如:

public class MyAsyncLookupFunction extends AsyncTableFunction<Row> {
    private transient RedisAsyncCommands<String, String> async;
    @Override
    public void open(FunctionContext context) throws Exception {
        RedisClient redisClient = RedisClient.create("redis://127.0.0.1:6379");
        StatefulRedisConnection<String, String> connection = redisClient.connect();
        async = connection.async();
    }
    public void eval(CompletableFuture<Collection<Row>> future, Object... params) {
        redisFuture.thenAccept(new Consumer<String>() {
            @Override
            public void accept(String value) {
                future.complete(Collections.singletonList(Row.of(key, value)));
            }
        });
    }
}

複製代碼

一個完整的例子以下:

Main方法:

import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.common.serialization.SimpleStringSchema;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.common.typeinfo.Types;
import org.apache.flink.api.java.typeutils.RowTypeInfo;
import org.apache.flink.api.java.utils.ParameterTool;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer011;
import org.apache.flink.table.api.EnvironmentSettings;
import org.apache.flink.table.api.Table;
import org.apache.flink.table.api.java.StreamTableEnvironment;
import org.apache.flink.types.Row;
import org.junit.Test;
 
import java.util.Properties;
 
public class LookUpAsyncTest {
 
    @Test
    public void test() throws Exception {
        LookUpAsyncTest.main(new String[]{});
    }
 
    public static void main(String[] args) throws Exception {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        //env.setParallelism(1);
        EnvironmentSettings settings = EnvironmentSettings.newInstance().useBlinkPlanner().inStreamingMode().build();
        StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env, settings);
 
        final ParameterTool params = ParameterTool.fromArgs(args);
        String fileName = params.get("f");
        DataStream<String> source = env.readTextFile("hdfs://172.16.44.28:8020" + fileName, "UTF-8");
 
        TypeInformation[] types = new TypeInformation[]{Types.STRING, Types.STRING, Types.LONG};
        String[] fields = new String[]{"id", "user_click", "time"};
        RowTypeInfo typeInformation = new RowTypeInfo(types, fields);
 
        DataStream<Row> stream = source.map(new MapFunction<String, Row>() {
            private static final long serialVersionUID = 2349572543469673349L;
 
            @Override
            public Row map(String s) {
                String[] split = s.split(",");
                Row row = new Row(split.length);
                for (int i = 0; i < split.length; i++) {
                            
                    Object value = split[i];
                    if (types[i].equals(Types.STRING)) {
                        value = split[i];
                    }
                    if (types[i].equals(Types.LONG)) {
                        value = Long.valueOf(split[i]);
                    }
                    row.setField(i, value);
                }
                return row;
            }
        }).returns(typeInformation);
 
        tableEnv.registerDataStream("user_click_name", stream, String.join(",", typeInformation.getFieldNames()) + ",proctime.proctime");
 
        RedisAsyncLookupTableSource tableSource = RedisAsyncLookupTableSource.Builder.newBuilder()
                .withFieldNames(new String[]{"id", "name"})
                .withFieldTypes(new TypeInformation[]{Types.STRING, Types.STRING})
                .build();
        tableEnv.registerTableSource("info", tableSource);
 
        String sql = "select t1.id,t1.user_click,t2.name" +
                " from user_click_name as t1" +
                " join info FOR SYSTEM_TIME AS OF t1.proctime as t2" +
                " on t1.id = t2.id";
 
        Table table = tableEnv.sqlQuery(sql);
 
        DataStream<Row> result = tableEnv.toAppendStream(table, Row.class);
 
        DataStream<String> printStream = result.map(new MapFunction<Row, String>() {
            @Override
            public String map(Row value) throws Exception {
                return value.toString();
            }
        });
 
        Properties properties = new Properties();
        properties.setProperty("bootstrap.servers", "127.0.0.1:9094");
        FlinkKafkaProducer011<String> kafkaProducer = new FlinkKafkaProducer011<>(
                "user_click_name",  
                new SimpleStringSchema(),
                properties);
        printStream.addSink(kafkaProducer);
 
        tableEnv.execute(Thread.currentThread().getStackTrace()[1].getClassName());
    }
}
複製代碼

RedisAsyncLookupTableSource方法:

import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.java.typeutils.RowTypeInfo;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.api.TableSchema;
import org.apache.flink.table.functions.AsyncTableFunction;
import org.apache.flink.table.functions.TableFunction;
import org.apache.flink.table.sources.LookupableTableSource;
import org.apache.flink.table.sources.StreamTableSource;
import org.apache.flink.table.types.DataType;
import org.apache.flink.table.types.utils.TypeConversions;
import org.apache.flink.types.Row;
 
public class RedisAsyncLookupTableSource implements StreamTableSource<Row>, LookupableTableSource<Row> {
 
    private final String[] fieldNames;
    private final TypeInformation[] fieldTypes;
 
    public RedisAsyncLookupTableSource(String[] fieldNames, TypeInformation[] fieldTypes) {
       this.fieldNames = fieldNames;
        this.fieldTypes = fieldTypes;
    }
 
    //同步方法
    @Override
    public TableFunction<Row> getLookupFunction(String[] strings) {
        return null;
    }
 
    //異步方法
    @Override
    public AsyncTableFunction<Row> getAsyncLookupFunction(String[] strings) {
        return MyAsyncLookupFunction.Builder.getBuilder()
                .withFieldNames(fieldNames)
                .withFieldTypes(fieldTypes)
                .build();
    }
 
    //開啓異步
    @Override
    public boolean isAsyncEnabled() {
        return true;
    }
 
    @Override
    public DataType getProducedDataType() {
        return TypeConversions.fromLegacyInfoToDataType(new RowTypeInfo(fieldTypes, fieldNames));
    }
 
    @Override
    public TableSchema getTableSchema() {
        return TableSchema.builder()
                .fields(fieldNames, TypeConversions.fromLegacyInfoToDataType(fieldTypes))
                .build();
    }
 
    @Override
    public DataStream<Row> getDataStream(StreamExecutionEnvironment environment) {
        throw new UnsupportedOperationException("do not support getDataStream");
    }
 
    public static final class Builder {
        private String[] fieldNames;
        private TypeInformation[] fieldTypes;
 
        private Builder() {
        }
 
        public static Builder newBuilder() {
            return new Builder();
        }
 
        public Builder withFieldNames(String[] fieldNames) {
            this.fieldNames = fieldNames;
            return this;
        }
 
        public Builder withFieldTypes(TypeInformation[] fieldTypes) {
            this.fieldTypes = fieldTypes;
            return this;
        }
 
        public RedisAsyncLookupTableSource build() {
            return new RedisAsyncLookupTableSource(fieldNames, fieldTypes);
        }
    }
}
複製代碼

MyAsyncLookupFunction

import io.lettuce.core.RedisClient;
import io.lettuce.core.RedisFuture;
import io.lettuce.core.api.StatefulRedisConnection;
import io.lettuce.core.api.async.RedisAsyncCommands;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.java.typeutils.RowTypeInfo;
import org.apache.flink.table.functions.AsyncTableFunction;
import org.apache.flink.table.functions.FunctionContext;
import org.apache.flink.types.Row;
 
import java.util.Collection;
import java.util.Collections;
import java.util.concurrent.CompletableFuture;
import java.util.function.Consumer;
 
public class MyAsyncLookupFunction extends AsyncTableFunction<Row> {
 
    private final String[] fieldNames;
    private final TypeInformation[] fieldTypes;
 
    private transient RedisAsyncCommands<String, String> async;
 
    public MyAsyncLookupFunction(String[] fieldNames, TypeInformation[] fieldTypes) {
        this.fieldNames = fieldNames;
        this.fieldTypes = fieldTypes;
    }
 
    @Override
    public void open(FunctionContext context) {
        //配置redis異步鏈接
        RedisClient redisClient = RedisClient.create("redis://127.0.0.1:6379");
        StatefulRedisConnection<String, String> connection = redisClient.connect();
        async = connection.async();
    }
 
    //每一條流數據都會調用此方法進行join
    public void eval(CompletableFuture<Collection<Row>> future, Object... paramas) {
        //表名、主鍵名、主鍵值、列名
        String[] info = {"userInfo", "userId", paramas[0].toString(), "userName"};
        String key = String.join(":", info);
        RedisFuture<String> redisFuture = async.get(key);
 
        redisFuture.thenAccept(new Consumer<String>() {
            @Override
            public void accept(String value) {
                future.complete(Collections.singletonList(Row.of(key, value)));
                //todo
//                BinaryRow row = new BinaryRow(2);
            }
        });
    }
 
    @Override
    public TypeInformation<Row> getResultType() {
        return new RowTypeInfo(fieldTypes, fieldNames);
    }
 
    public static final class Builder {
        private String[] fieldNames;
        private TypeInformation[] fieldTypes;
 
        private Builder() {
        }
 
        public static Builder getBuilder() {
            return new Builder();
        }
 
        public Builder withFieldNames(String[] fieldNames) {
            this.fieldNames = fieldNames;
            return this;
        }
 
        public Builder withFieldTypes(TypeInformation[] fieldTypes) {
            this.fieldTypes = fieldTypes;
            return this;
        }
 
        public MyAsyncLookupFunction build() {
            return new MyAsyncLookupFunction(fieldNames, fieldTypes);
        }
    }
}
複製代碼

十分須要注意的幾個點:

一、 外部數據源必須是異步客戶端:若是是線程安全的(多個客戶端一塊兒使用),你能夠不加 transient 關鍵字,初始化一次。不然,你須要加上 transient,不對其進行初始化,而在 open 方法中,爲每一個 Task 實例初始化一個。

二、eval 方法中多了一個 CompletableFuture,當異步訪問完成時,須要調用其方法進行處理。好比上面例子中的:

redisFuture.thenAccept(new Consumer<String>() {
            @Override
            public void accept(String value) {
                future.complete(Collections.singletonList(Row.of(key, value)));
            }
        });
複製代碼

三、社區雖然提供異步關聯維度表的功能,但事實上大數據量下關聯外部系統維表仍然會成爲系統的瓶頸,因此通常咱們會在同步函數和異步函數中加入緩存。綜合併發、易用、實時更新和多版本等因素考慮,Hbase是最理想的外部維表。

參考文章:http://wuchong.me/blog/2017/05/17/flink-internals-async-io/#https://www.jianshu.com/p/d8f99d94b761https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=65870673https://www.jianshu.com/p/7ce84f978ae0

關注個人公衆號,後臺回覆【JAVAPDF】獲取200頁面試題!5萬人關注的大數據成神之路,不來了解一下嗎?5萬人關注的大數據成神之路,真的不來了解一下嗎?5萬人關注的大數據成神之路,肯定真的不來了解一下嗎?

歡迎您關注《大數據成神之路》

大數據技術與架構

相關文章
相關標籤/搜索