spring cloud + kafka消息中間件

Kafka是一個持久化消息發佈訂閱系統,經常使用於消息隊列、日誌通道等場景java

1 Producer: 特指消息的生產者node

2 Consumer :特指消息的消費者web

3 Consumer Group :消費者組,能夠並行消費Topic中partition的消息redis

4 Broker:緩存代理,Kafa 集羣中的一臺或多臺服務器統稱爲 broker。spring

5 Topic:特指 Kafka 處理的消息源(feeds of messages)的不一樣分類。apache

6 Partition:Topic 物理上的分組,一個 topic 能夠分爲多個 partition,每一個 partition 是一個有序的隊列。partition 中的每條消息都會被分配一個有序的 id(offset)json

7 Message:消息,是通訊的基本單位,每一個 producer 能夠向一個 topic(主題)發佈一些消息bootstrap

8 稀疏索引:採用稀疏索引的方式,利用二分查找,定位消息。windows

@EnableBinding註解,綁定消息通道。該註解用來指定一個或者多個定義了@Input或@Output註解的接口。緩存

@EnableBinding(Sink.class),綁定了Sink接口,Sink接口是Spring Cloud 中默認綁定輸入通道,除此以外,還有綁定輸出通道Source,還有綁定輸入輸出通道的Processor通道。除了Spring Cloud定義的接口外,咱們也能夠自定義。

@StreamListener註解是將被修飾的方法註冊爲消息中間件上數據流的事件監聽器,註解中的屬性值對應了監聽的消息通道名

zookeeper安裝

進入Zookeeper設置目錄,筆者D:\Java\Tool\zookeeper-3.4.6\conf
將「zoo_sample.cfg」重命名爲「zoo.cfg」
在任意文本編輯器(如notepad)中打開zoo.cfg
找到並編輯dataDir=D:\\Java\\Tool\\zookeeper-3.4.6\\tmp
與Java中的作法相似,咱們在系統環境變量中添加:
  a. 在系統變量中添加ZOOKEEPER_HOME = D:\Java\Tool\zookeeper-3.4.6
  b. 編輯path系統變量,添加爲路徑%ZOOKEEPER_HOME%\bin;
在zoo.cfg文件中修改默認的Zookeeper端口(默認端口2181)

啓動zookeeper 

