[億級流量系列]--1.緩存、數據庫雙寫一致性保障方案

緩存、數據庫雙寫一致性保障方案

1.業務背景

  • 實時性要求較高的解決方案java

    緩存數據庫雙寫一致性一般是用於數據實時性要求較高的場景,好比說商品庫存服務。web

    解決思路:redis

    1. 若是不是在讀寫併發高的場景下,通常採用CacheAsidePattern便可解決。即先刪除緩存,再寫數據庫。spring

    2. 讀寫併發高的場景下。數據庫

      在讀寫併發高的場景下,讀取和寫入的操做是併發的。好比說如今數據庫中的庫存爲100,緩存中的庫存也爲100。有一個寫請求過來,要求修改庫存爲99。正常狀況下是先刪除了緩存,而後修改數據庫中的數據爲99。讀請求過來的時候發現緩存中的數據爲0,就會去數據庫中查詢獲得99。而後修改緩存中的數據也爲99。可是若是寫請求的時候還沒來得及將數據庫中數據修改成99,這時度請求就過來,發現緩存中的數據爲空,就去數據庫中讀取數據爲100,而後又從新將緩存中的數據更新爲100,這時寫請求將數據庫中的數據修改完畢,變爲99。這就致使了數據不一致的產生緩存

      解決:安全

      將讀寫請求串行化。將讀寫請求都放到隊列中操做,保證串行執行。而後再每一個隊列上掛一個線程去執行隊列中的請求操做併發

      edc92a34-7258-4c05-88f1-d540674ddd0f.png

  • 實時性要求不高的解決方案(先了解)app

​ 對於實時性要求不高的數據,能夠採用異步更新數據的方式。好比說商品詳情頁,它的數據要求不是實時性很高,可是要求大流量,特別是熱點數據的讀併發較高,這時候就必須有一個緩存數據生產服務。比方說有一個更新商品的服務去更新了數據庫中的詳情頁面數據,不須要實時反應到頁面上。這時候,能夠將這個修改數據的請求放到消息隊列中,緩存數據生產服務監聽着這個消息服務,一旦接收到消息,就須要去更新本身緩存中的數據。異步

Snipaste_2019-06-27_15-57-09.jpg

2.思路整理

若是保證讀請求和寫請求是針對同一個商品?咱們須要作一個HASH路由,保證同一個商品的請求進入的是同一個內存隊列。

每一個隊列都對應一個工做線程,工做線程拿到對應的請求,執行對應的操做。

3.方案落地

  • 1.線程池+內存隊列初始化
  • 2.兩種請求對象的封裝
  • 3.請求異步執行Service封裝
  • 4.請求處理的工程線程封裝
  • 5.兩種Controller接口封裝
  • 6.讀請求去重優化
  • 7.空數據請求的過濾

3.1 線程池+內存隊列初始化

web容器初始化的時候,就須要初始化線程池和內存隊列。咱們能夠自定義一個監聽器,而後註冊這個監聽器。

  • 新建listener

    新建listner.jpg

  • 註冊listener

    註冊listener.jpg

  • 測試

    測試.jpg

