Elasticsearch——併發衝突以及解決方案

1. 詳解併發衝突

在電商場景下,工做流程爲:數據庫

  1. 讀取商品信息,包括庫存數量
  2. 用戶下單購買
  3. 更新商品信息,將庫存數減一

若是是多線程操做,就可能有多個線程併發的去執行上述的3步驟流程,假如此時有兩我的都來讀取商品數據,兩個線程併發的服務於兩我的,同時在進行商品庫存數據的修改。正確的狀況:線程A將庫存-1,設置爲99件,線程B接着讀取99件,再-1,變爲98件。若是A,B線程都讀取的爲100件,A處理完以後修改成99件,B處理完以後再次修改成99件,此時結果就出錯了。多線程

2. 解決方案

2.1 悲觀鎖

在讀取商品數據時,同時對這一行數據加鎖,當此線程處理完數據以後,再解鎖,另外一個線程開始處理。併發

悲觀鎖併發控制方案,就是在各類狀況下,都上鎖。上鎖以後,就只有一個線程能夠操做這一條數據,不一樣的場景之下,上的鎖不一樣,行級鎖,表級鎖,讀鎖,寫鎖。異步

2.2 樂觀鎖

樂觀鎖不加鎖,每一個線程均可以任意操做。es的每條文檔中有一個version字段,新建文檔後爲1,修改一次累加,線程A,B同時讀取到數據,version=1,A處理完以後庫存爲99,在寫入es的時候會跟es中的版本號比較,都是1,則寫入成功,version=2,B處理完以後也爲99,存入es時與es中的數據的版本號version=2相比,明顯不一樣,此時不會用99去更新,而是從新讀取最新的數據,再減一,變爲98,執行上述操做,寫入。ide

2.3 Elasticsearch的樂觀鎖

Elasticsearch的後臺都是多線程異步的,多個請求之間是亂序的,可能後修改的先到,先修改的後到。ui

Elasticsearch的多線程異步併發修改是基於本身的_version版本號進行樂觀鎖併發控制的。spa

在後修改的先到時,修改完畢後,當先修改的後到時,會比較一下_version版本號,若是不相等就直接扔掉,不須要了。這樣結果會就會保存爲一個正確狀態。線程

代碼示例:code

PUT /test_index/test_type/3
{
  "test_field": "test test"
}
結果:
{
  "_index": "test_index",
  "_type": "test_type",
  "_id": "3",
  "_version": 1,
  "result": "created",
  "_shards": {
    "total": 2,
    "successful": 1,
    "failed": 0
  },
  "created": true
}
修改
PUT /test_index/test_type/3
{
  "test_field": "test1 test1"
}
結果
{
  "_index": "test_index",
  "_type": "test_type",
  "_id": "3",
  "_version": 2,
  "result": "created",
  "_shards": {
    "total": 2,
    "successful": 1,
    "failed": 0
  },
  "created": true
}
刪除
DELETE /test_index/test_type/3
結果:
{
  "_index": "test_index",
  "_type": "test_type",
  "_id": "3",
  "_version": 3,
  "result": "created",
  "_shards": {
    "total": 2,
    "successful": 1,
    "failed": 0
  },
  "created": true
}
從新建立
PUT /test_index/test_type/3
{
  "test_field": "test1 test1"
}
結果
{
  "_index": "test_index",
  "_type": "test_type",
  "_id": "4",
  "_version": 1,
  "result": "created",
  "_shards": {
    "total": 2,
    "successful": 1,
    "failed": 0
  },
  "created": true
}
複製代碼

刪除操做也會對這條數據的版本號加1cdn

在刪除一個document以後,能夠從一個側面證實,它不是當即物理刪除掉的,由於它的一些版本號等信息仍是保留着的。先刪除一條document,再從新建立這條document,其實會在delete version基礎之上,再把version號加1

