kafka系列之camel-kafka

概述

首先關於 camel 的基本概念和用法,以及 kafka 的基本概念和用法,這裏就不囉嗦了。這篇文章假設你對兩者都有基本的認識。html

camel 自己是一個路由引擎,經過 camel 你能夠定義路由規則,指定從哪裏(源)接收消息,如何處理這些消息,以及發往哪裏(目標)。camel-kafka 就是 camel 的其中一個組件,它從指定的 kafka topic 獲取消息來源進行處理。java

有些小夥伴可能有疑問了,kafka 自己不就是生產者-消費者模式嗎?原生 kafka 發佈消息,而後消費進行消息處理不就好了,爲啥還用 camel-kafka 呢?git

首先恭喜你是一個愛思考的小夥伴!這個問題的答案是這樣,camel 自己提供的是高層次的抽象,你能夠選擇從 kafka 做爲源接收數據,也可使用其它組件,好比mq,文件等。camel 讓你能使用相同的api和處理流程,處理不一樣協議和數據類型的系統。github

全部總結下,(下面這句話很重要,讀三遍)apache

camel實現了客戶端與服務端的解耦, 生產者和消費者的解耦。api

好比咱們能夠選擇從kafka獲取消息,而後發送到jms(activemq)。bash

from("kafka:test?brokers=localhost:9092")
.to("jms:queue:test.mq.queue")

在這裏插入圖片描述

詳解camel-kafka

camel對每一個組件約定一個發送和接受的 endpoint uri,kafka 的uri格式是,app

kafka:topic[?options]

option中有不少選項,好比最重要的 brokers 指定 kafka 服務的地址。而後是uri的參數,相似http uri的參數格式。下面是個示例:ui

from("kafka:test?brokers=localhost:9200"
                        + "&maxPollRecords=5000"
                        + "&consumersCount=1"
                        + "&seekTo=beginning"
                        + "&groupId=kafkaGroup")
                        .routeId("FromKafka")
                    .to("stream:out");

每一個參數的意思我就不一一解釋了,在文章最後我會給出官方的連接,你們能夠本身去查閱。spa

說了這麼多,咱們仍是運行一個程序看看效果。這個程序來自 apache camel 官方example,完整的代碼在文章的最後有連接。

首先,pom引入依賴,

<dependency>
            <groupId>org.apache.camel</groupId>
            <artifactId>camel-kafka</artifactId>
            <version>2.24.1</version>
        </dependency>

我須要在本地啓動一個 kafka 的server,具體過程網上不少,這裏不囉嗦了。惟一要注意的是 kafka server 的版本最好跟 camel-kafka 引入的 kafka-client 版本一致,以避免踩坑。

kafka環境安裝好以後,建立兩個topic,

bogon:kafka_2.11-2.2.0 ponyma$ ./bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 2 --topic TestLog
Created topic TestLog.
bogon:kafka_2.11-2.2.0 ponyma$ ./bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic AccessLog
Created topic AccessLog.

先來看看消費者部分的代碼,

public static void main(String[] args) throws Exception {

        System.out.println("About to run Kafka-camel integration2...");

        CamelContext camelContext = new DefaultCamelContext();

        // Add route to send messages to Kafka

        camelContext.addRoutes(new RouteBuilder() {
            public void configure() {
                PropertiesComponent pc = getContext().getComponent("properties", PropertiesComponent.class);
                pc.setLocation("classpath:application.properties");

                System.out.println("About to start route: Kafka Server -> Log ");

                from("kafka:{{consumer.topic}}?brokers={{kafka.host}}:{{kafka.port}}"
                        + "&maxPollRecords={{consumer.maxPollRecords}}"
                        + "&consumersCount={{consumer.consumersCount}}"
                        + "&seekTo={{consumer.seekTo}}"
                        + "&groupId={{consumer.group}}")
                        .routeId("FromKafka")
                    .to("stream:out");
            }
        });
        camelContext.start();

        // let it run for 5 minutes before shutting down
        Thread.sleep(5 * 60 * 1000);

        camelContext.stop();
    }

這個代碼的核心就是camel的路由配置,也很簡單,當前這個路由的意思是,從 kafka 某個 topic 讀取數據,不作任何處理直接發送到標準輸出。

再來看看生產者,

camelContext.addRoutes(new RouteBuilder() {
            public void configure() {
                PropertiesComponent pc = getContext().getComponent("properties", PropertiesComponent.class);
                pc.setLocation("classpath:application.properties");

                // setup kafka component with the brokers
                KafkaComponent kafka = new KafkaComponent();
                kafka.setBrokers("{{kafka.host}}:{{kafka.port}}");
                camelContext.addComponent("kafka", kafka);

                from("direct:kafkaStart").routeId("DirectToKafka")
                    .to("kafka:{{producer.topic}}").log(LoggingLevel.INFO, "${headers}");

                // Topic can be set in header as well.

                from("direct:kafkaStartNoTopic").routeId("kafkaStartNoTopic")
                    .to("kafka:dummy")
                    .log(LoggingLevel.INFO, "${headers}");

                // Use custom partitioner based on the key.
                /**
                 * partitioner指定分區發送
                 */

                from("direct:kafkaStartWithPartitioner").routeId("kafkaStartWithPartitioner")
                        .to("kafka:{{producer.topic}}?partitioner={{producer.partitioner}}")
                        .log("${headers}");


                // Takes input from the command line.

                from("stream:in").setHeader(KafkaConstants.PARTITION_KEY, simple("0"))
                        .setHeader(KafkaConstants.KEY, simple("1")).to("direct:kafkaStart");

            }

        });
  • 第一個 from to 意思是監聽 direct:kafkaStart ,發送到指定的 topic。
  • 第二個 from to 也是監聽某個 direct,可是沒有發送的 kafka的topic上。
  • 第三個 from to 是監聽 direct:kafkaStartWithPartitioner,發送到特定 topic 的特定分區上。
  • 第四個 from to 是監聽咱們在控制檯的輸入,發送到 direct:kafkaStart。

上面四個 from to 對應 下面四個發送的示例,經過日誌打印咱們能夠看看數據是否被正確的進行路由了。

headers.put(KafkaConstants.PARTITION_KEY, 0);
headers.put(KafkaConstants.KEY, "1");
producerTemplate.sendBodyAndHeaders("direct:kafkaStart", testKafkaMessage, headers);

這段代碼的意思是,生產者發送數據到 direct:kafkaStart 這個endpoint上, headers指定了全部的消息都會發送到 kafka topic 的第一個分區。

headers.put(KafkaConstants.KEY, "2");
headers.put(KafkaConstants.TOPIC, "TestLog");
producerTemplate.sendBodyAndHeaders("direct:kafkaStartNoTopic", testKafkaMessage, headers);

生產者發送數據到 direct:kafkaStartNoTopic 這個endpoint上,對應上面第二個 from to ,雖然沒有指定發送目標的 kafka topic,可是咱們在 header 裏指定了 topic,因此跟第一個 from to 其實能夠達到一樣的效果。

後面兩個就不貼出代碼了,一個是發送到分區0,一個發送到分區1。分區的原則是 header 裏指定的key,分區器是自定義的,在源碼 stringPartitioner.java 中。這裏不表。

先啓動消費者端,而後啓動生產者端,結果以下:

在這裏插入圖片描述

在這裏插入圖片描述

能夠看到,運行的結果跟咱們分析的是一致的。


本文所用的示例源碼地址:

https://camel.apache.org/comp...

參考:

https://github.com/apache/cam...

在這裏插入圖片描述歡迎你們關注個人公衆號

相關文章
相關標籤/搜索