flink算子-Dataflows DataSource數據源

Dataflows DataSource數據源

Flink內嵌支持的數據源很是多,好比HDFS、Socket、Kafka、Collections Flink也提供了addSource方 式,能夠自定義數據源html

File Source

經過讀取本地、HDFS文件建立一個數據源
若是讀取的是HDFS上的文件,那麼須要導入Hadoop依賴node

<dependency> 
    <groupId>org.apache.hadoop</groupId> 
    <artifactId>hadoop-client</artifactId>              
    <version>2.6.5</version> 
</dependency>
import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment 
//在算子轉換的時候,會將數據轉換成Flink內置的數據類型,因此須要將隱式轉換導入進來,才能自動進行 類型轉換 
import org.apache.flink.streaming.api.scala._ 
object FileSource { 
    def main(args: Array[String]): Unit = { 
    val env = StreamExecutionEnvironment.getExecutionEnvironment 
    val textStream = env.readTextFile("hdfs://node01:9000/flink/data/wc") textStream.flatMap(_.split(" ")).map((_,1)).keyBy(0).sum(1).print() 
    //讀完就中止 
    env.execute() 
    } 
}

每隔10s中讀取HDFS指定目錄下的新增文件內容,而且進行WordCount
業務場景:在企業中通常都會作實時的ETL,當Flume採集來新的數據,那麼基於Flink實時作ETL入倉apache

//每隔10s中讀取    hdfs上新增文件內容 
val textStream = 
env.readFile(textInputFormat,filePath,FileProcessingMode.PROCESS_CONTINUOUSLY,10 )

readTextFile底層調用的就是readFile方法,readFile是一個更加底層的方式,使用起來會更加的靈活bootstrap

Collection Source

基於本地集合的數據源,通常用於測試場景,沒有太大意義api

Socket Source

接受Socket Server中的數據dom

val initStream:DataStream[String] = env.socketTextStream("node01",8888)

Kafka Source

Flink接受Kafka中的數據,首先先配置flink與kafka的鏈接器依賴
官網地址:https://ci.apache.org/project...
maven依賴socket

<dependency> 
    <groupId>org.apache.flink</groupId> 
    <artifactId>flink-connector-kafka_2.11</artifactId> 
    <version>1.9.2</version> 
</dependency>
val env = StreamExecutionEnvironment.getExecutionEnvironment 
val prop = new Properties() 
prop.setProperty("bootstrap.servers","node01:9092,node02:9092,node03:9092") prop.setProperty("group.id","flink-kafka-id001") 
prop.setProperty("key.deserializer",classOf[StringDeserializer].getName) prop.setProperty("value.deserializer",classOf[StringDeserializer].getName) /** 
* earliest:從頭開始消費,舊數據會頻繁消費 
* latest:從最近的數據開始消費,再也不消費舊數據 
*/ 
prop.setProperty("auto.offset.reset","latest") 
val kafkaStream = env.addSource(new FlinkKafkaConsumer[(String, String)] ("flink-kafka", new KafkaDeserializationSchema[(String, String)] { 
override def isEndOfStream(t: (String, String)): Boolean = false 
override def deserialize(consumerRecord: ConsumerRecord[Array[Byte], Array[Byte]]): (String, String) = { 
    val key = new String(consumerRecord.key(), "UTF-8") 
    val value = new String(consumerRecord.value(), "UTF-8") 
    (key, value) 
} 
//指定返回數據類型 
override def getProducedType: TypeInformation[(String, String)] = createTuple2TypeInformation(createTypeInformation[String], 
createTypeInformation[String]) 
}, prop)) 
kafkaStream.print() 
env.execute()

kafka命令消費key value值
kafka-console-consumer.sh --zookeeper node01:2181 --topic flink-kafka --property print.key=true 默認只是消費value值
KafkaDeserializationSchema:讀取kafka中key、value
SimpleStringSchema:讀取kafka中valuemaven

Custom Source

基於SourceFunction接口實現單並行度數據源
val env = StreamExecutionEnvironment.getExecutionEnvironment 
//source的並行度爲1 單並行度source源 
val stream = env.addSource(new SourceFunction[String] { 
var flag = true 
override def run(ctx: SourceFunction.SourceContext[String]): Unit = { 
    val random = new Random() 
    while (flag) { 
        ctx.collect("hello" + random.nextInt(1000)) 
        Thread.sleep(200) 
    } 
} 
//中止產生數據 
override def cancel(): Unit = flag = false 
}) 
stream.print() 
env.execute()
基於ParallelSourceFunction接口實現多並行度數據源

實現ParallelSourceFunction接口=繼承RichParallelSourceFunctionide

val env = StreamExecutionEnvironment.getExecutionEnvironment 
val sourceStream = env.addSource(new ParallelSourceFunction[String] { 
    var flag = true 
    override def run(ctx:SourceFunction.SourceContext[String]): Unit = { 
        val random = new Random() 
        while (flag) { 
            ctx.collect("hello" + random.nextInt(1000)) 
            Thread.sleep(500) 
        } 
    } 
    override def cancel(): Unit = { 
    flag = false 
    } 
}).setParallelism(2)
相關文章
相關標籤/搜索