學習一下 SpringCloud (三)-- 服務調用、負載均衡 Ribbon、OpenFeign

(1) 相關博文地址:html

學習一下 SpringCloud (一)-- 從單體架構到微服務架構、代碼拆分(maven 聚合): https://www.cnblogs.com/l-y-h/p/14105682.html
學習一下 SpringCloud (二)-- 服務註冊中心 Eureka、Zookeeper、Consul、Nacos :https://www.cnblogs.com/l-y-h/p/14193443.html

(2)代碼地址:java

https://github.com/lyh-man/SpringCloudDemo

 

1、引入 服務調用、負載均衡

一、問題 與 解決

【問題:】
    在上一篇中,介紹了 Eureka、Zookeeper、Consul 做爲註冊中心,並使用 RestTemplate 進行服務調用。 詳見:https://www.cnblogs.com/l-y-h/p/14193443.html
    那麼是如何進行負載均衡的呢?
    
【解決:】
    在 @Bean 聲明 RestTemplate 時,添加一個 @LoadBalanced,並使用 註冊中心中 的服務名 做爲 RestTemplate 的 URL 地址(ip、端口號)。
    就這麼簡單的兩步,便可實現了負載均衡。
    
    那麼這裏面又涉及到什麼技術知識呢?能不能更換負載均衡策略?能不能自定義負載均衡策略?
經常使用技術:
    Ribbon(維護狀態,替代產品爲 Loadbalancer)
    OpenFeign(推薦使用)
    
【說明:】
    此處以 Eureka 僞集羣版建立的幾個模塊做爲演示。代碼地址:https://github.com/lyh-man/SpringCloudDemo
    服務註冊中心:eureka_server_700一、eureka_server_700二、eureka_server_7003
    服務提供者:eureka_client_producer_800二、eureka_client_producer_800三、eureka_client_producer_8004
    服務消費者:eureka_client_consumer_9002 
注:
    主要仍是在 服務消費者 上配置負載均衡策略(能夠 Debug 模式啓動看看執行流程),其餘模塊直接啓動便可。

 

 

 

2、服務調用、負載均衡 -- Ribbon

一、什麼是 Ribbon?

【Ribbon:】
    Ribbon 是 Netflix 公司實現的一套基於 HTTP、TCP 的客戶端負載均衡的工具。
    SpringCloud 已將其集成到 spring-cloud-netflix 中,實現 SpringCloud 的服務調用、負載均衡。
    Ribbon 提供了多種方式進行負載均衡(默認輪詢),也能夠自定義負載均衡方法。
    
注:
    Ribbon 雖然已進入維護模式,可是一時半會還不容易被徹底淘汰,仍是能夠學習一下基本使用的。
    Ribbon 替代產品是 Loadbalancer。
    
【相關網址:】
    https://github.com/Netflix/ribbon
    http://jvm123.com/doc/springcloud/index.html#spring-cloud-ribbon

 

 

 

二、Ribbon 與 Nginx 負載均衡區別

【負載均衡(Load Balance):】
    負載均衡指的是 將工做任務 按照某種規則 平均分攤到 多個操做單元上執行。
注:
    Web 項目的負載均衡,能夠理解爲:將用戶請求 平均分攤到 多個服務器上處理,從而提升系統的併發度、可用性。

【負載均衡分類:】
按照軟硬件劃分:
    硬件負載均衡: 通常造價昂貴,但數據傳輸更加穩定。好比: F5 負載均衡。
    軟件負載均衡: 通常採用某個代理組件,並使用 某種 負載均衡 算法實現(一種消息隊列分發機制)。好比:Nginx、Ribbon。

按照負載均衡位置劃分:
    集中式負載均衡:提供一個 獨立的 負載均衡系統(能夠是軟件,好比:Nginx,能夠是硬件,好比:F5)。
        經過此係統,將服務消費者的 請求 經過某種負載均衡策略 轉發給 服務提供者。
        
    客戶端負載均衡(進程式負載均衡):將負載均衡邏輯整合到 服務消費者中,服務消費者 定時同步獲取到 服務提供者信息,並保存在本地。
        每次均從本地緩存中取得 服務提供者信息,並根據 某種負載均衡策略 將請求發給 服務提供者。
注:
    使用集中式負載均衡時,服務消費者 不知道 任何一個服務提供者的信息,只知道獨立負載均衡設備的信息。
    使用客戶端負載均衡時,服務消費者 知道 全部服務提供者的信息。

【Nginx 負載均衡:】
    Nginx 實現的是 集中式負載均衡,Nginx 接收 客戶端全部請求,並將請求轉發到不一樣的服務器進行處理。
    
【Ribbon 負載均衡:】
    Ribbon 實現的是 客戶端負載均衡,從註冊中心得到服務信息並緩存在本地,在本地進行 負載均衡。

 

三、更換 Ribbon 負載均衡規則(兩種方式)

(1)引入依賴git

【依賴:】
<dependency>
  <groupId>org.springframework.cloud</groupId>
  <artifactId>spring-cloud-starter-netflix-ribbon</artifactId>
</dependency>

  通常使用 Ribbon 時須要引入上述依賴,可是對於 eureka 來講,其 eureka-client 依賴中已經集成了 ribbon 依賴,因此無需再次引入。github

 

 

 

