版權聲明:本套技術專欄是做者(秦凱新)平時工做的總結和昇華,經過從真實商業環境抽取案例進行總結和分享,並給出商業應用的調優建議和集羣環境容量規劃等內容,請持續關注本套博客。版權聲明:禁止轉載,歡迎學習。QQ郵箱地址:1120746959@qq.com,若有任何問題,可隨時聯繫。java
// 設置檢查點目錄
ssc.checkpoint("./streaming_checkpoint")
// 獲取Kafka配置(經過配置文件讀取,ConfigurationManager自定義方法)
val broker_list = ConfigurationManager.config.getString("kafka.broker.list")
val topics = ConfigurationManager.config.getString("kafka.topics")
// kafka消費者配置
val kafkaParam = Map(
"bootstrap.servers" -> broker_list,//用於初始化連接到集羣的地址
"key.deserializer" -> classOf[StringDeserializer],
"value.deserializer" -> classOf[StringDeserializer],
//用於標識這個消費者屬於哪一個消費團體
"group.id" -> "commerce-consumer-group",
//若是沒有初始化偏移量或者當前的偏移量不存在任何服務器上,可使用這個配置屬性
//可使用這個配置,latest自動重置偏移量爲最新的偏移量
"auto.offset.reset" -> "latest",
//若是是true,則這個消費者的偏移量會在後臺自動提交
"enable.auto.commit" -> (false: java.lang.Boolean)
)
// 建立DStream,返回接收到的輸入數據
// LocationStrategies:根據給定的主題和集羣地址建立consumer
// LocationStrategies.PreferConsistent:持續的在全部Executor之間分配分區
// ConsumerStrategies:選擇如何在Driver和Executor上建立和配置Kafka Consumer
// ConsumerStrategies.Subscribe:訂閱一系列主題
val adRealTimeLogDStream=KafkaUtils.createDirectStream[String,String](ssc,
LocationStrategies.PreferConsistent,ConsumerStrategies.Subscribe[String,String](Array(topics),kafkaParam))
複製代碼
setStartFromGroupOffsets()【默認消費策略】apache
默認讀取上次保存的offset信息 若是是應用第一次啓動,讀取不到上次的offset信息,則會根據這個參數auto.offset.reset的值來進行消費數據bootstrap
setStartFromEarliest() 從最先的數據開始進行消費,忽略存儲的offset信息緩存
setStartFromLatest() 從最新的數據進行消費,忽略存儲的offset信息服務器
setStartFromSpecificOffsets(Map<KafkaTopicPartition, Long>)架構
當checkpoint機制開啓的時候,KafkaConsumer會按期把kafka的offset信息還有其餘operator的狀態信息一塊保存起來。當job失敗重啓的時候,Flink會從最近一次的checkpoint中進行恢復數據,從新消費kafka中的數據。app
爲了可以使用支持容錯的kafka Consumer,須要開啓checkpoint env.enableCheckpointing(5000); // 每5s checkpoint一次運維
Kafka Consumers Offset 自動提交有如下兩種方法來設置,能夠根據job是否開啓checkpoint來區分:異步
(1) Checkpoint關閉時: 能夠經過下面兩個參數配置socket
enable.auto.commit
(2) Checkpoint開啓時:當執行checkpoint的時候纔會保存offset,這樣保證了kafka的offset和checkpoint的狀態偏移量保持一致。 能夠經過這個參數設置
setCommitOffsetsOnCheckpoints(boolean)
這個參數默認就是true。表示在checkpoint的時候提交offset, 此時,kafka中的自動提交機制就會被忽略
依賴引入:
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-statebackend-rocksdb_2.11</artifactId>
<version>1.6.1</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-connector-kafka-0.11_2.11</artifactId>
<version>1.6.1</version>
</dependency>
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-clients</artifactId>
<version>0.11.0.3</version>
</dependency>
案例實戰:
public class StreamingKafkaSource {
public static void main(String[] args) throws Exception {
//獲取Flink的運行環境
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
//checkpoint配置
env.enableCheckpointing(5000);
env.getCheckpointConfig().setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE);
env.getCheckpointConfig().setMinPauseBetweenCheckpoints(500);
env.getCheckpointConfig().setCheckpointTimeout(60000);
env.getCheckpointConfig().setMaxConcurrentCheckpoints(1);
env.getCheckpointConfig().enableExternalizedCheckpoints(CheckpointConfig.ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION);
//設置statebackend
//env.setStateBackend(new RocksDBStateBackend("hdfs://hadoop100:9000/flink/checkpoints",true));
String topic = "kafkaConsumer";
Properties prop = new Properties();
prop.setProperty("bootstrap.servers","SparkMaster:9092");
prop.setProperty("group.id","kafkaConsumerGroup");
FlinkKafkaConsumer011<String> myConsumer = new FlinkKafkaConsumer011<>(topic, new SimpleStringSchema(), prop);
myConsumer.setStartFromGroupOffsets();//默認消費策略
DataStreamSource<String> text = env.addSource(myConsumer);
text.print().setParallelism(1);
env.execute("StreamingFromCollection");
}
}
複製代碼
Kafka Producer的容錯-Kafka 0.9 and 0.10
若是Flink開啓了checkpoint,針對FlinkKafkaProducer09和FlinkKafkaProducer010 能夠提供 at-least-once的語義,還須要配置下面兩個參數:
setLogFailuresOnly(false)
setFlushOnCheckpoint(true)
注意:建議修改kafka 生產者的重試次數retries【這個參數的值默認是0】
Kafka Producer的容錯-Kafka 0.11,若是Flink開啓了checkpoint,針對FlinkKafkaProducer011 就能夠提供 exactly-once的語義,可是須要選擇具體的語義
Semantic.NONE
Semantic.AT_LEAST_ONCE【默認】
Semantic.EXACTLY_ONCE
public class StreamingKafkaSink {
public static void main(String[] args) throws Exception {
//獲取Flink的運行環境
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
//checkpoint配置
env.enableCheckpointing(5000);
env.getCheckpointConfig().setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE);
env.getCheckpointConfig().setMinPauseBetweenCheckpoints(500);
env.getCheckpointConfig().setCheckpointTimeout(60000);
env.getCheckpointConfig().setMaxConcurrentCheckpoints(1);
env.getCheckpointConfig().enableExternalizedCheckpoints(CheckpointConfig.ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION);
//設置statebackend
//env.setStateBackend(new RocksDBStateBackend("hdfs://SparkMaster:9000/flink/checkpoints",true));
DataStreamSource<String> text = env.socketTextStream("SparkMaster", 9001, "\n");
String brokerList = "SparkMaster:9092";
String topic = "kafkaProducer";
Properties prop = new Properties();
prop.setProperty("bootstrap.servers",brokerList);
//第一種解決方案,設置FlinkKafkaProducer011裏面的事務超時時間
//設置事務超時時間
//prop.setProperty("transaction.timeout.ms",60000*15+"");
//第二種解決方案,設置kafka的最大事務超時時間,主要是kafka的配置文件設置。
//FlinkKafkaProducer011<String> myProducer = new FlinkKafkaProducer011<>(brokerList, topic, new SimpleStringSchema());
//使用僅一次語義的kafkaProducer
FlinkKafkaProducer011<String> myProducer = new FlinkKafkaProducer011<>(topic, new KeyedSerializationSchemaWrapper<String>(new SimpleStringSchema()), prop, FlinkKafkaProducer011.Semantic.EXACTLY_ONCE);
text.addSink(myProducer);
env.execute("StreamingFromCollection");
}
}
複製代碼
kafka必不可少,關於kafka還有不少要說的內容,詳情請參考個人kafka商業環境實戰系列吧。
版權聲明:本套技術專欄是做者(秦凱新)平時工做的總結和昇華,經過從真實商業環境抽取案例進行總結和分享,並給出商業應用的調優建議和集羣環境容量規劃等內容,請持續關注本套博客。版權聲明:禁止轉載,歡迎學習。QQ郵箱地址:1120746959@qq.com,若有任何問題,可隨時聯繫。
秦凱新 於深圳 20181127023