一、所須要的依賴java
<?xml version="1.0" encoding="UTF-8"?> <project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> <modelVersion>4.0.0</modelVersion> <groupId>kafkaCli</groupId> <artifactId>kafkaCli</artifactId> <version>1.0-SNAPSHOT</version> <build> <plugins> <plugin> <artifactId>maven-assembly-plugin</artifactId> <configuration> <!--這部分無關緊要,加上的話則直接生成可運行jar包--> <!--<archive>--> <!--<manifest>--> <!--<mainClass>${exec.mainClass}</mainClass>--> <!--</manifest>--> <!--</archive>--> <descriptorRefs> <descriptorRef>jar-with-dependencies</descriptorRef> </descriptorRefs> </configuration> </plugin> <plugin> <groupId>org.apache.maven.plugins</groupId> <artifactId>maven-dependency-plugin</artifactId> <version>3.0.2</version> <executions> <execution> <id>copy-dependencies</id> <phase>package</phase> <goals> <goal>copy-dependencies</goal> </goals> <configuration> <outputDirectory>${project.build.directory}/lib</outputDirectory> </configuration> </execution> </executions> </plugin> <plugin> <groupId>org.apache.maven.plugins</groupId> <artifactId>maven-compiler-plugin</artifactId> <configuration> <source>1.6</source> <target>1.6</target> <encoding>GBK</encoding> </configuration> </plugin> </plugins> </build> <dependencies> <!-- webSocket所需依賴 --> <dependency> <groupId>javax</groupId> <artifactId>javaee-api</artifactId> <version>7.0</version> </dependency> <!-- kafka 所需依賴 --> <!-- https://mvnrepository.com/artifact/org.apache.kafka/kafka --> <dependency> <groupId>org.apache.kafka</groupId> <artifactId>kafka_2.12</artifactId> <version>2.1.1</version> </dependency> <dependency> <groupId>org.apache.kafka</groupId> <artifactId>kafka-clients</artifactId> <version>RELEASE</version> </dependency> </dependencies> </project>
二、生產者代碼web
package com.kafka.producer; import com.kafka.systemConfig.SystemConfig; import org.apache.kafka.clients.producer.*; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import java.util.Properties; public class ProducerKafka { private static final Logger log = LoggerFactory.getLogger(ProducerKafka.class); public static Producer<String, String> procuder; { Properties props = new Properties(); props.put("bootstrap.servers", SystemConfig.getProperty("bootstrap.servers","10.12.1.229:9092")); props.put("acks", SystemConfig.getProperty("acks","all")); props.put("retries", SystemConfig.getProperty("retries","0")); props.put("batch.size", SystemConfig.getProperty("batch.size","16384")); props.put("linger.ms",SystemConfig.getProperty("linger.ms","1")); props.put("buffer.memory", SystemConfig.getProperty("buffer.memory","33554432")); props.put("key.serializer", SystemConfig.getProperty("key.serializer", "org.apache.kafka.common.serialization.StringSerializer")); props.put("value.serializer", SystemConfig.getProperty("value.serializer", "org.apache.kafka.common.serialization.StringSerializer")); procuder = new KafkaProducer<String,String>(props); } /** * 向kafka發送消息 * @param message * @return */ public void sendMessgae(ProducerRecord message) throws Exception{ procuder.send(message, new Callback() { @Override public void onCompletion(RecordMetadata recordMetadata, Exception e) { log.info("向kafka發送數據返回偏移量: {}" , recordMetadata.offset()); } }); } /** * 向kafka發送消息 * @param topic 主題 * @param value 值 * @throws Exception */ public void sendMessgae(String topic, String value) throws Exception{ sendMessgae(new ProducerRecord<String, String>(topic, value)); } /** * 向kafka發送消息 * @param topic 主題 * @param value 值 * @throws Exception */ public void sendMessgae(String topic,String key, String value) throws Exception{ sendMessgae(new ProducerRecord(topic, key, value)); } /** * 刷新緩存 */ public void flush() { procuder.flush(); } /** * 關閉鏈接 */ public void close() { procuder.close(); } /** * 單例模式確保全局中只有一份該實例 */ private static class ProducerKafkaHolder{ private static ProducerKafka instance = new ProducerKafka(); } /** * 延遲加載,避免啓動加載 * @return */ public static ProducerKafka getInstance(){ return ProducerKafkaHolder.instance; } public static void main(String []args){ try { ProducerKafka producerKafka = ProducerKafka.getInstance(); producerKafka.sendMessgae("TEST_JAVA","key","value"); producerKafka.flush(); producerKafka.close(); } catch (Exception e) { e.printStackTrace(); } } }
三、配置項代碼apache
package com.kafka.systemConfig; import java.io.IOException; import java.io.InputStream; import java.util.Properties; public class SystemConfig { private static Properties properties = null; // private final static String FILE_PATH = System.getProperty("user.dir") + "/conf/kafkaProducer.properties"; private final static String FILE_PATH = "kafkaProducer.properties"; private SystemConfig() { System.out.println("FILE_PATH" + FILE_PATH); properties = getConfig(); } /** * Get property value. * * @param name * property name. * @return the value. */ public static String getProperty(String name) { return getProperty(name, null); } /** * Get property value. * * @param name * property name. * @param defaultValue * value if property not found. * @return the value. */ public static String getProperty(String name, String defaultValue) { String ret = null; if (properties == null) { properties = getConfig(); } if (properties != null) { ret = properties.getProperty(name); if (ret != null) { try { ret = new String(ret.getBytes("ISO-8859-1"), "GBK"); } catch (Exception e) { e.printStackTrace(); } return ret.trim(); } else{ return defaultValue; } } return defaultValue; } /** * @param name * @param defaultValue * @return */ public static int getIntProperty(String name, int defaultValue) { int res = Integer.parseInt(getProperty(name, defaultValue + "")); return res == 0 ? defaultValue : res; } private static Properties getConfig() { if (properties == null) { properties = new Properties(); InputStream is = null; try { is = SystemConfig.class.getClassLoader() .getResourceAsStream(FILE_PATH ); properties.load(is); } catch (IOException e) { } finally { if (is != null) { try { is.close(); } catch (IOException e) { } } } } return properties; } public static void main(String args[]){ // System.out.println(SystemConfig.getProperty("bootstrap.servers")); // System.out.println(FILE_PATH); System.out.println(SystemConfig.class.getClassLoader().getResourceAsStream(FILE_PATH )); ; } }
三、webSocket代碼bootstrap
package com.kafka.wbSocket; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import javax.websocket.OnClose; import javax.websocket.OnMessage; import javax.websocket.OnOpen; import javax.websocket.Session; import javax.websocket.server.ServerEndpoint; import java.io.IOException; import java.util.concurrent.CopyOnWriteArraySet; /** * 用於 webSocket 應用相關 * **/ @ServerEndpoint("/webSocket") public class WebSocket { private static final Logger log = LoggerFactory.getLogger(WebSocket.class); private Session session; public static CopyOnWriteArraySet<WebSocket> wbSockets = new CopyOnWriteArraySet<WebSocket>(); /** * 創建鏈接。 * 創建鏈接時入參爲session */ @OnOpen public void onOpen(Session session){ this.session = session; wbSockets.add(this); log.info("New session insert,sessionId is "+ session.getId()); } /** * 關閉鏈接 */ @OnClose public void onClose(){ wbSockets.remove(this); log.info("A session insert,sessionId is "+ session.getId()); } /** * 接收數據。 * */ @OnMessage public void onMessage(String message ,Session session){ log.info(message + "from " + session.getId()); } /** * 發送數據 * @param message * @throws IOException */ public void sendMessage(String message) throws IOException { this.session.getBasicRemote().sendText(message); } }
因平臺jdk只支持1.6 、kafka所需版本爲1.8 顧此消息中間件展現被忽略api