(2)Ribbon 提供的幾種負載均衡算法
  Ribbon 提供了 IRule 接口,經過其能夠設置並更換負載均衡規則。
  IRule 實質就是 根據某種負載均衡規則,從服務列表中選取一個須要訪問的服務。
  通常默認使用 ZoneAvoidanceRule + RoundRobinRule。web

【IRule 子類以下:】
RoundRobinRule   
    輪詢,按照服務列表順序 循環選擇服務。
    
RandomRule      
    隨機,隨機的從服務列表中選取服務。
    
RetryRule        
    重試,先按照輪詢策略獲取服務,若獲取失敗,則在指定時間進行重試,從新獲取可用服務。
    
WeightedResponseTimeRule   
    加權響應時間,響應時間越低(即響應時間快),權重越高,越容易被選擇。剛開始啓動時,使用輪詢策略。
    
BestAvailableRule          
    高可用,先過濾掉不可用服務(屢次訪問故障而處於斷路器跳閘的服務),選擇一個併發量最小的服務。

AvailabilityFilteringRule
    可用篩選,先過濾掉不可用服務 以及 併發量超過閾值的服務,對剩餘服務按輪詢策略訪問。

ZoneAvoidanceRule
    區域迴避,默認規則,綜合判斷服務所在區域的性能 以及 服務的可用性,過濾結果後採用輪詢的方式選擇結果。
    
【IRule:】
package com.netflix.loadbalancer;

public interface IRule {
    Server choose(Object var1);

    void setLoadBalancer(ILoadBalancer var1);

    ILoadBalancer getLoadBalancer();
}

 

 

 

以 Dubug 模式 啓動 eureka_client_consumer_9002,並在 IRule 接口 實現類的 choose() 方法上打上斷點,發送請求時,將會進入斷點,此時能夠看到執行的 負載均衡規則。算法

 

 

 

不停的刷新頁面,能夠看到請求以輪詢的方式被 服務提供者 處理。spring

 

 

 

(3)替換負載均衡規則(方式一:新建配置類)編程

【步驟一:】
    新建一個配置類(該類不能被 @ComponentScan 掃描到,即不能與 啓動類 在同一個包下),並定義規則。
好比:
    package com.lyh.springcloud.customize;
    
    import com.netflix.loadbalancer.IRule;
    import com.netflix.loadbalancer.RandomRule;
    import org.springframework.context.annotation.Bean;
    import org.springframework.context.annotation.Configuration;
    
    @Configuration
    public class CustomizeLoadBalanceRule {
    
        @Bean
        public IRule customizeRule() {
            return new RandomRule();
        }
    }

【步驟二:】
    在啓動類上添加 @RibbonClient 註解,並指定服務名 以及 規則。
好比:
    @RibbonClient(name = "EUREKA-CLIENT-PRODUCER", configuration = CustomizeLoadBalanceRule.class)

 

 

 

 

 

 

 

 

 

不停的刷新頁面,能夠看到請求以隨機的方式被 服務提供者 處理,而非輪詢。數組

 

 

 

(4)替換負載均衡規則(方式二:修改配置文件)
  在服務消費者 配置文件中 根據 服務提供者 服務名,
  經過 ribbon.NFLoadBalancerRuleClassName 指定負載均衡策略。
注:
  在後面的 OpenFeign 的使用中進行演示。緩存

【舉例:】
EUREKA-CLIENT-PRODUCER: # 服務提供者的服務名
  ribbon:
    NFLoadBalancerRuleClassName: com.netflix.loadbalancer.RandomRule

 

四、輪詢原理(RoundRobinRule)

(1)相關源碼
  主要就是 choose()、incrementAndGetModulo() 這兩個方法。
  choose() 用於選擇 server 服務。
  incrementAndGetModulo() 用來決定 選擇哪一個服務,返回服務下標。

public Server choose(ILoadBalancer lb, Object key) {
    if (lb == null) {
        log.warn("no load balancer");
        return null;
    }

    Server server = null;
    int count = 0;
    while (server == null && count++ < 10) {
        List<Server> reachableServers = lb.getReachableServers();
        List<Server> allServers = lb.getAllServers();
        int upCount = reachableServers.size();
        int serverCount = allServers.size();

        if ((upCount == 0) || (serverCount == 0)) {
            log.warn("No up servers available from load balancer: " + lb);
            return null;
        }

        int nextServerIndex = incrementAndGetModulo(serverCount);
        server = allServers.get(nextServerIndex);

        if (server == null) {
            /* Transient. */
            Thread.yield();
            continue;
        }

        if (server.isAlive() && (server.isReadyToServe())) {
            return (server);
        }

        // Next.
        server = null;
    }

    if (count >= 10) {
        log.warn("No available alive servers after 10 tries from load balancer: "
                + lb);
    }
    return server;
}

private AtomicInteger nextServerCyclicCounter;
private int incrementAndGetModulo(int modulo) {
    for (;;) {
        int current = nextServerCyclicCounter.get();
        int next = (current + 1) % modulo;
        if (nextServerCyclicCounter.compareAndSet(current, next))
            return next;
    }
}

 

(2)代碼分析

【choose():】
    初始進入 choose() 方法,server 爲 null 表示服務不存在,count 爲 0 表示屬於嘗試第一次獲取服務。
    進入 while 循環後,退出條件爲 server 不爲 null(即找到服務) 或者 count 大於等於 10 (即嘗試了 10 次仍未找到服務)。
    而 獲取 server 的核心在於獲取 服務的下標,即 int nextServerIndex = incrementAndGetModulo(serverCount);

【incrementAndGetModulo()】
    核心就是 自旋 CAS 並取模。
    modulo 表示服務器總數,current 表示當前服務下標,next 表示下一個服務下標。
    compareAndSet() 即 CAS 實現,若是 內存中的值 與 current 相同,那麼將內存中值改成 next,並返回 true,不然返回 false。
    即 compareAndSet 失敗後,會不停的執行循環 以獲取 最新的 current。
注:
    自旋、CAS 後面會講到。CAS 保證原子性。
    其實就是一個公式: 第幾回請求 % 服務器總數量 = 實際調用服務器下標位置

 

五、手寫一個輪詢算法

(1)說明

【說明:】
    建立一個與  eureka_client_consumer_9002 模塊相似的模塊 eureka_client_consumer_9005。
    修改配置文件,並去除 @LoadBalanced 註解(避免引發誤解)。
    本身實現一個輪詢算法(與 RoundRobinRule 相似)。
注:
    此處在 controller 中定義一個接口,用於測試 輪詢的功能(僅供參考,能夠繼承 AbstractLoadBalancerRule,自行構造一個負載均衡類)。
    去除 @LoadBalanced 註解後,訪問調用 RestTemplate 請求的接口時會報錯(用於區分)。

 

(2)相關代碼
  模塊建立此處省略(須要修改 pom.xml,配置文件)。
  詳情請見上篇博客:https://www.cnblogs.com/l-y-h/p/14193443.html#_label2_3

  面向接口編程,此處新建一個 LoadBalacner 接口,用於定義抽象方法(返回服務信息)。
  並定義一個 LoadBalacner 接口的實現類 LoadBalancerImpl。
  在 controller 中編寫接口(服務發現),測試一下。

【LoadBalacner】
package com.lyh.springcloud.eureka_client_consumer_9005.consumizeLoadBalance;

import org.springframework.cloud.client.ServiceInstance;

import java.util.List;

public interface LoadBalacner {
    /**
     * 從服務實例列表中獲取出 服務實例
     */
    ServiceInstance getInstances(List<ServiceInstance> serviceInstances);
}

【LoadBalancerImpl】
package com.lyh.springcloud.eureka_client_consumer_9005.consumizeLoadBalance.impl;

import com.lyh.springcloud.eureka_client_consumer_9005.consumizeLoadBalance.LoadBalacner;
import org.springframework.cloud.client.ServiceInstance;
import org.springframework.stereotype.Component;

import java.util.List;
import java.util.concurrent.atomic.AtomicInteger;

@Component
public class LoadBalancerImpl implements LoadBalacner {
    private AtomicInteger atomicInteger = new AtomicInteger();

    @Override
    public ServiceInstance getInstances(List<ServiceInstance> serviceInstances) {
        if (serviceInstances == null || serviceInstances.size() == 0) {
            return null;
        }
        return serviceInstances.get(incrementAndGetModulo(serviceInstances.size()));
    }

    public int incrementAndGetModulo(int count) {
        int current = 0;
        int next = 0;
        do {
            current = atomicInteger.get();
            next = (current >= Integer.MAX_VALUE ? 0 : current + 1) % count;
        } while (!atomicInteger.compareAndSet(current, next));
        return next;
    }
}

【ConsumerController】
package com.lyh.springcloud.eureka_client_consumer_9005.controller;

import com.lyh.springcloud.common.tools.Result;
import com.lyh.springcloud.eureka_client_consumer_9005.consumizeLoadBalance.LoadBalacner;
import com.lyh.springcloud.eureka_client_consumer_9005.entity.User;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.cloud.client.ServiceInstance;
import org.springframework.cloud.client.discovery.DiscoveryClient;
import org.springframework.web.bind.annotation.*;
import org.springframework.web.client.RestTemplate;

@RestController
@RequestMapping("/consumer/user")
public class ConsumerController {

    // 注意,此處 url 寫死的,僅用於演示,實際項目中不能這麼幹。
//    public static final String PRODUCER_URL = "http://localhost:8001/producer/";
    // 經過服務名 找到  Eureka 註冊中心真實訪問的 地址
    public static final String PRODUCER_URL = "http://EUREKA-CLIENT-PRODUCER";

    @Autowired
    private RestTemplate restTemplate;

    @Autowired
    private DiscoveryClient discoveryClient;

    @Autowired
    private LoadBalacner loadBalancer;

    @GetMapping("/loadBalance")
    public Result testLoadBalance() {
        ServiceInstance serviceInstance = loadBalancer.getInstances(discoveryClient.getInstances("EUREKA-CLIENT-PRODUCER"));
        if (serviceInstance == null) {
            return Result.error().message("服務不存在");
        }
        return Result.ok().data("HostAndPort", serviceInstance.getHost() + ":" + serviceInstance.getPort());
    }

    @GetMapping("/get/{id}")
    public Result getUser(@PathVariable Integer id) {
        return restTemplate.getForObject(PRODUCER_URL + "/producer/user/get/" + id, Result.class);
    }

    @PostMapping("/create")
    public Result createUser(@RequestBody User user) {
        return restTemplate.postForObject(PRODUCER_URL + "/producer/user/create", user, Result.class);
    }
}

 

 

 

 

 

 

 

 

 

3、補充知識

一、CAS

(1)什麼是 CAS?

