kafka下載地址 :http://kafka.apache.org/downloads
zookeeper下載地址:https://zookeeper.apache.org/
jdk下載地址:https://www.oracle.com/technetwork/java/javase/downloads/jdk8-downloads-2133151.html
html
一、首先 使用tar命令對jdk進行解壓 tar -zxvf tar -zxvf jdk-8u181-linux-x64.tar.gz 目錄下面會多出一個jdk1.8.0_181 進入裏面去 使用pwd命令查看絕對路徑 而且複製找個路徑 最後進行jdk環境變量的配置 編輯 vim /etc/profile文件 在文件後面加上: export JAVA_HOME=(剛纔pwd命令看到的路徑) export CLASSPATH=.:${JAVA_HOME}/jre/lib/rt.jar:${JAVA_HOME}/lib/dt.jar:${JAVA_HOME}/ lib/ tools.jar export PATH=$PATH:${JAVA_HOME}/bin 最後使用source /etc/profile 刷新文件 使用java -version 查看環境變量是否配置成功 二、成功以後進行zookeeper的安裝 使用 tar -zxvf zookeeper-3.4.12.tar.gz 接下下載好的zookeeper安裝包 將zookeeper下的/conf/zookeeper.example更名成zoo.cfg 使用mv 和cp命令均可以 而後vim這個文件 加上下面兩行 dataLogDir=/tmp/zookeeper-log #日誌路徑 quorumListenOnAllIPs=true #在阿里雲的服務器上保證外網能夠訪問到 剛開始沒設置這個折騰了很久 三、最後,安裝kafka 使用 tar -zxvf kafka_2.12-1.1.1.tgz 解壓下載好的kafka cd 到解壓後的文件裏面去 編輯配置文件 vim config/server.properties 加上下面幾行 listeners=PLAINTEXT://:9092 advertised.host.name=阿里雲服務器公網ip # advertised.port=9092 將zookeeper.connect的值改成阿里雲的公網ip
首先cd到zookeeper的bin目錄下 使用 ./zkServer.sh start 啓動zookeeper 再cd到kafka的bin目錄下 使用 ./kafka-server-start.sh ../config/server.properties 啓動kafka 新建一個會話或者打開一個新的終端 這時候使用jps命令 能夠看到 Kafka和QuorumPeerMain表示啓動所有成功,下面建立一個主題 cd到kafka的bin目錄下面,執行 ./kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 -- partitions 1 --topic Hello-world 輸出Created topic "Hello-world". 表示topic建立成功 使用./kafka-topics.sh --list --zookeeper localhost:2181 查看主題的列表 輸出裏面會含有Hello-world 下面進行消息的生產和消費 先啓動生產者 ./kafka-console-producer.sh --broker-list 阿里雲公網ip:9092 --topic Hello- world 會出現一個 > 相似於交互界面 這時候就能夠生產消息了 啓動消費者 ./kafka-console-consumer.sh --zookeeper 阿里雲公網ip:2181 --topic Hello- world --from-beginning 這時候當生產者生產消息的時候 消費者這邊就能夠看到了
首先、新建一個Maven工程(此處再也不多描述),在pom文件中加入kafka的依賴 <dependency> <groupId>org.apache.kafka</groupId> <artifactId>kafka_2.10</artifactId> <version>0.8.2.0</version> </dependency> 新建一個KafkaProducerDemo和KafkaConsumerDemo類(名字能夠自定義): 話很少說 上代碼 KafkaProducerDemo類: public class KafkaProducerDemo { public static void main(String[] args) { //建立properties文件 Properties properties = new Properties(); //設置kafka服務器地址 properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "阿里雲公網ip:9092"); //設置key進行序列化 properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer"); //設置value進行序列化 properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer"); //建立消息生產者 KafkaProducer<String,String> producer = new KafkaProducer<>(properties); //建立消息實體 制定主題、key、value ProducerRecord<String,String> record = new ProducerRecord<>("Hello-world","haha","from java client"); //發送消息 producer.send(record); System.out.println("消息發送成功"); //關閉生產者 producer.close(); } } KafkaConsumerDemo類: public class KafkaConsumerDemo { public static void main(String[] args) { //新建配置文件 Properties properties = new Properties(); //設置kafka服務器地址 properties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG,"阿里雲公網ip:9092"); //設置key的反序列化 properties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG,"org.apache.kafka.common.serialization.StringDeserializer"); //設置value的反序列化 properties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,"org.apache.kafka.common.serialization.StringDeserializer"); //設置groupid properties.put(ConsumerConfig.GROUP_ID_CONFIG,"test"); //建立消費者對象 KafkaConsumer<String,String> consumer = new KafkaConsumer<>(properties); //訂閱主題 consumer.subscribe(Arrays.asList("Hello-world")); while (true) { //消費消息 ConsumerRecords<String, String> records = consumer.poll(100); for (ConsumerRecord<String, String> record : records) System.out.println("消息的主題是:" + record.topic()+",消息的key是:" + record.key()+",消息的value是:"+record.value()); } } } 上面就是鏈接kafka遠程服務器代碼
可是上述過程作完以後仍是不能正確運行、這個地方折騰了很久、最後在哪裏看到解決的辦法記不大清了
就是要阿里雲服務器服務安全設置裏面加個規則 將2181和9092端口開放就能夠,可是我中間也使用命令的方式
關閉了防火牆、沒什麼用,不知道什麼鬼。 搞得我頭皮發麻java