Kafka是一個持久化消息發佈訂閱系統,經常使用於消息隊列、日誌通道等場景java
1 Producer: 特指消息的生產者node
2 Consumer :特指消息的消費者web
3 Consumer Group :消費者組,能夠並行消費Topic中partition的消息redis
4 Broker:緩存代理,Kafa 集羣中的一臺或多臺服務器統稱爲 broker。spring
5 Topic:特指 Kafka 處理的消息源(feeds of messages)的不一樣分類。apache
6 Partition:Topic 物理上的分組,一個 topic 能夠分爲多個 partition,每一個 partition 是一個有序的隊列。partition 中的每條消息都會被分配一個有序的 id(offset)json
7 Message:消息,是通訊的基本單位,每一個 producer 能夠向一個 topic(主題)發佈一些消息bootstrap
8 稀疏索引:採用稀疏索引的方式,利用二分查找,定位消息。windows
@EnableBinding註解,綁定消息通道。該註解用來指定一個或者多個定義了@Input或@Output註解的接口。緩存
@EnableBinding(Sink.class),綁定了Sink接口,Sink接口是Spring Cloud 中默認綁定輸入通道,除此以外,還有綁定輸出通道Source,還有綁定輸入輸出通道的Processor通道。除了Spring Cloud定義的接口外,咱們也能夠自定義。
@StreamListener註解是將被修飾的方法註冊爲消息中間件上數據流的事件監聽器,註解中的屬性值對應了監聽的消息通道名
zookeeper安裝
進入Zookeeper設置目錄,筆者D:\Java\Tool\zookeeper-3.4.6\conf
將「zoo_sample.cfg」重命名爲「zoo.cfg」
在任意文本編輯器(如notepad)中打開zoo.cfg
找到並編輯dataDir=D:\\Java\\Tool\\zookeeper-3.4.6\\tmp
與Java中的作法相似,咱們在系統環境變量中添加:
a. 在系統變量中添加ZOOKEEPER_HOME = D:\Java\Tool\zookeeper-3.4.6
b. 編輯path系統變量,添加爲路徑%ZOOKEEPER_HOME%\bin;
在zoo.cfg文件中修改默認的Zookeeper端口(默認端口2181)
啓動zookeeper
啓動kafka(重要:請確保在啓動Kafka服務器前,Zookeeper實例已經準備好並開始運行)
.\bin\windows\kafka-server-start.bat .\config\server.properties
--------------- 註冊中心server1啓動類
package org.eureka.server;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.boot.builder.SpringApplicationBuilder;
import org.springframework.cloud.config.server.EnableConfigServer;
import org.springframework.cloud.netflix.eureka.server.EnableEurekaServer;
@SpringBootApplication
@EnableEurekaServer
@EnableConfigServer
public class EurekaServerStarter {
public static void main(String[] args) {
new SpringApplicationBuilder(EurekaServerStarter.class).run(args);
}
}
--------------------------application.yml
server:
port: 8761
# 是否要開啓基本的鑑權
security:
basic:
enabled: false
user:
name: admin
password: 123456
management:
security:
enabled: false
spring:
profiles:
active: peer2,native
cloud:
config:
server:
native:
search-locations: file:///D:/Users/xuzhi268/zhongchou/config_profiles
eureka:
instance:
hostname: peer1
lease-renewal-interval-in-seconds: 30 #指定續約更新頻率,默認是 30s
environment: dev
client:
register-with-eureka: false # 禁用eureka做爲客戶端註冊本身
fetch-registry: false # 表示是否從eureka server獲取註冊信息,若是是單一節點,不須要同步其餘eureka server節點,則能夠設置爲false,但此處爲集羣,應該設置爲true,默認爲true,可不設置
serviceUrl:
defaultZone: http://peer1:8761/eureka/,http://peer2:8766/eureka/
#http://admin:123456@peer1:8761/eureka/,http://admin:123456@peer2:8766/eureka/ #多個用逗號隔開
-------------------------- pom.xml
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<groupId>org.eureka.server</groupId>
<artifactId>eureka_server</artifactId>
<version>1.0.1-SNAPSHOT</version>
<parent>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-parent</artifactId>
<version>1.5.2.RELEASE</version>
<relativePath/>
</parent>
<dependencyManagement>
<dependencies>
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-dependencies</artifactId>
<version>Camden.SR7</version>
<type>pom</type>
<scope>import</scope>
</dependency>
</dependencies>
</dependencyManagement>
<dependencies>
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-starter-eureka-server</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-config-server</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-starter-config</artifactId>
</dependency>
<!-- 開啓基本的鑑權 -->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-security</artifactId>
</dependency>
</dependencies>
<build>
<plugins>
<plugin>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-maven-plugin</artifactId>
</plugin>
</plugins>
</build>
</project>
--------------------- 註冊中心server2
啓動類
package org.eureka.server;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.boot.builder.SpringApplicationBuilder;
import org.springframework.cloud.config.server.EnableConfigServer;
import org.springframework.cloud.netflix.eureka.server.EnableEurekaServer;
@SpringBootApplication
@EnableEurekaServer
@EnableConfigServer
public class EurekaServerStartRunner {
public static void main(String[] args) {
new SpringApplicationBuilder(EurekaServerStartRunner.class).run(args);
}
}
--------------------- application.yml
server:
port: 8766
security:
basic:
enabled: false
user:
name: admin
password: 123456
management:
security:
enabled: false
spring:
application:
name: eureka
profiles:
active: peer1,native
cloud:
config:
server:
native:
search-locations: file:///D:/Users/xuzhi268/zhongchou/config_profiles
eureka:
instance:
hostname: peer2
lease-renewal-interval-in-seconds: 30
environment: dev
client:
register-with-eureka: false
fetch-registry: false
serviceUrl:
defaultZone: http://peer1:8761/eureka/,http://peer2:8766/eureka/
#http://admin:123456@peer1:8761/eureka/,http://admin:123456@peer2:8766/eureka/
-------------------------pom.xml
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<groupId>org.eureka.server</groupId>
<artifactId>eureka_server1</artifactId>
<version>0.0.1-SNAPSHOT</version>
<parent>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-parent</artifactId>
<version>1.4.7.RELEASE</version>
</parent>
<dependencyManagement>
<dependencies>
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-dependencies</artifactId>
<version>Camden.SR7</version>
<type>pom</type>
<scope>import</scope>
</dependency>
</dependencies>
</dependencyManagement>
<dependencies>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-starter-eureka-server</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-starter-config</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-config-server</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-actuator</artifactId>
</dependency>
</dependencies>
</project>
--------------- 服務提供者代碼
package com.cloud.eureka.client.ucenter;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.boot.builder.SpringApplicationBuilder;
import org.springframework.cloud.netflix.eureka.EnableEurekaClient;
@SpringBootApplication
@EnableEurekaClient
public class UcenterApplicationRunner {
public static void main(String[] args) {
new SpringApplicationBuilder(UcenterApplicationRunner.class).properties("server.port=" + 8765).run(args);
}
}
------------------------------------------------------
package com.cloud.eureka.client.ucenter.service;
import java.util.List;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.cloud.client.ServiceInstance;
import org.springframework.cloud.client.discovery.DiscoveryClient;
import org.springframework.cloud.context.config.annotation.RefreshScope;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RequestMethod;
import org.springframework.web.bind.annotation.RequestParam;
import org.springframework.web.bind.annotation.RestController;
import com.cloud.eureka.client.ucenter.biz.UserInfoService;
import com.cloud.eureka.client.ucenter.domain.UserDto;
import com.cloud.eureka.client.ucenter.network.request.QueryUserByIdRequest;
import com.cloud.eureka.client.ucenter.network.response.QueryUserByIdResult;
@RestController
@RefreshScope
public class UcenterInfoService {
private final Logger logger = LoggerFactory.getLogger(UcenterInfoService.class);
@Autowired
private DiscoveryClient discoveryClient;
@Value(value = "${redis.host}")
private String redisHost;
@Value(value = "${redis.port}")
private String redisPort;
@Value("${server.port}")
private String port;
@RequestMapping(value = "/hello", method = RequestMethod.GET)
public String index() {
logger.info("server.port : " + port);
logger.info("redis host " + redisHost + ":" + redisPort);
ServiceInstance instance = discoveryClient.getLocalServiceInstance();
logger.info("<=-=-=-= ucenter server access index() " + this.getClass().getSimpleName() + " " + Thread.currentThread().getName());
logger.info("/hello, host:" + instance.getHost() + ", service_id:" + instance.getServiceId());
return "server.port : " + port;
}
@Autowired
private UserInfoService userInfoService;
@RequestMapping(value = "/queryUserInfo", method = RequestMethod.GET)
public UserDto queryUserInfo (@RequestParam("userId") Long userId) {
logger.info("server.port : " + port);
logger.info("redis host " + redisHost + ":" + redisPort);
logger.info("<=-=-=-= ucenter server access queryUserInfo " + this.getClass().getSimpleName()
+ " " + Thread.currentThread().getName());
logger.info("<<=-=-=-=>>恭喜用戶 " + userId + " 你鏈接成功<<=-=-=-=>>");
return userInfoService.queryUserInfo();
}
@RequestMapping(value = "queryUserList", method = RequestMethod.POST)
public List<UserDto> queryUserList() {
logger.info("server.port : " + port);
logger.info("redis host " + redisHost + ":" + redisPort);
logger.info("<=-=-=-= ucenter server access queryUserList " + this.getClass().getSimpleName()
+ " " + Thread.currentThread().getName());
List<UserDto> ulist = userInfoService.queryUserList();
return ulist;
}
@RequestMapping(value = "/queryUserById", method = RequestMethod.POST)
public QueryUserByIdResult<UserDto> queryUserUserId(QueryUserByIdRequest request) {
logger.info("server.port : " + port);
logger.info("redis host " + redisHost + ":" + redisPort);
logger.info("<=-=-=-= ucenter server access queryUserInfo " + this.getClass().getSimpleName()
+ " " + Thread.currentThread().getName());
QueryUserByIdResult<UserDto> result = new QueryUserByIdResult<UserDto>();
UserDto dto = userInfoService.queryUserInfo();
result.setModel(dto);
result.setRespCode("000");
result.setRespMsg("查詢成功 server.port : " + port);
return result;
}
}
---------------------------------消息消費者-------------
package com.cloud.eureka.client.ucenter.service.kafka;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.cloud.stream.annotation.EnableBinding;
import org.springframework.cloud.stream.annotation.StreamListener;
import org.springframework.cloud.stream.messaging.Sink;
import org.springframework.kafka.support.Acknowledgment;
import org.springframework.kafka.support.KafkaHeaders;
import org.springframework.messaging.Message;
import com.alibaba.fastjson.JSON;
import com.cloud.eureka.client.ucenter.biz.UserInfoService;
import com.cloud.eureka.client.ucenter.domain.MessageBody;
import com.cloud.eureka.client.ucenter.domain.UserDto;
@EnableBinding(Sink.class)
public class MsgSink {
@Autowired
private UserInfoService userInfoService;
@StreamListener(Sink.INPUT)
public void process(Message<?> message) {
System.out.println("==== 消費者消費消息開始 : " + message.getPayload());
String message2 = String.valueOf(message.getPayload());
MessageBody body = JSON.parseObject(message2, MessageBody.class);
System.out.println("json 2 message body " + body);
if (null != body) {
UserDto userDto = userInfoService.queryUserInfo();
System.out.println("查詢用戶[" + body.getUserName() + "]信息 : " + JSON.toJSONString(userDto));
System.out.println("發放指定產品【" + body.getProductId() + "】代金券數量爲:" + body.getCouponNum());
}
Acknowledgment acknowledgment = message.getHeaders().get(KafkaHeaders.ACKNOWLEDGMENT, Acknowledgment.class);
if (acknowledgment != null) {
System.out.println("==== 消費者消費消息應答,Acknowledgment provided");
acknowledgment.acknowledge();
}
}
}
----------------------
package com.cloud.eureka.client.ucenter.facade;
import java.util.List;
import org.springframework.cloud.netflix.feign.FeignClient;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RequestMethod;
import org.springframework.web.bind.annotation.RequestParam;
import com.cloud.eureka.client.ucenter.domain.UserDto;
import com.cloud.eureka.client.ucenter.network.request.QueryUserByIdRequest;
import com.cloud.eureka.client.ucenter.network.response.QueryUserByIdResult;
@FeignClient("ucenter")
public interface UcenterCloudFacade {
@RequestMapping(value = "/hello", method = RequestMethod.GET)
public String index();
@RequestMapping(value = "/queryUserInfo", method = RequestMethod.GET)
public UserDto queryUserInfo (@RequestParam("userId") Long userId);
@RequestMapping(value = "/queryUserList", method = RequestMethod.POST)
public List<UserDto> queryUserList();
@RequestMapping(value = "/queryUserById", method = RequestMethod.POST)
public QueryUserByIdResult<UserDto> queryUserById(QueryUserByIdRequest request);
}
------------------------------
package com.cloud.eureka.client.ucenter.domain;
public class MessageBody {
private Long userId;
private Long productId;
private Integer couponNum;
private String userName;
public Long getUserId() {
return userId;
}
public void setUserId(Long userId) {
this.userId = userId;
}
public Long getProductId() {
return productId;
}
public void setProductId(Long productId) {
this.productId = productId;
}
public Integer getCouponNum() {
return couponNum;
}
public void setCouponNum(Integer couponNum) {
this.couponNum = couponNum;
}
public String getUserName() {
return userName;
}
public void setUserName(String userName) {
this.userName = userName;
}
}
---------------------------
package com.cloud.eureka.client.ucenter.domain;
import java.io.Serializable;
public class UserDto implements Serializable {
/*** */
private static final long serialVersionUID = 8541673794025166248L;
private Long id;
private String userName;
private String address;
public Long getId() {
return id;
}
public void setId(Long id) {
this.id = id;
}
public String getUserName() {
return userName;
}
public void setUserName(String userName) {
this.userName = userName;
}
public String getAddress() {
return address;
}
public void setAddress(String address) {
this.address = address;
}
}
------------------------------
package com.cloud.eureka.client.ucenter.biz;
import java.util.ArrayList;
import java.util.List;
import org.springframework.stereotype.Service;
import com.cloud.eureka.client.ucenter.domain.UserDto;
@Service
public class UserInfoService {
public UserDto queryUserInfo() {
UserDto dto = new UserDto();
dto.setId(1l);
dto.setUserName("李世民");
dto.setAddress("陝西省咸陽市紫禁城");
System.out.println("<<=-=-=-=>>恭喜,恭喜你鏈接成功<<=-=-=-=>>");
return dto;
}
public List<UserDto> queryUserList() {
UserDto dto = new UserDto();
dto.setId(1203586l);
dto.setUserName("李世民");
dto.setAddress("陝西省咸陽市紫禁城");
UserDto dto1 = new UserDto();
dto1.setId(1022589l);
dto1.setUserName("朱江明");
dto1.setAddress("江蘇省南京市");
UserDto dto2 = new UserDto();
dto2.setId(1022575l);
dto2.setUserName("劉如海");
dto2.setAddress("湖北省省武漢市市");
List<UserDto> ulist = new ArrayList<>();
ulist.add(dto);
ulist.add(dto1);
ulist.add(dto2);
return ulist;
}
}
------------------- application.yml
#server:
#當前服務端口號
# port: 8762
spring:
application:
#當前應用名稱
name: ucenter
cloud:
instance-count: 1
instance-index: 0
stream:
binder: kafka
kafka:
binder:
brokers: localhost:9092
zk-nodes: localhost:2181
auto-add-partitions: false
auto-create-topics: true
min-partition-count: 1
bindings:
input:
destination: event_demo
group: s1
consumer:
concurrency: 1
partitioned: false
eureka:
client:
serviceUrl:
#註冊中心的地址
defaultZone: http://peer1:8761/eureka/,http://peer2:8766/eureka/
--------------------- bootstrap.yml
#禁用配置中心權限驗證
management:
security:
enabled: false
spring:
cloud:
config:
uri: http://localhost:8761/
feign:
httpclient:
enabled: true
max-connections: 200 # 默認值
max-connections-per-route: 50 # 默認值
---------------------------- pom.xml
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd">
<modelVersion>4.0.0</modelVersion>
<groupId>com.cloud.eureka.client</groupId>
<artifactId>ucenter</artifactId>
<packaging>war</packaging>
<version>1.0.1-SNAPSHOT</version>
<name>ucenter Maven Webapp</name>
<url>http://maven.apache.org</url>
<parent>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-parent</artifactId>
<version>1.4.7.RELEASE</version>
<relativePath/>
</parent>
<dependencyManagement>
<dependencies>
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-dependencies</artifactId>
<version>Camden.SR7</version>
<type>pom</type>
<scope>import</scope>
</dependency>
</dependencies>
</dependencyManagement>
<dependencies>
<dependency>
<groupId>junit</groupId>
<artifactId>junit</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-starter-eureka</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-test</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-starter-config</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-actuator</artifactId>
</dependency>
<!-- <dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-starter-bus-kafka</artifactId>
</dependency> -->
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-stream-binder-kafka</artifactId>
</dependency>
<dependency>
<groupId>com.alibaba</groupId>
<artifactId>fastjson</artifactId>
<version>1.2.47</version>
</dependency>
</dependencies>
<build>
<finalName>ucenter</finalName>
</build>
</project>
-------------------------- 客戶端代碼
package com.cloud.profile.third;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.boot.web.client.RestTemplateBuilder;
import org.springframework.cloud.client.loadbalancer.LoadBalanced;
import org.springframework.cloud.netflix.eureka.EnableEurekaClient;
import org.springframework.cloud.netflix.feign.EnableFeignClients;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.ComponentScan;
import org.springframework.web.client.RestTemplate;
/**
* @EnableDiscoveryClient :啓用服務註冊與發現
* @EnableFeignClients:啓用feign進行遠程調用
* Feign是一個聲明式Web Service客戶端。使用Feign能讓編寫Web Service客戶端更加簡單,
* 它的使用方法是定義一個接口,而後在上面添加註解,同時也支持JAX-RS標準的註解。Feign也支持可拔插式的編碼器和解碼器。
* Spring Cloud對Feign進行了封裝,使其支持了Spring MVC標準註解和HttpMessageConverters。
* Feign能夠與Eureka和Ribbon組合使用以支持負載均衡。
*/
@SpringBootApplication
@EnableEurekaClient
@EnableFeignClients
@ComponentScan(basePackages = {"com.cloud.profile.controller", "com.cloud.profile.third.kafka"})
public class ComsumerAppliactionRunner {
public static void main(String[] args) {
SpringApplication.run(ComsumerAppliactionRunner.class, args);
}
@Autowired
private RestTemplateBuilder builder;
@Bean
@LoadBalanced
public RestTemplate restTemplate() {
return builder.build();
}
}
----------------------- 消息發送端
package com.cloud.profile.third.kafka;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.PathVariable;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RequestMethod;
import org.springframework.web.bind.annotation.RestController;
import com.alibaba.fastjson.JSON;
import com.cloud.profile.dto.MessageBody;
@RestController
public class ProducerController {
@Autowired
private SendService service;
@RequestMapping(value = "/send/{msg}", method = RequestMethod.GET)
public void send(@PathVariable("msg") String msg){
MessageBody body = new MessageBody();
body.setCouponNum(5);
body.setProductId(10023l);
body.setUserId(13809825l);
body.setUserName("趙敏");
System.out.println("==== 生產者, 開始發送消息:" + JSON.toJSONString(body));
service.sendMessage(JSON.toJSONString(body));
System.out.println("==== 生產者,發送消息結束。");
}
}
package com.cloud.profile.third.kafka;
import java.util.HashMap;
import java.util.Map;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.cloud.stream.annotation.EnableBinding;
import org.springframework.cloud.stream.messaging.Source;
import org.springframework.kafka.support.KafkaHeaders;
import org.springframework.messaging.Message;
import org.springframework.messaging.MessageHeaders;
import org.springframework.messaging.support.MessageBuilder;
@EnableBinding(Source.class)
public class SendService {
@Autowired
private Source source;
public void sendMessage(final String msg) {
try {
System.out.println("==== 開始發送消息:" + msg);
Message<String> message = new Message<String>() {
@Override
public String getPayload() {
return msg;
}
@Override
public MessageHeaders getHeaders() {
Map<String, Object> headers = new HashMap<>();
headers.put(KafkaHeaders.ACKNOWLEDGMENT, "yes");
MessageHeaders header = new MessageHeaders(headers);
return header;
}
};
boolean ret = source.output().send(message);
//boolean ret = source.output().send(MessageBuilder.withPayload(msg).build());
System.out.println("==== 發送消息結束。" + (ret ? "發送成功":"發送失敗"));
} catch (Exception e) {
e.printStackTrace();
}
}
}
------------------------------------
package com.cloud.profile.factory;
import java.lang.reflect.Method;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.web.client.RestTemplate;
import com.alibaba.fastjson.JSON;
public class CloudServiceFactory {
private static final Logger logger = LoggerFactory.getLogger(CloudServiceFactory.class);
private static Map<String, Object> claxxCash = new ConcurrentHashMap<>();
public static <T> T createGetServerFactory (RestTemplate restTemplate,
String serviceName, String serverUrl, Class<T> resultObject, Map<String, Object> paramMap) {
T returnObject = null;
try {
StringBuffer serverRequestUri = new StringBuffer("http://" + serviceName);
serverRequestUri.append("/" + serverUrl);
if (null != paramMap && paramMap.entrySet().size() > 0) {
serverRequestUri.append("?");
for (Map.Entry<String, Object> reqParam : paramMap.entrySet()) {
serverRequestUri.append(reqParam.getKey() + "=" + reqParam.getValue() + "&");
}
serverRequestUri.deleteCharAt(serverRequestUri.length() - 1);
}
logger.info("create cloud server request Uri :" + serverRequestUri.toString());
returnObject = restTemplate.getForObject(serverRequestUri.toString(), resultObject);
logger.info("create cloud server result :" + JSON.toJSONString(returnObject));
} catch (Exception e) {
logger.error("create cloud server instance exception " + e.getMessage());
}
return returnObject;
}
@SuppressWarnings("unchecked")
public static <T> T createGetFactory (RestTemplate restTemplate, String serviceName, String serverUrl, Class<T> resultObject) {
T returnObject = null;
try {
if (null != claxxCash) {
Object object = claxxCash.get(resultObject.getSimpleName());
if (null != object) {
return (T) object;
}
}
returnObject = resultObject.newInstance();
StringBuffer serverRequestUri = new StringBuffer("http://" + serviceName);
returnObject = restTemplate.getForObject(serverRequestUri.toString(), resultObject);
} catch (Exception e) {
logger.error("create cloud server instance exception " + e.getMessage());
}
if (null != returnObject) {
claxxCash.put(resultObject.getSimpleName(), resultObject);
}
return returnObject;
}
@SuppressWarnings("unchecked")
public static <T> T createPostServerFactory (RestTemplate restTemplate, String serviceName,
String serverUrl, Object request, Class<T> resultObject, Object... params) {
T returnObject = null;
try {
if (null != claxxCash) {
Object object = claxxCash.get(resultObject.getSimpleName());
if (null != object) {
return (T) object;
}
}
returnObject = resultObject.newInstance();
StringBuffer serverRequestUri = new StringBuffer("http://" + serviceName).append("/" + serverUrl);
returnObject = restTemplate.postForEntity(serverRequestUri.toString(), request, resultObject, params).getBody();
} catch (Exception e) {
logger.error("create cloud server instance exception " + e.getMessage());
}
if (null != returnObject) {
claxxCash.put(resultObject.getSimpleName(), resultObject);
}
return returnObject;
}
}
package com.cloud.profile.controller;
import java.util.HashMap;
import java.util.Map;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.cloud.client.ServiceInstance;
import org.springframework.cloud.client.loadbalancer.LoadBalancerClient;
import org.springframework.cloud.context.config.annotation.RefreshScope;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RequestMethod;
import org.springframework.web.bind.annotation.RequestParam;
import org.springframework.web.bind.annotation.RestController;
import org.springframework.web.client.RestTemplate;
import com.alibaba.fastjson.JSON;
import com.cloud.eureka.client.ucenter.domain.UserDto;
import com.cloud.eureka.client.ucenter.network.request.QueryUserByIdRequest;
import com.cloud.eureka.client.ucenter.network.response.QueryUserByIdResult;
import com.cloud.profile.factory.CloudServiceFactory;
import com.cloud.profile.third.UcenterThirdService;
@RestController
@RefreshScope
public class SystemController {
private static Logger logger = LoggerFactory.getLogger(SystemController.class);
@Autowired
private RestTemplate restTemplate;
@Autowired
private LoadBalancerClient loadBalancerClient;
@RequestMapping(value = "/queryUser", method = RequestMethod.GET)
public String queryUser(@RequestParam("userId") Long userId) {
CloudServiceFactory.createGetFactory(restTemplate, "", "", QueryUserByIdResult.class);
ServiceInstance serviceInstance = this.loadBalancerClient.choose("ucenter");
System.out.println("===" + ":" + serviceInstance.getServiceId() + ":" + serviceInstance.getHost() + ":"
+ serviceInstance.getPort());// 打印當前調用服務的信息
String ret = this.restTemplate.getForObject("http://ucenter/hello", String.class);
//("http://ucenter/queryUserById?userId=" +userId, QueryUserByIdResult.class);
QueryUserByIdRequest queryRequest = new QueryUserByIdRequest();
queryRequest.setUserId(userId);
Map<String, Object> paramMap = new HashMap<>();
paramMap.put("userId", userId);
logger.info("<== 根據userId查詢用戶 queryRequest : " + JSON.toJSONString(queryRequest));
@SuppressWarnings("unchecked")
QueryUserByIdResult<UserDto> rest = CloudServiceFactory.createGetServerFactory(restTemplate,
"ucenter", "queryUserById", QueryUserByIdResult.class, paramMap);
logger.info("query Result : " + JSON.toJSONString(rest));
return JSON.toJSONString(rest);
}
@Autowired
private UcenterThirdService ucenterThirdService;
@RequestMapping(value = "/queryUserInfo", method = RequestMethod.GET)
public String queryUserInfo (@RequestParam("userId") Long userId) {
logger.info("profile response " + Thread.currentThread().getName() + " " +
this.getClass().getSimpleName() + JSON.toJSONString(ucenterThirdService.queryUserInfo(userId)));
logger.info(ucenterThirdService.index());
logger.info("profile response " + Thread.currentThread().getName() + " " + this.getClass().getSimpleName() + "<<=-=-=-=>>恭喜用戶 " + userId + " 你鏈接成功<<=-=-=-=>>");
logger.info("profile response " + Thread.currentThread().getName() + " " +
this.getClass().getSimpleName() + JSON.toJSONString(ucenterThirdService.queryUserList()));
QueryUserByIdRequest queryRequest = new QueryUserByIdRequest();
queryRequest.setUserId(userId);
logger.info("<== 根據userId查詢用戶 queryRequest : " + JSON.toJSONString(queryRequest));
QueryUserByIdResult<UserDto> result = ucenterThirdService.queryUserById(queryRequest);
logger.info("<== 根據userId查詢用戶 result : " + JSON.toJSONString(result));
if (null != result && "000".equals(result.getRespCode())) {
logger.info("根據userId查詢用戶信息成功");
return JSON.toJSONString(result);
}
return "hello success";
}
@RequestMapping(value = "/queryUserById.do", method = RequestMethod.GET)
public String queryUserById(@RequestParam("userId") Long userId) {
QueryUserByIdRequest queryRequest = new QueryUserByIdRequest();
queryRequest.setUserId(userId);
logger.info("<== 根據userId查詢用戶 queryRequest : " + JSON.toJSONString(queryRequest));
QueryUserByIdResult<UserDto> result = ucenterThirdService.queryUserById(queryRequest);
logger.info("<== 根據userId查詢用戶 result : " + JSON.toJSONString(result));
if (null != result && "000".equals(result.getRespCode())) {
logger.info("根據userId查詢用戶信息成功");
return JSON.toJSONString(result);
}
return "error page";
}
}
-------------- application.porperties
server.port=8673
spring.application.name=profile
#spring.cloud.stream.instance-count=1
#spring.cloud.stream.instance-index=0
eureka.client.fetchRegistry=true
eureka.client.serviceUrl.defaultZone: http://localhost:8761/eureka/,http://localhost:8766/eureka/
eureka.client.registry-fetch-interval-seconds=30
eureka.instance.lease-expiration-duration-in-seconds=45
profile.ribbon.NFLoadBalancerRuleClassName=com.netflix.loadbalancer.RandomRule
------------------bootstrap.yml
spring:
cloud:
instance-count: 1
instance-index: 0
stream:
binder: kafka
kafka:
binder:
brokers: localhost:9092
zk-nodes: localhost:2181
min-partition-count: 1
auto-create-topics: true
auto-add-partitions: false
bindings:
output:
destination: event_demo
content-type: application/json #text/plain;charset=UTF-8
producer:
partition-count: 1
#zi ding yi binder can shu
#spring.cloud.stream.bindings.<channelName>.binder=<binderName>
#spring.cloud.stream.binders.<binderName>.type=kafka
#spring.cloud.stream.binders.<binderName>.environment.spring.cloud.stream.kafka.binder.brokers=10.79.96.52:9092
#spring.cloud.stream.binders.<binderName>.environment.spring.cloud.stream.kafka.binder.zk-nodes=10.79.96.52:2182
----------------------- pom.xml
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd">
<modelVersion>4.0.0</modelVersion>
<groupId>com.cloud.profile</groupId>
<artifactId>profile</artifactId>
<packaging>war</packaging>
<version>1.0.1-SNAPSHOT</version>
<name>profile Maven Webapp</name>
<url>http://maven.apache.org</url>
<parent>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-parent</artifactId>
<version>1.4.7.RELEASE</version>
<relativePath/>
</parent>
<dependencyManagement>
<dependencies>
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-dependencies</artifactId>
<version>Camden.SR7</version>
<type>pom</type>
<scope>import</scope>
</dependency>
</dependencies>
</dependencyManagement>
<dependencies>
<dependency>
<groupId>junit</groupId>
<artifactId>junit</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-starter-eureka</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>
</dependency>
<dependency>
<groupId>com.alibaba</groupId>
<artifactId>fastjson</artifactId>
<version>1.2.46</version>
</dependency>
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-starter-feign</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-starter-ribbon</artifactId>
</dependency>
<!-- <dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-starter-bus-kafka</artifactId>
</dependency> -->
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-stream-binder-kafka</artifactId>
</dependency>
</dependencies>
<build>
<finalName>profile</finalName>
</build>
</project>
#方式1
#spring:
# cloud:
# instance-count: 1
# instance-index: 0
# stream:
# binder: kafka
# kafka:
# binder:
# brokers: localhost:9092
# zk-nodes: localhost:2181
# min-partition-count: 1
# auto-create-topics: true
# auto-add-partitions: false
# bindings:
# output: # sourceA: 這裏指的是通道名字 ; output 是 Source 默認的通道名字
# destination: mess_data # 目標主題
# content-type: application/json # text/plain;charset=UTF-8
# producer:
# partition-count: 1
#zi ding yi binder can shu
#spring.cloud.stream.bindings.<channelName>.binder=<binderName>
#spring.cloud.stream.binders.<binderName>.type=kafka
#spring.cloud.stream.binders.<binderName>.environment.spring.cloud.stream.kafka.binder.brokers=localhost:9092
#spring.cloud.stream.binders.<binderName>.environment.spring.cloud.stream.kafka.binder.zk-nodes=localhost:2182
spring.cloud.stream.bindings.sourceA.binder=messA
spring.cloud.stream.bindings.sourceA.destination=messA_data
spring.cloud.stream.bindings.sourceA.producer.partition-count=1
spring.cloud.stream.bindings.sourceA.producer.partitioned=false
spring.cloud.stream.binders.messA.type=kafka
spring.cloud.stream.binders.messA.environment.spring.cloud.stream.kafka.binder.brokers=localhost:9092
spring.cloud.stream.binders.messA.environment.spring.cloud.stream.kafka.binder.zk-nodes=localhost:2181
spring.cloud.stream.binders.messA.environment.spring.cloud.stream.kafka.binder.auto-add-partitions=false
spring.cloud.stream.binders.messA.environment.spring.cloud.stream.kafka.binder.auto-create-topics=true
spring.cloud.stream.binders.messA.environment.spring.cloud.stream.kafka.binder.min-partition-count: 1
自定義渠道
package com.cloud.profile.third.kafka;
import org.springframework.cloud.stream.annotation.Output;
import org.springframework.messaging.MessageChannel;
public interface SourceOutput { String OUT_PUT = "sourceA"; @Output(SourceOutput.OUT_PUT) MessageChannel output(); }