【CAS:】
    CAS 是 Compare And Swap 的縮寫,即 比較交換。
    是一種無鎖算法,在不加鎖的狀況下實現多線程之間變量同步,從而保證數據的原子性。
    屬於硬件層面對併發操做的支持(CPU 原語)。
注:
    原子性:一個操做或多個操做要麼所有執行且執行過程當中不會被其餘因素打斷,要麼所有不執行。
    原語:指的是若干條指令組成的程序段,實現特定的功能,執行過程當中不能被中斷(也即原子性)。
    
【基本流程:】
    CAS 操做包含三個操做數 —— 內存值(V)、預期原值(A)和新值(B)。
    若是內存裏面的值 V 和 A 的值是同樣的,那麼就將內存裏面的值更新成 B,
    若 V 與 A 不一致,則不操做(某些狀況下,能夠經過自旋操做,不斷嘗試修改數據直至成功修改)。
即
    V = V == A ? B : V;
或者
    for(;;) {
        V = getV();
        if (V == A) {
            V = B;
            break;
        }
    }

【缺點:(詳見下面的 Atomic 類底層原理)】
    會出現 ABA 問題(兩次讀取數據時值相同,但不肯定值是否被修改過)。
    使用自旋(死循環)CAS 時會佔用系統資源、影響執行效率。
    每次只能對一個共享變量進行原子操做。

 

(2)原子性

【說明:】
    初始 i = 0,現有 10 個線程,分別執行 i++ 10000次,若不對 i++ 作任何限制,那麼最終執行結果通常都是小於 100000 的。
    由於 A、B 執行 i++ 操做時,彼此會相互干擾,也即不能保證原子性。

【如何保證原子性:】
    能夠給 i++ 操做加上 synchronized 進行同步控制,從而保證操做按照順序執行、互不干擾。
    也可使用 Atomic 相關類進行操做(核心是 自旋 CAS 操做 volatile 變量)。
注:
    synchronized 在 JDK1.6 以前,屬於重量級鎖,屬於悲觀鎖的一種(在操做鎖變量前就給對象加鎖,而無論對象是否發生資源競爭),性能較差。
    在 JDK1.6 以後,對 synchronized 進行了優化,引入了 偏向鎖、輕量級鎖、採用 CAS 思想,提高了效率。
    
【CAS 與 synchronized 比較:】
CAS:
    CAS 屬於無鎖算法,能夠支持多個線程併發修改,併發度高。
    CAS 每次只支持一個共享變量進行原子操做。
    CAS 會出現 ABA 問題。

synchronizedsynchronized 一次只能容許一個線程修改,併發度低。
    synchronized 能夠對多個共享變量進行原子操做。
    
【舉例:】
package com.lyh.tree;

import java.util.concurrent.atomic.AtomicInteger;

public class Test {
    private int count = 0;

    private int count2 = 0;

    private AtomicInteger count3 = new AtomicInteger(0);

    /**
     * 普通方法
     */
    public void increment() {
        count++;
    }

    /**
     * 使用 synchronized 修飾的方法
     */
    public synchronized void increment2() {
        count2++;
    }

    /**
     * 使用 atomic 類的方法
     */
    public void increment3() {
        count3.getAndIncrement();
    }

    public static void main(String[] args) {
        // 實例化一個對象
        Test test = new Test();

        // 建立 10 個線程
        for (int i = 0; i < 10; i++) {
            // 每一個線程內部均 執行 10000 次 三種 i++ 操做
            new Thread(() -> {
                for (int j = 0; j < 10000; j++) {
                    // 普通方法 i++
                    test.increment();
                    // 添加 synchronized 關鍵字後的 i++
                    test.increment2();
                    // 使用 atomic 類的 i++
                    test.increment3();
                }
            }, "thread-" + "i").start();
        }

        // 等待 1 秒,確保上面線程能夠執行完畢
        try {
            Thread.sleep(1000);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }

        // 輸出三種 i++ 的最終結果
        System.out.println("普通方法 i++ 操做後最終值 i = " + test.count);
        System.out.println("添加 synchronized 關鍵字後的 i++ 操做後最終值 i = " + test.count2);
        System.out.println("使用 atomic 類的 i++ 操做後最終值 i = " + test.count3);
    }
}

 

 

 

二、Atomic 類底層原理

(1)Atomic 經常使用類有哪些?

【Atomic:】
    Atomic 類存放於 java.util.concurrent.atomic 包下,用於提供對變量的原子操做(保證變量操做的原子性)。
    
【經常使用類:】
    操做基本類型的 Atomic 類(提供了對 booleanintlong 類型的原子操做):
        AtomicBoolean
        AtomicInteger
        AtomicLong
    
    操做引用類型的 Atomic 類(提供了引用類型的原子操做):
        AtomicReference
        AtomicStampedReference
        AtomicMarkableReference
注:
    AtomicStampedReference、AtomicMarkableReference 以版本號、標記的方式解決 ABA 問題。
        
    操做數組的 Atomic 類(提供了數組的原子操做):
        AtomicIntegerArray
        AtomicLongArray
        AtomicReferenceArray

 

 

 

(2)底層原理

【底層原理:】
    一句話歸納:Atomic 類基於 Unsafe 類 以及 (自旋) CAS 操做 volatile 變量實現的。
核心點:
    Unsafe 類             提供 native 方法,用於操做內存
    valueOffset 變量      指的是變量在內存中的地址。
    volatile 變量         指的是共享變量(保證操做可見性)。
    CAS                   CPU 原語(保證操做原子性)。