容器初始化的流程已經作好了,如今須要實現具體的怎麼去初始化線程池和內存隊列。

  • 新建線程池和內存隊列的包裝類ThreadPoolAndQueueWrapper

    這個包裝類用於放在監聽器中,調用它的init()方法,就能夠執行線程池的初始化和隊列的初始化。線程開始提交請求工做。

    package com.roncoo.eshop.inventory.thread;
    
    import java.util.concurrent.ArrayBlockingQueue;
    import java.util.concurrent.ExecutorService;
    import java.util.concurrent.Executors;
    
    import com.roncoo.eshop.inventory.request.Request;
    import com.roncoo.eshop.inventory.request.RequestQueue;
    
    /** * 初始化容器的時候須要初始化線程池和內存隊列 * @author Administrator * */
    public class ThreadPoolAndQueueWrapper {
    
    	private ExecutorService threadPool = Executors.newFixedThreadPool(10);
    	
    	public ThreadPoolAndQueueWrapper() {
    		RequestQueue requestQueue = RequestQueue.getInstance();
    		//初始化的時候就將內存隊列集合填滿
    		for (int i = 0; i < 10; i++) {
    			ArrayBlockingQueue<Request> queue = new ArrayBlockingQueue<Request>(100);
    			requestQueue.addQueue(queue);
    			//線程池用於提交 請求處理的工做線程
    			threadPool.submit(new RequestProcessThread(queue));
    		}
    		
    	}
    	
    	/** * 初始化工做線程池和內存隊列的方法,上來就執行 */
    	public static void init() {
    		//保證初始化的時候只能初始化一次線程池和內存隊列
    		//採用靜態內部類的方式保證線程絕對安全
    		Singleton.getInstance();
    	}
    	
    	private static class Singleton{
    		
    		private static ThreadPoolAndQueueWrapper instance;
    		
    		static {
    			instance = new ThreadPoolAndQueueWrapper();
    		}
    		
    		public static ThreadPoolAndQueueWrapper getInstance() {
    			return instance;
    		}
    	}
    }
    複製代碼
  • 請求隊列的封裝RequestQueue,內部持有請求隊列的集合,提供添加隊列的方法

    package com.roncoo.eshop.inventory.request;
    
    import java.util.ArrayList;
    import java.util.List;
    import java.util.concurrent.ArrayBlockingQueue;
    
    /** * 請求內存隊列封裝 * @author Administrator * */
    public class RequestQueue {
    
    	/** * 內存隊列,是一個集合。由於涉及併發,因此使用ArrayBlockingQueeue,隊列中存放的是請求(讀請求和寫請求) */
    	private List<ArrayBlockingQueue<Request>> queues = new ArrayList<ArrayBlockingQueue<Request>>();
    	
    	public static RequestQueue getInstance() {
    		return Singleton.getInstance();
    	}
    	
    	/** * 添加一個內存隊列 * @param queue */
    	public void addQueue(ArrayBlockingQueue<Request> queue) {
    		this.queues.add(queue);
    	}
    	
    	/** * 內部靜態類的方式保證絕對的線程安全 * @author Administrator * */
    	private static class Singleton {
    		
    		private static RequestQueue instance;
    		
    		static {
    			instance = new RequestQueue();
    		}
    		
    		public static RequestQueue getInstance() {
    			return instance;
    		}
    	}
    }
    
    複製代碼
  • 須要提交到線程池中的工做線程,用於處理Request請求。而且持有本身的內存隊列

    package com.roncoo.eshop.inventory.thread;
    
    import java.util.concurrent.ArrayBlockingQueue;
    import java.util.concurrent.Callable;
    
    import com.roncoo.eshop.inventory.request.Request;
    
    /** * 執行請求的工做線程 * @author Administrator * */
    public class RequestProcessThread implements Callable<Boolean>{
    
    	/** * 本身監控的內存隊列 */
    	private ArrayBlockingQueue<Request> queue;
    	
    	public RequestProcessThread(ArrayBlockingQueue<Request> queue) {
    		this.queue = queue;
    	}
    	
    	/** * 具體的工做流程 */
    	@Override
    	public Boolean call() throws Exception {
    		while(true) {
    			break;
    		}
    		return true;
    	}
    
    }
    
    複製代碼
  • 請求的封裝Request,是一個接口,之後讀請求和寫請求須要實現這個接口,進行本身的操做邏輯

    /** * 請求接口,讀請求和寫請求要實現這個接口 * @author Administrator * */
    public interface Request {
    
    }
    複製代碼

項目結構

項目結構.jpg