啓動kafka(重要:請確保在啓動Kafka服務器前,Zookeeper實例已經準備好並開始運行

.\bin\windows\kafka-server-start.bat .\config\server.properties

--------------- 註冊中心server1啓動類

package org.eureka.server;

import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.boot.builder.SpringApplicationBuilder;
import org.springframework.cloud.config.server.EnableConfigServer;
import org.springframework.cloud.netflix.eureka.server.EnableEurekaServer;

@SpringBootApplication
@EnableEurekaServer
@EnableConfigServer
public class EurekaServerStarter {
    public static void main(String[] args) {
        new SpringApplicationBuilder(EurekaServerStarter.class).run(args);
    }
}

--------------------------application.yml 

server:
  port: 8761
  
# 是否要開啓基本的鑑權  
security:
  basic:
    enabled: false
  user:
    name: admin
    password: 123456

management:
  security:
    enabled: false

spring:
  profiles:
    active: peer2,native
  cloud:
    config:
      server:
        native:
          search-locations: file:///D:/Users/xuzhi268/zhongchou/config_profiles

eureka:
  instance:
    hostname: peer1
    lease-renewal-interval-in-seconds: 30 #指定續約更新頻率,默認是 30s
  environment: dev
  client:
    register-with-eureka: false # 禁用eureka做爲客戶端註冊本身
    fetch-registry: false       # 表示是否從eureka server獲取註冊信息,若是是單一節點,不須要同步其餘eureka server節點,則能夠設置爲false,但此處爲集羣,應該設置爲true,默認爲true,可不設置
    serviceUrl:
      defaultZone: http://peer1:8761/eureka/,http://peer2:8766/eureka/
      #http://admin:123456@peer1:8761/eureka/,http://admin:123456@peer2:8766/eureka/ #多個用逗號隔開
      -------------------------- pom.xml

<project xmlns="http://maven.apache.org/POM/4.0.0" 
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" 
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
    <modelVersion>4.0.0</modelVersion>
    <groupId>org.eureka.server</groupId>
    <artifactId>eureka_server</artifactId>
    <version>1.0.1-SNAPSHOT</version>
    
    <parent>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-parent</artifactId>
        <version>1.5.2.RELEASE</version>
        <relativePath/>
    </parent>
    
    <dependencyManagement>
        <dependencies>
            <dependency>
                <groupId>org.springframework.cloud</groupId>
                <artifactId>spring-cloud-dependencies</artifactId>
                <version>Camden.SR7</version>
                <type>pom</type>
                <scope>import</scope>
            </dependency>
        </dependencies>
    </dependencyManagement>
    
    <dependencies>
        <dependency>
            <groupId>org.springframework.cloud</groupId>
            <artifactId>spring-cloud-starter-eureka-server</artifactId>
        </dependency>
        
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-web</artifactId>
        </dependency>
        
        <dependency>
            <groupId>org.springframework.cloud</groupId>
            <artifactId>spring-cloud-config-server</artifactId>
        </dependency>
        
        <dependency>
            <groupId>org.springframework.cloud</groupId>
            <artifactId>spring-cloud-starter-config</artifactId>
        </dependency>
        <!-- 開啓基本的鑑權 -->
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-security</artifactId>
        </dependency>   
    </dependencies>
    <build>
        <plugins>
            <plugin>
                <groupId>org.springframework.boot</groupId>
                <artifactId>spring-boot-maven-plugin</artifactId>
            </plugin>
        </plugins>
    </build>
</project>

--------------------- 註冊中心server2

啓動類

package org.eureka.server;

import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.boot.builder.SpringApplicationBuilder;
import org.springframework.cloud.config.server.EnableConfigServer;
import org.springframework.cloud.netflix.eureka.server.EnableEurekaServer;

@SpringBootApplication
@EnableEurekaServer
@EnableConfigServer
public class EurekaServerStartRunner {
    public static void main(String[] args) {
        new SpringApplicationBuilder(EurekaServerStartRunner.class).run(args);
    }
}
--------------------- application.yml

server:
  port: 8766
  
security:
  basic:
    enabled: false
  user:
    name: admin
    password: 123456
   
management:
  security:
    enabled: false

spring:
  application:
    name: eureka
  profiles:
    active: peer1,native
  cloud:
    config:
      server:
        native:
          search-locations: file:///D:/Users/xuzhi268/zhongchou/config_profiles
           
eureka:
  instance:
    hostname: peer2
    lease-renewal-interval-in-seconds: 30
  environment: dev
  client:
    register-with-eureka: false
    fetch-registry: false
    serviceUrl:
      defaultZone: http://peer1:8761/eureka/,http://peer2:8766/eureka/
  #http://admin:123456@peer1:8761/eureka/,http://admin:123456@peer2:8766/eureka/
  -------------------------pom.xml

<project xmlns="http://maven.apache.org/POM/4.0.0" 
    xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" 
    xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
    <modelVersion>4.0.0</modelVersion>
    <groupId>org.eureka.server</groupId>
    <artifactId>eureka_server1</artifactId>
    <version>0.0.1-SNAPSHOT</version>
    
    <parent>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-parent</artifactId>
        <version>1.4.7.RELEASE</version>
    </parent>
    
    <dependencyManagement>
        <dependencies>
            <dependency>
                <groupId>org.springframework.cloud</groupId>
                <artifactId>spring-cloud-dependencies</artifactId>
                <version>Camden.SR7</version>
                <type>pom</type>
                <scope>import</scope>
            </dependency>
        </dependencies>
    </dependencyManagement>
    
    <dependencies>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-web</artifactId>
        </dependency>
        
        <dependency>
            <groupId>org.springframework.cloud</groupId>
            <artifactId>spring-cloud-starter-eureka-server</artifactId>
        </dependency>
        
        <dependency>
            <groupId>org.springframework.cloud</groupId>
            <artifactId>spring-cloud-starter-config</artifactId>    
        </dependency>
        <dependency>
            <groupId>org.springframework.cloud</groupId>
            <artifactId>spring-cloud-config-server</artifactId>
        </dependency>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-actuator</artifactId>
        </dependency>
    </dependencies>
</project>

--------------- 服務提供者代碼

package com.cloud.eureka.client.ucenter;

import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.boot.builder.SpringApplicationBuilder;
import org.springframework.cloud.netflix.eureka.EnableEurekaClient;

@SpringBootApplication
@EnableEurekaClient
public class UcenterApplicationRunner {
    
    public static void main(String[] args) {
        new SpringApplicationBuilder(UcenterApplicationRunner.class).properties("server.port=" + 8765).run(args);
    }
}
------------------------------------------------------

package com.cloud.eureka.client.ucenter.service;

import java.util.List;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.cloud.client.ServiceInstance;
import org.springframework.cloud.client.discovery.DiscoveryClient;
import org.springframework.cloud.context.config.annotation.RefreshScope;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RequestMethod;
import org.springframework.web.bind.annotation.RequestParam;
import org.springframework.web.bind.annotation.RestController;

import com.cloud.eureka.client.ucenter.biz.UserInfoService;
import com.cloud.eureka.client.ucenter.domain.UserDto;
import com.cloud.eureka.client.ucenter.network.request.QueryUserByIdRequest;
import com.cloud.eureka.client.ucenter.network.response.QueryUserByIdResult;

@RestController
@RefreshScope
public class UcenterInfoService {

     private final Logger logger = LoggerFactory.getLogger(UcenterInfoService.class);
     
     @Autowired
     private DiscoveryClient discoveryClient;
     
     @Value(value = "${redis.host}")
     private String redisHost;
     @Value(value = "${redis.port}")
     private String redisPort;
     @Value("${server.port}")
     private String port;
     
     @RequestMapping(value = "/hello", method = RequestMethod.GET)
     public String index() {
        logger.info("server.port : " + port);
        logger.info("redis host " + redisHost + ":" + redisPort);
        ServiceInstance instance = discoveryClient.getLocalServiceInstance();
        logger.info("<=-=-=-= ucenter server access index() " + this.getClass().getSimpleName() + " " + Thread.currentThread().getName());
        logger.info("/hello, host:" + instance.getHost() + ", service_id:" + instance.getServiceId());
        return "server.port : " + port;
     }
     
     @Autowired
     private UserInfoService userInfoService;
     
     @RequestMapping(value = "/queryUserInfo", method = RequestMethod.GET)
     public UserDto queryUserInfo (@RequestParam("userId") Long userId) {
         logger.info("server.port : " + port);
         logger.info("redis host " + redisHost + ":" + redisPort);
         logger.info("<=-=-=-= ucenter server access queryUserInfo " + this.getClass().getSimpleName()
                 + " " + Thread.currentThread().getName());
         logger.info("<<=-=-=-=>>恭喜用戶 " + userId + " 你鏈接成功<<=-=-=-=>>");
         return userInfoService.queryUserInfo();
     }
     
     @RequestMapping(value = "queryUserList", method = RequestMethod.POST)
     public List<UserDto> queryUserList() {
         logger.info("server.port : " + port);
         logger.info("redis host " + redisHost + ":" + redisPort);
         logger.info("<=-=-=-= ucenter server access queryUserList " + this.getClass().getSimpleName()
                 + " " + Thread.currentThread().getName());
         List<UserDto> ulist = userInfoService.queryUserList();
         return ulist;
     }
     
     @RequestMapping(value = "/queryUserById", method = RequestMethod.POST)
     public QueryUserByIdResult<UserDto> queryUserUserId(QueryUserByIdRequest request) {
         logger.info("server.port : " + port);
         logger.info("redis host " + redisHost + ":" + redisPort);
         logger.info("<=-=-=-= ucenter server access queryUserInfo " + this.getClass().getSimpleName()
                 + " " + Thread.currentThread().getName());
         QueryUserByIdResult<UserDto> result = new QueryUserByIdResult<UserDto>();
         UserDto dto = userInfoService.queryUserInfo();
         result.setModel(dto);
         result.setRespCode("000");
         result.setRespMsg("查詢成功 server.port : " + port);
         return result;
     }
}
---------------------------------消息消費者-------------

package com.cloud.eureka.client.ucenter.service.kafka;

import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.cloud.stream.annotation.EnableBinding;
import org.springframework.cloud.stream.annotation.StreamListener;
import org.springframework.cloud.stream.messaging.Sink;
import org.springframework.kafka.support.Acknowledgment;
import org.springframework.kafka.support.KafkaHeaders;
import org.springframework.messaging.Message;

import com.alibaba.fastjson.JSON;
import com.cloud.eureka.client.ucenter.biz.UserInfoService;
import com.cloud.eureka.client.ucenter.domain.MessageBody;
import com.cloud.eureka.client.ucenter.domain.UserDto;

@EnableBinding(Sink.class)
public class MsgSink {

    @Autowired
    private UserInfoService userInfoService;
    
    @StreamListener(Sink.INPUT)
    public void process(Message<?> message) {
        System.out.println("==== 消費者消費消息開始 : " + message.getPayload());
        String message2 = String.valueOf(message.getPayload());
        MessageBody body = JSON.parseObject(message2, MessageBody.class);
        System.out.println("json 2 message body " + body);
        if (null != body) {
            UserDto userDto = userInfoService.queryUserInfo();
            System.out.println("查詢用戶[" + body.getUserName() + "]信息 : " + JSON.toJSONString(userDto));
            System.out.println("發放指定產品【" + body.getProductId() + "】代金券數量爲:" + body.getCouponNum());
        }
        Acknowledgment acknowledgment = message.getHeaders().get(KafkaHeaders.ACKNOWLEDGMENT, Acknowledgment.class);
        if (acknowledgment != null) {
            System.out.println("==== 消費者消費消息應答,Acknowledgment provided");
            acknowledgment.acknowledge();
        }
    }
}

----------------------

package com.cloud.eureka.client.ucenter.facade;

import java.util.List;

import org.springframework.cloud.netflix.feign.FeignClient;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RequestMethod;
import org.springframework.web.bind.annotation.RequestParam;

import com.cloud.eureka.client.ucenter.domain.UserDto;
import com.cloud.eureka.client.ucenter.network.request.QueryUserByIdRequest;
import com.cloud.eureka.client.ucenter.network.response.QueryUserByIdResult;

@FeignClient("ucenter")
public interface UcenterCloudFacade {
     @RequestMapping(value = "/hello", method = RequestMethod.GET)
     public String index();
    
     @RequestMapping(value = "/queryUserInfo", method = RequestMethod.GET)
     public UserDto queryUserInfo (@RequestParam("userId") Long userId);
     
     @RequestMapping(value = "/queryUserList", method = RequestMethod.POST)
     public List<UserDto> queryUserList();
     
     @RequestMapping(value = "/queryUserById", method = RequestMethod.POST)
     public QueryUserByIdResult<UserDto> queryUserById(QueryUserByIdRequest request);
}
------------------------------

package com.cloud.eureka.client.ucenter.domain;

public class MessageBody {
    private Long userId;
    private Long productId;
    private Integer couponNum;
    private String userName;
    
    public Long getUserId() {
        return userId;
    }
    public void setUserId(Long userId) {
        this.userId = userId;
    }
    public Long getProductId() {
        return productId;
    }
    public void setProductId(Long productId) {
        this.productId = productId;
    }
    public Integer getCouponNum() {
        return couponNum;
    }
    public void setCouponNum(Integer couponNum) {
        this.couponNum = couponNum;
    }
    public String getUserName() {
        return userName;
    }
    public void setUserName(String userName) {
        this.userName = userName;
    }
}
---------------------------

package com.cloud.eureka.client.ucenter.domain;

import java.io.Serializable;

public class UserDto implements Serializable {
    /*** */
    private static final long serialVersionUID = 8541673794025166248L;
    
    private Long id;
    private String userName;
    private String address;

    public Long getId() {
        return id;
    }

    public void setId(Long id) {
        this.id = id;
    }

    public String getUserName() {
        return userName;
    }

    public void setUserName(String userName) {
        this.userName = userName;
    }

    public String getAddress() {
        return address;
    }

    public void setAddress(String address) {
        this.address = address;
    }
}
------------------------------

package com.cloud.eureka.client.ucenter.biz;

import java.util.ArrayList;
import java.util.List;

import org.springframework.stereotype.Service;

import com.cloud.eureka.client.ucenter.domain.UserDto;

@Service
public class UserInfoService {
    
    public UserDto queryUserInfo() {
        UserDto dto = new UserDto();
        dto.setId(1l);
        dto.setUserName("李世民");
        dto.setAddress("陝西省咸陽市紫禁城");
        System.out.println("<<=-=-=-=>>恭喜,恭喜你鏈接成功<<=-=-=-=>>");
        return dto;
    }
    
    public List<UserDto> queryUserList() {
        UserDto dto = new UserDto();
        dto.setId(1203586l);
        dto.setUserName("李世民");
        dto.setAddress("陝西省咸陽市紫禁城");
        
        UserDto dto1 = new UserDto();
        dto1.setId(1022589l);
        dto1.setUserName("朱江明");
        dto1.setAddress("江蘇省南京市");
        
        UserDto dto2 = new UserDto();
        dto2.setId(1022575l);
        dto2.setUserName("劉如海");
        dto2.setAddress("湖北省省武漢市市");
        List<UserDto> ulist = new ArrayList<>();
        
        ulist.add(dto);
        ulist.add(dto1);
        ulist.add(dto2);
        return ulist;
    }
}
------------------- application.yml


#server:
  #當前服務端口號
 # port: 8762
spring:
  application:
    #當前應用名稱
    name: ucenter
  cloud:
     instance-count: 1
     instance-index: 0
     stream:
        binder: kafka
        kafka:
          binder:
            brokers: localhost:9092
            zk-nodes: localhost:2181
            auto-add-partitions: false
            auto-create-topics: true
            min-partition-count: 1
        bindings:
          input:
            destination: event_demo
            group: s1
            consumer:
              concurrency: 1
              partitioned: false       
        
    
    
eureka:
  client:
    serviceUrl:
      #註冊中心的地址
      defaultZone: http://peer1:8761/eureka/,http://peer2:8766/eureka/   

--------------------- bootstrap.yml


#禁用配置中心權限驗證
management:
  security:
    enabled: false

spring:
  cloud:
    config:
      uri: http://localhost:8761/       
        
        
feign:
  httpclient:
    enabled: true
    max-connections: 200 # 默認值
    max-connections-per-route: 50 # 默認值
---------------------------- pom.xml

<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
  xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd">
    <modelVersion>4.0.0</modelVersion>
    <groupId>com.cloud.eureka.client</groupId>
    <artifactId>ucenter</artifactId>
    <packaging>war</packaging>
    <version>1.0.1-SNAPSHOT</version>
    <name>ucenter Maven Webapp</name>
    <url>http://maven.apache.org</url>
    
    <parent>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-parent</artifactId>
        <version>1.4.7.RELEASE</version>
        <relativePath/>
    </parent>
    
    <dependencyManagement>
        <dependencies>
            <dependency>
                <groupId>org.springframework.cloud</groupId>
                <artifactId>spring-cloud-dependencies</artifactId>
                <version>Camden.SR7</version>
                <type>pom</type>
                <scope>import</scope>
            </dependency>
        </dependencies>
    </dependencyManagement>
    
    <dependencies>
        <dependency>
            <groupId>junit</groupId>
            <artifactId>junit</artifactId>
            <scope>test</scope>
        </dependency>
        
        <dependency>
            <groupId>org.springframework.cloud</groupId>
            <artifactId>spring-cloud-starter-eureka</artifactId>
        </dependency>
    
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-web</artifactId>
        </dependency>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-test</artifactId>
            <scope>test</scope>
        </dependency>        
        <dependency>
            <groupId>org.springframework.cloud</groupId>
            <artifactId>spring-cloud-starter-config</artifactId>
        </dependency>        
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-actuator</artifactId>
        </dependency>       
        <!-- <dependency>
            <groupId>org.springframework.cloud</groupId>
            <artifactId>spring-cloud-starter-bus-kafka</artifactId>
        </dependency> -->
        <dependency>
            <groupId>org.springframework.cloud</groupId>
            <artifactId>spring-cloud-stream-binder-kafka</artifactId>
        </dependency>    
        <dependency>
            <groupId>com.alibaba</groupId>
            <artifactId>fastjson</artifactId>
            <version>1.2.47</version>
        </dependency>          
    </dependencies>
    <build>
        <finalName>ucenter</finalName>
    </build>
</project>
-------------------------- 客戶端代碼

package com.cloud.profile.third;

import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.boot.web.client.RestTemplateBuilder;
import org.springframework.cloud.client.loadbalancer.LoadBalanced;
import org.springframework.cloud.netflix.eureka.EnableEurekaClient;
import org.springframework.cloud.netflix.feign.EnableFeignClients;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.ComponentScan;
import org.springframework.web.client.RestTemplate;

/**
 * @EnableDiscoveryClient :啓用服務註冊與發現
 * @EnableFeignClients:啓用feign進行遠程調用
 * Feign是一個聲明式Web Service客戶端。使用Feign能讓編寫Web Service客戶端更加簡單, 
 * 它的使用方法是定義一個接口,而後在上面添加註解,同時也支持JAX-RS標準的註解。Feign也支持可拔插式的編碼器和解碼器。
 * Spring Cloud對Feign進行了封裝,使其支持了Spring MVC標準註解和HttpMessageConverters。
 * Feign能夠與Eureka和Ribbon組合使用以支持負載均衡。
 */

@SpringBootApplication
@EnableEurekaClient
@EnableFeignClients
@ComponentScan(basePackages = {"com.cloud.profile.controller", "com.cloud.profile.third.kafka"})
public class ComsumerAppliactionRunner {
    public static void main(String[] args) {
        SpringApplication.run(ComsumerAppliactionRunner.class, args);
    }
    
    @Autowired  
    private RestTemplateBuilder builder;  
    
    @Bean  
    @LoadBalanced
    public RestTemplate restTemplate() {  
        return builder.build();  
    } 
}
----------------------- 消息發送端

package com.cloud.profile.third.kafka;

import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.PathVariable;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RequestMethod;
import org.springframework.web.bind.annotation.RestController;

import com.alibaba.fastjson.JSON;
import com.cloud.profile.dto.MessageBody;

@RestController
public class ProducerController {
    @Autowired
    private SendService service;
    
    @RequestMapping(value = "/send/{msg}", method = RequestMethod.GET)
    public void send(@PathVariable("msg") String msg){
        
        MessageBody body = new MessageBody();
        body.setCouponNum(5);
        body.setProductId(10023l);
        body.setUserId(13809825l);
        body.setUserName("趙敏");
        System.out.println("==== 生產者, 開始發送消息:" + JSON.toJSONString(body));
        service.sendMessage(JSON.toJSONString(body));
        System.out.println("==== 生產者,發送消息結束。");
    }
}
 

package com.cloud.profile.third.kafka;

import java.util.HashMap;
import java.util.Map;

import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.cloud.stream.annotation.EnableBinding;
import org.springframework.cloud.stream.messaging.Source;
import org.springframework.kafka.support.KafkaHeaders;
import org.springframework.messaging.Message;
import org.springframework.messaging.MessageHeaders;
import org.springframework.messaging.support.MessageBuilder;

@EnableBinding(Source.class)
public class SendService {
    
    @Autowired
    private Source source;

    public void sendMessage(final String msg) {
        try {
            System.out.println("==== 開始發送消息:" + msg);
            Message<String> message = new Message<String>() {

                @Override
                public String getPayload() {
                    return msg;
                }

                @Override
                public MessageHeaders getHeaders() {
                    Map<String, Object> headers = new HashMap<>();
                    headers.put(KafkaHeaders.ACKNOWLEDGMENT, "yes");
                    MessageHeaders header = new MessageHeaders(headers);
                    return header;
                }
            };
            boolean ret = source.output().send(message);
            //boolean ret = source.output().send(MessageBuilder.withPayload(msg).build());
            System.out.println("==== 發送消息結束。" + (ret ? "發送成功":"發送失敗"));
        } catch (Exception e) {
            e.printStackTrace();
        }
    }
}
------------------------------------

package com.cloud.profile.factory;

import java.lang.reflect.Method;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.web.client.RestTemplate;

import com.alibaba.fastjson.JSON;

public class CloudServiceFactory {
    
    private static final Logger logger = LoggerFactory.getLogger(CloudServiceFactory.class);
    
    private static Map<String, Object> claxxCash = new ConcurrentHashMap<>();
    
    public static <T> T createGetServerFactory (RestTemplate restTemplate, 
            String serviceName, String serverUrl, Class<T> resultObject, Map<String, Object> paramMap) {
        T returnObject = null;
        try {
            StringBuffer serverRequestUri = new StringBuffer("http://" + serviceName);
            serverRequestUri.append("/" + serverUrl);
            if (null != paramMap && paramMap.entrySet().size() > 0) {
                serverRequestUri.append("?");
                for (Map.Entry<String, Object> reqParam : paramMap.entrySet()) {
                    serverRequestUri.append(reqParam.getKey() + "=" + reqParam.getValue() + "&");
                }
                serverRequestUri.deleteCharAt(serverRequestUri.length() - 1);
            }
            
            logger.info("create cloud server request Uri :" + serverRequestUri.toString());
            returnObject = restTemplate.getForObject(serverRequestUri.toString(), resultObject);
            logger.info("create cloud server result :" + JSON.toJSONString(returnObject));
        } catch (Exception e) {
            logger.error("create cloud server instance exception " + e.getMessage());
        }
        
        return returnObject;
    }
    
    @SuppressWarnings("unchecked")
    public static <T> T createGetFactory (RestTemplate restTemplate, String serviceName, String serverUrl, Class<T> resultObject) {
        T returnObject = null;
        try {
            if (null != claxxCash) {
                Object object = claxxCash.get(resultObject.getSimpleName());
                if (null != object) {
                    return (T) object;
                }
            }
            returnObject = resultObject.newInstance();
            StringBuffer serverRequestUri = new StringBuffer("http://" + serviceName);
            
            returnObject = restTemplate.getForObject(serverRequestUri.toString(), resultObject);
            
        } catch (Exception e) {
            logger.error("create cloud server instance exception " + e.getMessage());
        }
        if (null != returnObject) {
            claxxCash.put(resultObject.getSimpleName(), resultObject);
        }
        return returnObject;
    }
    
    @SuppressWarnings("unchecked")
    public static <T> T createPostServerFactory (RestTemplate restTemplate, String serviceName, 
            String serverUrl, Object request, Class<T> resultObject, Object... params) {
        T returnObject = null;
        try {
            if (null != claxxCash) {
                Object object = claxxCash.get(resultObject.getSimpleName());
                if (null != object) {
                    return (T) object;
                }
            }
            returnObject = resultObject.newInstance();
            StringBuffer serverRequestUri = new StringBuffer("http://" + serviceName).append("/" + serverUrl);
            returnObject = restTemplate.postForEntity(serverRequestUri.toString(), request, resultObject, params).getBody();
        } catch (Exception e) {
            logger.error("create cloud server instance exception " + e.getMessage());
        }
        if (null != returnObject) {
            claxxCash.put(resultObject.getSimpleName(), resultObject);
        }
        return returnObject;
    }
}
 

package com.cloud.profile.controller;

import java.util.HashMap;
import java.util.Map;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.cloud.client.ServiceInstance;
import org.springframework.cloud.client.loadbalancer.LoadBalancerClient;
import org.springframework.cloud.context.config.annotation.RefreshScope;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RequestMethod;
import org.springframework.web.bind.annotation.RequestParam;
import org.springframework.web.bind.annotation.RestController;
import org.springframework.web.client.RestTemplate;

import com.alibaba.fastjson.JSON;
import com.cloud.eureka.client.ucenter.domain.UserDto;
import com.cloud.eureka.client.ucenter.network.request.QueryUserByIdRequest;
import com.cloud.eureka.client.ucenter.network.response.QueryUserByIdResult;
import com.cloud.profile.factory.CloudServiceFactory;
import com.cloud.profile.third.UcenterThirdService;

@RestController
@RefreshScope
public class SystemController {
    
    private static Logger logger = LoggerFactory.getLogger(SystemController.class);
     
    @Autowired
    private RestTemplate restTemplate;
    @Autowired  
    private LoadBalancerClient loadBalancerClient;  
    
    @RequestMapping(value = "/queryUser", method = RequestMethod.GET)
    public String queryUser(@RequestParam("userId") Long userId) {
        
        CloudServiceFactory.createGetFactory(restTemplate, "", "", QueryUserByIdResult.class);
        
        ServiceInstance serviceInstance = this.loadBalancerClient.choose("ucenter");
        
        System.out.println("===" + ":" + serviceInstance.getServiceId() + ":" + serviceInstance.getHost() + ":"  
                    + serviceInstance.getPort());// 打印當前調用服務的信息 
        String ret = this.restTemplate.getForObject("http://ucenter/hello", String.class); 
        
                //("http://ucenter/queryUserById?userId=" +userId, QueryUserByIdResult.class);
        QueryUserByIdRequest queryRequest = new QueryUserByIdRequest(); 
        queryRequest.setUserId(userId);
        Map<String, Object> paramMap = new HashMap<>();
        paramMap.put("userId", userId);
        logger.info("<== 根據userId查詢用戶 queryRequest : " + JSON.toJSONString(queryRequest));
        @SuppressWarnings("unchecked")
        QueryUserByIdResult<UserDto> rest = CloudServiceFactory.createGetServerFactory(restTemplate, 
                    "ucenter", "queryUserById", QueryUserByIdResult.class, paramMap);
        logger.info("query Result : " + JSON.toJSONString(rest));
        return JSON.toJSONString(rest);
    }
    
    @Autowired
    private UcenterThirdService ucenterThirdService;
    
    @RequestMapping(value = "/queryUserInfo", method = RequestMethod.GET)
    public String queryUserInfo (@RequestParam("userId") Long userId) {
    
        logger.info("profile response " + Thread.currentThread().getName() + " " + 
                 this.getClass().getSimpleName() + JSON.toJSONString(ucenterThirdService.queryUserInfo(userId)));
        logger.info(ucenterThirdService.index());
        logger.info("profile response " + Thread.currentThread().getName() + " " + this.getClass().getSimpleName() + "<<=-=-=-=>>恭喜用戶 " + userId + " 你鏈接成功<<=-=-=-=>>");
        logger.info("profile response " + Thread.currentThread().getName() + " " + 
                 this.getClass().getSimpleName() + JSON.toJSONString(ucenterThirdService.queryUserList()));
        
        QueryUserByIdRequest queryRequest = new QueryUserByIdRequest(); 
        queryRequest.setUserId(userId);
        logger.info("<== 根據userId查詢用戶 queryRequest : " + JSON.toJSONString(queryRequest));
        QueryUserByIdResult<UserDto> result = ucenterThirdService.queryUserById(queryRequest);
        logger.info("<== 根據userId查詢用戶 result : " + JSON.toJSONString(result));
        if (null != result && "000".equals(result.getRespCode())) {
             logger.info("根據userId查詢用戶信息成功");
             return JSON.toJSONString(result);
        }
        
        return "hello success";
    }
    
    @RequestMapping(value = "/queryUserById.do", method = RequestMethod.GET)
    public String queryUserById(@RequestParam("userId") Long userId) {
        QueryUserByIdRequest queryRequest = new QueryUserByIdRequest(); 
        queryRequest.setUserId(userId);
        logger.info("<== 根據userId查詢用戶 queryRequest : " + JSON.toJSONString(queryRequest));
        QueryUserByIdResult<UserDto> result = ucenterThirdService.queryUserById(queryRequest);
        logger.info("<== 根據userId查詢用戶 result : " + JSON.toJSONString(result));
        if (null != result && "000".equals(result.getRespCode())) {
             logger.info("根據userId查詢用戶信息成功");
             return JSON.toJSONString(result);
        }
        return "error page";
    }  
}
-------------- application.porperties

server.port=8673
spring.application.name=profile
#spring.cloud.stream.instance-count=1
#spring.cloud.stream.instance-index=0

eureka.client.fetchRegistry=true
eureka.client.serviceUrl.defaultZone: http://localhost:8761/eureka/,http://localhost:8766/eureka/

eureka.client.registry-fetch-interval-seconds=30
eureka.instance.lease-expiration-duration-in-seconds=45

profile.ribbon.NFLoadBalancerRuleClassName=com.netflix.loadbalancer.RandomRule

------------------bootstrap.yml


spring:
  cloud:
    instance-count: 1
    instance-index: 0
    stream:
      binder: kafka
      kafka:
        binder:
          brokers: localhost:9092
          zk-nodes: localhost:2181
          min-partition-count: 1
          auto-create-topics: true
          auto-add-partitions: false
      bindings:
        output:
          destination: event_demo
          content-type: application/json   #text/plain;charset=UTF-8
          producer:
            partition-count: 1
        

#zi ding yi binder can shu          
#spring.cloud.stream.bindings.<channelName>.binder=<binderName>
#spring.cloud.stream.binders.<binderName>.type=kafka
#spring.cloud.stream.binders.<binderName>.environment.spring.cloud.stream.kafka.binder.brokers=10.79.96.52:9092
#spring.cloud.stream.binders.<binderName>.environment.spring.cloud.stream.kafka.binder.zk-nodes=10.79.96.52:2182
            
----------------------- pom.xml

<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
  xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd">
    <modelVersion>4.0.0</modelVersion>
    <groupId>com.cloud.profile</groupId>
    <artifactId>profile</artifactId>
    <packaging>war</packaging>
    <version>1.0.1-SNAPSHOT</version>
    <name>profile Maven Webapp</name>
    <url>http://maven.apache.org</url>
    
    <parent>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-parent</artifactId>
        <version>1.4.7.RELEASE</version>
        <relativePath/>
    </parent>
    
    <dependencyManagement>
        <dependencies>
            <dependency>
                <groupId>org.springframework.cloud</groupId>
                <artifactId>spring-cloud-dependencies</artifactId>
                <version>Camden.SR7</version>
                <type>pom</type>
                <scope>import</scope>
            </dependency>
        </dependencies>
    </dependencyManagement>
    
    <dependencies>
        <dependency>
            <groupId>junit</groupId>
            <artifactId>junit</artifactId>
            <scope>test</scope>
        </dependency>
        <dependency>
            <groupId>org.springframework.cloud</groupId>
            <artifactId>spring-cloud-starter-eureka</artifactId>
        </dependency>
    
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-web</artifactId>
        </dependency>
        <dependency>
            <groupId>com.alibaba</groupId>
            <artifactId>fastjson</artifactId>
            <version>1.2.46</version>
        </dependency>
        <dependency>
           <groupId>org.springframework.cloud</groupId>
           <artifactId>spring-cloud-starter-feign</artifactId>
        </dependency>
        <dependency>
            <groupId>org.springframework.cloud</groupId>
            <artifactId>spring-cloud-starter-ribbon</artifactId>
        </dependency>
        
        <!-- <dependency>
            <groupId>org.springframework.cloud</groupId>
            <artifactId>spring-cloud-starter-bus-kafka</artifactId>
        </dependency> -->
        
        <dependency>
            <groupId>org.springframework.cloud</groupId>
            <artifactId>spring-cloud-stream-binder-kafka</artifactId>
        </dependency>
        
    </dependencies>
    <build>
        <finalName>profile</finalName>
    </build>
</project>
              
 

#方式1
#spring:
#  cloud:
#    instance-count: 1
#    instance-index: 0
#    stream:
#      binder: kafka
#      kafka:
#        binder:
#          brokers: localhost:9092
#          zk-nodes: localhost:2181
#          min-partition-count: 1
#          auto-create-topics: true
#          auto-add-partitions: false
#      bindings:
#        output:                            # sourceA: 這裏指的是通道名字 ; output 是 Source 默認的通道名字
#          destination: mess_data           # 目標主題
#          content-type: application/json   # text/plain;charset=UTF-8
#          producer:
#            partition-count: 1
      

#zi ding yi binder can shu          
#spring.cloud.stream.bindings.<channelName>.binder=<binderName>
#spring.cloud.stream.binders.<binderName>.type=kafka
#spring.cloud.stream.binders.<binderName>.environment.spring.cloud.stream.kafka.binder.brokers=localhost:9092
#spring.cloud.stream.binders.<binderName>.environment.spring.cloud.stream.kafka.binder.zk-nodes=localhost:2182

spring.cloud.stream.bindings.sourceA.binder=messA
spring.cloud.stream.bindings.sourceA.destination=messA_data
spring.cloud.stream.bindings.sourceA.producer.partition-count=1
spring.cloud.stream.bindings.sourceA.producer.partitioned=false
spring.cloud.stream.binders.messA.type=kafka
spring.cloud.stream.binders.messA.environment.spring.cloud.stream.kafka.binder.brokers=localhost:9092
spring.cloud.stream.binders.messA.environment.spring.cloud.stream.kafka.binder.zk-nodes=localhost:2181
spring.cloud.stream.binders.messA.environment.spring.cloud.stream.kafka.binder.auto-add-partitions=false
spring.cloud.stream.binders.messA.environment.spring.cloud.stream.kafka.binder.auto-create-topics=true
spring.cloud.stream.binders.messA.environment.spring.cloud.stream.kafka.binder.min-partition-count: 1

自定義渠道

package com.cloud.profile.third.kafka;

import org.springframework.cloud.stream.annotation.Output;
import org.springframework.messaging.MessageChannel;

public interface SourceOutput {          String OUT_PUT = "sourceA";          @Output(SourceOutput.OUT_PUT)     MessageChannel output(); }

相關文章
相關標籤/搜索