2.4 es的樂觀鎖併發控制示例

  • 先新建一條數據

    PUT /test_index/test_type/4
    {
      "test_field": "test"
    }
    複製代碼
  • 模擬兩個客戶端,都獲取到了同一條數據

    GET /test_index/test_type/4
    返回
    {
        "_index": "test_index",
        "_type": "test_type",
        "_id": "4",
        "_version": 1,
        "found": true,
        "_source": {
          "test_field": "test"
        }
     }
    複製代碼
  • 其中一個客戶端,先更新了一下這個數據, 同時帶上數據的版本號,確保說,es中的數據的版本號,跟客戶端中的數據的版本號是相同的,才能修改

    PUT test_index/test_type/4?version=1
    {
      "test_field": "client1 changed"
    }
    返回結果
    {
        "_index": "test_index",
        "_type": "test_type",
        "_id": "4",
        "_version": 2,
        "result": "updated",
        "_shards": {
          "total": 2,
          "successful": 1,
          "failed": 0
        },
        "created": false
    }
    複製代碼
  • 另一個客戶端,嘗試基於version=1的數據去進行修改,一樣帶上version版本號,進行樂觀鎖的併發控制

    PUT test_index/test_type/4?version=1
    {
      "test_field": "client2 changed"
    }
    會出錯,返回
    {
        "error": {
          "root_cause": [
            {
              "type": "version_conflict_engine_exception",
              "reason": "[test_type][4]: version conflict, current version [2] is different than the one provided [1]",
              "index_uuid": "rsiZYqiwSCC2XdR8N2bJow",
              "shard": "2",
              "index": "test_index"
            }
          ],
          "type": "version_conflict_engine_exception",
          "reason": "[test_type][4]: version conflict, current version [2] is different than the one provided [1]",
          "index_uuid": "rsiZYqiwSCC2XdR8N2bJow",
          "shard": "2",
          "index": "test_index"
        },
        "status": 409
    }
    複製代碼

    樂觀鎖就成功阻止併發問題

  • 在樂觀鎖成功阻止併發問題以後,嘗試正確的完成更新

    從新進行GET請求,獲得 version

    GET /test_index/test_type/4
    {
      "_index": "test_index",
      "_type": "test_type",
      "_id": "4",
      "_version": 2,
      "found": true,
      "_source": {
        "test_field": "client1 changed"
      }
    }
    複製代碼

    基於最新的數據和版本號,去進行修改,修改後,帶上最新的版本號,可能這個步驟會須要反覆執行好幾回,才能成功,特別是在多線程併發更新同一條數據很頻繁的狀況下

    PUT /test_index/test_type/4?version=2
    {
      "test_field": "client2 changed"
    }
    返回
    {
      "_index": "test_index",
      "_type": "test_type",
      "_id": "4",
      "_version": 3,
      "result": "updated",
      "_shards": {
        "total": 2,
        "successful": 1,
        "failed": 0
      },
      "created": false
    }
    複製代碼

2.5 基於external version進行樂觀鎖併發控制

es提供了一個feature,就是說,你能夠不用它提供的內部_version版本號來進行併發控制,能夠基於你本身維護的一個版本號來進行併發控制。

?version=1&version_type=external
複製代碼

version_type=external,惟一的區別在於,_version,只有當你提供的version與es中的_version如出一轍的時候,才能夠進行修改,只要不同,就報錯;當version_type=external的時候,只有當你提供的version比es中的_version大的時候,才能完成修改

es,_version=1,?version=1,才能更新成功
es,_version=1,?version>1&version_type=external,才能成功,好比說?version=2&version_type=external

代碼示例:

  • 先建立一條數據

    PUT test_index/test_type/5
    {
      "test_field": "external test"
    }
    返回
    {
      "_index": "test_index",
      "_type": "test_type",
      "_id": "5",
      "_version": 1,
      "result": "created",
      "_shards": {
        "total": 2,
        "successful": 1,
        "failed": 0
      },
      "created": true
    }
    複製代碼
  • 模擬兩個客戶端同時查詢到這條數據

    GET /test_index/test_type/5
    返回
    {
      "_index": "test_index",
      "_type": "test_type",
      "_id": "5",
      "_version": 1,
      "found": true,
      "_source": {
        "test_field": "external test"
      }
    }
    複製代碼
  • 第一個客戶端先進行修改,此時客戶端程序是在本身的數據庫中獲取到了這條數據的最新版本號,好比說是2

    PUT /test_index/test_type/5?version=2&version_type=external
    {
      "test_field": "external client1 changed"
    }
    返回
    {
      "_index": "test_index",
      "_type": "test_type",
      "_id": "5",
      "_version": 2,
      "result": "updated",
      "_shards": {
        "total": 2,
        "successful": 1,
        "failed": 0
      },
      "created": false
    }
    複製代碼
  • 模擬第二個客戶端,同時拿到了本身數據庫中維護的那個版本號,也是2,同時基於version=2發起了修改

    PUT /test_index/test_type/5?version=2&version_type=external
    {
      "test_field": "external client2 changed"
    }
    會出錯,返回
    {
      "error": {
        "root_cause": [
          {
            "type": "version_conflict_engine_exception",
            "reason": "[test_type][5]: version conflict, current version [2] is higher or equal to the one provided [2]",
            "index_uuid": "rsiZYqiwSCC2XdR8N2bJow",
            "shard": "1",
            "index": "test_index"
          }
        ],
        "type": "version_conflict_engine_exception",
        "reason": "[test_type][5]: version conflict, current version [2] is higher or equal to the one provided [2]",
        "index_uuid": "rsiZYqiwSCC2XdR8N2bJow",
        "shard": "1",
        "index": "test_index"
      },
      "status": 409
    }
    複製代碼
  • 在併發控制成功後,從新基於最新的版本號發起更新

    GET /test_index/test_type/5
    返回
    {
      "_index": "test_index",
      "_type": "test_type",
      "_id": "5",
      "_version": 2,
      "found": true,
      "_source": {
        "test_field": "external client1 changed"
      }
    }
    PUT /test_index/test_type/5?version=3&version_type=external
    {
      "test_field": "external client2 changed"
    }
    返回
    {
      "_index": "test_index",
      "_type": "test_type",
      "_id": "5",
      "_version": 3,
      "result": "updated",
      "_shards": {
        "total": 2,
        "successful": 1,
        "failed": 0
      },
      "created": false
    }複製代碼
相關文章
相關標籤/搜索