目錄html
部署環境:阿里雲ECS服務器java
端口映射信息:python
eureka1:8761 | eureka2:8762mysql
config-server:8888git
shopping-product:11100github
shopping-order:11110web
api-gateway:8080redis
open-api:8081算法
https://github.com/lizzie2008/spring-cloud-app.gitspring
<groupId>tech.lancelot</groupId> <artifactId>spring-cloud-app</artifactId> <version>0.0.1-SNAPSHOT</version> <name>spring-cloud-app</name> <description>Demo project for Spring Cloud</description> <packaging>pom</packaging>
由於Module做爲子項目,咱們改寫下對應的POM文件。
<parent> <groupId>tech.lancelot</groupId> <artifactId>spring-cloud-app</artifactId> <version>0.0.1-SNAPSHOT</version> </parent> <groupId>tech.lancelot</groupId> <artifactId>eureka-server</artifactId> <version>0.0.1-SNAPSHOT</version> <name>eureka-server</name> <description>Registry Center</description> <dependencies> <dependency> <groupId>org.springframework.cloud</groupId> <artifactId>spring-cloud-starter-netflix-eureka-server</artifactId> </dependency> </dependencies>
從新Build一下項目,能正常編譯。可是此時Eureka Server是不能正常啓動工做的,須要在application類增長
@EnableEurekaServer
。
此時,咱們再運行Eureka Server,發現能夠正常啓動服務註冊服務器,服務端口8080,註冊地址:http://localhost:8761/eureka/。
eureka: client: fetch-registry: false #設置不從註冊中心獲取註冊信息 register-with-eureka: false #設置自身不做爲客戶端註冊到註冊中心 spring: application: name: eureka-server #應用名稱 server: port: 8761 #應用服務端口
一樣,咱們修改POM文件,依賴於父項目,注意這裏須要引入eureka-client
和spring-boot-starter-web
依賴。
<parent> <groupId>tech.lancelot</groupId> <artifactId>spring-cloud-app</artifactId> <version>0.0.1-SNAPSHOT</version> </parent> <groupId>tech.lancelot</groupId> <artifactId>shopping-provider</artifactId> <version>0.0.1-SNAPSHOT</version> <name>shopping-provider</name> <description>shopping service provider</description> <dependencies> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-web</artifactId> </dependency> <dependency> <groupId>org.springframework.cloud</groupId> <artifactId>spring-cloud-starter-netflix-eureka-client</artifactId> </dependency> </dependencies>
須要在application類增長@EnableDiscoveryClient
,同時修改配置文件。
eureka: client: serviceUrl: defaultZone: http://localhost:8761/eureka/ #指定服務註冊地址 spring: application: name: shopping-provider #應用名稱
重啓Eureka Client,啓動後再次訪問Eureka Server管理界面,能夠發現order-provider服務已註冊。
以前咱們的Eureka Server是單點服務,實際生產中,常常是多臺註冊中心,所以咱們嘗試下配置2臺註冊中心。
啓動服務器實例1:
eureka: client: # fetch-registry: false #設置不從註冊中心獲取註冊信息 # register-with-eureka: false #設置自身不做爲客戶端註冊到註冊中心 defaultZone: http://localhost:8762/eureka/ #指定服務註冊地址 spring: application: name: eureka-server1 #應用名稱 server: port: 8761 #應用服務端口
啓動服務器實例2:
eureka: client: # fetch-registry: false #設置不從註冊中心獲取註冊信息 # register-with-eureka: false #設置自身不做爲客戶端註冊到註冊中心 defaultZone: http://localhost:8761/eureka/ #指定服務註冊地址 spring: application: name: eureka-server2 #應用名稱 server: port: 8762 #應用服務端口
重啓2臺註冊中心,啓動後分別訪問2臺的管理界面,能夠看到2臺註冊中心已經相互註冊。
項目增長2個服務模塊,並向Eureka Server註冊:shopping-product(商品服務)、shopping-order(訂單服務),實現相應業務邏輯,這部分詳細實現再也不闡述。
總體項目結構以下:
spring-cloud-app
--eureka-server(服務註冊中心)
--shopping-common(購物公共模塊)
--shopping-product(商品服務模塊)
--shopping-order(訂單服務模塊)
系統架構如圖,比較簡單,一個集羣服務中心,目前有2個服務提供並註冊:
Spring Cloud Ribbon 是一個客戶端的負載均衡器,它提供對大量的HTTP和TCP客戶端的訪問控制。
客戶端負載均衡便是當瀏覽器向後臺發出請求的時候,客戶端會向 Eureka Server 讀取註冊到服務器的可用服務信息列表,而後根據設定的負載均衡策略(沒有設置即用默認的),抉擇出向哪臺服務器發送請求。
假設有如下業務場景,shopping-order模塊須要調用shopping-product提供的API接口。咱們看如何實現。
第一種方法使用構造RestTemplate,調用遠程API,這種方法url是寫死,若是啓動多臺shopping-product服務的話,那又該如何?
@Test void getProductByRestTemplate() { //1.第一種方法 RestTemplate restTemplate = new RestTemplate(); String response = restTemplate.getForObject("http://localhost:11100/api/products", String.class); Assert.hasLength(response,"未獲取內容"); }
第二種方法:咱們啓動2個shopping-product服務實例,分別是11100端口和9001端口,運行測試發現,會根據loadBalancerClient負載均衡機制幫咱們選擇一個服務地址,進行訪問調用。
@Autowired private LoadBalancerClient loadBalancerClient; @Test void getProductByLoadBalance(){ //2.第二種方法,先獲取負載均衡的地址再調用API ServiceInstance instance = loadBalancerClient.choose("shopping-product"); String url=String.format("http://%s:%s/api/products",instance.getHost(),instance.getPort()); RestTemplate restTemplate = new RestTemplate(); String response = restTemplate.getForObject(url, String.class); log.info("port:"+instance.getPort()+response); }
但這樣依舊非常麻煩,接下來看第三種方法。第三種方法屏蔽了API的具體url信息,只用ServerId,並根據負載均衡規則,自動路由到對應的地址。
由於eureka包中已經添加了對Ribbon的依賴,咱們能夠增長斷點,調試程序,發現進入RibbonLoadBalancerClient-->choose方法,返回負載均衡策略選擇的ServiceInstance。
@Component public class RestTemplateConfiguration { @Bean @LoadBalanced public RestTemplate restTemplate() { return new RestTemplate(); } } @SpringBootTest @Slf4j class OrderServiceTest { @Autowired private RestTemplate restTemplate; @Test void getProductByServerId() { String response = restTemplate.getForObject("http://shopping-product/api/products", String.class); log.info(response); } }
固然,咱們也能夠指定應用服務的負載均衡策略:
shopping-order: ribbon: NFLoadBalancerRuleClassName: com.netflix.loadbalancer.RandomRule
目前系統架構如圖,實現shopping-product和shopping-order集羣化部署,調用方式經過客戶端負載均衡,來路由消費端的請求。
Feign是一個聲明式的Web Service客戶端,它的目的就是讓Web Service調用更加簡單。Feign提供了HTTP請求的模板,經過編寫簡單的接口和插入註解,就能夠定義好HTTP請求的參數、格式、地址等信息。
而Feign則會徹底代理HTTP請求,咱們只須要像調用方法同樣調用它就能夠完成服務請求及相關處理。Feign整合了Ribbon和Hystrix(關於Hystrix咱們後面再講),可讓咱們再也不須要顯式地使用這兩個組件。
總起來講,Feign具備以下特性:
shopping-product服務提供端暴露API。
@GetMapping("/productInfos") public List<ProductInfoOutput> findProductInfosByIds(@RequestParam(required = false) String productIds) throws Exception { //若是傳入商品id參數 if (StringUtils.isNotEmpty(productIds)) { List<String> ids = Arrays.asList(productIds.split(",")); List<ProductInfo> productInfos = productService.findProductInfosByIds(ids); List<ProductInfoOutput> productInfoOutputs = ListUtils.copyProperties(productInfos, ProductInfoOutput.class); return productInfoOutputs; }else{ List<ProductInfo> productInfos = productService.findProductInfos(); List<ProductInfoOutput> productInfoOutputs = ListUtils.copyProperties(productInfos, ProductInfoOutput.class); return productInfoOutputs; } }
shopping-order模塊須要調用shopping-product接口,首先咱們在服務調用端增長Maven依賴
<dependency> <groupId>org.springframework.cloud</groupId> <artifactId>spring-cloud-starter-feign</artifactId> </dependency>
啓動類標註開啓Feign服務
@SpringBootApplication @EnableDiscoveryClient @EnableFeignClients public class ShoppingOrderApplication { public static void main(String[] args) { SpringApplication.run(ShoppingOrderApplication.class,args); } }
/** * 聲明式服務 */ @FeignClient("shopping-product/api/v1") public interface ProductClient { @GetMapping("/productInfos") List<ProductInfoOutput> findProductInfosByIds(@RequestParam(required = false) String productIds); }
@FeignClient(「服務名稱」)映射服務調用,本質仍是http請求,只不過Feign幫咱們屏蔽了底層的請求路由,對開發者徹底透明,使得調用遠程服務感受跟調用本地服務一致的編碼體驗。
本地調用測試,能夠正常返回接口數據。
@GetMapping("/orders/findProductInfosByIds") public List<ProductInfoOutput> findProductInfosByIds(){ List<ProductInfoOutput> productInfoOutputs = productClient .findProductInfosByIds("157875196366160022, 157875227953464068"); return productInfoOutputs; }
在實現負載均衡基礎上,封裝聲明式服務調用。實現shopping-order對shopping-product的透明調用,系統架構如圖以下。
上個環境中,咱們有2個服務提供者,首先看下各自的配置,能夠發現很大一部分都是重複的。
若是微服務架構中沒有使用統一配置中心時,所存在的問題:
eureka: client: serviceUrl: defaultZone: http://localhost:8761/eureka/ #指定服務註冊地址 spring: application: name: shopping-order #應用名稱 datasource: driver-class-name: com.mysql.cj.jdbc.Driver username: root password: 123456 url: jdbc:mysql://localhost:3306/spring_cloud_app?characterEncoding=utf-8&useSSL=false&serverTimezone=UTC jpa: show-sql: true database-platform: org.hibernate.dialect.MySQLDialect server: port: 11110
對於一些簡單的項目來講,咱們通常都是直接把相關配置放在單獨的配置文件中,以 properties 或者 yml 的格式出現,更省事兒的方式是直接放到 application.properties 或 application.yml 中。在集羣部署狀況下,咱們嘗試來實現配置的集中管理,並支持配置的動態刷新。
一樣,咱們做爲子項目,修改相關依賴,加入對spring-cloud-config-server依賴
<modelVersion>4.0.0</modelVersion> <parent> <groupId>tech.lancelot</groupId> <artifactId>spring-cloud-app</artifactId> <version>0.0.1-SNAPSHOT</version> </parent> <artifactId>config-server</artifactId> <packaging>jar</packaging> <dependencies> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-web</artifactId> </dependency> <!-- spring cloud config 服務端包 --> <dependency> <groupId>org.springframework.cloud</groupId> <artifactId>spring-cloud-config-server</artifactId> </dependency> </dependencies>
spring: application: name: config-server # 應用名稱 cloud: config: server: git: uri: https://github.com/lizzie2008/Central-Configuration.git #配置文件所在倉庫 username: 'Github username' password: 'Github password' default-label: master #配置文件分支 search-paths: spring-cloud-app #配置文件所在根目錄 server: port: 8888
@EnableConfigServer
@EnableConfigServer @SpringBootApplication public class ConfigServerApplication { public static void main(String[] args) { SpringApplication.run(ConfigServerApplication.class, args); } }
Spring Cloud Config 有它的一套訪問規則,咱們經過這套規則在瀏覽器上直接訪問就能夠。
/{application}/{profile}[/{label}] /{application}-{profile}.yml /{label}/{application}-{profile}.yml /{application}-{profile}.properties /{label}/{application}-{profile}.properties
{application} 就是應用名稱,對應到配置文件上來,就是配置文件的名稱部分,例如我上面建立的配置文件。
{profile} 就是配置文件的版本,咱們的項目有開發版本、測試環境版本、生產環境版本,對應到配置文件上來就是以 application-{profile}.yml 加以區分,例如application-dev.yml、application-sit.yml、application-prod.yml。
{label} 表示 git 分支,默認是 master 分支,若是項目是以分支作區分也是能夠的,那就能夠經過不一樣的 label 來控制訪問不一樣的配置文件了。
咱們在git項目中,新建spring-cloud-app/config-eureka-server.yml配置文件,而後訪問配置中心服務器,看看能正常獲取配置文件。
config-server自己做爲一個服務,也能夠做爲服務提供方,向服務中心註冊,其餘的服務想要獲取配置文件,只須要經過服務名稱就會訪問。
<dependency> <groupId>org.springframework.cloud</groupId> <artifactId>spring-cloud-starter-netflix-eureka-client</artifactId> </dependency>
@EnableDiscoveryClient
註解@EnableConfigServer @EnableDiscoveryClient @SpringBootApplication public class ConfigServerApplication { public static void main(String[] args) { SpringApplication.run(ConfigServerApplication.class, args); } }
eureka: client: serviceUrl: defaultZone: http://eureka1:8761/eureka/,http://eureka2:8762/eureka/ #指定服務註冊地址
eureka: client: serviceUrl: defaultZone: http://eureka1:8761/eureka/,http://eureka2:8762/eureka/ #指定服務註冊地址 spring: application: name: shopping-product #應用名稱 cloud: config: discovery: enabled: true service-id: config-server
spring: datasource: driver-class-name: com.mysql.cj.jdbc.Driver username: root password: 123456 url: jdbc:mysql://localhost:3306/spring_cloud_app?characterEncoding=utf-8&useSSL=false&serverTimezone=UTC jpa: show-sql: true database-platform: org.hibernate.dialect.MySQLDialect server: port: 11100
shopping-product.yml
增長一個配置屬性來進行測試env: dev
@RestController @RefreshScope @RequestMapping("api/env") public class EnvController { @Value("${env}") private String env; @RequestMapping public String printEnv() { return env; } }
訪問http://localhost:11100/api/env,返回當前的值dev。
Spring Cloud Config 在項目啓動時加載配置內容這一機制,可是若是咱們修改配置文件內容後,不會自動刷新。例如咱們上面的項目,當服務已經啓動的時候,去修改 github 上的配置文件內容,這時候,再次刷新頁面,對不起,仍是舊的配置內容,新內容不會主動刷新過來。那應該怎麼去觸發配置信息的動態刷新呢?
它提供了一個刷新機制,可是須要咱們主動觸發。那就是 @RefreshScope 註解並結合 actuator ,注意要引入 spring-boot-starter-actuator 包。
<dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-actuator</artifactId> </dependency>
@RefreshScope
註解management: endpoints: web: exposure: include: "*"
[ "config.client.version", "env" ]
每次改了配置後,就用 postman 訪問一下 refresh 接口,仍是不夠方便。 github 提供了一種 webhook 的方式,當有代碼變動的時候,會調用咱們設置的地址,來實現咱們想達到的目的。
填上回調的地址
也就是上面提到的 actuator/refresh 這個地址,可是必須保證這個地址是能夠被 github 訪問到的。這樣每當github上修改了配置文件,就自動通知對應的hook地址自動刷新。
總體項目結構以下:
spring-cloud-app
--config-server(統一配置中心)
--eureka-server(服務註冊中心)
--shopping-common(購物公共模塊)
--shopping-product(商品服務模塊)
--shopping-order(訂單服務模塊)
更新系統架構,新建config-server節點,也向eureka-server註冊,相關服務註冊節點根據配置實例名稱,路由到config-server節點,動態的加載配置。
一、異步處理
好比用戶在電商網站下單,下單完成後會給用戶推送短信或郵件,發短信和郵件的過程就能夠異步完成。由於下單付款是核心業務,發郵件和短信並不屬於核心功能,而且可能耗時較長,因此針對這種業務場景能夠選擇先放到消息隊列中,有其餘服務來異步處理。
二、應用解耦:
假設公司有幾個不一樣的系統,各系統在某些業務有聯動關係,好比 A 系統完成了某些操做,須要觸發 B 系統及 C 系統。若是 A 系統完成操做,主動調用 B 系統的接口或 C 系統的接口,能夠完成功能,可是各個系統之間就產生了耦合。用消息中間件就能夠完成解耦,當 A 系統完成操做將數據放進消息隊列,B 和 C 系統去訂閱消息就能夠了。這樣各系統只要約定好消息的格式就行了。
三、流量削峯
好比秒殺活動,一會兒進來好多請求,有的服務可能承受不住瞬時高併發而崩潰,因此針對這種瞬時高併發的場景,在中間加一層消息隊列,把請求先入隊列,而後再把隊列中的請求平滑的推送給服務,或者讓服務去隊列拉取。
四、日誌處理
kafka 最開始就是專門爲了處理日誌產生的。
當碰到上面的幾種狀況的時候,就要考慮用消息隊列了。若是你碰巧使用的是 RabbitMQ 或者 kafka ,並且一樣也是在使用 Spring Cloud ,那能夠考慮下用 Spring Cloud Stream。Spring Cloud Stream 是消息中間件組件,它集成了 kafka 和 rabbitmq ,本文以rabbitmq 爲例。
分析目前shopping-order項目中,建立訂單的代碼以下:
/** * 建立訂單 * */ @Transactional public String Create(OrderInput orderInput) throws Exception { //扣庫存 ResultVo result1=productClient.decreaseStock(orderInput.getOrderItemInputs()); if (result1.getCode() != 0) throw new Exception("調用訂單扣減庫存接口出錯:" + result1.getMsg()); //構建訂單主表 OrderMaster orderMaster = new OrderMaster(); BeanUtils.copyProperties(orderInput, orderMaster); //指定默認值 orderMaster.setOrderId(KeyUtil.genUniqueKey("OM")); orderMaster.setOrderStatus(OrderStatus.NEW); orderMaster.setPayStatus(PayStatus.WAIT); //構建訂單明細 List<String> productIds = orderInput.getOrderItemInputs().stream().map(OrderItemInput::getProductId).collect(Collectors.toList()); ResultVo<List<ProductInfoOutput>> result2 = productClient.findProductInfosByIds(String.join(",", productIds)); if (result2.getCode() != 0) throw new Exception("調用訂單查詢接口出錯:" + result2.getMsg()); List<ProductInfoOutput> productInfoOutputs = result2.getData(); //訂單金額總計 BigDecimal total = new BigDecimal(BigInteger.ZERO); for (OrderItemInput orderItemInput : orderInput.getOrderItemInputs()) { OrderDetail orderDetail = new OrderDetail(); BeanUtils.copyProperties(orderItemInput, orderDetail); Optional<ProductInfoOutput> productInfoOutputOptional = productInfoOutputs.stream() .filter(s -> s.getProductId().equals(orderItemInput.getProductId())).findFirst(); if (!productInfoOutputOptional.isPresent()) throw new Exception(String.format("商品【%s】不存在", orderItemInput.getProductId())); ProductInfoOutput productInfoOutput = productInfoOutputOptional.get(); orderDetail.setDetailId(KeyUtil.genUniqueKey("OD")); orderDetail.setOrderId(orderMaster.getOrderId()); orderDetail.setProductName(productInfoOutput.getProductName()); orderDetail.setProductPrice(productInfoOutput.getProductPrice().multiply(new BigDecimal(orderDetail.getProductQuantity()))); orderDetail.setProductIcon(productInfoOutput.getProductIcon()); total = total.add(orderDetail.getProductPrice()); orderDetailRepository.save(orderDetail); } orderMaster.setOrderAmount(total); orderMasterRepository.save(orderMaster); return orderMaster.getOrderId(); }
建立訂單的同時,先調用商品接口扣減庫存,若是佔用庫存成功,再生成訂單。這樣的話,生成訂單的操做和佔用商品庫存的操做實際上是耦合在一塊兒的。在實際電商高併發、高流量的狀況下,咱們不多這麼作。因此,咱們要將業務解耦,實現訂單和扣減庫存的異步處理。
大致思路以下:生成訂單==》通知商品調用庫存==》商品佔用庫存==》通知訂單佔用成功==》更新訂單佔用庫存狀態
shopping-order、shopping-product項目中
<dependency> <groupId>org.springframework.cloud</groupId> <artifactId>spring-cloud-starter-stream-rabbit</artifactId> </dependency>
spring: rabbitmq: host: aliyun.host port: 5672 username: guest password: guest
public interface StreamClient { String INPUT = "myMessage"; @Input(StreamClient.INPUT) SubscribableChannel input(); @Output(StreamClient.INPUT) MessageChannel output(); }
@Component @EnableBinding(StreamClient.class) @Slf4j public class StreamReceiver { @StreamListener(value = StreamClient.INPUT) public void process(OrderInput orderInput) { log.info("StreamReceiver: {}", orderInput); } }
@RestController @RequestMapping("api/v1/stream") @Slf4j public class StreamController { private final StreamClient streamClient; @Autowired public StreamController(StreamClient streamClient) { this.streamClient = streamClient; } @GetMapping("/sendMessage") public void sendMessage() { OrderInput orderInput=new OrderInput(); orderInput.setBuyerName("小王"); orderInput.setBuyerPhone("15011111111"); orderInput.setBuyerAddress("姥姥家"); orderInput.setBuyerOpenid("11111"); streamClient.output().send(MessageBuilder.withPayload(orderInput).build()); } }
啓動應用程序,測試發送接口,發現spring-cloud-stream幫咱們自動建立了一個隊列,消息發送到這個隊列,而後被接收端消費。
此時,若是咱們啓動多個shopping-product服務實例,會有個問題,若是發送端發送一條消息,會被2個實例同時消費,在正常的業務中,這種狀況是應該避免的。因此咱們須要對消息進行分組,在application.yml中增長以下配置,保證只有一個服務實例來消費。
spring: rabbitmq: host: aliyun.host port: 5672 username: guest password: guest cloud: stream: bindings: myMessage: group: shopping-order content-type: application/json
shopping-order做爲庫存佔用命令的消息發送者,首先向shopping-product發送消息stock_apply(佔用庫存申請),shopping-product接收此消息進行庫存處理,而後將庫存佔用處理的結果做爲消息stock_result(佔用庫存結果)發送,shopping-order端再收到結果消息對訂單狀態進行更新。
spring: cloud: stream: bindings: stock_apply_output: #佔用庫存申請 destination: stock.apply stock_result_input: #佔用庫存結果 destination: stock.result group: shopping-order
spring: cloud: stream: bindings: stock_apply_input: #佔用庫存申請 destination: stock.apply group: shopping-product stock_result_output: #佔用庫存結果 destination: stock.result
public interface OrderStream { String STOCK_APPLY_OUTPUT = "stock_apply_output"; @Output(OrderStream.STOCK_APPLY_OUTPUT) MessageChannel stockApplyOutput(); String STOCK_RESULT_INPUT = "stock_result_input"; @Input(OrderStream.STOCK_RESULT_INPUT) SubscribableChannel stockResultInput(); }
public interface ProductStream { String STOCK_APPLY_INPUT = "stock_apply_input"; @Input(ProductStream.STOCK_APPLY_INPUT) SubscribableChannel stockApplyInput(); String STOCK_RESULT_OUTPUT = "stock_result_output"; @Output(ProductStream.STOCK_RESULT_OUTPUT) MessageChannel stockResultOutput(); }
/** * 建立訂單 */ @Transactional public String Create(OrderInput orderInput) throws Exception { //構建訂單主表 OrderMaster orderMaster = new OrderMaster(); BeanUtils.copyProperties(orderInput, orderMaster); //指定默認值 orderMaster.setOrderId(KeyUtil.genUniqueKey("OM")); orderMaster.setOrderStatus(OrderStatus.NEW); orderMaster.setPayStatus(PayStatus.WAIT); //構建訂單明細 List<String> productIds = orderInput.getOrderItemInputs().stream().map(OrderItemInput::getProductId).collect(Collectors.toList()); ResultVo<List<ProductInfoOutput>> result2 = productClient.findProductInfosByIds(String.join(",", productIds)); if (result2.getCode() != 0) throw new Exception("調用訂單查詢接口出錯:" + result2.getMsg()); List<ProductInfoOutput> productInfoOutputs = result2.getData(); //訂單金額總計 BigDecimal total = new BigDecimal(BigInteger.ZERO); for (OrderItemInput orderItemInput : orderInput.getOrderItemInputs()) { OrderDetail orderDetail = new OrderDetail(); BeanUtils.copyProperties(orderItemInput, orderDetail); Optional<ProductInfoOutput> productInfoOutputOptional = productInfoOutputs.stream() .filter(s -> s.getProductId().equals(orderItemInput.getProductId())).findFirst(); if (!productInfoOutputOptional.isPresent()) throw new Exception(String.format("商品【%s】不存在", orderItemInput.getProductId())); ProductInfoOutput productInfoOutput = productInfoOutputOptional.get(); orderDetail.setDetailId(KeyUtil.genUniqueKey("OD")); orderDetail.setOrderId(orderMaster.getOrderId()); orderDetail.setProductName(productInfoOutput.getProductName()); orderDetail.setProductPrice(productInfoOutput.getProductPrice().multiply(new BigDecimal(orderDetail.getProductQuantity()))); orderDetail.setProductIcon(productInfoOutput.getProductIcon()); total = total.add(orderDetail.getProductPrice()); orderDetailRepository.save(orderDetail); } orderMaster.setOrderAmount(total); orderMasterRepository.save(orderMaster); //扣庫存 StockApplyInput stockApplyInput = new StockApplyInput(); stockApplyInput.setOrderId(orderMaster.getOrderId()); stockApplyInput.setOrderItemInputs(orderInput.getOrderItemInputs()); orderStream.stockApplyOutput().send(MessageBuilder.withPayload(stockApplyInput).build()); return orderMaster.getOrderId(); }
@Service @Slf4j @EnableBinding(ProductStream.class) public class ProductService { private final ProductInfoRepository productInfoRepository; private final ProductCategoryRepository productCategoryRepository; @Autowired public ProductService(ProductInfoRepository productInfoRepository, ProductCategoryRepository productCategoryRepository) { this.productInfoRepository = productInfoRepository; this.productCategoryRepository = productCategoryRepository; } /** * 扣減庫存 * */ @Transactional @StreamListener(ProductStream.STOCK_APPLY_INPUT) @SendTo(ProductStream.STOCK_RESULT_OUTPUT) public StockResultOutput processStockApply(StockApplyInput stockApplyInput) throws Exception { log.info("佔用庫存消息被消費..."); StockResultOutput stockResultOutput = new StockResultOutput(); stockResultOutput.setOrderId(stockApplyInput.getOrderId()); try { for (OrderItemInput orderItemInput : stockApplyInput.getOrderItemInputs()) { Optional<ProductInfo> productInfoOptional = productInfoRepository.findById(orderItemInput.getProductId()); if (!productInfoOptional.isPresent()) throw new Exception("商品不存在."); ProductInfo productInfo = productInfoOptional.get(); int result = productInfo.getProductStock() - orderItemInput.getProductQuantity(); if (result < 0) throw new Exception("商品庫存不知足."); productInfo.setProductStock(result); productInfoRepository.save(productInfo); } stockResultOutput.setIsSuccess(true); stockResultOutput.setMessage("OK"); return stockResultOutput; } catch (Exception e) { stockResultOutput.setIsSuccess(false); stockResultOutput.setMessage(e.getMessage()); return stockResultOutput; } } }
@StreamListener(OrderStream.STOCK_RESULT_INPUT) public void processStockResult(StockResultOutput stockResultOutput) { log.info("庫存消息返回" + stockResultOutput); Optional<OrderMaster> optionalOrderMaster = orderMasterRepository.findById(stockResultOutput.getOrderId()); if (optionalOrderMaster.isPresent()) { OrderMaster orderMaster = optionalOrderMaster.get(); if (stockResultOutput.getIsSuccess()) { orderMaster.setOrderStatus(OrderStatus.OCCUPY_SUCCESS); } else { orderMaster.setOrderStatus(OrderStatus.OCCUPY_FAILURE); } orderMasterRepository.save(orderMaster); } }
執行調試結果,跟蹤執行結果:生成訂單同時發送庫存申請命令,商品模塊處理庫存申請成功後,返回庫存佔用結果告知訂單模塊,從而實現訂單生成和商品庫存佔用的邏輯的解耦。
在原有的架構基礎上,咱們對商品和訂單服務進行了應用解耦,庫存佔用邏輯異步化,經過消息隊列傳遞消息,並結合spring cloud stream對消息input和output綁定,使得在程序中很方便的進行消息發送和接收處理。
Zuul是Netflix開源的微服務網關,能夠和Eureka、Ribbon、Hystrix等組件配合使用,Spring Cloud對Zuul進行了整合與加強,Zuul默認使用的HTTP客戶端是Apache HTTPClient,也可使用RestClient或okhttp3.OkHttpClient。 Zuul的主要功能是路由轉發和過濾器。zuul默認和Ribbon結合實現了負載均衡的功能
zuul的核心是一系列的filters, 其做用類比Servlet框架的Filter,或者AOP。zuul把請求路由到用戶處理邏輯的過程當中,這些filter參與一些過濾處理,好比Authentication,Load Shedding等
Zuul使用一系列不一樣類型的過濾器,使咱們可以快速靈活地將功能應用於咱們的邊緣服務。這些過濾器可幫助咱們執行如下功能:
<dependencies> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-web</artifactId> </dependency> <dependency> <groupId>org.springframework.cloud</groupId> <artifactId>spring-cloud-config-client</artifactId> </dependency> <dependency> <groupId>org.springframework.cloud</groupId> <artifactId>spring-cloud-starter-netflix-eureka-client</artifactId> </dependency> <dependency> <groupId>org.springframework.cloud</groupId> <artifactId>spring-cloud-starter-netflix-zuul</artifactId> </dependency> </dependencies>
EnableDiscoveryClient
和@EnableZuulProxy
註解。@EnableDiscoveryClient @EnableZuulProxy @SpringBootApplication public class ApiGatewayApplication { public static void main(String[] args) { SpringApplication.run(ApiGatewayApplication.class, args); } }
默認的路由規則是按照服務的名稱來路由服務,固然咱們也能夠自定義。在zuul中,路由匹配的路徑表達式採用ant風格定義
通配符 | 說明 |
---|---|
? | 匹配任意單個字符 |
* | 匹配任意數量的字符 |
** | 匹配任意數量的字符,支持多級目錄 |
zuul: routes: # 簡潔寫法 shopping-product: /product/**
<dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-actuator</artifactId> </dependency>
management: endpoints: web: exposure: include: "*"
zuul: routes: # 簡潔寫法 shopping-product: /product/** # 排除某些路由 ignored-patterns: - /**/productInfos
這樣咱們再訪問這個接口時,就提示 Not Found 錯誤了
默認狀況下,spring cloud zuul在請求路由時,會過濾掉http請求頭信息中一些敏感信息,防止它們被傳遞到下游的外部服務器。默認的敏感頭信息經過zuul.sensitiveHeaders參數定義,默認包括cookie,set-Cookie,authorization三個屬性。因此,咱們在開發web項目時經常使用的cookie在spring cloud zuul網關中默認時不傳遞的,這就會引起一個常見的問題,若是咱們要將使用了spring security,shiro等安全框架構建的web應用經過spring cloud zuul構建的網關來進行路由時,因爲cookie信息沒法傳遞,咱們的web應用將沒法實現登陸和鑑權。有時候,針對某些路由,咱們須要傳遞這個cookie。
zuul: routes: # 徹底寫法 product-route: path: /product/** serviceId: shopping-product # 將指定路由的敏感頭設置爲空 sensitiveHeaders:
以前路由的配置都是寫在配置文件中,若是路由規則變化之後,須要重啓網關服務。可是實際生產環境,通常都須要動態的加載路由的配置,不能輕易重啓網關服務。
eureka: client: serviceUrl: defaultZone: http://eureka1:8761/eureka/,http://eureka2:8762/eureka/ #指定服務註冊地址 spring: application: name: api-gateway #應用名稱 cloud: config: discovery: enabled: true service-id: config-server
@RefreshScope
註解@Component public class ZuulConfiguration { @ConfigurationProperties("zuul") @RefreshScope public ZuulProperties zuulProperties(){ return new ZuulProperties(); } }
設想如下場景:咱們須要判斷用戶請求的參數是否包含認證信息,若是包含token信息,則能夠訪問,不然禁止訪問。能夠用Zuul Filter很方便的實如今網關端,統一進行認證。
PRE_TYPE
PRE_DECORATION_FILTER_ORDER-1
true
/** * 驗證token 過濾器 */ @Component public class TokenFilter extends ZuulFilter { @Override public String filterType() { return PRE_TYPE; } @Override public int filterOrder() { return 0; } @Override public boolean shouldFilter() { return true; } @Override public Object run() throws ZuulException { RequestContext currentContext = RequestContext.getCurrentContext(); HttpServletRequest request = currentContext.getRequest(); //測試在url參數中獲取token String token = request.getParameter("token"); if(StringUtils.isEmpty(token)){ currentContext.setSendZuulResponse(false); currentContext.setResponseStatusCode(HttpStatus.UNAUTHORIZED.value()); } return null; } }
@Component public class AddResHeaderFilter extends ZuulFilter{ @Override public String filterType() { return POST_TYPE; } @Override public int filterOrder() { return SEND_RESPONSE_FILTER_ORDER - 1; } @Override public boolean shouldFilter() { return true; } @Override public Object run() { RequestContext requestContext = RequestContext.getCurrentContext(); HttpServletResponse response = requestContext.getResponse(); response.setHeader("X-Foo", UUID.randomUUID().toString()); return null; } }
這裏介紹一種限流的設計方案:
對於不少應用場景來講,除了要求可以限制數據的平均傳輸速率外,還要求容許某種程度的突發傳輸。這時候漏桶算法可能就不合適了,令牌桶算法更爲適合。如圖所示,令牌桶算法的原理是系統會以一個恆定的速度往桶裏放入令牌,而若是請求須要被處理,則須要先從桶裏獲取一個令牌,當桶裏沒有令牌可取時,則拒絕服務。
Google公司已經實現了上述的令牌桶的算法,直接使用 RateLimiter 就能夠經過Zuul實現限流的功能:
@Component public class RateLimitFilter extends ZuulFilter { private static final RateLimiter RATE_LIMITER = RateLimiter.create(100); @Override public String filterType() { return PRE_TYPE; } @Override public int filterOrder() { return SERVLET_DETECTION_FILTER_ORDER - 1; } @Override public boolean shouldFilter() { return true; } @Override public Object run() { if (!RATE_LIMITER.tryAcquire()) { throw new RuntimeException("未能獲取到令牌."); } return null; } }
總體項目結構以下:
spring-cloud-app
--api-gateway(服務網關)
--config-server(統一配置中心)
--eureka-server(服務註冊中心)
--shopping-common(購物公共模塊)
--shopping-product(商品服務模塊)
--shopping-order(訂單服務模塊)
目前全部的客戶端請求,首先被髮送到統一網關服務處理,而後由網關進行限流、熔斷、權限驗證、記錄日誌等等,而後根據自定義的路由規則,再分發到不一樣的應用服務中去,應用服務器返回處理結果後,由網關統一返回給客戶端。
在分佈式環境中,許多服務依賴項中的一些必然會失敗。Hystrix是一個庫,經過添加延遲容忍和容錯邏輯,幫助你控制這些分佈式服務之間的交互。Hystrix經過隔離服務之間的訪問點、中止級聯失敗和提供回退選項來實現這一點,全部這些均可以提升系統的總體彈性。
在實際工做中,尤爲是分佈式、微服務愈來愈廣泛的今天,一個服務常常須要調用其餘的服務,即RPC調用,而調用最多的方式仍是經過http請求進行調用,這裏面就有一個問題了,若是調用過程當中,由於網絡等緣由,形成某個服務調用超時,若是沒有熔斷機制,此處的調用鏈路將會一直阻塞在這裏,在高併發的環境下,若是許多個請求都卡在這裏的話,服務器不得不爲此分配更多的線程來處理源源不斷涌入的請求。
更恐怖的是,若是這是一個多級調用,即此處的服務的調用結果還被其餘服務調用了,這就造成了所謂的雪崩效應,後果將不堪設想。所以,須要某種機制,在必定的異常接口調用出現的時候,可以自動發現這種異常,並快速進行服務降級。
/** * Hystrix 測試 */ @RestController @RequestMapping("api/hystrix") public class HystrixController { @GetMapping("/getProductEnv") public String getProductEnv() { RestTemplate restTemplate = new RestTemplate(); return restTemplate.postForObject("http://localhost:11100/api/env", null, String.class); } }
<dependency> <groupId>org.springframework.cloud</groupId> <artifactId>spring-cloud-starter-netflix-hystrix</artifactId> </dependency>
@EnableCircuitBreaker
註解,或者將@SpringBootApplication
、@EnableDiscoveryClient
、@EnableCircuitBreaker
三個合併成一個@SpringCloudApplication
註解。@EnableFeignClients(basePackages = "tech.lancelot.shoppingorder.client") //@SpringBootApplication //@EnableDiscoveryClient //@EnableCircuitBreaker @SpringCloudApplication public class ShoppingOrderApplication { public static void main(String[] args) { SpringApplication.run(ShoppingOrderApplication.class, args); } }
@HystrixCommand
註解,並指定調用方法失敗時的錯誤處理回調。也能夠爲整個類增長@DefaultProperties
註解,定義一個默認的返回方法/** * Hystrix 測試 */ @RestController @RequestMapping("api/hystrix") public class HystrixController { @HystrixCommand(fallbackMethod = "defaultFallback") @GetMapping("/getProductEnv") public String getProductEnv() { RestTemplate restTemplate = new RestTemplate(); return restTemplate.postForObject("http://localhost:11100/api/env", null, String.class); } // 默認服務不可達的返回信息 private String defaultFallback() { return "太擁擠了, 請稍後再試~~"; } }
若是咱們沒有配置默認的超時時間,Hystrix 將取 default_executionTimeoutInMilliseconds(1秒)做爲默認超時時間,也能夠自定義超時時間。
@HystrixCommand(commandProperties = { @HystrixProperty(name = "execution.isolation.thread.timeoutInMilliseconds",value = "3000")})
這樣的話,shopping-order調用遠程服務,超過3s以後,馬上返回錯誤處理,不會再阻塞。
hystrix: command: default: # 方法默認屬性 execution: isolation: thread: timeoutInMilliseconds: 1000 getProductEnv: # 該名稱方法屬性 execution: isolation: thread: timeoutInMilliseconds: 3000
若是某個目標服務調用慢或者有大量超時,此時,熔斷該服務的調用,對於後續調用請求,不在繼續調用目標服務,直接返回,快速釋放資源。若是目標服務狀況好轉則恢復調用。
熔斷器有三個狀態 CLOSED
、OPEN
、HALF_OPEN
熔斷器默認關閉狀態,當觸發熔斷(至少有 circuitBreaker.requestVolumeThreshold 個請求,錯誤率達到 circuitBreaker.errorThresholdPercentage)後狀態變動爲 OPEN
,在等待到指定的時間(circuitBreaker.sleepWindowInMilliseconds),Hystrix會放請求檢測服務是否開啓,這期間熔斷器會變爲HALF_OPEN
半開啓狀態,熔斷探測服務可用則繼續變動爲 CLOSED
關閉熔斷器。
@HystrixCommand(commandProperties = { @HystrixProperty(name = "circuitBreaker.enabled", value = "true"), //設置熔斷 @HystrixProperty(name = "circuitBreaker.requestVolumeThreshold", value = "10"),//請求數達到後才計算 @HystrixProperty(name = "circuitBreaker.sleepWindowInMilliseconds", value = "10000"), //休眠時間窗 @HystrixProperty(name = "circuitBreaker.errorThresholdPercentage", value = "60"), //錯誤率 })
Spring Coud 還給 Hytrix 提供了一個可視化的組件:
<dependency> <groupId>org.springframework.cloud</groupId> <artifactId>spring-cloud-starter-netflix-hystrix-dashboard</artifactId> </dependency>
@EnableHystrixDashboard
註解@EnableFeignClients(basePackages = "tech.lancelot.shoppingorder.client") //@SpringBootApplication //@EnableDiscoveryClient //@EnableCircuitBreaker @SpringCloudApplication @EnableHystrixDashboard public class ShoppingOrderApplication { public static void main(String[] args) { SpringApplication.run(ShoppingOrderApplication.class, args); } }
經過以上容錯方法的實現,就能夠構建更加穩定、可靠的分佈式系統:
微服務架構是一個分佈式架構,它按業務劃分服務單元,一個分佈式系統每每有不少個服務單元。因爲服務單元數量衆多,業務的複雜性,若是出現了錯誤和異常,很難去定位。主要體如今,一個請求可能須要調用不少個服務,而內部服務的調用複雜性,決定了問題難以定位。因此微服務架構中,必須實現分佈式鏈路追蹤,去跟進一個請求到底有哪些服務參與,參與的順序又是怎樣的,從而達到每一個請求的步驟清晰可見,出了問題,很快定位。
OpenTracing 是一個輕量級的標準化層,它位於應用程序/類庫和追蹤或日誌分析程序之間。
+-------------+ +---------+ +----------+ +------------+ | Application | | Library | | OSS | | RPC/IPC | | Code | | Code | | Services | | Frameworks | +-------------+ +---------+ +----------+ +------------+ | | | | | | | | v v v v +------------------------------------------------------+ | OpenTracing | +------------------------------------------------------+ | | | | | | | | v v v v +-----------+ +-------------+ +-------------+ +-----------+ | Tracing | | Logging | | Metrics | | Tracing | | System A | | Framework B | | Framework C | | System D | +-----------+ +-------------+ +-------------+ +-----------+
OpenTracing 的優點
OpenTracing 數據模型
OpenTracing 中的 Trace(調用鏈)經過歸屬於此調用鏈的 Span 來隱性的定義。
特別說明,一條 Trace(調用鏈)能夠被認爲是一個由多個 Span 組成的有向無環圖(DAG圖),Span 與 Span 的關係被命名爲 References。
例如:下面的示例 Trace 就是由8個 Span 組成:
單個 Trace 中,span 間的因果關係 [Span A] ←←←(the root span) | +------+------+ | | [Span B] [Span C] ←←←(Span C 是 Span A 的孩子節點, ChildOf) | | [Span D] +---+-------+ | | [Span E] [Span F] >>> [Span G] >>> [Span H] ↑ ↑ ↑ (Span G 在 Span F 後被調用, FollowsFrom)
有些時候,使用下面這種,基於時間軸的時序圖能夠更好的展示 Trace(調用鏈):
單個 Trace 中,span 間的時間關係 ––|–––––––|–––––––|–––––––|–––––––|–––––––|–––––––|–––––––|–> time [Span A···················································] [Span B··············································] [Span D··········································] [Span C········································] [Span E·······] [Span F··] [Span G··] [Span H··]
每一個 Span 包含如下的狀態:(譯者注:因爲這些狀態會反映在 OpenTracing API 中,因此會保留部分英文說明)
鍵值對中,鍵必須爲 string,值能夠是任意類型。
可是須要注意,不是全部的支持 OpenTracing 的 Tracer,都須要支持全部的值類型。
每個 SpanContext 包含如下狀態:
更多關於 OpenTracing 數據模型的知識,請參考 OpenTracing語義標準。
OpenTracing 實現
這篇文檔列出了全部 OpenTracing 實現。在這些實現中,比較流行的爲 Jaeger 和 Zipkin。
事件類型
cs ( Client Send ) :客戶端發起請求的時間
cr ( Client Received ) :客戶端收處處理完請求的時間。
ss ( Server Send ) :服務端處理完邏輯的時間。
sr ( Server Received ) :服務端收到調用端請求的時間。
客戶端調用時間=cr-cs
服務端處理時間=sr-ss
<dependency> <groupId>org.springframework.cloud</groupId> <artifactId>spring-cloud-starter-sleuth</artifactId> </dependency>
目前,鏈路追蹤組件有Google的Dapper,Twitter 的 Zipkin,以及阿里的Eagleeye (鷹眼)等,它們都是很是優秀的鏈路追蹤開源組件。本文主要講述如何在Spring Cloud Sleuth中集成Zipkin。
Zipkin Server主要包括四個模塊:
Collector 接收或收集各應用傳輸的數據
Storage 存儲接受或收集過來的數據,當前支持Memory,MySQL,Cassandra,ElasticSearch等,默認存儲在內存中。
API(Query) 負責查詢Storage中存儲的數據,提供簡單的JSON API獲取數據,主要提供給web UI使用
Web 提供簡單的web界面
首先,安裝 zipkin,爲了方便直接用 docker 進行安裝,具體詳見容器化部署章節,這裏再也不詳述。
引入sleuth-zipkin相關依賴,由於 starter-zipkin 已經包含 starter-sleuth 的依賴,因此能夠把原先的 sleuth依賴去掉。
<dependency> <groupId>org.springframework.cloud</groupId> <artifactId>spring-cloud-starter-zipkin</artifactId> </dependency>
spring: zipkin: base-url: http://zipkin:9411/
spring: sleuth: sampler: rate: 100 zipkin: base-url: http://zipkin:9411/
在服務調用的過程當中,經過Sleuth將鏈路信息(通過抽樣後的信息)統一上報給Zipkin,經過Zipkin就能夠集中查看和管理微服務架構中的調用鏈路信息,便於開發人員與運維人員跟蹤和調試問題。
[root@localhost ~]# yum install docker [root@localhost ~]# systemctrl enable docker #設置docker開機啓動 [root@localhost ~]# systemctrl start docker #啓動docker
{ "registry-mirrors": ["http://hub-mirror.c.163.com"], "registry-mirrors": ["https://njrds9qc.mirror.aliyuncs.com"] }
[root@localhost ~]# systemctl daemon-reload [root@localhost ~]# systemctl restart docker
[root@localhost ~]# docker -v Docker version 1.13.1, build 7f2769b/1.13.1
[root@localhost ~]# pip -V
[root@localhost ~]# yum -y install epel-release [root@localhost ~]# yum -y install python-pip [root@localhost ~]# pip install --upgrade pip
[root@localhost ~]# pip install docker-compose
[root@localhost ~]# pip install more-itertools==5.0.0
[root@localhost ~]# docker-compose -v docker-compose version 1.25.0, build b42d419
application.yml:
spring: application: name: eureka-server #應用名稱 profiles: active: peer1
application-peer1.yml:
eureka: client: service-url: defaultZone: http://peer2:8762/eureka/ #指定服務註冊地址 server: port: 8761 #應用服務端口
application-peer2.yml:
eureka: client: service-url: defaultZone: http://peer1:8761/eureka/ #指定服務註冊地址 server: port: 8762 #應用服務端口
FROM hub.c.163.com/library/java:8-alpine ADD target/*.jar app.jar EXPOSE 8761 EXPOSE 8762 ENTRYPOINT ["java","-jar","/app.jar"]
mvn clean package -Dmaven.test.skip=true -U docker build -t spring-cloud-app/eureka-server:v1 .
[root@localhost ~]# docker pull mysql:5.7
mysql: image: docker.io/mysql:5.7 hostname: mysql networks: - eureka-net ports: - "3306:3306" environment: MYSQL_ROOT_PASSWORD: "123456" volumes: - "./mysql/conf:/etc/mysql" - "./mysql/logs:/var/log/mysql" - "./mysql/data:/var/lib/mysql"
-management
表示有管理界面的,能夠瀏覽器訪問。5672是訪問端口,15672是管理端口。
[root@localhost ~]# docker run -d --hostname rabbitmq -p 5672:5672 -p 15672:15672 rabbitmq:3.8.2-management
訪問端口管理界面,輸入默認用戶名/密碼 :guest/guest
[root@localhost ~]# docker run -d -p 9411:9411 openzipkin/zipkin
docker-compose.yml:
version: "2" services: eureka1: image: spring-cloud-app/eureka-server:v1 hostname: eureka1 networks: - eureka-net ports: - "8761:8761" environment: - spring.profiles.active=peer1 eureka2: image: spring-cloud-app/eureka-server:v1 hostname: eureka2 networks: - eureka-net ports: - "8762:8762" environment: - spring.profiles.active=peer2 config-server: image: spring-cloud-app/config-server:v1 hostname: config-server networks: - eureka-net ports: - "8888:8888" mysql: image: docker.io/mysql:5.7 hostname: mysql networks: - eureka-net ports: - "3306:3306" environment: MYSQL_ROOT_PASSWORD: "123456" volumes: - "./mysql/conf:/etc/mysql" - "./mysql/logs:/var/log/mysql" - "./mysql/data:/var/lib/mysql" rabbitmq: image: docker.io/rabbitmq:3.8.2-management hostname: rabbitmq networks: - eureka-net ports: - "5672:5672" - "15672:15672" zipkin: image: docker.io/openzipkin/zipkin:2.19.2 hostname: zipkin networks: - eureka-net ports: - "9411:9411" networks: eureka-net: driver: bridge