3.2 兩種請求對象的封裝

  • 新建實體類ProductInventory

    public class ProductInventory {
    
    	/** * 商品id */
    	private Integer productId;
    	/** * 庫存數量 */
    	private Long inventoryCnt;
    	
    	public ProductInventory() {
    		
    	}
    	
    	public ProductInventory(Integer productId, Long inventoryCnt) {
    		this.productId = productId;
    		this.inventoryCnt = inventoryCnt;
    	}
    	
    	public Integer getProductId() {
    		return productId;
    	}
    	public void setProductId(Integer productId) {
    		this.productId = productId;
    	}
    	public Long getInventoryCnt() {
    		return inventoryCnt;
    	}
    	public void setInventoryCnt(Long inventoryCnt) {
    		this.inventoryCnt = inventoryCnt;
    	}
    	
    }
    複製代碼
  • Request接口中添加業務方法

    public interface Request {
    
    	void process();
    }
    複製代碼
  • 庫存寫請求 ProductInventroyWriteRequest

    package com.roncoo.eshop.inventory.request;
    
    import com.roncoo.eshop.inventory.model.ProductInventory;
    import com.roncoo.eshop.inventory.service.IProductInventoryService;
    
    /** * 庫存寫請求 * 寫請求執行邏輯:Cache Aside Pattern * 1.先刪除緩存 * 2.再更新數據庫 * @author Administrator * */
    public class ProductInventoryWriteRequest implements Request{
    	
    	private ProductInventory productInventory;
    	
    	private IProductInventoryService productInventoryService;
    	
    	public ProductInventoryWriteRequest(ProductInventory productInventory, IProductInventoryService productInventoryService) {
    		this.productInventory = productInventory;
    		this.productInventoryService = productInventoryService;
    	}
    	
    	public void process() {
    		//1.刪除緩存
    		productInventoryService.removeCache(productInventory);
    		//2.更新數據庫
    		productInventoryService.updateDb(productInventory);
    	}
    
    }
    
    複製代碼
  • 庫存讀請求 ProductInventroyReadRequest

    package com.roncoo.eshop.inventory.request;
    
    import org.springframework.beans.factory.annotation.Autowired;
    
    import com.roncoo.eshop.inventory.model.ProductInventory;
    import com.roncoo.eshop.inventory.service.IProductInventoryService;
    
    /** * 商品庫存讀請求 * 1.查詢數據庫 * 2.設置緩存 * @author Administrator * */
    public class ProductInventoryReadRequest implements Request{
    
    	/** * 商品Id */
    	private Integer productId;
    	
    	@Autowired
    	private IProductInventoryService productInventoryService;
    	
    	public ProductInventoryReadRequest(Integer productId, IProductInventoryService productInventoryService) {
    		this.productId = productId;
    		this.productInventoryService = productInventoryService;
    	}
    	
    	@Override
    	public void process() {
    		//1.從數據庫中查詢最新商品庫存
    		ProductInventory productInventory = productInventoryService.findProductInventoryByProductId(productId);
    		//2.將商品庫存設置到redis緩存中
    		productInventoryService.setProductInventoryToCache(productInventory);
    	}
    }
    複製代碼

項目結構:

項目結構2.jpg

3.3 請求異步執行的service封裝

這一步的操做主要是將過來的請求根據商品id路由到對應的內存隊列中。接受的參數是請求.

我的理解叫service這個名稱不太好,換個名稱不如叫作接口路由代理

  • service接口 RequestAsyncServiceImpl

    package com.roncoo.eshop.inventory.service.impl;
    
    import java.util.concurrent.ArrayBlockingQueue;
    
    import org.springframework.stereotype.Service;
    
    import com.roncoo.eshop.inventory.request.Request;
    import com.roncoo.eshop.inventory.request.RequestQueue;
    import com.roncoo.eshop.inventory.service.RequestAsyncService;
    
    /** * 處理請求的異步service * 1.將請求路由到不一樣的內存隊列 * 2.將請求放入到內存隊列中 * @author Administrator * */
    @Service("requestAsyncService")
    public class RequestAsyncServiceImpl implements RequestAsyncService{
    
    	@Override
    	public void process(Request request) {
    		try {
    			//作請求的路由,根據每一個請求的商品id,路由到對應的內存隊列中
    			ArrayBlockingQueue<Request> queue = getRoutingQueue(request.getProductId());
    			queue.put(request);
    		} catch (InterruptedException e) {
    			// TODO Auto-generated catch block
    			e.printStackTrace();
    		}
    	}
    
    	private ArrayBlockingQueue<Request> getRoutingQueue(Integer productId){
    		RequestQueue requestQueue = RequestQueue.getInstance();
    		//先獲取productId的hash值
    		String key = String.valueOf(productId);
    		int h;
    		int hash = (key == null) ?0 : (h = key.hashCode()) ^ (h >>> 16);
    		// 對hash取模,將hash值路由到指定的內存隊列
    		// 好比內存隊列大小8
    		// 用內存隊列的數量對hash值取模以後, 結果必定是在0-7之間
    		// 任何一個商品id都會被固定路由到一樣的一個內存隊列中去
    		int index = (requestQueue.queueSize() - 1) & hash;
    		return requestQueue.getQueue(index);
    	}
    }
    
    
    複製代碼

