文章很長,並且持續更新,建議收藏起來,慢慢讀! Java 高併發 發燒友社羣:瘋狂創客圈(總入口) 奉上如下珍貴的學習資源:html
入大廠 、作架構、大力提高Java 內功 必備的精彩博文 | 2021 秋招漲薪1W + 必備的精彩博文 |
---|---|
1:Redis 分佈式鎖 (圖解-秒懂-史上最全) | 2:Zookeeper 分佈式鎖 (圖解-秒懂-史上最全) |
3: Redis與MySQL雙寫一致性如何保證? (面試必備) | 4: 面試必備:秒殺超賣 解決方案 (史上最全) |
5:面試必備之:Reactor模式 | 6: 10分鐘看懂, Java NIO 底層原理 |
7:TCP/IP(圖解+秒懂+史上最全) | 8:Feign原理 (圖解) |
9:DNS圖解(秒懂 + 史上最全 + 高薪必備) | 10:CDN圖解(秒懂 + 史上最全 + 高薪必備) |
10: 分佈式事務( 圖解 + 史上最全 + 吐血推薦 ) |
Java 面試題 30個專題 , 史上最全 , 面試必刷 | 阿里、京東、美團... 隨意挑、橫着走!!! |
---|---|
1: JVM面試題(史上最強、持續更新、吐血推薦) | 2:Java基礎面試題(史上最全、持續更新、吐血推薦 |
3:架構設計面試題 (史上最全、持續更新、吐血推薦) | 4:設計模式面試題 (史上最全、持續更新、吐血推薦) |
1七、分佈式事務面試題 (史上最全、持續更新、吐血推薦) | 一致性協議 (史上最全) |
2九、多線程面試題(史上最全) | 30、HR面經,過五關斬六將後,當心陰溝翻船! |
9.網絡協議面試題(史上最全、持續更新、吐血推薦) | 更多專題, 請參見【 瘋狂創客圈 高併發 總目錄 】 |
SpringCloud 精彩博文 | |
---|---|
nacos 實戰(史上最全) | sentinel (史上最全+入門教程) |
SpringCloud gateway (史上最全) | 更多專題, 請參見【 瘋狂創客圈 高併發 總目錄 】 |
美團面試題:Redis與MySQL雙寫一致性如何保證?java
這道題其實就是在問緩存和數據庫在雙寫場景下,一致性是如何保證的?mysql
本文將很是全面的,跟你們一塊兒來探討如何回答這個問題。nginx
本文的行文次序,首先介紹集中式緩存的緩存模式和數據一致性,而後介紹 二級緩存的架構和數據一致性,最後介紹 三級緩存的架構和數據一致性git
不吹牛,本文在全網數據一致性的全部博文中,絕對算是史上最全的。github
本文最爲全面的介紹了 redis 與 db 雙寫數據一致性解決方案,web
固然, 會參考了最新的一些文章, 可是解決那些 複製來複制去的bug,面試
另外,本文增長了 L2 、L3 多級緩存的一致性問題redis
本文很是經典,絕對的高分面試必備, 建議邊學習、邊思考,而且必定要實戰算法
- 若是有問題,歡迎來瘋狂創客圈找尼恩和18羅漢門一塊兒交流
- 本文後續也會不斷升級迭代,持續保持史上最全位置。
一致性就是數據保持一致,在分佈式系統中,能夠理解爲多個節點中數據的值是一致的。
緩存能夠提高性能、緩解數據庫壓力,可是使用緩存也會致使數據不一致性的問題。通常咱們是如何使用緩存呢?有三種經典的緩存模式:
Cache-Aside Pattern,即旁路緩存模式,它的提出是爲了儘量地解決緩存與數據庫的數據不一致問題。
Cache-Aside Pattern的讀請求流程以下:
讀的時候,先讀緩存,緩存命中的話,直接返回數據;
緩存沒有命中的話,就去讀數據庫,從數據庫取出數據,放入緩存後,同時返回響應。
Cache-Aside Pattern的寫請求流程以下:
更新的時候,先更新數據庫,而後再刪除緩存。
Read/Write Through模式中,服務端把緩存做爲主要數據存儲。應用程序跟數據庫緩存交互,都是經過抽象緩存層完成的。
Read-Through的簡要讀流程以下
從緩存讀取數據,讀到直接返回
若是讀取不到的話,從數據庫加載,寫入緩存後,再返回響應。
這個簡要流程是否是跟Cache-Aside很像呢?
其實Read-Through就是多了一層Cache-Provider,流程以下:
Read-Through的優勢
Read-Through實際只是在Cache-Aside之上進行了一層封裝,它會讓程序代碼變得更簡潔,同時也減小數據源上的負載。
Write-Through模式下,當發生寫請求時,也是由緩存抽象層完成數據源和緩存數據的更新,流程以下:
Write behind跟Read-Through/Write-Through有類似的地方,都是由Cache Provider來負責緩存和數據庫的讀寫。它兩又有個很大的不一樣:Read/Write Through是同步更新緩存和數據的,Write Behind則是隻更新緩存,不直接更新數據庫,經過批量異步的方式來更新數據庫。
這種方式下,緩存和數據庫的一致性不強,對一致性要求高的系統要謹慎使用。
可是它適合頻繁寫的場景,MySQL的InnoDB Buffer Pool機制就使用到這種模式。
Cache Aside 更新模式實現起來比較簡單,可是須要維護兩個數據存儲:
Read/Write Through 的寫模式須要維護一個數據存儲(緩存),實現起來要複雜一些。
Write Behind Caching 更新模式和Read/Write Through 更新模式相似,區別是Write Behind Caching 更新模式的數據持久化操做是異步的,可是Read/Write Through 更新模式的數據持久化操做是同步的。
Write Behind Caching 的優勢是直接操做內存速度快,屢次操做能夠合併持久化到數據庫。缺點是數據可能會丟失,例如系統斷電等。
有些小夥伴可能會問, Cache-Aside在寫入請求的時候,爲何是刪除緩存而不是更新緩存呢?
咱們在操做緩存的時候,到底應該刪除緩存仍是更新緩存呢?咱們先來看個例子:
操做的次序以下:
線程A先發起一個寫操做,第一步先更新數據庫
線程B再發起一個寫操做,第二步更新了數據庫
如今,因爲網絡等緣由,線程B先更新了緩存, 線程A更新緩存。
這時候,緩存保存的是A的數據(老數據),數據庫保存的是B的數據(新數據),數據不一致了,髒數據出現啦。若是是刪除緩存取代更新緩存則不會出現這個髒數據問題。
更新緩存相對於刪除緩存,還有兩點劣勢:
1 若是你寫入的緩存值,是通過複雜計算才獲得的話。 更新緩存頻率高的話,就浪費性能啦。
2 在寫多讀少的狀況下,數據不少時候還沒被讀取到,又被更新了,這也浪費了性能呢(實際上,寫多的場景,用緩存也不是很划算了)
任何的措施,也不是絕對的好, 只有分場景看是否是適合,更新緩存的措施,也是有用的:
在讀多寫少的場景,價值大。
美團二面:Redis與MySQL雙寫一致性如何保證?
Cache-Aside緩存模式中,有些小夥伴仍是有疑問,在寫入請求的時候,爲何是先操做數據庫呢?爲何不先操做緩存呢?
假設有A、B兩個請求,請求A作更新操做,請求B作查詢讀取操做。
A、B兩個請求的操做流程以下:
醬紫就有問題啦,緩存和數據庫的數據不一致了。
緩存保存的是老數據,數據庫保存的是新數據。所以,Cache-Aside緩存模式,選擇了先操做數據庫而不是先操做緩存。
重要:緩存是經過犧牲強一致性來提升性能的。
這是由CAP理論決定的。緩存系統適用的場景就是非強一致性的場景,它屬於CAP中的AP。
CAP理論,指的是在一個分佈式系統中, Consistency(一致性)、 Availability(可用性)、Partition tolerance(分區容錯性),三者不可得兼。
CAP理論做爲分佈式系統的基礎理論,它描述的是一個分佈式系統在如下三個特性中:
最多知足其中的兩個特性。也就是下圖所描述的。分佈式系統要麼知足CA,要麼CP,要麼AP。沒法同時知足CAP。
I. 什麼是 一致性、可用性和分區容錯性
分區容錯性:指的分佈式系統中的某個節點或者網絡分區出現了故障的時候,整個系統仍然能對外提供知足一致性和可用性的服務。也就是說部分故障不影響總體使用。
事實上咱們在設計分佈式系統是都會考慮到bug,硬件,網絡等各類緣由形成的故障,因此即便部分節點或者網絡出現故障,咱們要求整個系統仍是要繼續使用的
(不繼續使用,至關於只有一個分區,那麼也就沒有後續的一致性和可用性了)
可用性: 一直能夠正常的作讀寫操做。簡單而言就是客戶端一直能夠正常訪問並獲得系統的正常響應。用戶角度來看就是不會出現系統操做失敗或者訪問超時等問題。
一致性:在分佈式系統完成某寫操做後任何讀操做,都應該獲取到該寫操做寫入的那個最新的值。至關於要求分佈式系統中的各節點時時刻刻保持數據的一致性。
因此使用緩存提高性能,就是會有數據更新的延遲。這須要咱們在設計時結合業務仔細思考是否適合用緩存。而後緩存必定要設置過時時間,這個時間過短、或者太長都很差:
可是,經過一些方案優化處理,是能夠保證弱一致性,最終一致性的。
3種方案保證數據庫與緩存的一致性
- 延時雙刪策略
- 刪除緩存重試機制
- 讀取biglog異步刪除緩存
有些小夥伴可能會說,不必定要先操做數據庫呀,採用緩存延時雙刪策略就好啦?
什麼是延時雙刪呢?
1 先刪除緩存
2 再更新數據庫
3 休眠一會(好比1秒),再次刪除緩存。
參考代碼以下:
這個休眠一會,通常多久呢?都是1秒?
這個休眠時間 = 讀業務邏輯數據的耗時 + 幾百毫秒。
爲了確保讀請求結束,寫請求能夠刪除讀請求可能帶來的緩存髒數據。
無論是延時雙刪仍是Cache-Aside的先操做數據庫再刪除緩存,若是第二步的刪除緩存失敗呢?
刪除失敗會致使髒數據哦~
刪除失敗就多刪除幾回呀,保證刪除緩存成功呀~ 因此能夠引入刪除緩存重試機制
寫請求更新數據庫
緩存由於某些緣由,刪除失敗
把刪除失敗的key放到消息隊列
消費消息隊列的消息,獲取要刪除的key
重試刪除緩存操做
重試刪除緩存機制還能夠,就是會形成好多業務代碼入侵。
其實,還能夠經過數據庫的binlog來異步淘汰key。
以mysql爲例 可使用阿里的canal將binlog日誌採集發送到MQ隊列裏面,而後編寫一個簡單的緩存刪除消息者訂閱binlog日誌,根據更新log刪除緩存,而且經過ACK機制確認處理這條更新log,保證數據緩存一致性
PushConsumer爲了保證消息確定消費成功,只有使用方明確表示消費成功,RocketMQ纔會認爲消息消費成功。中途斷電,拋出異常等都不會認爲成功——即都會從新投遞。首先,消費的時候,咱們須要注入一個消費回調,具體sample代碼以下:
consumer.registerMessageListener(new MessageListenerConcurrently() { @Override public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) { System.out.println(Thread.currentThread().getName() + " Receive New Messages: " + msgs); delcache(key);//執行真正刪除 return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;//返回消費成功 } });
業務實現消費回調的時候,當且僅當此回調函數返回ConsumeConcurrentlyStatus.CONSUME_SUCCESS
,RocketMQ纔會認爲這批消息(默認是1條)是消費完成的。
若是這時候消息消費失敗,例如數據庫異常,餘額不足扣款失敗等一切業務認爲消息須要重試的場景,只要返回ConsumeConcurrentlyStatus.RECONSUME_LATER,RocketMQ就會認爲這批消息消費失敗了。
爲了保證消息是確定被至少消費成功一次,RocketMQ會把這批消費失敗的消息重發回Broker(topic不是原topic而是這個消費租的RETRY topic),在延遲的某個時間點(默認是10秒,業務可設置)後,再次投遞到這個ConsumerGroup。而若是一直這樣重複消費都持續失敗到必定次數(默認16次),就會投遞到DLQ死信隊列。應用能夠監控死信隊列來作人工干預。
Pub/Sub功能(means Publish, Subscribe)即發佈及訂閱功能。Pub/Sub是目前普遍使用的通訊模型,它採用事件做爲基本的通訊機制,提供大規模系統所要求的鬆散耦合的交互模式:訂閱者(如客戶端)以事件訂閱的方式表達出它有興趣接收的一個事件或一類事件;發佈者(如服務器)可將訂閱者感興趣的事件隨時通知相關訂閱者。熟悉設計模式的朋友應該瞭解這與23種設計模式中的觀察者模式極爲類似。
Redis經過publish和subscribe命令實現訂閱和發佈的功能。訂閱者能夠經過subscribe向redis server訂閱本身感興趣的消息類型。redis將信息類型稱爲通道(channel)。當發佈者經過publish命令向redis server發送特定類型的信息時,訂閱該消息類型的所有訂閱者都會收到此消息。
可是呢還有個問題, 「若是是主從數據庫呢」?
由於主從DB同步存在延時時間。若是刪除緩存以後,數據同步到備庫以前已經有請求過來時, 「會從備庫中讀到髒數據」,如何解決呢?解決方案以下流程圖:
綜上所述,在分佈式系統中,緩存和數據庫同時存在時,若是有寫操做的時候,「先操做數據庫,再操做緩存」。以下:
1.讀取緩存中是否有相關數據
2.若是緩存中有相關數據value,則返回
3.若是緩存中沒有相關數據,則從數據庫讀取相關數據放入緩存中key->value,再返回
4.若是有更新數據,則先更新數據庫,再刪除緩存
5.爲了保證第四步刪除緩存成功,使用binlog異步刪除
6.若是是主從數據庫,binglog取自於從庫
7.若是是一主多從,每一個從庫都要採集binlog,而後消費端收到最後一臺binlog數據才刪除緩存,或者爲了簡單,收到一次更新log,刪除一次緩存
在不少業務狀況下,咱們都會在系統中加入redis緩存作查詢優化, 使用es 作全文檢索。
若是數據庫數據發生更新,這時候就須要在業務代碼中寫一段同步更新redis的代碼。這種數據同步的代碼跟業務代碼糅合在一塊兒會不太優雅,能不能把這些數據同步的代碼抽出來造成一個獨立的模塊呢,答案是能夠的。
若是你還對SpringBoot
、canal
、RocketMQ
、MySQL
、ElasticSearch
不是很瞭解的話,這裏我爲你們整理個它們的官網網站,以下
這裏主要介紹一下canal,其餘的自行學習。
canal [kə’næl],譯意爲水道/管道/溝渠,主要用途是基於 MySQL 數據庫增量日誌解析,提供增量數據訂閱和消費.。
canal是一個假裝成slave訂閱mysql的binlog,實現數據同步的中間件。
說明:
instance模塊:
到這裏咱們對canal
有了一個初步的認識,接下咱們就進入實戰環節。
對於自建 MySQL
, 須要先開啓 Binlog
寫入功能,配置binlog-format
爲ROW
模式,my.cnf 中配置以下
[mysqld] log-bin=mysql-bin # 開啓 binlog binlog-format=ROW # 選擇 ROW 模式 server_id=1 # 配置 MySQL replaction 須要定義,不要和 canal 的 slaveId 重複
**
注意:**針對阿里雲 RDS for MySQL
, 默認打開了 binlog , 而且帳號默認具備 binlog dump 權限 , 不須要任何權限或者 binlog 設置,能夠直接跳過這一步
受權canal
鏈接 MySQL 帳號具備做爲 MySQL slave
的權限, 若是已有帳戶可直接 使用grant 命令受權。
#建立用戶名和密碼都爲canal CREATE USER canal IDENTIFIED BY 'canal'; GRANT SELECT, REPLICATION SLAVE, REPLICATION CLIENT ON *.* TO 'canal'@'%'; FLUSH PRIVILEGES;
canal提供web ui 進行Server管理、Instance管理。
wget https://github.com/alibaba/canal/releases/download/canal-1.1.4/canal.admin-1.1.4.tar.gz
咱們先配置canal.admin以後。經過web ui來配置 cancal server,這樣使用界面操做很是的方便。
vi conf/application.yml server: port: 8089 spring: jackson: date-format: yyyy-MM-dd HH:mm:ss time-zone: GMT+8 spring.datasource: address: 127.0.0.1:3306 database: canal_manager username: canal password: canal driver-class-name: com.mysql.jdbc.Driver url: jdbc:mysql://${spring.datasource.address}/${spring.datasource.database}?useUnicode=true&characterEncoding=UTF-8&useSSL=false hikari: maximum-pool-size: 30 minimum-idle: 1 canal: adminUser: admin adminPasswd: admin
初始化元數據庫
mysql -h127.0.0.1 -uroot -p # 導入初始化SQL > source conf/canal_manager.sql
canal_manager.sql
sh bin/startup.sh
使用用戶名:admin 密碼爲:123456 登陸
登陸成功,會自動跳轉到以下界面。這時候咱們的canal.admin就搭建成功了。
下載 canal.deployer, 訪問 release 頁面 , 選擇須要的包下載, 如以 1.1.4版本爲例
wget https://github.com/alibaba/canal/releases/download/canal-1.1.4/canal.deployer-1.1.4.tar.gz
解壓完成能夠看到以下結構:
進入conf 目錄。能夠看到以下的配置文件。
咱們先對canal.properties
不作任何修改。
使用canal_local.properties
的配置覆蓋canal.properties
# register ip canal.register.ip = # canal admin config canal.admin.manager = 127.0.0.1:8089 canal.admin.port = 11110 canal.admin.user = admin canal.admin.passwd = 4ACFE3202A5FF5CF467898FC58AAB1D615029441 # admin auto register canal.admin.register.auto = true canal.admin.register.cluster =
使用以下命令啓動canal server
sh bin/startup.sh local
啓動成功。同時咱們在canal.admin web ui中刷新 server 管理,能夠到canal server 已經啓動成功。
這時候咱們的canal.server 搭建已經成功。
選擇Instance 管理-> 新建Instance
填寫 Instance名稱:cms_article
#mysql serverId canal.instance.mysql.slaveId = 1234 #position info,須要改爲本身的數據庫信息 canal.instance.master.address = 127.0.0.1:3306 canal.instance.master.journal.name = canal.instance.master.position = canal.instance.master.timestamp = #canal.instance.standby.address = #canal.instance.standby.journal.name = #canal.instance.standby.position = #canal.instance.standby.timestamp = #username/password,須要改爲本身的數據庫信息 canal.instance.dbUsername = canal canal.instance.dbPassword = canal #改爲本身的數據庫信息(須要監聽的數據庫) canal.instance.defaultDatabaseName = cms-manage canal.instance.connectionCharset = UTF-8 #table regex 須要過濾的表 這裏數據庫的中全部表 canal.instance.filter.regex = .\*\\..\* # MQ 配置 日誌數據會發送到cms_article這個topic上 canal.mq.topic=cms_article # dynamic topic route by schema or table regex #canal.mq.dynamicTopic=mytest1.user,mytest2\\..*,.*\\..* #單分區處理消息 canal.mq.partition=0
咱們這裏爲了演示之建立一張表。
配置好以後,我須要點擊保存。此時在Instances 管理中就能夠看到此時的實例信息。
canal 1.1.1版本以後, 默認支持將canal server接收到的binlog數據直接投遞到MQ, 目前默認支持的MQ系統有:
本案例以RocketMQ
爲例
咱們仍然使用web ui 界面操做。點擊 server 管理 - > 點擊配置
修改配置文件
# ... # 可選項: tcp(默認), kafka, RocketMQ canal.serverMode = RocketMQ # ... # kafka/rocketmq 集羣配置: 192.168.1.117:9092,192.168.1.118:9092,192.168.1.119:9092 canal.mq.servers = 192.168.0.200:9078 canal.mq.retries = 0 # flagMessage模式下能夠調大該值, 但不要超過MQ消息體大小上限 canal.mq.batchSize = 16384 canal.mq.maxRequestSize = 1048576 # flatMessage模式下請將該值改大, 建議50-200 canal.mq.lingerMs = 1 canal.mq.bufferMemory = 33554432 # Canal的batch size, 默認50K, 因爲kafka最大消息體限制請勿超過1M(900K如下) canal.mq.canalBatchSize = 50 # Canal get數據的超時時間, 單位: 毫秒, 空爲不限超時 canal.mq.canalGetTimeout = 100 # 是否爲flat json格式對象 canal.mq.flatMessage = false canal.mq.compressionType = none canal.mq.acks = all # kafka消息投遞是否使用事務 canal.mq.transaction = false
修改好以後保存。會自動重啓。
此時咱們就能夠在rocketmq的控制檯看到一個cms_article topic已經自動建立了。
<dependency> <groupId>com.alibaba.otter</groupId> <artifactId>canal.client</artifactId> <version>1.1.4</version> </dependency> <dependency> <groupId>org.apache.rocketmq</groupId> <artifactId>rocketmq-spring-boot-starter</artifactId> <version>2.0.2</version> </dependency> <!-- 根據我的須要依賴 --> <dependency> <groupId>javax.persistence</groupId> <artifactId>persistence-api</artifactId> </dependency>
package com.crazymaker.springcloud.stock.consumer; import com.alibaba.otter.canal.protocol.FlatMessage; import com.crazymaker.springcloud.common.exception.BusinessException; import com.crazymaker.springcloud.common.util.JsonUtil; import com.crazymaker.springcloud.standard.redis.RedisRepository; import com.google.common.collect.Sets; import lombok.extern.slf4j.Slf4j; import org.springframework.data.redis.connection.RedisConnection; import org.springframework.data.redis.core.RedisTemplate; import org.springframework.data.redis.serializer.RedisSerializer; import org.springframework.util.ReflectionUtils; import javax.annotation.Resource; import javax.persistence.Id; import java.lang.reflect.Field; import java.lang.reflect.ParameterizedType; import java.util.Collection; import java.util.List; import java.util.Map; import java.util.Set; /** * 抽象CanalMQ通用處理服務 **/ @Slf4j public abstract class AbstractCanalMQ2RedisService<T> implements CanalSynService<T> { @Resource private RedisTemplate<String, Object> redisTemplate; @Resource RedisRepository redisRepository; private Class<T> classCache; /** * 獲取Model名稱 * * @return Model名稱 */ protected abstract String getModelName(); @Override public void process(FlatMessage flatMessage) { if (flatMessage.getIsDdl()) { ddl(flatMessage); return; } Set<T> data = getData(flatMessage); if (SQLType.INSERT.equals(flatMessage.getType())) { insert(data); } if (SQLType.UPDATE.equals(flatMessage.getType())) { update(data); } if (SQLType.DELETE.equals(flatMessage.getType())) { delete(data); } } @Override public void ddl(FlatMessage flatMessage) { //TODO : DDL須要同步,刪庫清空,更新字段處理 } @Override public void insert(Collection<T> list) { insertOrUpdate(list); } @Override public void update(Collection<T> list) { insertOrUpdate(list); } private void insertOrUpdate(Collection<T> list) { redisTemplate.executePipelined((RedisConnection redisConnection) -> { for (T data : list) { String key = getWrapRedisKey(data); RedisSerializer keySerializer = redisTemplate.getKeySerializer(); RedisSerializer valueSerializer = redisTemplate.getValueSerializer(); redisConnection.set(keySerializer.serialize(key), valueSerializer.serialize(data)); } return null; }); } @Override public void delete(Collection<T> list) { Set<String> keys = Sets.newHashSetWithExpectedSize(list.size()); for (T data : list) { keys.add(getWrapRedisKey(data)); } //Set<String> keys = list.stream().map(this::getWrapRedisKey).collect(Collectors.toSet()); redisRepository.delAll(keys); } /** * 封裝redis的key * * @param t 原對象 * @return key */ protected String getWrapRedisKey(T t) { // return new StringBuilder() // .append(ApplicationContextHolder.getApplicationName()) // .append(":") // .append(getModelName()) // .append(":") // .append(getIdValue(t)) // .toString(); throw new IllegalStateException( "基類 方法 'getWrapRedisKey' 還沒有實現!"); } /** * 獲取類泛型 * * @return 泛型Class */ protected Class<T> getTypeArguement() { if (classCache == null) { classCache = (Class) ((ParameterizedType) this.getClass().getGenericSuperclass()).getActualTypeArguments()[0]; } return classCache; } /** * 獲取Object標有@Id註解的字段值 * * @param t 對象 * @return id值 */ protected Object getIdValue(T t) { Field fieldOfId = getIdField(); ReflectionUtils.makeAccessible(fieldOfId); return ReflectionUtils.getField(fieldOfId, t); } /** * 獲取Class標有@Id註解的字段名稱 * * @return id字段名稱 */ protected Field getIdField() { Class<T> clz = getTypeArguement(); Field[] fields = clz.getDeclaredFields(); for (Field field : fields) { Id annotation = field.getAnnotation(Id.class); if (annotation != null) { return field; } } log.error("PO類未設置@Id註解"); throw new BusinessException("PO類未設置@Id註解"); } /** * 轉換Canal的FlatMessage中data成泛型對象 * * @param flatMessage Canal發送MQ信息 * @return 泛型對象集合 */ protected Set<T> getData(FlatMessage flatMessage) { List<Map<String, String>> sourceData = flatMessage.getData(); Set<T> targetData = Sets.newHashSetWithExpectedSize(sourceData.size()); for (Map<String, String> map : sourceData) { T t = JsonUtil.mapToPojo(map, getTypeArguement()); targetData.add(t); } return targetData; } }
rocketMQ
是支持廣播消費的,只須要在消費端進行配置便可,默認狀況下使用的是集羣消費,這就意味着若是咱們配置了多個消費者實例,只會有一個實例消費消息。
對於更新Redis來講,一個實例消費消息,完成redis的更新,這就夠了。
package com.crazymaker.springcloud.stock.consumer; import com.alibaba.otter.canal.protocol.FlatMessage; import com.crazymaker.springcloud.seckill.dao.po.SeckillGoodPO; import com.google.common.collect.Sets; import lombok.Data; import lombok.extern.slf4j.Slf4j; import org.apache.rocketmq.spring.annotation.MessageModel; import org.apache.rocketmq.spring.annotation.RocketMQMessageListener; import org.apache.rocketmq.spring.core.RocketMQListener; import org.springframework.stereotype.Service; import java.util.List; import java.util.Map; import java.util.Set; @Slf4j @Service //廣播模式 //@RocketMQMessageListener(topic = "seckillgood", consumerGroup = "UpdateRedis", messageModel = MessageModel.BROADCASTING) //集羣模式 @RocketMQMessageListener(topic = "seckillgood", consumerGroup = "UpdateRedis") @Data public class UpdateRedisGoodConsumer extends AbstractCanalMQ2RedisService<SeckillGoodPO> implements RocketMQListener<FlatMessage> { private String modelName = "seckillgood"; @Override public void onMessage(FlatMessage s) { process(s); } // @Cacheable(cacheNames = {"seckill"}, key = "'seckillgood:' + #goodId") /** * 封裝redis的key * * @param t 原對象 * @return key */ protected String getWrapRedisKey(SeckillGoodPO t) { return new StringBuilder() // .append(ApplicationContextHolder.getApplicationName()) .append("seckill") .append(":") // .append(getModelName()) .append("seckillgood") .append(":") .append(t.getId()) .toString(); } /** * 轉換Canal的FlatMessage中data成泛型對象 * * @param flatMessage Canal發送MQ信息 * @return 泛型對象集合 */ protected Set<SeckillGoodPO> getData(FlatMessage flatMessage) { List<Map<String, String>> sourceData = flatMessage.getData(); Set<SeckillGoodPO> targetData = Sets.newHashSetWithExpectedSize(sourceData.size()); for (Map<String, String> map : sourceData) { SeckillGoodPO po = new SeckillGoodPO(); po.setId(Long.valueOf(map.get("id"))); //省略其餘的屬性 targetData.add(po); } return targetData; } }
根據須要能夠重寫裏面的方法,DDL
處理暫時還沒完成,只是整個Demo,完整的實戰活兒,仍是留給你們本身幹吧。
尼恩的忠實建議:
理論水平的提高,看看視頻、看看書,只有兩個字,就是須要:多看。
實戰水平的提高,只有兩個字,就是須要:多幹。
基於binlog同步的緩存的數據一致性實戰,很是重要,建議你們必定要幹一票。
尼恩的忠實建議:
理論水平的提高,看看視頻、看看書,只有兩個字,就是須要:多看。
實戰水平的提高,只有兩個字,就是須要:多幹。
基於binlog同步的緩存的數據一致性實戰的具體材料、源碼、問題,歡迎來 瘋狂創客圈社羣交流。
高併發Java發燒友社羣 - 瘋狂創客圈 總入口 點擊瞭解詳情:
美團面試題:Redis與MySQL雙寫一致性如何保證?
若是回答完了上面的內容,可以獲得 100分的話,加上下面的回答內容,你就能夠獲得120分,讓面試官有驚奇、驚喜的感受了。
注意:讓面試官有驚奇、驚喜的感受以後,基本面試就很容易經過。
瞭解到了咱們爲何要使用緩存,以及緩存能解決咱們什麼樣的問題。可是使用緩存時也須要注意一些問題:
若是隻是單純的整合Redis緩存,那麼可能出現以下的問題
爲了解決以上可能出現的問題,讓緩存層更穩定,健壯,咱們使用二級緩存架構
1級爲本地緩存,或者進程內的緩存(如 Ehcache) —— 速度快,進程內可用
2級爲集中式緩存(如 Redis)—— 可同時爲多節點提供服務
爲何要引入本地緩存
相對於IO操做 速度快,效率高 相對於Redis Redis是一種優秀的分佈式緩存實現,受限於網卡等緣由,遠水救不了近火
因此:
DB + Redis + LocalCache = 高效存儲,高效訪問
本地緩存通常適合於緩存只讀、量少、高頻率訪問的數據。如秒殺商品數據。
或者每一個部署節點獨立的數據,如長鏈接服務中,每一個部署節點因爲都是維護了不一樣的鏈接,每一個鏈接的數據都是獨立的,而且隨着鏈接的斷開而刪除。若是數據在集羣的不一樣部署節點須要共享和保持一致,則須要使用分佈式緩存來統一存儲,實現應用集羣的全部應用進程都在該統一的分佈式緩存中進行數據存取便可。
本地緩存位於同一個JVM的堆中,相對於分佈式緩存的好處是,故性能更好,減小了跨網絡傳輸,
可是本地緩存因爲佔用 JVM 內存空間 (或者進程的內存空間),故不能進行大數據量的數據存儲。
本地緩存只支持被該應用進程訪問,通常沒法被其餘應用進程訪問,若是對應的數據庫數據,存在數據更新,則須要同步更新不一樣節點的本地緩存副本,來保證數據一致性
本地緩存的更新,複雜度較高而且容易出錯,如基於 Redis 的發佈訂閱機制、或者消息隊列MQ來同步更新各個部署節點。
數據庫 | 本地緩存 | 分佈式緩存 | |
---|---|---|---|
存儲位置 | 存盤,數據不丟失 | 不存盤,以前的數據丟失 | 不存盤,數據丟失 |
持久化 | 能夠 | 不能夠 | 不能夠 |
訪問速度 | 慢 | 最快 | 快 |
可擴展 | 可存在其餘機器的硬盤 | 只能存在本機內存 | 可存在其餘機器的內存 |
使用場景 | 須要實現持久化保存 | 須要快速訪問,但須要考慮內存大小 | 1)須要快速訪問,不須要考慮內存大小 2)須要實現持久化,但會丟失一些數據 3)須要讓緩存集中在一塊兒,訪問任一機器上內存中的數據均可以從緩存中獲得 |
單獨使用本地緩存與集中式緩存,都會有各自的短板。
有這麼一個網站,某個頁面天天的訪問量是 1000萬,每一個頁面從緩存讀取的數據是 50K。緩存數據存放在一個 Redis 服務,機器使用千兆網卡。那麼這個 Redis 一天要承受 500G 的數據流,至關於平均每秒鐘是 5.78M 的數據。而網站通常都會有高峯期和低峯期,兩個時間流量的差別多是百倍以上。咱們假設高峯期每秒要承受的流量比平均值高 50 倍,也就是說高峯期 Redis 服務每秒要傳輸超過 250 兆的數據。請注意這個 250 兆的單位是 byte,而千兆網卡的單位是「bit」 ,你懂了嗎? 這已經遠遠超過 Redis 服務的網卡帶寬。
因此若是你能發現這樣的問題,通常你會這麼作:
若是你採用第2種方法來解決上述的場景中碰到的問題,那麼你最好準備 5 個 Redis 服務來支撐。
在緩存服務這塊成本直接攀升了 5 倍。你有錢固然沒任何問題,可是結構就變得很是複雜了,並且可能你緩存的數據量其實不大,1000 萬高頻次的緩存讀寫 Redis 也能輕鬆應付,但是由於帶寬的問題,你不得不付出 5 倍的成本。
按照80/20原則,若是咱們把20%的熱點數據,放在本地緩存,若是咱們不用每次頁面訪問的時候都去 Redis 讀取數據,那麼 Redis 上的數據流量至少下降 80%的帶寬流量,甚至於一個很小的 Redis 集羣能夠輕鬆應付。
做爲須要超高併發的訪問數據,屬於 20% 的熱點數據
這屬於提早預測靜態熱點數據類型。
具體參參見瘋狂創客圈的 億級 IM中臺實戰
這屬於提早預測靜態熱點數據類型。
還有的是提早不能識別出來的,如電商系統中的熱點商品那就完美了。
經過流計算識別出來的熱點數據,可以動態地實時發現熱點。
這屬於實時預測動態熱點數據類型。因爲數據量大,能夠經過流計算框架 storm 或者 fink 實現,
不夠,此項工做,通常屬於大數據團隊的工做。
第一級緩存使用內存(同時支持 Ehcache 2.x、Ehcache 3.x 、Guava、 Caffeine),第二級緩存使用 Redis(推薦)/Memcached
本地緩存與集中式緩存的結合架構,大體的架構圖,以下:
經過消息隊列,或者其餘廣播模式的發佈訂閱,保持各個一級緩存的數據一致性。
這一點,與Cache-Aside模式不一樣,Cache-Aside只是刪除緩存便可。可是熱點數據,若是刪除,很容易致使緩存擊穿。
對於秒殺這樣的場景,瞬間有十幾萬甚至上百萬的請求要同時讀取商品。若是沒有緩存,每個請求連帶的數據操做都須要應用與數據庫生成connection,而數據庫的最大鏈接數是有限的,一旦超過數據庫會直接宕機。這就是緩存擊穿。
緩存擊穿與 緩存穿透的簡單區別:
緩存擊穿是指數據庫中有數據,可是緩存中沒有,大量的請求打到數據庫;
緩存穿透是指緩存和數據庫中都沒有的數據,而用戶不斷髮起請求,如發起爲id爲「-1」的數據或id爲特別大不存在的數據。這時的用戶極可能是攻擊者,攻擊會致使數據庫壓力過大。
方案1:biglog同步保障數據一致性
方案2:使用程序方式發送更新消息,保障數據一致性
方案1,能夠經過biglog同步,來保障二級緩存的數據一致性,具體的架構以下
rocketMQ
是支持廣播消費的,只須要在消費端進行配置便可,rocketMQ
默認狀況下使用的是集羣消費,這就意味着若是咱們配置了多個消費者實例,只會有一個實例消費消息。
對於更新Redis來講,一個實例消費消息,完成redis的更新,這就夠了。
對於更新Guava或者其餘1級緩存來講,一個實例消費消息,是不夠的,須要每個實例都消息,因此,必須設置 rocketMQ 客戶端的消費模式,爲 廣播模式;
@RocketMQMessageListener(topic = "seckillgood", consumerGroup = "UpdateGuava", messageModel = MessageModel.BROADCASTING)
使用程序方式保障數據一致性的架構,能夠編寫一個通用的2級緩存通用組件,當數據更新的時候,去發送消息,具體的架構以下:
方案2和方案1 的總體區別不大,只不過 方案2 須要本身寫代碼(或者中間組件)發送數據的變化通知。
方案1 的一個優點:能夠和 創建索引等其餘的消費者,共用binlog的消息隊列。
其餘的區別,你們能夠自行探索。
對於高併發的請求,接入層Nginx有着巨大的做用,能反向代理,負載均衡,動靜分離以及和Lua整合,能夠實現請求定向分發等很是有用的功能,同理Nginx層能夠實現緩存的功能
能夠利用接入層Nginx的進程內緩存,緩存極熱數據的高併發訪問,在接入層,當請求過來時,判斷本地緩存中是否存在,若是存在着直接返回請求結果(或者展示靜態資源的數據),這樣的請求不會直接發送到後端服務層
爲了解決以上可能出現的問題,讓緩存層更穩定,健壯,咱們引入三級緩存架構
三級緩存架構 圖: 具體以下圖所示
原文: lua_shared_dict
syntax:lua_shared_dict <name> <size> default: no context: http phase: depends on usage
聲明一個共享內存區域 name,以充當基於 Lua 字典 ngx.shared.<name>
的共享存儲。
lua_shared_dict 指令定義的共享內存老是被當前 Nginx 服務器實例中全部的 Nginx worker 進程所共享。
size 參數接受大小單位,如 k,m:
http { #指定緩存信息 lua_shared_dict seckill_cache 128m; ... }
詳細參見: ngx.shared.DICT
而後在lua腳本中使用:
local shared_memory = ngx.shared.seckill_cache
便可以取到放在共享內存中的數據。對共享內存的操做也是如set ,get 之類。
--優先從緩存獲取,不然訪問上游接口 local seckill_cache = ngx.shared.seckill_cache local goodIdCacheKey = "goodId_" .. goodId local goodCache = seckill_cache:get(goodIdCacheKey) if goodCache == "" or goodCache == nil then ngx.log(ngx.DEBUG,"cache not hited " .. goodId) --回源上游接口,好比Java 後端rest接口 local res = ngx.location.capture("/stock-provider/api/seckill/good/detail/v1", { method = ngx.HTTP_POST, -- args = requestBody , -- 重要:將請求參數,原樣向上遊傳遞 always_forward_body = false, -- 也能夠設置爲false 僅轉發put和post請求方式中的body. }) --返回上游接口的響應體 body goodCache = res.body; --單位爲s seckill_cache:set(goodIdCacheKey, goodCache, 10 * 60 * 60) end ngx.say(goodCache);
ngx.shared.DICT的實現是採用紅黑樹實現,當申請的緩存被佔用完後若是有新數據須要存儲則採用 LRU 算法淘汰掉「多餘」數據。
LRU原理
LRU的設計原理就是,當數據在最近一段時間常常被訪問,那麼它在之後也會常常被訪問。這就意味着,若是常常訪問的數據,咱們須要然其可以快速命中,而不常訪問的數據,咱們在容量超出限制內,要將其淘汰。
L3與L2同樣,都是本地緩存,優勢和缺點以下:
本地緩存只支持被該應用進程訪問,通常沒法被其餘應用進程訪問,若是對應的數據庫數據,存在數據更新,則須要同步更新不一樣節點的本地緩存副本,來保證數據一致性
本地緩存的更新,複雜度較高而且容易出錯,如基於 Redis 的發佈訂閱機制、或者消息隊列MQ來同步更新各個部署節點。
L3級緩存主要用於極熱數據,如秒殺的商品數據(對於秒殺這樣的場景,瞬間有十幾萬甚至上百萬的請求要同時讀取商品。若是沒有命中本地緩存,可能致使緩存擊穿。
緩存擊穿與 緩存穿透的簡單區別:
- 緩存擊穿是指數據庫中有數據,可是緩存中沒有,大量的請求打到數據庫;
- 緩存穿透是指緩存和數據庫中都沒有的數據,而用戶不斷髮起請求,如發起爲id爲「-1」的數據或id爲特別大不存在的數據。這時的用戶極可能是攻擊者,攻擊會致使數據庫壓力過大。
爲了防止緩存擊穿,同時也保持數據一致性,具體的方案爲:
L3級緩存的數據一致性保障以及防止緩存擊穿方案:
1.數據預熱(或者叫預加載)
2.設置熱點數據永遠不過時,經過 ngx.shared.DICT的緩存的LRU機制去淘汰
3.若是緩存主動更新,在快過時以前更新,若有變化,經過訂閱變化的機制,主動本地刷新
4.提供兜底方案,若是本地緩存沒有,則經過後端服務獲取數據,而後緩存起來
L3級緩存的數據一致性實戰,至關重要,建議必定要動手實戰一票。
尼恩的忠實建議:
理論水平的提高,看看視頻、看看書,只有兩個字,就是須要:多看。
實戰水平的提高,只有兩個字,就是須要:多幹。
L3級緩存的數據一致性實戰的具體材料、源碼、問題,歡迎來 瘋狂創客圈社羣交流。
高併發Java發燒友社羣 - 瘋狂創客圈 總入口 點擊瞭解詳情: