導入兩個jar包到flume/lib下java
org.apache.flume.FlumeException: Unable to load sink type: org.apache.spark.streaming.flume.sink.SparkSink, class: org.apache.spark.streaming.flume.sink.SparkSinkapache
java.lang.IllegalStateException: begin() called when transaction is OPEN!app
a1.sources = r1 a1.sinks = k1 a1.channels = c1 # source a1.sources.r1.type=spooldir a1.sources.r1.spoolDir=/home/zhuzhu/apps/flumeSpooding a1.sources.r1.fileHeader=true # Describe the sink a1.sinks.k1.type = org.apache.spark.streaming.flume.sink.SparkSink # 當前主機端口 a1.sinks.k1.hostname = 192.168.137.88 a1.sinks.k1.port = 9999 # Use a channel which buffers events in memory a1.channels.c1.type = memory a1.channels.c1.capacity = 1000 a1.channels.c1.transactionCapacity = 100 # Bind the source and sink to the channel a1.sources.r1.channels = c1 a1.sinks.k1.channel = c1
package day02 import java.net.InetSocketAddress import org.apache.spark.storage.StorageLevel import org.apache.spark.streaming.{Seconds, StreamingContext} import org.apache.spark.streaming.dstream.{DStream, ReceiverInputDStream} import org.apache.spark.streaming.flume.{FlumeUtils, SparkFlumeEvent} import org.apache.spark.{SparkConf, SparkContext} /** * @ClassName: StreamingFlume * @Description TODO 實時監控flume,統計flume數據產生,是Spark * @Author: Charon * @Date: 2021/4/7 13:19 * @Version 1.0 **/ object StreamingFlume { def main(args: Array[String]): Unit = { //1.建立SparkConf對象 val conf: SparkConf = new SparkConf().setMaster("local[*]").setAppName("StreamingFlume") //2.建立SparkContext對象 val sc = new SparkContext(conf) //設置日誌輸出格式,只打印異常日誌,在這裏設置沒有用 //sc.setLogLevel("WARN") //3.建立StreamingContext,Seconds(5):輪詢機制,多久執行一次 val ssc = new StreamingContext(sc, Seconds(5)) //4.定義一個flume集合,能夠接受多個flume數據,多個用,隔開須要new val addresses = Seq(new InetSocketAddress("127.0.0.1", 5555)) //5.獲取flume中的數據, val stream: ReceiverInputDStream[SparkFlumeEvent] = FlumeUtils.createPollingStream(ssc, addresses, StorageLevel.MEMORY_AND_DISK_2) // 6.截取flume數據:{"header":xxxxx "body":xxxxxx} val lineDstream: DStream[String] = stream.map(x => new String(x.event.getBody.array())) lineDstream.flatMap(x=>x.split(" ")).map(x=>(x,1)).reduceByKey(_+_).print() ssc.start() ssc.awaitTermination() } }
向指定目錄上傳文件spa