3.4 處理具體請求的工做線程的代碼修改

public class RequestProcessThread implements Callable<Boolean>{

	/** * 本身監控的內存隊列 */
	private ArrayBlockingQueue<Request> queue;
	
	public RequestProcessThread(ArrayBlockingQueue<Request> queue) {
		this.queue = queue;
	}
	
	/** * 具體的工做流程 */
	@Override
	public Boolean call() throws Exception {
		while(true) {
			//從本身監控的內存隊列中拿出請求
			Request request = queue.take();
			//執行操做
			request.process();
			break;
		}
		return true;
	}

}

複製代碼

工程結構:

工程結構3.jpg

3.5 contorller層的封裝

主要是讀請求,要考慮在200ms以內不斷循環,從緩存中獲取數據。若是200ms內沒有,再去數據庫查詢

package com.roncoo.eshop.inventory.controller;

import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Controller;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.ResponseBody;

import com.roncoo.eshop.inventory.Response.Response;
import com.roncoo.eshop.inventory.model.ProductInventory;
import com.roncoo.eshop.inventory.request.ProductInventoryWriteRequest;
import com.roncoo.eshop.inventory.request.Request;
import com.roncoo.eshop.inventory.service.IProductInventoryService;
import com.roncoo.eshop.inventory.service.RequestAsyncService;

/** * 商品庫存controller * @author Administrator * */
@Controller
public class ProductInventoryController {

	@Autowired
	private RequestAsyncService requestAsyncService;
	
	@Autowired
	private IProductInventoryService productInventoryService;

	@RequestMapping("/updateProductInventory")
	@ResponseBody
	public Response updateProductInventory(ProductInventory productInventory) {
		Response response = null;
		
		try {
			Request request = new ProductInventoryWriteRequest(productInventory, productInventoryService);
			requestAsyncService.process(request);
			response = new Response(Response.SUCCESS);
		} catch (Exception e) {
			// TODO Auto-generated catch block
			e.printStackTrace();
		}
		
		return response;
	}
	
	@RequestMapping("/getProductInventory")
	@ResponseBody
	public ProductInventory getProductInventory(Integer productId) {
		ProductInventory productInventory = null;
		
		try {
			Request request = new ProductInventoryWriteRequest(productInventory, productInventoryService);
			requestAsyncService.process(request);
			
			//把讀請求交給service異步處理之後,須要等待一會
			//等待前面庫存更新的操做,同時緩存舒心的操做
			//若是等待的時間超過了200ms,那麼就本身去數據庫中查詢
			long startTime = System.currentTimeMillis();
			long endTime = 0L;
			long waitTime = 0L;
			
			while(true) {
				if(waitTime > 200) {
					break;
				}
				
				//嘗試從緩存中獲取數據
				productInventory = productInventoryService.getProductInventoryCache(productId);
				//若是有數據,就返回數據
				if(productInventory != null) {
					return productInventory;
				}
				
				else {
					Thread.sleep(20);
					endTime = System.currentTimeMillis();
					waitTime = endTime - startTime;
				}
			}
			
			//若是在規定的時間內(通常是200ms)沒有,那麼就嘗試本身本身去數據庫獲取
			productInventory = productInventoryService.findProductInventoryByProductId(productId);
			if(productInventory != null) {
				return productInventory;
			}
		} catch (InterruptedException e) {
			// TODO Auto-generated catch block
			e.printStackTrace();
		}
		
		return new ProductInventory(productId, -1L);
	}
}


複製代碼
相關文章
相關標籤/搜索