首先關於 camel 的基本概念和用法,以及 kafka 的基本概念和用法,這裏就不囉嗦了。這篇文章假設你對兩者都有基本的認識。html
camel 自己是一個路由引擎,經過 camel 你能夠定義路由規則,指定從哪裏(源)接收消息,如何處理這些消息,以及發往哪裏(目標)。camel-kafka 就是 camel 的其中一個組件,它從指定的 kafka topic 獲取消息來源進行處理。java
有些小夥伴可能有疑問了,kafka 自己不就是生產者-消費者模式嗎?原生 kafka 發佈消息,而後消費進行消息處理不就好了,爲啥還用 camel-kafka 呢?git
首先恭喜你是一個愛思考的小夥伴!這個問題的答案是這樣,camel 自己提供的是高層次的抽象,你能夠選擇從 kafka 做爲源接收數據,也可使用其它組件,好比mq,文件等。camel 讓你能使用相同的api和處理流程,處理不一樣協議和數據類型的系統。github
全部總結下,(下面這句話很重要,讀三遍)apache
camel實現了客戶端與服務端的解耦, 生產者和消費者的解耦。api
好比咱們能夠選擇從kafka獲取消息,而後發送到jms(activemq)。bash
from("kafka:test?brokers=localhost:9092") .to("jms:queue:test.mq.queue")
camel對每一個組件約定一個發送和接受的 endpoint uri,kafka 的uri格式是,app
kafka:topic[?options]
option中有不少選項,好比最重要的 brokers 指定 kafka 服務的地址。而後是uri的參數,相似http uri的參數格式。下面是個示例:ui
from("kafka:test?brokers=localhost:9200" + "&maxPollRecords=5000" + "&consumersCount=1" + "&seekTo=beginning" + "&groupId=kafkaGroup") .routeId("FromKafka") .to("stream:out");
每一個參數的意思我就不一一解釋了,在文章最後我會給出官方的連接,你們能夠本身去查閱。spa
說了這麼多,咱們仍是運行一個程序看看效果。這個程序來自 apache camel 官方example,完整的代碼在文章的最後有連接。
首先,pom引入依賴,
<dependency> <groupId>org.apache.camel</groupId> <artifactId>camel-kafka</artifactId> <version>2.24.1</version> </dependency>
我須要在本地啓動一個 kafka 的server,具體過程網上不少,這裏不囉嗦了。惟一要注意的是 kafka server 的版本最好跟 camel-kafka 引入的 kafka-client 版本一致,以避免踩坑。
kafka環境安裝好以後,建立兩個topic,
bogon:kafka_2.11-2.2.0 ponyma$ ./bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 2 --topic TestLog Created topic TestLog. bogon:kafka_2.11-2.2.0 ponyma$ ./bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic AccessLog Created topic AccessLog.
先來看看消費者部分的代碼,
public static void main(String[] args) throws Exception { System.out.println("About to run Kafka-camel integration2..."); CamelContext camelContext = new DefaultCamelContext(); // Add route to send messages to Kafka camelContext.addRoutes(new RouteBuilder() { public void configure() { PropertiesComponent pc = getContext().getComponent("properties", PropertiesComponent.class); pc.setLocation("classpath:application.properties"); System.out.println("About to start route: Kafka Server -> Log "); from("kafka:{{consumer.topic}}?brokers={{kafka.host}}:{{kafka.port}}" + "&maxPollRecords={{consumer.maxPollRecords}}" + "&consumersCount={{consumer.consumersCount}}" + "&seekTo={{consumer.seekTo}}" + "&groupId={{consumer.group}}") .routeId("FromKafka") .to("stream:out"); } }); camelContext.start(); // let it run for 5 minutes before shutting down Thread.sleep(5 * 60 * 1000); camelContext.stop(); }
這個代碼的核心就是camel的路由配置,也很簡單,當前這個路由的意思是,從 kafka 某個 topic 讀取數據,不作任何處理直接發送到標準輸出。
再來看看生產者,
camelContext.addRoutes(new RouteBuilder() { public void configure() { PropertiesComponent pc = getContext().getComponent("properties", PropertiesComponent.class); pc.setLocation("classpath:application.properties"); // setup kafka component with the brokers KafkaComponent kafka = new KafkaComponent(); kafka.setBrokers("{{kafka.host}}:{{kafka.port}}"); camelContext.addComponent("kafka", kafka); from("direct:kafkaStart").routeId("DirectToKafka") .to("kafka:{{producer.topic}}").log(LoggingLevel.INFO, "${headers}"); // Topic can be set in header as well. from("direct:kafkaStartNoTopic").routeId("kafkaStartNoTopic") .to("kafka:dummy") .log(LoggingLevel.INFO, "${headers}"); // Use custom partitioner based on the key. /** * partitioner指定分區發送 */ from("direct:kafkaStartWithPartitioner").routeId("kafkaStartWithPartitioner") .to("kafka:{{producer.topic}}?partitioner={{producer.partitioner}}") .log("${headers}"); // Takes input from the command line. from("stream:in").setHeader(KafkaConstants.PARTITION_KEY, simple("0")) .setHeader(KafkaConstants.KEY, simple("1")).to("direct:kafkaStart"); } });
上面四個 from to 對應 下面四個發送的示例,經過日誌打印咱們能夠看看數據是否被正確的進行路由了。
headers.put(KafkaConstants.PARTITION_KEY, 0); headers.put(KafkaConstants.KEY, "1"); producerTemplate.sendBodyAndHeaders("direct:kafkaStart", testKafkaMessage, headers);
這段代碼的意思是,生產者發送數據到 direct:kafkaStart 這個endpoint上, headers指定了全部的消息都會發送到 kafka topic 的第一個分區。
headers.put(KafkaConstants.KEY, "2"); headers.put(KafkaConstants.TOPIC, "TestLog"); producerTemplate.sendBodyAndHeaders("direct:kafkaStartNoTopic", testKafkaMessage, headers);
生產者發送數據到 direct:kafkaStartNoTopic 這個endpoint上,對應上面第二個 from to ,雖然沒有指定發送目標的 kafka topic,可是咱們在 header 裏指定了 topic,因此跟第一個 from to 其實能夠達到一樣的效果。
後面兩個就不貼出代碼了,一個是發送到分區0,一個發送到分區1。分區的原則是 header 裏指定的key,分區器是自定義的,在源碼 stringPartitioner.java 中。這裏不表。
先啓動消費者端,而後啓動生產者端,結果以下:
能夠看到,運行的結果跟咱們分析的是一致的。
本文所用的示例源碼地址:
https://camel.apache.org/comp...
參考:
https://github.com/apache/cam...
歡迎你們關注個人公衆號