【Unsafe:】
    Unsafe 存放於 JDK 源碼 rt.jar 的 sun.misc 包下。
    內部提供了一系列 native 方法,用於與操做系統交互(能夠操做特定內存中的數據)。
注:
    Unsafe 屬於 CAS 核心類,Java 沒法直接訪問底層操做系統,須要經過 native 方法進行操做,而 Unsafe 就是爲此存在的。
    
【volatile 變量:】
    使用 volatile 修改變量,保證數據在多線程之間的可見性(一個線程修改數據後,其他線程均能知道修改後的數據)。

【valueOffset 變量:】
    valueOffset 變量 表示共享變量在內存中的偏移地址,Unsafe 根據此地址獲取 共享變量在內存中的值。

 

 

 

(3)以 AtomicInteger 爲例。
  compareAndSet() 直接調用 CAS 進行比較。
  getAndIncrement() 使用 自旋 CAS 進行比較。

【AtomicInteger 類:】
/**
* 直接調用 Unsafe 的 CAS 操做。
* 根據 this 以及 valueOffset 獲取到內存中的值 V,expect 爲指望值 A,若是 V == A,則將 V 值改成 update,不然不操做。
*
* this 表示當前對象
* valueOffset 表示共享變量在內存的偏移地址
* expect 表示指望值
* update 表示更新值
*/
public final boolean compareAndSet(int expect, int update) {
    return unsafe.compareAndSwapInt(this, valueOffset, expect, update);
}

/**
* 以原子方式遞增當前 value。
* 調用 Unsafe 的 getAndAddInt() 進行操做,即 自旋 CAS 操做。
*
* this 表示當前對象
* valueOffset 表示共享變量在內存的偏移地址
* 1 表示每次增長值爲 1
*/
public final int getAndIncrement() {
    return unsafe.getAndAddInt(this, valueOffset, 1);
}

【Unsafe 類:】
/**
* CAS 原語操做。
* 各變量含義參考上面 AtomicInteger compareAndSet() 的註釋。 
*/
public final native boolean compareAndSwapInt(Object var1, long var2, int var4, int var5);

/**
* 自旋 CAS 操做。
* var1 表示當前對象。
* var2 表示共享變量在內存的偏移地址
* var4 表示共享變量每次遞增的值
*/
public final int getAndAddInt(Object var1, long var2, int var4) {
    int var5;
    do {
        // 先根據偏移地址,獲取到 內存中存儲的值
        var5 = this.getIntVolatile(var1, var2);
        // 而後死循環(自旋)CAS 判斷。
        // 根據偏移地址再去取一次內存中的值 與 已經取得的值進行比較,相同則加上須要增長的值。
        // 不一樣,則 CAS 失敗,也即 while 條件爲 true,再進行下一次比較。
    } while(!this.compareAndSwapInt(var1, var2, var5, var5 + var4));

    return var5;
}

 

(4)缺點:

【CAS 缺點:】
    會出現 ABA 問題(兩次讀取數據時值相同,但不肯定值是否被修改過)。
    自旋(死循環)CAS 會佔用系統資源、影響執行效率。
    每次只能對一個共享變量進行原子操做。
    
【對於自旋 CAS:】
    在上面 Unsafe 類的 getAndAddInt() 方法中,能夠看到一個 do-while 循環。
    存在這麼一種狀況:while 條件中 CAS 一直失敗,也即循環一直持續(死循環),
    此時將會佔用 CPU 資源不斷執行循環,影響執行效率。
    
【對於 ABA 問題:】
    CAS 算法是從內存中取出某時刻的值並與當前指望值進行比較,從內存中取出的值就可能存在問題。
    存在這麼一種狀況:線程 A、線程 B 同時操做內存值 V,線程 A 因爲某種緣由停頓了一下,而線程 B 先將值 V 改成 K,而後又將 K 改成 V,
    此時 A 再次獲取的值還是 V,而後 A 進行 CAS 比較成功。
    雖然 A 兩次獲取的值是同一個值,可是這個值中間是發生過變化的,也即此時 A 執行 CAS 不該該成功,這就是 ABA 問題。

    爲了解決ABA問題,能夠在對象中額外再增長一個標記來標識對象是否有過變動,當且僅當 標記 與 預期標記位 也相同時,CAS 才能夠執行成功。
好比:
    AtomicStampedReference 或者 AtomicMarkableReference 類。

 

(5)ABA 再現
  使用 AtomicReference<Integer> 再現 ABA 問題。

【問題再現:】
package com.lyh.tree;

import java.util.concurrent.atomic.AtomicReference;

