先說下版本狀況:
Spark 2.4.3
Scala 2.11.12
Flume-1.6.0java
Flume配置文件:
simple-agent.sources = netcat-source simple-agent.sinks = spark-sink simple-agent.channels = memory-channel #Describe/configure the source simple-agent.sources.netcat-source.type = netcat simple-agent.sources.netcat-source.bind =centos simple-agent.sources.netcat-source.port= 44444 # Describe the sink simple-agent.sinks.spark-sink.type=org.apache.spark.streaming.flume.sink.SparkSink simple-agent.sinks.spark-sink.hostname= centos simple-agent.sinks.spark-sink.port= 41414 simple-agent.channels.memory-channel.type = memory simple-agent.sources.netcat-source.channels = memory-channel simple-agent.sinks.spark-sink.channel = memory-channel
啓動腳本:apache
flume-ng agent --name simple-agent --conf $FLUME_HOME/conf --conf-file $FLUME_HOME/conf/flume_pull.conf -Dflume.root.logger=INFO,console
到以上步驟均沒有出現問題。可是將本地測試代碼啓動,嘗試與Flume的sink進行鏈接時,崩了...centos
2019-10-16 16:42:35,364 (New I/O worker #1) [WARN - org.apache.avro.ipc.Responder.respond(Responder.java:174)] system error org.apache.avro.AvroRuntimeException: Unknown datum type: java.lang.Exception: java.lang.NoClassDefFoundError: Could not initialize class org.apache.spark.streaming.flume.sink.EventBatch at org.apache.avro.generic.GenericData.getSchemaName(GenericData.java:593) at org.apache.avro.generic.GenericData.resolveUnion(GenericData.java:558) at org.apache.avro.generic.GenericDatumWriter.resolveUnion(GenericDatumWriter.java:144) at org.apache.avro.generic.GenericDatumWriter.write(GenericDatumWriter.java:71) at org.apache.avro.generic.GenericDatumWriter.write(GenericDatumWriter.java:58) at org.apache.avro.ipc.specific.SpecificResponder.writeError(SpecificResponder.java:74) at org.apache.avro.ipc.Responder.respond(Responder.java:169) at org.apache.avro.ipc.NettyServer$NettyServerAvroHandler.messageReceived(NettyServer.java:188) at org.jboss.netty.channel.SimpleChannelUpstreamHandler.handleUpstream(SimpleChannelUpstreamHandler.java:70) at org.apache.avro.ipc.NettyServer$NettyServerAvroHandler.handleUpstream(NettyServer.java:173) at org.jboss.netty.channel.DefaultChannelPipeline.sendUpstream(DefaultChannelPipeline.java:558) at org.jboss.netty.channel.DefaultChannelPipeline$DefaultChannelHandlerContext.sendUpstream(DefaultChannelPipeline.java:786) at org.jboss.netty.channel.Channels.fireMessageReceived(Channels.java:296) at org.jboss.netty.handler.codec.frame.FrameDecoder.unfoldAndFireMessageReceived(FrameDecoder.java:458) at org.jboss.netty.handler.codec.frame.FrameDecoder.callDecode(FrameDecoder.java:439) at org.jboss.netty.handler.codec.frame.FrameDecoder.messageReceived(FrameDecoder.java:303) at org.jboss.netty.channel.SimpleChannelUpstreamHandler.handleUpstream(SimpleChannelUpstreamHandler.java:70) at org.jboss.netty.channel.DefaultChannelPipeline.sendUpstream(DefaultChannelPipeline.java:558) at org.jboss.netty.channel.DefaultChannelPipeline.sendUpstream(DefaultChannelPipeline.java:553) at org.jboss.netty.channel.Channels.fireMessageReceived(Channels.java:268) at org.jboss.netty.channel.Channels.fireMessageReceived(Channels.java:255) at org.jboss.netty.channel.socket.nio.NioWorker.read(NioWorker.java:84) at org.jboss.netty.channel.socket.nio.AbstractNioWorker.processSelectedKeys(AbstractNioWorker.java:471) at org.jboss.netty.channel.socket.nio.AbstractNioWorker.run(AbstractNioWorker.java:332) at org.jboss.netty.channel.socket.nio.NioWorker.run(NioWorker.java:35) at org.jboss.netty.util.ThreadRenamingRunnable.run(ThreadRenamingRunnable.java:102) at org.jboss.netty.util.internal.DeadLockProofWorker$1.run(DeadLockProofWorker.java:42) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) at java.lang.Thread.run(Thread.java:748) 2019-10-16 16:42:35,380 (New I/O worker #1) [WARN - org.apache.avro.ipc.NettyServer$NettyServerAvroHandler.exceptionCaught(NettyServer.java:201)] Unexpected exception from downstream. java.io.IOException: Connection reset by peer at sun.nio.ch.FileDispatcherImpl.read0(Native Method) at sun.nio.ch.SocketDispatcher.read(SocketDispatcher.java:39) at sun.nio.ch.IOUtil.readIntoNativeBuffer(IOUtil.java:223) at sun.nio.ch.IOUtil.read(IOUtil.java:192) at sun.nio.ch.SocketChannelImpl.read(SocketChannelImpl.java:380) at org.jboss.netty.channel.socket.nio.NioWorker.read(NioWorker.java:59) at org.jboss.netty.channel.socket.nio.AbstractNioWorker.processSelectedKeys(AbstractNioWorker.java:471) at org.jboss.netty.channel.socket.nio.AbstractNioWorker.run(AbstractNioWorker.java:332) at org.jboss.netty.channel.socket.nio.NioWorker.run(NioWorker.java:35) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) at java.util.concurrent.ThreadPoolExecuto
10/16 16:56:38 ERROR Requestor: Error in callback handler: java.lang.IllegalAccessError: tried to access method org.apache.avro.specific.SpecificData.<init>()V from class org.apache.spark.streaming.flume.sink.EventBatch java.lang.IllegalAccessError: tried to access method org.apache.avro.specific.SpecificData.<init>()V from class org.apache.spark.streaming.flume.sink.EventBatch
既然都有這個org.apache.spark.streaming.flume.sink.EventBatch,所幸就看看代碼吧
package org.apache.spark.streaming.flume.sink; import org.apache.avro.specific.SpecificData; import org.apache.avro.message.BinaryMessageEncoder; import org.apache.avro.message.BinaryMessageDecoder; import org.apache.avro.message.SchemaStore; @SuppressWarnings("all") @org.apache.avro.specific.AvroGenerated public class EventBatch extends org.apache.avro.specific.SpecificRecordBase implements org.apache.avro.specific.SpecificRecord { private static final long serialVersionUID = -2739787017790252011L; public static final org.apache.avro.Schema SCHEMA$ = new org.apache.avro.Schema.Parser().parse("{\"type\":\"record\",\"name\":\"EventBatch\",\"namespace\":\"org.apache.spark.streaming.flume.sink\",\"fields\":[{\"name\":\"errorMsg\",\"type\":\"string\",\"default\":\"\"},{\"name\":\"sequenceNumber\",\"type\":\"string\"},{\"name\":\"events\",\"type\":{\"type\":\"array\",\"items\":{\"type\":\"record\",\"name\":\"SparkSinkEvent\",\"fields\":[{\"name\":\"headers\",\"type\":{\"type\":\"map\",\"values\":\"string\"}},{\"name\":\"body\",\"type\":\"bytes\"}]}}}]}"); public static org.apache.avro.Schema getClassSchema() { return SCHEMA$; } private static SpecificData MODEL$ = new SpecificData(); private static final BinaryMessageEncoder<EventBatch> ENCODER = new BinaryMessageEncoder<EventBatch>(MODEL$, SCHEMA$); private static final BinaryMessageDecoder<EventBatch> DECODER = new BinaryMessageDecoder<EventBatch>(MODEL$, SCHEMA$);
在IDEA中能夠看到 org.apache.avro.message.BinaryMessageEncoder;
這行是紅色的,沒有找到該方法。而後我就搜索了一下,
果真是我用的avro版本過舊。socket
1.在代碼的pom.xml中添加如下依賴。測試
<!-- https://mvnrepository.com/artifact/org.apache.avro/avro --> <dependency> <groupId>org.apache.avro</groupId> <artifactId>avro</artifactId> <version>1.8.2</version> </dependency> <!-- https://mvnrepository.com/artifact/org.apache.avro/avro-ipc --> <dependency> <groupId>org.apache.avro</groupId> <artifactId>avro-ipc</artifactId> <version>1.8.2</version> </dependency>
2.將以上兩個jar包上傳至 $FLUME_HOME/lib下,並刪除舊的avro jar包。
歡迎關注個人公號:彪悍大藍貓,持續分享大數據、SpringCloud乾貨~大數據