kafka+spark streaming集成

主要的:pomjava

<kafka.version>0.10.2.0</kafka.version>
        <slf4j.version>1.7.10</slf4j.version>
        <spark.version>2.1.0</spark.version>        
...
        <!-- kafka -->
        <dependency>
            <groupId>org.apache.kafka</groupId>
            <artifactId>kafka_2.11</artifactId>
            <version>${kafka.version}</version>
        </dependency>
        <!-- spark -->
        <dependency>
            <groupId>org.apache.spark</groupId>
            <artifactId>spark-core_2.11</artifactId>
            <version>${spark.version}</version>
        </dependency>
        <!-- spark streaming -->
        <dependency>
            <groupId>org.apache.spark</groupId>
            <artifactId>spark-streaming_2.11</artifactId>
            <version>${spark.version}</version>
        </dependency>
        <!-- spark streaming kafka-->
        <dependency>
            <groupId>org.apache.spark</groupId>
            <artifactId>spark-streaming-kafka-0-10_2.11</artifactId>
            <version>${spark.version}</version>
        </dependency>

 

主要功能:打印出現ERROR和WARN的logapache

import com.boyoi.kafka.KafkaParams;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.streaming.Duration;
import org.apache.spark.streaming.api.java.JavaDStream;
import org.apache.spark.streaming.api.java.JavaInputDStream;
import org.apache.spark.streaming.api.java.JavaStreamingContext;
import org.apache.spark.streaming.kafka010.ConsumerStrategies;
import org.apache.spark.streaming.kafka010.KafkaUtils;
import org.apache.spark.streaming.kafka010.LocationStrategies;

import java.io.IOException;
import java.net.URISyntaxException;
import java.util.Arrays;
import java.util.Collection;

/**
 * 基礎測試
 */
public class Kafka2Error {

    public static void main(String[] avgs) throws URISyntaxException, IOException, InterruptedException {

        Collection<String> topics = Arrays.asList("test");
        JavaSparkContext sc = new JavaSparkContext("local","error");

        JavaStreamingContext jssc = new JavaStreamingContext(sc, Duration.apply(1000));
        final JavaInputDStream<ConsumerRecord<String, String>> directStream = KafkaUtils.createDirectStream(
                jssc,
                LocationStrategies.PreferConsistent(),
                ConsumerStrategies.<String, String>Subscribe(topics, KafkaParams.getSer9Params())
        );

        JavaDStream<String> map = directStream.map(a -> a.value());

        JavaDStream<String> filter = map.filter(s -> {
            if (
                    (
                            s.charAt(24) == '[' &&
                                    s.charAt(25) == ' ' &&
                                    s.charAt(26) == 'W' &&
                                    s.charAt(27) == 'A' &&
                                    s.charAt(28) == 'R' &&
                                    s.charAt(29) == 'N' &&
                                    s.charAt(30) == ']'
                    )
                            ||
                            (
                                    s.charAt(24) == '[' &&
                                            s.charAt(25) == 'E' &&
                                            s.charAt(26) == 'R' &&
                                            s.charAt(27) == 'R' &&
                                            s.charAt(28) == 'O' &&
                                            s.charAt(29) == 'R' &&
                                            s.charAt(30) == ']'
                            )
                    ) {
                return true;
            }
            return false;
        });

        filter.foreachRDD(each -> each.foreach(each2-> System.out.println(each2)));

        jssc.start();
        jssc.awaitTermination();
    }

}

 

kafka默認參數以下。value.deserializer使用了自定義的反序列化類。因源使用的GBK字符集,而kafka默認使用的UTF-8來反序列化。bootstrap

import org.apache.kafka.common.serialization.StringDeserializer;

import java.util.HashMap;
import java.util.Map;

/**
 * kafka 默認參數
 */
public class KafkaParams {
    private static Map<String, Object> kafkaParams = new HashMap<String, Object>();
    static {
        kafkaParams.put("bootstrap.servers", "192.168.1.9:9092");
        kafkaParams.put("key.deserializer", StringDeserializer.class);
        kafkaParams.put("value.deserializer", StringDeserializerGBK.class);
        kafkaParams.put("group.id", "use_a_separate_group_id_for_each_stream");
        kafkaParams.put("auto.offset.reset", "latest");
        kafkaParams.put("enable.auto.commit", false);
    }

    public static Map<String, Object> getSer9Params(){
        return kafkaParams;
    }


}

自定義的反序列化類代碼。api

import org.apache.kafka.common.errors.SerializationException;
import org.apache.kafka.common.serialization.StringDeserializer;

import java.io.UnsupportedEncodingException;

/**
 * 自定義字符反序列。默認GBK
 */
public class StringDeserializerGBK extends StringDeserializer {

    @Override
    public String deserialize(String topic, byte[] data) {
        try {
            return data == null?null:new String(data, "GBK");
        } catch (UnsupportedEncodingException var4) {
            throw new SerializationException("Error when deserializing byte[] to string due to unsupported encoding gbk");
        }
    }
}
相關文章
相關標籤/搜索