public class Test {
    public static void main(String[] args) {
        // 使用引用類型原子類(Integer 在緩存中 -128 ~ 127 表示的是同一個值,超出此範圍,則會自動建立一個新的 Integer,即地址不一樣)
        AtomicReference<Integer> atomicInteger = new AtomicReference<>(100);
        System.out.println("原值爲: " + atomicInteger.get());

        // 建立線程 A,執行兩次值修改操做
        new Thread(() -> {
            // 第一次從 100 改成 127
            atomicInteger.compareAndSet(100, 127);
            System.out.println("第一次修改後,值爲: " + atomicInteger.get());

            // 第二次從 127 改成 100
            atomicInteger.compareAndSet(127, 100);
            System.out.println("第二次修改後,值爲: " + atomicInteger.get());
        }, "thread-A").start();

        // 建立線程 B,等待 1 秒,使線程 A 執行完畢,而後再執行值修改操做
        new Thread(() -> {
            try {
                Thread.sleep(1000);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            // 第三次從 100 改成 128
            atomicInteger.compareAndSet(100, 128);
            System.out.println("第三次修改後,值爲: " + atomicInteger.get());

            // 第四次,因爲 128 超過 Integer 緩存範圍,會自動建立一個新的 Integer,此時指望值 與 內存中 值地址不一樣,也即 CAS 失敗
            atomicInteger.compareAndSet(128, -128);
            System.out.println("最終值爲: " + atomicInteger.get());
        }, "thread-B").start();
    }
}

【結果:】
原值爲: 100
第一次修改後,值爲: 127
第二次修改後,值爲: 100
第三次修改後,值爲: 128
最終值爲: 128

 

 

 

(6)ABA 解決
  使用 AtomicStampedReference<Integer> 解決 ABA 問題。
  新增了標記位,用於判斷當前值是否發生過變化。

package com.lyh.tree;

import java.util.concurrent.atomic.AtomicStampedReference;

public class Test2 {
    public static void main(String[] args) {
        AtomicStampedReference<Integer> atomicInteger = new AtomicStampedReference<>(100, 1);
        System.out.println("原值爲: " + atomicInteger.getReference() + " , 標記爲: " + atomicInteger.getStamp());

        int stamp = atomicInteger.getStamp();

        new Thread(() -> {
            atomicInteger.compareAndSet(100, 127, stamp, stamp + 1);
            System.out.println("第一次修改後,值爲: " + atomicInteger.getReference() + " , 標記爲: " + atomicInteger.getStamp());

            atomicInteger.compareAndSet(127, 100, stamp + 1, stamp + 2);
            System.out.println("第二次修改後,值爲: " + atomicInteger.getReference() + " , 標記爲: " + atomicInteger.getStamp());
        }, "thread-A").start();

        new Thread(() -> {
            try {
                Thread.sleep(1000);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            atomicInteger.compareAndSet(100, 128, stamp, stamp + 1);
            System.out.println("最終值爲: " + atomicInteger.getReference() + " , 標記爲: " + atomicInteger.getStamp());
        }, "thread-B").start();
    }
}

 

 

 

三、自旋鎖(SpinLock)

(1)什麼是自旋鎖?

【自旋鎖:】
    指的是當一個線程嘗試去獲取鎖時,
    若此時鎖已經被其餘線程獲取,那麼當前線程將採用 循環 的方式不斷的去嘗試獲取鎖。
    當循環執行的某次操做成功獲取鎖時,將退出循環。
    
【優勢:】
    減小了線程上下文切換帶來的消耗。
    
【缺點:】
    循環會佔用 CPU 資源,若長時間獲取不到鎖,那麼至關於一個死循環在執行。
    
【舉例:】
前面介紹的 Unsafe 類中 getAndAddInt() 即爲 自旋鎖實現,自旋 CAS 進行值的遞增。

public final int getAndAddInt(Object var1, long var2, int var4) {
    int var5;
    do {
        var5 = this.getIntVolatile(var1, var2);
    } while(!this.compareAndSwapInt(var1, var2, var5, var5 + var4));

    return var5;
}

 

(2)手寫一個自旋鎖
  經過 CAS 操做做爲 循環(自旋)條件。

【SpinLockDemo】
package com.lyh.tree;

import java.util.concurrent.atomic.AtomicReference;

/**
 * 演示自旋鎖(相似於 Unsafe 類中的 getAndAddInt() 方法)。
 *
 * 經過 CAS 操做做爲自旋鎖條件,A 線程先獲取鎖,並佔用鎖時間爲 3 秒,
 * B 線程獲取鎖,發現鎖被 A 佔用,B 則執行 循環 等待。
 * A 線程釋放鎖後,B 在某次循環中 CAS 成功,即 B 獲取到鎖,而後釋放。
 */
public class SpinLockDemo {
    private AtomicReference<Thread> atomicReference = new AtomicReference<>();

    /**
     * 獲取鎖
     */
    public void lock() {
        // 獲取當前線程
        Thread thread = Thread.currentThread();
        System.out.println("當前執行 lock 的線程爲: " + thread.getName());

        System.out.println("線程 " + thread.getName() + " 嘗試獲取鎖");

        Long start = System.currentTimeMillis();
        // 自旋 CAS,當值爲 null 時,CAS 成功,此時鎖被獲取。
        // 當值不爲 null 時,CAS 失敗,不斷執行循環直至值爲 null。
        while(!atomicReference.compareAndSet(null, thread)) {

        }
        Long end = System.currentTimeMillis();
        System.out.println("線程 " + thread.getName() + " 成功獲取到鎖, 嘗試獲取時間爲: " + ((end - start) / 1000) + "秒");
    }

    /**
     * 釋放鎖
     */
    public void unLock() {
        // 獲取當前線程
        Thread thread = Thread.currentThread();
        System.out.println("當前執行 unLock 的線程爲: " + thread.getName());
        // 釋放鎖
        atomicReference.compareAndSet(thread, null);
    }

