實時性要求較高的解決方案java
緩存數據庫雙寫一致性一般是用於數據實時性要求較高的場景,好比說商品庫存服務。web
解決思路:redis
若是不是在讀寫併發高的場景下,通常採用CacheAsidePattern便可解決。即先刪除緩存,再寫數據庫。spring
讀寫併發高的場景下。數據庫
在讀寫併發高的場景下,讀取和寫入的操做是併發的。好比說如今數據庫中的庫存爲100,緩存中的庫存也爲100。有一個寫請求過來,要求修改庫存爲99。正常狀況下是先刪除了緩存,而後修改數據庫中的數據爲99。讀請求過來的時候發現緩存中的數據爲0,就會去數據庫中查詢獲得99。而後修改緩存中的數據也爲99。可是若是寫請求的時候還沒來得及將數據庫中數據修改成99,這時度請求就過來,發現緩存中的數據爲空,就去數據庫中讀取數據爲100,而後又從新將緩存中的數據更新爲100,這時寫請求將數據庫中的數據修改完畢,變爲99。這就致使了數據不一致的產生緩存
解決:安全
將讀寫請求串行化。將讀寫請求都放到隊列中操做,保證串行執行。而後再每一個隊列上掛一個線程去執行隊列中的請求操做併發
實時性要求不高的解決方案(先了解)app
對於實時性要求不高的數據,能夠採用異步更新數據的方式。好比說商品詳情頁,它的數據要求不是實時性很高,可是要求大流量,特別是熱點數據的讀併發較高,這時候就必須有一個緩存數據生產服務。比方說有一個更新商品的服務去更新了數據庫中的詳情頁面數據,不須要實時反應到頁面上。這時候,能夠將這個修改數據的請求放到消息隊列中,緩存數據生產服務監聽着這個消息服務,一旦接收到消息,就須要去更新本身緩存中的數據。異步
若是保證讀請求和寫請求是針對同一個商品?咱們須要作一個HASH路由,保證同一個商品的請求進入的是同一個內存隊列。
每一個隊列都對應一個工做線程,工做線程拿到對應的請求,執行對應的操做。
web容器初始化的時候,就須要初始化線程池和內存隊列。咱們能夠自定義一個監聽器,而後註冊這個監聽器。
新建listener
註冊listener
測試
容器初始化的流程已經作好了,如今須要實現具體的怎麼去初始化線程池和內存隊列。
新建線程池和內存隊列的包裝類ThreadPoolAndQueueWrapper
這個包裝類用於放在監聽器中,調用它的init()方法,就能夠執行線程池的初始化和隊列的初始化。線程開始提交請求工做。
package com.roncoo.eshop.inventory.thread;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import com.roncoo.eshop.inventory.request.Request;
import com.roncoo.eshop.inventory.request.RequestQueue;
/** * 初始化容器的時候須要初始化線程池和內存隊列 * @author Administrator * */
public class ThreadPoolAndQueueWrapper {
private ExecutorService threadPool = Executors.newFixedThreadPool(10);
public ThreadPoolAndQueueWrapper() {
RequestQueue requestQueue = RequestQueue.getInstance();
//初始化的時候就將內存隊列集合填滿
for (int i = 0; i < 10; i++) {
ArrayBlockingQueue<Request> queue = new ArrayBlockingQueue<Request>(100);
requestQueue.addQueue(queue);
//線程池用於提交 請求處理的工做線程
threadPool.submit(new RequestProcessThread(queue));
}
}
/** * 初始化工做線程池和內存隊列的方法,上來就執行 */
public static void init() {
//保證初始化的時候只能初始化一次線程池和內存隊列
//採用靜態內部類的方式保證線程絕對安全
Singleton.getInstance();
}
private static class Singleton{
private static ThreadPoolAndQueueWrapper instance;
static {
instance = new ThreadPoolAndQueueWrapper();
}
public static ThreadPoolAndQueueWrapper getInstance() {
return instance;
}
}
}
複製代碼
請求隊列的封裝RequestQueue,內部持有請求隊列的集合,提供添加隊列的方法
package com.roncoo.eshop.inventory.request;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.ArrayBlockingQueue;
/** * 請求內存隊列封裝 * @author Administrator * */
public class RequestQueue {
/** * 內存隊列,是一個集合。由於涉及併發,因此使用ArrayBlockingQueeue,隊列中存放的是請求(讀請求和寫請求) */
private List<ArrayBlockingQueue<Request>> queues = new ArrayList<ArrayBlockingQueue<Request>>();
public static RequestQueue getInstance() {
return Singleton.getInstance();
}
/** * 添加一個內存隊列 * @param queue */
public void addQueue(ArrayBlockingQueue<Request> queue) {
this.queues.add(queue);
}
/** * 內部靜態類的方式保證絕對的線程安全 * @author Administrator * */
private static class Singleton {
private static RequestQueue instance;
static {
instance = new RequestQueue();
}
public static RequestQueue getInstance() {
return instance;
}
}
}
複製代碼
須要提交到線程池中的工做線程,用於處理Request請求。而且持有本身的內存隊列
package com.roncoo.eshop.inventory.thread;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.Callable;
import com.roncoo.eshop.inventory.request.Request;
/** * 執行請求的工做線程 * @author Administrator * */
public class RequestProcessThread implements Callable<Boolean>{
/** * 本身監控的內存隊列 */
private ArrayBlockingQueue<Request> queue;
public RequestProcessThread(ArrayBlockingQueue<Request> queue) {
this.queue = queue;
}
/** * 具體的工做流程 */
@Override
public Boolean call() throws Exception {
while(true) {
break;
}
return true;
}
}
複製代碼
請求的封裝Request,是一個接口,之後讀請求和寫請求須要實現這個接口,進行本身的操做邏輯
/** * 請求接口,讀請求和寫請求要實現這個接口 * @author Administrator * */
public interface Request {
}
複製代碼
項目結構
新建實體類ProductInventory
public class ProductInventory {
/** * 商品id */
private Integer productId;
/** * 庫存數量 */
private Long inventoryCnt;
public ProductInventory() {
}
public ProductInventory(Integer productId, Long inventoryCnt) {
this.productId = productId;
this.inventoryCnt = inventoryCnt;
}
public Integer getProductId() {
return productId;
}
public void setProductId(Integer productId) {
this.productId = productId;
}
public Long getInventoryCnt() {
return inventoryCnt;
}
public void setInventoryCnt(Long inventoryCnt) {
this.inventoryCnt = inventoryCnt;
}
}
複製代碼
Request接口中添加業務方法
public interface Request {
void process();
}
複製代碼
庫存寫請求 ProductInventroyWriteRequest
package com.roncoo.eshop.inventory.request;
import com.roncoo.eshop.inventory.model.ProductInventory;
import com.roncoo.eshop.inventory.service.IProductInventoryService;
/** * 庫存寫請求 * 寫請求執行邏輯:Cache Aside Pattern * 1.先刪除緩存 * 2.再更新數據庫 * @author Administrator * */
public class ProductInventoryWriteRequest implements Request{
private ProductInventory productInventory;
private IProductInventoryService productInventoryService;
public ProductInventoryWriteRequest(ProductInventory productInventory, IProductInventoryService productInventoryService) {
this.productInventory = productInventory;
this.productInventoryService = productInventoryService;
}
public void process() {
//1.刪除緩存
productInventoryService.removeCache(productInventory);
//2.更新數據庫
productInventoryService.updateDb(productInventory);
}
}
複製代碼
庫存讀請求 ProductInventroyReadRequest
package com.roncoo.eshop.inventory.request;
import org.springframework.beans.factory.annotation.Autowired;
import com.roncoo.eshop.inventory.model.ProductInventory;
import com.roncoo.eshop.inventory.service.IProductInventoryService;
/** * 商品庫存讀請求 * 1.查詢數據庫 * 2.設置緩存 * @author Administrator * */
public class ProductInventoryReadRequest implements Request{
/** * 商品Id */
private Integer productId;
@Autowired
private IProductInventoryService productInventoryService;
public ProductInventoryReadRequest(Integer productId, IProductInventoryService productInventoryService) {
this.productId = productId;
this.productInventoryService = productInventoryService;
}
@Override
public void process() {
//1.從數據庫中查詢最新商品庫存
ProductInventory productInventory = productInventoryService.findProductInventoryByProductId(productId);
//2.將商品庫存設置到redis緩存中
productInventoryService.setProductInventoryToCache(productInventory);
}
}
複製代碼
項目結構:
這一步的操做主要是將過來的請求根據商品id路由到對應的內存隊列中。接受的參數是請求.
我的理解叫service這個名稱不太好,換個名稱不如叫作接口路由代理
service接口 RequestAsyncServiceImpl
package com.roncoo.eshop.inventory.service.impl;
import java.util.concurrent.ArrayBlockingQueue;
import org.springframework.stereotype.Service;
import com.roncoo.eshop.inventory.request.Request;
import com.roncoo.eshop.inventory.request.RequestQueue;
import com.roncoo.eshop.inventory.service.RequestAsyncService;
/** * 處理請求的異步service * 1.將請求路由到不一樣的內存隊列 * 2.將請求放入到內存隊列中 * @author Administrator * */
@Service("requestAsyncService")
public class RequestAsyncServiceImpl implements RequestAsyncService{
@Override
public void process(Request request) {
try {
//作請求的路由,根據每一個請求的商品id,路由到對應的內存隊列中
ArrayBlockingQueue<Request> queue = getRoutingQueue(request.getProductId());
queue.put(request);
} catch (InterruptedException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
}
private ArrayBlockingQueue<Request> getRoutingQueue(Integer productId){
RequestQueue requestQueue = RequestQueue.getInstance();
//先獲取productId的hash值
String key = String.valueOf(productId);
int h;
int hash = (key == null) ?0 : (h = key.hashCode()) ^ (h >>> 16);
// 對hash取模,將hash值路由到指定的內存隊列
// 好比內存隊列大小8
// 用內存隊列的數量對hash值取模以後, 結果必定是在0-7之間
// 任何一個商品id都會被固定路由到一樣的一個內存隊列中去
int index = (requestQueue.queueSize() - 1) & hash;
return requestQueue.getQueue(index);
}
}
複製代碼
public class RequestProcessThread implements Callable<Boolean>{
/** * 本身監控的內存隊列 */
private ArrayBlockingQueue<Request> queue;
public RequestProcessThread(ArrayBlockingQueue<Request> queue) {
this.queue = queue;
}
/** * 具體的工做流程 */
@Override
public Boolean call() throws Exception {
while(true) {
//從本身監控的內存隊列中拿出請求
Request request = queue.take();
//執行操做
request.process();
break;
}
return true;
}
}
複製代碼
工程結構:
主要是讀請求,要考慮在200ms以內不斷循環,從緩存中獲取數據。若是200ms內沒有,再去數據庫查詢
package com.roncoo.eshop.inventory.controller;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Controller;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.ResponseBody;
import com.roncoo.eshop.inventory.Response.Response;
import com.roncoo.eshop.inventory.model.ProductInventory;
import com.roncoo.eshop.inventory.request.ProductInventoryWriteRequest;
import com.roncoo.eshop.inventory.request.Request;
import com.roncoo.eshop.inventory.service.IProductInventoryService;
import com.roncoo.eshop.inventory.service.RequestAsyncService;
/** * 商品庫存controller * @author Administrator * */
@Controller
public class ProductInventoryController {
@Autowired
private RequestAsyncService requestAsyncService;
@Autowired
private IProductInventoryService productInventoryService;
@RequestMapping("/updateProductInventory")
@ResponseBody
public Response updateProductInventory(ProductInventory productInventory) {
Response response = null;
try {
Request request = new ProductInventoryWriteRequest(productInventory, productInventoryService);
requestAsyncService.process(request);
response = new Response(Response.SUCCESS);
} catch (Exception e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
return response;
}
@RequestMapping("/getProductInventory")
@ResponseBody
public ProductInventory getProductInventory(Integer productId) {
ProductInventory productInventory = null;
try {
Request request = new ProductInventoryWriteRequest(productInventory, productInventoryService);
requestAsyncService.process(request);
//把讀請求交給service異步處理之後,須要等待一會
//等待前面庫存更新的操做,同時緩存舒心的操做
//若是等待的時間超過了200ms,那麼就本身去數據庫中查詢
long startTime = System.currentTimeMillis();
long endTime = 0L;
long waitTime = 0L;
while(true) {
if(waitTime > 200) {
break;
}
//嘗試從緩存中獲取數據
productInventory = productInventoryService.getProductInventoryCache(productId);
//若是有數據,就返回數據
if(productInventory != null) {
return productInventory;
}
else {
Thread.sleep(20);
endTime = System.currentTimeMillis();
waitTime = endTime - startTime;
}
}
//若是在規定的時間內(通常是200ms)沒有,那麼就嘗試本身本身去數據庫獲取
productInventory = productInventoryService.findProductInventoryByProductId(productId);
if(productInventory != null) {
return productInventory;
}
} catch (InterruptedException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
return new ProductInventory(productId, -1L);
}
}
複製代碼