Flink內嵌支持的數據源很是多,好比HDFS、Socket、Kafka、Collections Flink也提供了addSource方 式,能夠自定義數據源html
經過讀取本地、HDFS文件建立一個數據源
若是讀取的是HDFS上的文件,那麼須要導入Hadoop依賴node
<dependency> <groupId>org.apache.hadoop</groupId> <artifactId>hadoop-client</artifactId> <version>2.6.5</version> </dependency>
import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment //在算子轉換的時候,會將數據轉換成Flink內置的數據類型,因此須要將隱式轉換導入進來,才能自動進行 類型轉換 import org.apache.flink.streaming.api.scala._ object FileSource { def main(args: Array[String]): Unit = { val env = StreamExecutionEnvironment.getExecutionEnvironment val textStream = env.readTextFile("hdfs://node01:9000/flink/data/wc") textStream.flatMap(_.split(" ")).map((_,1)).keyBy(0).sum(1).print() //讀完就中止 env.execute() } }
每隔10s中讀取HDFS指定目錄下的新增文件內容,而且進行WordCount
業務場景:在企業中通常都會作實時的ETL,當Flume採集來新的數據,那麼基於Flink實時作ETL入倉apache
//每隔10s中讀取 hdfs上新增文件內容 val textStream = env.readFile(textInputFormat,filePath,FileProcessingMode.PROCESS_CONTINUOUSLY,10 )
readTextFile底層調用的就是readFile方法,readFile是一個更加底層的方式,使用起來會更加的靈活bootstrap
基於本地集合的數據源,通常用於測試場景,沒有太大意義api
接受Socket Server中的數據dom
val initStream:DataStream[String] = env.socketTextStream("node01",8888)
Flink接受Kafka中的數據,首先先配置flink與kafka的鏈接器依賴
官網地址:https://ci.apache.org/project...
maven依賴socket
<dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-connector-kafka_2.11</artifactId> <version>1.9.2</version> </dependency>
val env = StreamExecutionEnvironment.getExecutionEnvironment val prop = new Properties() prop.setProperty("bootstrap.servers","node01:9092,node02:9092,node03:9092") prop.setProperty("group.id","flink-kafka-id001") prop.setProperty("key.deserializer",classOf[StringDeserializer].getName) prop.setProperty("value.deserializer",classOf[StringDeserializer].getName) /** * earliest:從頭開始消費,舊數據會頻繁消費 * latest:從最近的數據開始消費,再也不消費舊數據 */ prop.setProperty("auto.offset.reset","latest") val kafkaStream = env.addSource(new FlinkKafkaConsumer[(String, String)] ("flink-kafka", new KafkaDeserializationSchema[(String, String)] { override def isEndOfStream(t: (String, String)): Boolean = false override def deserialize(consumerRecord: ConsumerRecord[Array[Byte], Array[Byte]]): (String, String) = { val key = new String(consumerRecord.key(), "UTF-8") val value = new String(consumerRecord.value(), "UTF-8") (key, value) } //指定返回數據類型 override def getProducedType: TypeInformation[(String, String)] = createTuple2TypeInformation(createTypeInformation[String], createTypeInformation[String]) }, prop)) kafkaStream.print() env.execute()
kafka命令消費key value值
kafka-console-consumer.sh --zookeeper node01:2181 --topic flink-kafka --property print.key=true 默認只是消費value值
KafkaDeserializationSchema:讀取kafka中key、value
SimpleStringSchema:讀取kafka中valuemaven
val env = StreamExecutionEnvironment.getExecutionEnvironment //source的並行度爲1 單並行度source源 val stream = env.addSource(new SourceFunction[String] { var flag = true override def run(ctx: SourceFunction.SourceContext[String]): Unit = { val random = new Random() while (flag) { ctx.collect("hello" + random.nextInt(1000)) Thread.sleep(200) } } //中止產生數據 override def cancel(): Unit = flag = false }) stream.print() env.execute()
實現ParallelSourceFunction接口=繼承RichParallelSourceFunctionide
val env = StreamExecutionEnvironment.getExecutionEnvironment val sourceStream = env.addSource(new ParallelSourceFunction[String] { var flag = true override def run(ctx:SourceFunction.SourceContext[String]): Unit = { val random = new Random() while (flag) { ctx.collect("hello" + random.nextInt(1000)) Thread.sleep(500) } } override def cancel(): Unit = { flag = false } }).setParallelism(2)