    public static void main(String[] args) {
        SpinLockDemo spinLockDemo = new SpinLockDemo();

        // 建立線程 A
        new Thread(() -> {
            // A 獲取鎖
            spinLockDemo.lock();
            // A 佔用鎖 3 秒
            try {
                Thread.sleep(3000);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            // A 釋放鎖
            spinLockDemo.unLock();
        }, "thread-A").start();

        // 等待 1 秒,確保 A 線程先執行
        try {
            Thread.sleep(1000);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }

        // 建立線程 B
        new Thread(() -> {
            // B 獲取鎖
            spinLockDemo.lock();
            // B 釋放鎖
            spinLockDemo.unLock();
        }, "thread-B").start();
    }
}

【輸出結果:】
當前執行 lock 的線程爲: thread-A
線程 thread-A 嘗試獲取鎖
線程 thread-A 成功獲取到鎖, 嘗試獲取時間爲: 0秒
當前執行 lock 的線程爲: thread-B
線程 thread-B 嘗試獲取鎖
當前執行 unLock 的線程爲: thread-A
線程 thread-B 成功獲取到鎖, 嘗試獲取時間爲: 2秒
當前執行 unLock 的線程爲: thread-B

 

 

 

4、服務調用、負載均衡 -- Feign、OpenFeign

一、什麼是 Feign、 OpenFeign ?

【Feign:】
    Feign 是一個聲明式 web service 客戶端,使用 Feign 使得編寫 Java HTTP 客戶端更容易。
    SpringCloud 組件中 Feign 做爲一個輕量級 Restful 風格的 HTTP 服務客戶端,內置了 Ribbon,提供客戶端的負載均衡。
     使用 Feign 註解定義接口,調用該接口便可訪問 服務。
           
【OpenFeign:】
    SpringCloud 組件中 Open Feign 是在 Feign 基礎上支持了  SpringMVC 註解。
    經過 @FeignClient 註解能夠解析 SpringMVC 中 @RequestMapping 等註解下的接口,並經過動態代理的方式產生實現類,在實現類中作負載均衡、服務調用。 

【相關地址:】
    https://cloud.spring.io/spring-cloud-openfeign/reference/html/
    https://github.com/OpenFeign/feign

 

 

 

二、Feign 集成了 Ribbon

前面使用 Ribbon + RestTemplate 實現 負載均衡 以及 服務調用時,
流程以下:
    Step1:在配置類中經過 @Bean 聲明一下 RestTemplate 類,再添加上 @LoadBalanced 註解。
    Step2:在 controller 中注入 RestTemplate 。
    Step3:使用 RestTemplate 根據服務名發送 HTTP 請求。
    
而不一樣的模塊若要調用服務(一個接口可能會被多個模塊調用),則每次都要進行 Step一、Step二、Step3。
實際開發中,若不進行處理,對於編碼、維護都是一件麻煩的事情。

Feign 就是在 Ribbon 基礎上作了進一步封裝,只須要建立一個接口,並使用註解的方式進行配置,
便可完成 接口 與 服務提供方接口的綁定。經過自定義的接口便可完成 服務調用。
注:
    相似於在 Dao 層接口上添加 @Mapper。
    Feign 在接口上添加 @FeignClient,並配置服務名。

 

三、使用 OpenFeign

(1)說明

【說明:】
    建立一個空模塊 eureka_client_consumer_9006。
    用於測試 OpenFeign 的使用。
注:
    建立流程此處省略,詳細可參考上一篇博客:https://www.cnblogs.com/l-y-h/p/14193443.html#_label2_3

 

(2)建立項目 eureka_client_consumer_9006
  建立 eureka_client_consumer_9006 模塊,修改 父工程以及當前工程 pom.xml 文件。
  修改配置類。

【依賴:】
<dependency>
    <groupId>org.springframework.cloud</groupId>
    <artifactId>spring-cloud-starter-openfeign</artifactId>
</dependency>

【application.yml】
server:
  port: 9006
spring:
  application:
    name: eureka-client-consumer

eureka:
  instance:
    appname: eureka-client-consumer # 優先級比 spring.application.name 高
    instance-id: eureka-client-consumer-instance3  # 設置當前實例 ID
    hostname: eureka.client.consumer.9006 # 設置主機名
  client:
    register-with-eureka: true # 默認爲 true,註冊到 註冊中心
    fetch-registry: true # 默認爲 true,從註冊中心 獲取 註冊信息
    service-url:
      # 指向 註冊中心 地址,註冊到 集羣全部的 註冊中心。
      defaultZone: http://eureka.server.7001.com:7001/eureka,http://eureka.server.7002.com:7002/eureka,http://eureka.server.7003.com:7003/eureka

 

 

 

 

 

 

(3)編寫接口,綁定服務。
  經過 @Component 註解,將該接口交給 Spring 管理(用於 @Autowired 注入)。
  經過 @FeignClient 註解配置 服務名,並編寫方法,用於綁定須要訪問的接口。

【ProducerFeignService】
package com.lyh.springcloud.eureka_client_consumer_9006.service;

import com.lyh.springcloud.common.tools.Result;
import org.springframework.cloud.openfeign.FeignClient;
import org.springframework.stereotype.Component;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.PathVariable;

@FeignClient(value = "EUREKA-CLIENT-PRODUCER")
@Component
public interface ProducerFeignService {
    @GetMapping("/producer/user/get/{id}")
    Result getUser(@PathVariable Integer id);
}

 

 

 

(4)編寫 controller 

【ConsumerController】
package com.lyh.springcloud.eureka_client_consumer_9006.controller;

import com.lyh.springcloud.common.tools.Result;
import com.lyh.springcloud.eureka_client_consumer_9006.service.ProducerFeignService;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.PathVariable;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;

@RestController
@RequestMapping("/consumer/user")
public class ConsumerController {
    @Autowired
    private ProducerFeignService producerFeignService;

