Kafka是最初由Linkedin公司開發,是一個分佈式、支持分區的(partition)、多副本的(replica),基於zookeeper協調的分佈式消息系統,它的最大的特性就是能夠實時的處理大量數據以知足各類需求場景:好比基於hadoop的批處理系統、低延遲的實時系統、storm/Spark流式處理引擎,web/nginx日誌、訪問日誌,消息服務等等,用scala語言編寫,Linkedin於2010年貢獻給了Apache基金會併成爲頂級開源 項目。具體理論性的內容我也不是特別懂,這裏有一篇文章寫的很好,轉發一下https://blog.csdn.net/YChenFeng/article/details/74980531html
下面就開始配置和使用java
一、下載mysql
下載地址:http://kafka.apache.org/downloads.html ,我用的是kafka_2.12-1.0.0.tgz版本。有興趣的能夠下載最新辦法。nginx
二、下載完成以後,直接解壓,目錄結構是這樣的git
bin目錄是各類啓動文件,config是相關參數配置文件,logs是自建的日誌文件,libs是使用到的相關jar文件github
啓動文件中有windows項,用例直接啓動相關服務。web
三、修改配置參數spring
進入config目錄,編輯 server.properties文件,找到並編輯log.dirs= D:\\Tools\\kafka_2.11-1.0.0\\logs,找到並編輯zookeeper.connect=localhost:2181。表示本地運行。sql
(Kafka會按照默認,在9092端口上運行,並鏈接zookeeper的默認端口:2181)express
四、啓動服務,在dos服務下輸入下面命令進行啓動
D:\tools\kafka_2.12-2.1.0\bin\windows\zookeeper-server-start.bat D:\tools\kafka_2.12-2.1.0\config\zookeeper.properties
首先啓動,zookeeper服務,對應加載zookeeper的配置文件,kafka依賴zookeeper監控其狀態
D:\tools\kafka_2.12-2.1.0\bin\windows\kafka-server-start.bat D:\tools\kafka_2.12-2.1.0\config\server.properties
而後啓動kafka服務,對應加載相應配置文件
五、使用idea在git 上下載kafka-monitor 代碼,地址:https://github.com/linxin26/kafka-monitor
運行Start,在瀏覽器輸入http://127.0.0.1:5050
查看現有主題 topic
查看 broker
六、編寫測試類查看相關情況
import org.apache.kafka.clients.consumer.ConsumerRebalanceListener; import org.apache.kafka.clients.consumer.ConsumerRecord; import org.apache.kafka.clients.consumer.ConsumerRecords; import org.apache.kafka.clients.consumer.KafkaConsumer; import org.apache.kafka.common.TopicPartition; import java.util.Arrays; import java.util.Collection; import java.util.Properties; /** * Created by songxiaofei on 2018-12-27. */ public class ConsumerMessage { public static void main(String[] args){ Properties props = new Properties(); props.put("bootstrap.servers", "10.1.9.3:9092"); props.put("group.id", "test"); props.put("enable.auto.commit", "true"); props.put("auto.commit.interval.ms", "1000"); props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer"); props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer"); final KafkaConsumer<String, String> consumer = new KafkaConsumer<String,String>(props); consumer.subscribe(Arrays.asList("buy"),new ConsumerRebalanceListener() { public void onPartitionsRevoked(Collection<TopicPartition> collection) { } public void onPartitionsAssigned(Collection<TopicPartition> collection) { //將偏移設置到最開始 consumer.seekToBeginning(collection); } }); while (true) { ConsumerRecords<String, String> records = consumer.poll(100); for (ConsumerRecord<String, String> record : records) System.out.printf("offset = %d, key = %s, value = %s%n,checksum=%s", record.offset(), record.key(), record.value(),record.checksum()); } } }
執行代碼,能夠查看相關日誌,用來查看kafka的配置狀況
D:\tools\Java\jdk1.8.0_31\bin\java "-javaagent:D:\tools\JetBrains\IntelliJ IDEA 2018.1\lib\idea_rt.jar=64422:D:\tools\JetBrains\IntelliJ IDEA 2018.1\bin" -Dfile.encoding=UTF-8 -classpath D:\tools\Java\jdk1.8.0_31\jre\lib\charsets.jar;D:\tools\Java\jdk1.8.0_31\jre\lib\deploy.jar;D:\tools\Java\jdk1.8.0_31\jre\lib\ext\access-bridge-64.jar;D:\tools\Java\jdk1.8.0_31\jre\lib\ext\cldrdata.jar;D:\tools\Java\jdk1.8.0_31\jre\lib\ext\dnsns.jar;D:\tools\Java\jdk1.8.0_31\jre\lib\ext\jaccess.jar;D:\tools\Java\jdk1.8.0_31\jre\lib\ext\jfxrt.jar;D:\tools\Java\jdk1.8.0_31\jre\lib\ext\localedata.jar;D:\tools\Java\jdk1.8.0_31\jre\lib\ext\nashorn.jar;D:\tools\Java\jdk1.8.0_31\jre\lib\ext\sunec.jar;D:\tools\Java\jdk1.8.0_31\jre\lib\ext\sunjce_provider.jar;D:\tools\Java\jdk1.8.0_31\jre\lib\ext\sunmscapi.jar;D:\tools\Java\jdk1.8.0_31\jre\lib\ext\sunpkcs11.jar;D:\tools\Java\jdk1.8.0_31\jre\lib\ext\zipfs.jar;D:\tools\Java\jdk1.8.0_31\jre\lib\javaws.jar;D:\tools\Java\jdk1.8.0_31\jre\lib\jce.jar;D:\tools\Java\jdk1.8.0_31\jre\lib\jfr.jar;D:\tools\Java\jdk1.8.0_31\jre\lib\jfxswt.jar;D:\tools\Java\jdk1.8.0_31\jre\lib\jsse.jar;D:\tools\Java\jdk1.8.0_31\jre\lib\management-agent.jar;D:\tools\Java\jdk1.8.0_31\jre\lib\plugin.jar;D:\tools\Java\jdk1.8.0_31\jre\lib\resources.jar;D:\tools\Java\jdk1.8.0_31\jre\lib\rt.jar;E:\jetproject\kafka-monitor-master\target\classes;E:\mavrepository\.m2\repository\org\apache\kafka\kafka-clients\0.10.0.1\kafka-clients-0.10.0.1.jar;E:\mavrepository\.m2\repository\net\jpountz\lz4\lz4\1.3.0\lz4-1.3.0.jar;E:\mavrepository\.m2\repository\org\xerial\snappy\snappy-java\1.1.2.6\snappy-java-1.1.2.6.jar;E:\mavrepository\.m2\repository\org\apache\commons\commons-lang3\3.5\commons-lang3-3.5.jar;E:\mavrepository\.m2\repository\org\apache\kafka\kafka_2.10\0.10.0.1\kafka_2.10-0.10.0.1.jar;E:\mavrepository\.m2\repository\com\101tec\zkclient\0.8\zkclient-0.8.jar;E:\mavrepository\.m2\repository\org\scala-lang\scala-library\2.10.6\scala-library-2.10.6.jar;E:\mavrepository\.m2\repository\com\yammer\metrics\metrics-core\2.2.0\metrics-core-2.2.0.jar;E:\mavrepository\.m2\repository\net\sf\jopt-simple\jopt-simple\4.9\jopt-simple-4.9.jar;E:\mavrepository\.m2\repository\com\alibaba\fastjson\1.2.22\fastjson-1.2.22.jar;E:\mavrepository\.m2\repository\org\slf4j\slf4j-api\1.6.2\slf4j-api-1.6.2.jar;E:\mavrepository\.m2\repository\org\slf4j\slf4j-log4j12\1.6.2\slf4j-log4j12-1.6.2.jar;E:\mavrepository\.m2\repository\commons-logging\commons-logging-api\1.1\commons-logging-api-1.1.jar;E:\mavrepository\.m2\repository\log4j\log4j\1.2.16\log4j-1.2.16.jar;E:\mavrepository\.m2\repository\mysql\mysql-connector-java\5.1.40\mysql-connector-java-5.1.40.jar;E:\mavrepository\.m2\repository\org\apache\curator\curator-framework\2.10.0\curator-framework-2.10.0.jar;E:\mavrepository\.m2\repository\org\apache\curator\curator-client\2.10.0\curator-client-2.10.0.jar;E:\mavrepository\.m2\repository\com\google\guava\guava\16.0.1\guava-16.0.1.jar;E:\mavrepository\.m2\repository\org\apache\zookeeper\zookeeper\3.4.8\zookeeper-3.4.8.jar;E:\mavrepository\.m2\repository\jline\jline\0.9.94\jline-0.9.94.jar;E:\mavrepository\.m2\repository\io\netty\netty\3.7.0.Final\netty-3.7.0.Final.jar;E:\mavrepository\.m2\repository\org\apache\curator\curator-recipes\2.10.0\curator-recipes-2.10.0.jar;E:\mavrepository\.m2\repository\org\jolokia\jolokia-jvm\1.3.5\jolokia-jvm-1.3.5.jar;E:\mavrepository\.m2\repository\org\jolokia\jolokia-core\1.5.0\jolokia-core-1.5.0.jar;E:\mavrepository\.m2\repository\com\googlecode\json-simple\json-simple\1.1.1\json-simple-1.1.1.jar;D:\tools\Java\jdk1.8.0_31\lib\tools.jar;E:\mavrepository\.m2\repository\org\projectlombok\lombok\1.16.18\lombok-1.16.18.jar;E:\mavrepository\.m2\repository\org\springframework\boot\spring-boot-starter-web\2.0.0.RELEASE\spring-boot-starter-web-2.0.0.RELEASE.jar;E:\mavrepository\.m2\repository\org\springframework\boot\spring-boot-starter\2.0.0.RELEASE\spring-boot-starter-2.0.0.RELEASE.jar;E:\mavrepository\.m2\repository\org\springframework\boot\spring-boot\2.0.0.RELEASE\spring-boot-2.0.0.RELEASE.jar;E:\mavrepository\.m2\repository\org\springframework\boot\spring-boot-autoconfigure\2.0.0.RELEASE\spring-boot-autoconfigure-2.0.0.RELEASE.jar;E:\mavrepository\.m2\repository\org\springframework\boot\spring-boot-starter-logging\2.0.0.RELEASE\spring-boot-starter-logging-2.0.0.RELEASE.jar;E:\mavrepository\.m2\repository\org\apache\logging\log4j\log4j-to-slf4j\2.10.0\log4j-to-slf4j-2.10.0.jar;E:\mavrepository\.m2\repository\org\apache\logging\log4j\log4j-api\2.10.0\log4j-api-2.10.0.jar;E:\mavrepository\.m2\repository\org\slf4j\jul-to-slf4j\1.7.25\jul-to-slf4j-1.7.25.jar;E:\mavrepository\.m2\repository\javax\annotation\javax.annotation-api\1.3.2\javax.annotation-api-1.3.2.jar;E:\mavrepository\.m2\repository\org\springframework\spring-core\5.0.4.RELEASE\spring-core-5.0.4.RELEASE.jar;E:\mavrepository\.m2\repository\org\springframework\spring-jcl\5.0.4.RELEASE\spring-jcl-5.0.4.RELEASE.jar;E:\mavrepository\.m2\repository\org\yaml\snakeyaml\1.19\snakeyaml-1.19.jar;E:\mavrepository\.m2\repository\org\springframework\boot\spring-boot-starter-json\2.0.0.RELEASE\spring-boot-starter-json-2.0.0.RELEASE.jar;E:\mavrepository\.m2\repository\com\fasterxml\jackson\core\jackson-databind\2.9.4\jackson-databind-2.9.4.jar;E:\mavrepository\.m2\repository\com\fasterxml\jackson\core\jackson-annotations\2.9.0\jackson-annotations-2.9.0.jar;E:\mavrepository\.m2\repository\com\fasterxml\jackson\core\jackson-core\2.9.4\jackson-core-2.9.4.jar;E:\mavrepository\.m2\repository\com\fasterxml\jackson\datatype\jackson-datatype-jdk8\2.9.4\jackson-datatype-jdk8-2.9.4.jar;E:\mavrepository\.m2\repository\com\fasterxml\jackson\datatype\jackson-datatype-jsr310\2.9.4\jackson-datatype-jsr310-2.9.4.jar;E:\mavrepository\.m2\repository\com\fasterxml\jackson\module\jackson-module-parameter-names\2.9.4\jackson-module-parameter-names-2.9.4.jar;E:\mavrepository\.m2\repository\org\springframework\boot\spring-boot-starter-tomcat\2.0.0.RELEASE\spring-boot-starter-tomcat-2.0.0.RELEASE.jar;E:\mavrepository\.m2\repository\org\apache\tomcat\embed\tomcat-embed-core\8.5.28\tomcat-embed-core-8.5.28.jar;E:\mavrepository\.m2\repository\org\apache\tomcat\embed\tomcat-embed-el\8.5.28\tomcat-embed-el-8.5.28.jar;E:\mavrepository\.m2\repository\org\apache\tomcat\embed\tomcat-embed-websocket\8.5.28\tomcat-embed-websocket-8.5.28.jar;E:\mavrepository\.m2\repository\org\hibernate\validator\hibernate-validator\6.0.7.Final\hibernate-validator-6.0.7.Final.jar;E:\mavrepository\.m2\repository\javax\validation\validation-api\2.0.1.Final\validation-api-2.0.1.Final.jar;E:\mavrepository\.m2\repository\org\jboss\logging\jboss-logging\3.3.2.Final\jboss-logging-3.3.2.Final.jar;E:\mavrepository\.m2\repository\com\fasterxml\classmate\1.3.4\classmate-1.3.4.jar;E:\mavrepository\.m2\repository\org\springframework\spring-web\5.0.4.RELEASE\spring-web-5.0.4.RELEASE.jar;E:\mavrepository\.m2\repository\org\springframework\spring-beans\5.0.4.RELEASE\spring-beans-5.0.4.RELEASE.jar;E:\mavrepository\.m2\repository\org\springframework\spring-webmvc\5.0.4.RELEASE\spring-webmvc-5.0.4.RELEASE.jar;E:\mavrepository\.m2\repository\org\springframework\spring-aop\5.0.4.RELEASE\spring-aop-5.0.4.RELEASE.jar;E:\mavrepository\.m2\repository\org\springframework\spring-context\5.0.4.RELEASE\spring-context-5.0.4.RELEASE.jar;E:\mavrepository\.m2\repository\org\springframework\spring-expression\5.0.4.RELEASE\spring-expression-5.0.4.RELEASE.jar co.solinx.kafka.monitor.ConsumerMessage log4j:WARN No such property [maxFileSize] in org.apache.log4j.DailyRollingFileAppender. log4j:WARN No such property [maxBackupIndex] in org.apache.log4j.DailyRollingFileAppender. 2019-01-16 15:56:36,600 [main] [org.apache.kafka.clients.consumer.ConsumerConfig]-[INFO] ConsumerConfig values: metric.reporters = [] metadata.max.age.ms = 300000 partition.assignment.strategy = [org.apache.kafka.clients.consumer.RangeAssignor] reconnect.backoff.ms = 50 sasl.kerberos.ticket.renew.window.factor = 0.8 max.partition.fetch.bytes = 1048576 bootstrap.servers = [10.1.9.3:9092] ssl.keystore.type = JKS enable.auto.commit = true sasl.mechanism = GSSAPI interceptor.classes = null exclude.internal.topics = true ssl.truststore.password = null client.id = ssl.endpoint.identification.algorithm = null max.poll.records = 2147483647 check.crcs = true request.timeout.ms = 40000 heartbeat.interval.ms = 3000 auto.commit.interval.ms = 1000 receive.buffer.bytes = 65536 ssl.truststore.type = JKS ssl.truststore.location = null ssl.keystore.password = null fetch.min.bytes = 1 send.buffer.bytes = 131072 value.deserializer = class org.apache.kafka.common.serialization.StringDeserializer group.id = test retry.backoff.ms = 100 sasl.kerberos.kinit.cmd = /usr/bin/kinit sasl.kerberos.service.name = null sasl.kerberos.ticket.renew.jitter = 0.05 ssl.trustmanager.algorithm = PKIX ssl.key.password = null fetch.max.wait.ms = 500 sasl.kerberos.min.time.before.relogin = 60000 connections.max.idle.ms = 540000 session.timeout.ms = 30000 metrics.num.samples = 2 key.deserializer = class org.apache.kafka.common.serialization.StringDeserializer ssl.protocol = TLS ssl.provider = null ssl.enabled.protocols = [TLSv1.2, TLSv1.1, TLSv1] ssl.keystore.location = null ssl.cipher.suites = null security.protocol = PLAINTEXT ssl.keymanager.algorithm = SunX509 metrics.sample.window.ms = 30000 auto.offset.reset = latest
七、使用命令行建立topic
D:\tools\kafka_2.12-2.1.0\bin\windows\kafka-topics.bat --create --zookeeper localhost:12181 --replication-factor 1 --partitions 1 --topic hello
查看已有的topic
D:\tools\kafka_2.12-2.1.0\bin\windows\kafka-topics.bat --list --zookeeper 127.0.0.1:12181
建立一個消息生產者,同時建立一個消息消費者,去接收消息生產者發來的消息
D:\tools\kafka_2.12-2.1.0\bin\windows\kafka-console-producer.bat --broker-list 127.0.0.1:9092 --topic hello 建立一個消息生產者
D:\tools\kafka_2.12-2.1.0\bin\windows\kafka-console-consumer.bat --bootstrap-server 127.0.0.1:9092 --topic hello --from-beginning 建立一個消息消費者