主要的:pomjava
<kafka.version>0.10.2.0</kafka.version> <slf4j.version>1.7.10</slf4j.version> <spark.version>2.1.0</spark.version> ... <!-- kafka --> <dependency> <groupId>org.apache.kafka</groupId> <artifactId>kafka_2.11</artifactId> <version>${kafka.version}</version> </dependency> <!-- spark --> <dependency> <groupId>org.apache.spark</groupId> <artifactId>spark-core_2.11</artifactId> <version>${spark.version}</version> </dependency> <!-- spark streaming --> <dependency> <groupId>org.apache.spark</groupId> <artifactId>spark-streaming_2.11</artifactId> <version>${spark.version}</version> </dependency> <!-- spark streaming kafka--> <dependency> <groupId>org.apache.spark</groupId> <artifactId>spark-streaming-kafka-0-10_2.11</artifactId> <version>${spark.version}</version> </dependency>
主要功能:打印出現ERROR和WARN的logapache
import com.boyoi.kafka.KafkaParams; import org.apache.kafka.clients.consumer.ConsumerRecord; import org.apache.spark.api.java.JavaSparkContext; import org.apache.spark.streaming.Duration; import org.apache.spark.streaming.api.java.JavaDStream; import org.apache.spark.streaming.api.java.JavaInputDStream; import org.apache.spark.streaming.api.java.JavaStreamingContext; import org.apache.spark.streaming.kafka010.ConsumerStrategies; import org.apache.spark.streaming.kafka010.KafkaUtils; import org.apache.spark.streaming.kafka010.LocationStrategies; import java.io.IOException; import java.net.URISyntaxException; import java.util.Arrays; import java.util.Collection; /** * 基礎測試 */ public class Kafka2Error { public static void main(String[] avgs) throws URISyntaxException, IOException, InterruptedException { Collection<String> topics = Arrays.asList("test"); JavaSparkContext sc = new JavaSparkContext("local","error"); JavaStreamingContext jssc = new JavaStreamingContext(sc, Duration.apply(1000)); final JavaInputDStream<ConsumerRecord<String, String>> directStream = KafkaUtils.createDirectStream( jssc, LocationStrategies.PreferConsistent(), ConsumerStrategies.<String, String>Subscribe(topics, KafkaParams.getSer9Params()) ); JavaDStream<String> map = directStream.map(a -> a.value()); JavaDStream<String> filter = map.filter(s -> { if ( ( s.charAt(24) == '[' && s.charAt(25) == ' ' && s.charAt(26) == 'W' && s.charAt(27) == 'A' && s.charAt(28) == 'R' && s.charAt(29) == 'N' && s.charAt(30) == ']' ) || ( s.charAt(24) == '[' && s.charAt(25) == 'E' && s.charAt(26) == 'R' && s.charAt(27) == 'R' && s.charAt(28) == 'O' && s.charAt(29) == 'R' && s.charAt(30) == ']' ) ) { return true; } return false; }); filter.foreachRDD(each -> each.foreach(each2-> System.out.println(each2))); jssc.start(); jssc.awaitTermination(); } }
kafka默認參數以下。value.deserializer使用了自定義的反序列化類。因源使用的GBK字符集,而kafka默認使用的UTF-8來反序列化。bootstrap
import org.apache.kafka.common.serialization.StringDeserializer; import java.util.HashMap; import java.util.Map; /** * kafka 默認參數 */ public class KafkaParams { private static Map<String, Object> kafkaParams = new HashMap<String, Object>(); static { kafkaParams.put("bootstrap.servers", "192.168.1.9:9092"); kafkaParams.put("key.deserializer", StringDeserializer.class); kafkaParams.put("value.deserializer", StringDeserializerGBK.class); kafkaParams.put("group.id", "use_a_separate_group_id_for_each_stream"); kafkaParams.put("auto.offset.reset", "latest"); kafkaParams.put("enable.auto.commit", false); } public static Map<String, Object> getSer9Params(){ return kafkaParams; } }
自定義的反序列化類代碼。api
import org.apache.kafka.common.errors.SerializationException; import org.apache.kafka.common.serialization.StringDeserializer; import java.io.UnsupportedEncodingException; /** * 自定義字符反序列。默認GBK */ public class StringDeserializerGBK extends StringDeserializer { @Override public String deserialize(String topic, byte[] data) { try { return data == null?null:new String(data, "GBK"); } catch (UnsupportedEncodingException var4) { throw new SerializationException("Error when deserializing byte[] to string due to unsupported encoding gbk"); } } }