    @GetMapping("/get/{id}")
    public Result getUser(@PathVariable Integer id) {
        return producerFeignService.getUser(id);
    }
}

 

 

 

(5)啓動服務,並測試。
  按順序依次啓動,Eureka Server 以及 Producer。
  在 eureka_client_consumer_9006 啓動類上添加 @EnableFeignClients 註解,並啓動。
  效果與使用 ribbon 時相同(默認 ZoneAvoidanceRule 負載均衡規則)。

 

 

 

 

 

 

(6)啓動失敗時的錯誤

【錯誤:】
Description:

Field producerFeignService in com.lyh.springcloud.eureka_client_consumer_9006.controller.ConsumerController required a bean of type 'com.lyh.springcloud.eureka_client_consumer_9006.service.ProducerFeignService' that could not be found.

The injection point has the following annotations:
    - @org.springframework.beans.factory.annotation.Autowired(required=true)


Action:

Consider defining a bean of type 'com.lyh.springcloud.eureka_client_consumer_9006.service.ProducerFeignService' in your configuration.

【解決:】
    在啓動類上添加 @EnableFeignClients 註解。

 

 

 

(7)自定義負載均衡策略
  Feign 集成了 Ribbon,因此 Ribbon 自定義負載均衡策略也適用於 Feign。
  此處使用配置文件的方式進行自定義負載均衡。
  在服務調用方的配置文件中,根據 服務提供者的 服務名,進行 ribbon 負載均衡策略更換。

【自定義負載均衡策略:】
EUREKA-CLIENT-PRODUCER:
  ribbon:
    NFLoadBalancerRuleClassName: com.netflix.loadbalancer.RandomRule

【application.yml】
server:
  port: 9006
spring:
  application:
    name: eureka-client-consumer

eureka:
  instance:
    appname: eureka-client-consumer # 優先級比 spring.application.name 高
    instance-id: eureka-client-consumer-instance3  # 設置當前實例 ID
    hostname: eureka.client.consumer.9006 # 設置主機名
  client:
    register-with-eureka: true # 默認爲 true,註冊到 註冊中心
    fetch-registry: true # 默認爲 true,從註冊中心 獲取 註冊信息
    service-url:
      # 指向 註冊中心 地址,註冊到 集羣全部的 註冊中心。
      defaultZone: http://eureka.server.7001.com:7001/eureka,http://eureka.server.7002.com:7002/eureka,http://eureka.server.7003.com:7003/eureka

EUREKA-CLIENT-PRODUCER:
  ribbon:
    NFLoadBalancerRuleClassName: com.netflix.loadbalancer.RandomRule

 

 

 

 

 

 

四、Feign 超時控制 以及 日誌打印

(1)超時控制

【說明:】
    OpenFeign 默認請求等待時間爲 1 秒鐘,即 若服務器超過 1 秒仍未返回處理結果,那麼 OpenFeign 將認爲調用出錯。
    有時服務器業務處理時間須要超過 1 秒,此時直接報錯是不合適的。
    爲了不這樣的問題,能夠根據實際狀況 適當設置 Feign 客戶端的超時控制。
    
【演示:】
    在 服務提供者 eureka_client_producer_8004 提供一個接口,並在內部暫停 3 秒,用於模擬業務處理時間。
    在 服務消費者 eureka_client_consumer_9006 中調用並訪問接口,用於測試 是否會出錯。
    在配置文件中 配置超時控制後,再次查看是否出錯。
    
【application.yml】
# 設置 OpenFeign 超時時間(OpenFeign 默認支持 Ribbon)
ribbon:
  # 指的是創建鏈接所用的超時時間
  ConnectTimeout: 3000
  # 指的是創建鏈接後從服務器獲取資源的超時時間(即請求處理的超時時間)
  ReadTimeout: 4000

 

 

 

 

 

 

 

 

 

 

 

 

(2)日誌打印

【說明:】
    Feign 提供了日誌打印功能,經過配置調整日誌級別,能夠方便了解 Feign 中請求調用的細節。
日誌級別:
    NONE:默認,不顯示任何日誌。
    BASIC: 僅記錄請求方法、URL、響應狀態以及執行時間。
    HEADERS:包含 BASIC、請求頭信息、響應頭信息。
    FULL:包含 HEADERS、請求數據、響應數據。

Step1:
  配置 日誌級別(Logger.Level)。
注:
  import feign.Logger。不要導錯包了。

package com.lyh.springcloud.eureka_client_consumer_9006.config;

import feign.Logger;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

@Configuration
public class FeignConfig {

    /**
     * 配置 Feign 日誌級別
     */
    @Bean
    Logger.Level feignLoggerLever() {
        return Logger.Level.FULL;
    }
}

 

 

 

Step2:
  配置須要打印日誌的接口。

【application.yml】
logging:
  level:
    com.lyh.springcloud.eureka_client_consumer_9006.service.ProducerFeignService: debug

 

 

Step3:
  啓動服務,並調用服務,能夠在控制檯看到日誌信息。

相關文章
相關標籤/搜索