1、對強一致要求比較高的,應採用實時同步方案,即查詢緩存查詢不到再從DB查詢,保存到緩存;更新緩存時,先更新數據庫,再將緩存的設置過時(建議不要去更新緩存內容,直接設置緩存過時)。html
2、對於併發程度較高的,可採用異步隊列的方式同步,可採用kafka等消息中間件處理消息生產和消費。java
3、使用阿里的同步工具canal,canal實現方式是模擬mysql slave和master的同步機制,監控DB bitlog的日誌更新來觸發緩存的更新,此種方法能夠解放程序員雙手,減小工做量,但在使用時有些侷限性。node
4、採用UDF自定義函數的方式,面對mysql的API進行編程,利用觸發器進行緩存同步,但UDF主要是c/c++語言實現,學習成本高。mysql
@Cacheable(key = "caches[0].name + T(String).valueOf(#userId)",unless = "#result eq null")
@CachePut(key = "caches[0].name + T(String).valueOf(#user.userId)")
@CacheEvict(key = "caches[0].name + T(String).valueOf(#userId)" )
@Caching(evict = {@CacheEvict(key = "caches[0].name + T(String).valueOf(#userId)" ),
@CacheEvict(key = "caches[0].name + #result.name" )})
@Cacheable:查詢時使用,注意Long類型需轉換爲Sting類型,不然會拋異常
@CachePut:更新時使用,使用此註解,必定會從DB上查詢數據
@CacheEvict:刪除時使用;
@Caching:組合用法 具體註解的使用可參考官網
注意:註解方式雖然能使咱們的代碼簡潔,可是註解方式有侷限性:對key的獲取,以及嵌套使用時註解無效,以下所示
public class User { private Long userId; private String name; private Integer age; private String sex; private String addr;
//get set ..... }
service接口c++
public interface UserService { User getUser(Long userId); User updateUser(User user); User getUserByName(String name); int insertUser(User user); User delete (Long userId); }
//實現類
//假設有需求是由name查詢user的,通常咱們是先由name->id,再由id->user,這樣會減小redis緩存的冗餘信息
@Service(value = "userSerivceImpl")
@CacheConfig(cacheNames = "user")
public class UserServiceImpl implements UserService {
private static Logger log = LoggerFactory.getLogger(UserServiceImpl.class);
@Autowired
UserMapper userMapper;
@Cacheable(key = "caches[0].name + T(String).valueOf(#userId)",unless = "#result eq null")
public User getUser(Long userId) {
User user = userMapper.selectByPrimaryKey(userId);
return user;
}
@Cacheable(key = "caches[0].name + #name")
public String getIdByName(String name){
Long userId = userMapper.getIdByName(name);
return String.valueOf(userId);
}
//使用getUserByName方式調用getIdByName 和getUser方法來實現查詢,可是若是用此方式在controller中直接調用
//getUserByName方法,緩存效果是不起做用的,必須是直接調用getIdByName和getUser方法才能起做用
public User getUserByName(String name) {
//經過name 查詢到主鍵 再由主鍵查詢實體
return getUser(Long.valueOf(getIdByName(name)));
}
1.先定義一個RedisCacheConfig類用於生成RedisTemplate和對CacheManager的管理程序員
@Configuration public class RedisCacheConfig extends CachingConfigurerSupport { /*定義緩存數據 key 生成策略的bean *包名+類名+方法名+全部參數 */ @Bean public KeyGenerator keyGenerator() { return new KeyGenerator() { @Override public Object generate(Object target, Method method, Object... params) { StringBuilder sb = new StringBuilder(); sb.append(target.getClass().getName()); sb.append(method.getName()); for (Object obj : params) { sb.append(obj.toString()); } return sb.toString(); } }; } //@Bean public CacheManager cacheManager( @SuppressWarnings("rawtypes") RedisTemplate redisTemplate) { //RedisCacheManager cacheManager = new RedisCacheManager(redisTemplate); //cacheManager.setDefaultExpiration(60);//設置緩存保留時間(seconds) return cacheManager; } //1.項目啓動時此方法先被註冊成bean被spring管理 @Bean public StringRedisTemplate stringRedisTemplate(RedisConnectionFactory factory) { StringRedisTemplate template = new StringRedisTemplate(factory); Jackson2JsonRedisSerializer jackson2JsonRedisSerializer = new Jackson2JsonRedisSerializer(Object.class); ObjectMapper om = new ObjectMapper(); om.setVisibility(PropertyAccessor.ALL, JsonAutoDetect.Visibility.ANY); om.enableDefaultTyping(ObjectMapper.DefaultTyping.NON_FINAL); jackson2JsonRedisSerializer.setObjectMapper(om); template.setValueSerializer(jackson2JsonRedisSerializer); template.afterPropertiesSet(); return template; } @Bean public RedisTemplate<String, Object> redisTemplate(RedisConnectionFactory connectionFactory) { RedisTemplate<String, Object> template = new RedisTemplate<>(); template.setConnectionFactory(connectionFactory); //使用Jackson2JsonRedisSerializer來序列化和反序列化redis的value值 Jackson2JsonRedisSerializer serializer = new Jackson2JsonRedisSerializer(Object.class); System.out.println("==============obj:"+Object.class.getName()); ObjectMapper mapper = new ObjectMapper(); mapper.setVisibility(PropertyAccessor.ALL, JsonAutoDetect.Visibility.ANY); mapper.enableDefaultTyping(ObjectMapper.DefaultTyping.NON_FINAL); serializer.setObjectMapper(mapper); template.setValueSerializer(serializer); //使用StringRedisSerializer來序列化和反序列化redis的key值 template.setKeySerializer(new StringRedisSerializer()); template.afterPropertiesSet(); return template; } }
2.定義一個redisUtil類用於存取緩存值redis
@Component public class RedisCacheUtil { @Autowired private StringRedisTemplate stringRedisTemplate; @Autowired private RedisTemplate<String, Object> redisTemplate; /** * 存儲字符串 * @param key string類型的key * @param value String類型的value */ public void set(String key, String value) { stringRedisTemplate.opsForValue().set(key, value); } /** * 存儲對象 * @param key String類型的key * @param value Object類型的value */ public void set(String key, Object value) { redisTemplate.opsForValue().set(key, value); } /** * 存儲對象 * @param key String類型的key * @param value Object類型的value */ public void set(String key, Object value,Long timeOut) { redisTemplate.opsForValue().set(key, value,timeOut, TimeUnit.SECONDS); } /** * 根據key獲取字符串數據 * @param key * @return */ public String getValue(String key) { return stringRedisTemplate.opsForValue().get(key); } // public Object getValue(String key) { // return redisTemplate.opsForValue().get(key); // } /** * 根據key獲取對象 * @param key * @return */ public Object getValueOfObject(String key) { return redisTemplate.opsForValue().get(key); } /** * 根據key刪除緩存信息 * @param key */ public void delete(String key) { redisTemplate.delete(key); } /** * 查詢key是否存在 * @param key * @return */ @SuppressWarnings("unchecked") public boolean exists(String key) { return redisTemplate.hasKey(key); } }
3.實現類spring
/** * Created by yexin on 2017/9/8. * * 在Impl基礎上+ 防止緩存雪崩和緩存穿透功能 */ @Service(value = "userServiceImpl4") public class UserServiceImpl4 implements UserService { @Autowired UserMapper userMapper; @Autowired RedisCacheUtil redisCacheUtil; @Value("${timeOut}") private long timeOut; @Override public User getUser(Long userId) { String key = "user" + userId; User user = (User) redisCacheUtil.getValueOfObject(key); String keySign = key + "_sign"; String valueSign = redisCacheUtil.getValue(keySign); if(user == null){//防止第一次查詢時返回時空結果 //防止緩存穿透 if(redisCacheUtil.exists(key)){ return null; } user = userMapper.selectByPrimaryKey(userId); redisCacheUtil.set(key,user); redisCacheUtil.set(keySign,"1",timeOut *(new Random().nextInt(10) + 1)); // redisCacheUtil.set(keySign,"1",0L); //過時時間不能設置爲0,必須比0大的數 return user; } if(valueSign != null){ return user; }else { //設置標記的實效時間 Long tt = timeOut * (new Random().nextInt(10) + 1); System.out.println("tt:"+tt); redisCacheUtil.set(keySign,"1",tt); //異步處理緩存更新 應對與高併發的狀況,會產生髒讀的狀況 ThreadPoolUtil.getExecutorService().execute(new Runnable(){ public void run() { // System.out.println("-----執行異步操做-----"); User user1 = userMapper.selectByPrimaryKey(userId); redisCacheUtil.set(key,user1); } }); // new Thread(){ // public void run() { //應對與高併發的狀況,會產生髒讀的狀況 // System.out.println("-----執行異步操做-----"); // User user1 = userMapper.selectByPrimaryKey(userId); // redisCacheUtil.set(key,user1); // } // }.start(); } return user; } }
異步實現經過kafka做爲消息隊列實現,異步只針對更新操做,查詢無需異步,實現類以下sql
1.pom文件需依賴數據庫
<dependency> <groupId>org.springframework.cloud</groupId> <artifactId>spring-cloud-starter-stream-kafka</artifactId>
</dependency>
2.生產着代碼
@EnableBinding(Source.class) public class SendService { @Autowired private Source source; public void sendMessage(String msg) { try{ source.output().send(MessageBuilder.withPayload(msg).build()); } catch (Exception e) { e.printStackTrace(); } } //接受的是一個實體類,具體配置在application.yml public void sendMessage(TransMsg msg) { try { //MessageBuilder.withPayload(msg).setHeader(KafkaHeaders.TOPIC,"111111").build(); source.output().send(MessageBuilder.withPayload(msg).build()); } catch (Exception e) { e.printStackTrace(); } } }
3.消費者代碼
@EnableBinding(Sink.class) public class MsgSink { @Resource(name = "userSerivceImpl3") UserService userService; @StreamListener(Sink.INPUT) public void process(TransMsg<?> msg) throws NoSuchMethodException, InvocationTargetException, IllegalAccessException, ClassNotFoundException { System.out.println("sink......"+msg); System.out.println("opt db strat ----"); userService.updateUser((User) msg.getParams()); System.out.println("執行db結束------"); } }
4.application.yml配置
spring: application: name: demo-provider redis: database: 0 host: 192.168.252.128 #host: localhost port: 6379 password: pool: max-active: 50 max-wait: -1 max-idle: 50 timeout: 0 #kafka cloud: stream: kafka: binder: brokers: 192.168.252.128:9092 zk-nodes: 192.168.252.128:2181 minPartitionCount: 1 autoCreateTopics: true autoAddPartitions: true bindings: input: destination: topic-02 # content-type: application/json content-type: application/x-java-object #此種類型配置在消費端接受到的爲一個實體類 group: t1 consumer: concurrency: 1 partitioned: false output: destination: topic-02 content-type: application/x-java-object producer: partitionCount: 1 instance-count: 1 instance-index: 0
5.實現類
@Service(value = "userServiceImpl2") public class UserServiceImpl2 implements UserService{ @Autowired UserMapper userMapper; @Autowired RedisCacheUtil redisCacheUtil; private static Logger log = LoggerFactory.getLogger(UserServiceImpl.class); @Autowired SendService sendService; public User updateUser(User user) { System.out.println(" impl2 active "); String key = "user"+ user.getUserId(); System.out.println("key:"+key); //是否存在key if(!redisCacheUtil.exists(key)){ return userMapper.updateByPrimaryKeySelective(user) == 1 ? user : null; } /* 更新key對應的value 更新隊列 */ User user1 = (User)redisCacheUtil.getValueOfObject(key); try { redisCacheUtil.set(key,user); TransMsg<User> msg = new TransMsg<User>(key,user,this.getClass().getName(),"updateUser",user); sendService.sendMessage(msg); }catch (Exception e){ redisCacheUtil.set(key,user1); } return user; } }
注意:kafka與zookeeper的配置在此不介紹
先要安裝canal,配置canal的example文件等,配置暫不介紹
package org.example.canal; import com.alibaba.fastjson.JSONObject; import com.alibaba.otter.canal.client.CanalConnector; import com.alibaba.otter.canal.client.CanalConnectors; import com.alibaba.otter.canal.common.utils.AddressUtils; import com.alibaba.otter.canal.protocol.Message; import com.alibaba.otter.canal.protocol.CanalEntry.Column; import com.alibaba.otter.canal.protocol.CanalEntry.Entry; import com.alibaba.otter.canal.protocol.CanalEntry.EntryType; import com.alibaba.otter.canal.protocol.CanalEntry.EventType; import com.alibaba.otter.canal.protocol.CanalEntry.RowChange; import com.alibaba.otter.canal.protocol.CanalEntry.RowData; import org.example.canal.util.RedisUtil; import java.net.InetSocketAddress; import java.util.List; public class CanalClient { public static void main(String[] args) { // 建立連接 CanalConnector connector = CanalConnectors.newSingleConnector(new InetSocketAddress(AddressUtils.getHostIp(), 11111), "example", "", ""); int batchSize = 1000; try { connector.connect(); connector.subscribe(".*\\..*"); connector.rollback(); while (true) { Message message = connector.getWithoutAck(batchSize); // 獲取指定數量的數據 long batchId = message.getId(); int size = message.getEntries().size(); if (batchId == -1 || size == 0) { try { Thread.sleep(1000); } catch (InterruptedException e) { e.printStackTrace(); } } else { printEntry(message.getEntries()); } connector.ack(batchId); // 提交確認 // connector.rollback(batchId); // 處理失敗, 回滾數據 } } finally { connector.disconnect(); } } private static void printEntry( List<Entry> entrys) { for (Entry entry : entrys) { if (entry.getEntryType() == EntryType.TRANSACTIONBEGIN || entry.getEntryType() == EntryType.TRANSACTIONEND) { continue; } RowChange rowChage = null; try { System.out.println("tablename:"+entry.getHeaderOrBuilder().getTableName()); rowChage = RowChange.parseFrom(entry.getStoreValue()); } catch (Exception e) { throw new RuntimeException("ERROR ## parser of eromanga-event has an error , data:" + entry.toString(), e); } EventType eventType = rowChage.getEventType(); System.out.println(String.format("================> binlog[%s:%s] , name[%s,%s] , eventType : %s", entry.getHeader().getLogfileName(), entry.getHeader().getLogfileOffset(), entry.getHeader().getSchemaName(), entry.getHeader().getTableName(), eventType)); for (RowData rowData : rowChage.getRowDatasList()) { if (eventType == EventType.DELETE) { redisDelete(rowData.getBeforeColumnsList()); } else if (eventType == EventType.INSERT) { redisInsert(rowData.getAfterColumnsList()); } else { System.out.println("-------> before"); printColumn(rowData.getBeforeColumnsList()); System.out.println("-------> after"); redisUpdate(rowData.getAfterColumnsList()); } } } } private static void printColumn( List<Column> columns) { for (Column column : columns) { System.out.println(column.getName() + " : " + column.getValue() + " update=" + column.getUpdated()); } } private static void redisInsert( List<Column> columns){ JSONObject json=new JSONObject(); for (Column column : columns) { json.put(column.getName(), column.getValue()); } if(columns.size()>0){ RedisUtil.stringSet("user:"+ columns.get(0).getValue(),json.toJSONString()); } } private static void redisUpdate( List<Column> columns){ JSONObject json=new JSONObject(); for (Column column : columns) { json.put(column.getName(), column.getValue()); } if(columns.size()>0){ RedisUtil.stringSet("user:"+ columns.get(0).getValue(),json.toJSONString()); } } private static void redisDelete( List<Column> columns){ JSONObject json=new JSONObject(); for (Column column : columns) { json.put(column.getName(), column.getValue()); } if(columns.size()>0){ RedisUtil.delKey("user:"+ columns.get(0).getValue()); } } }
package org.example.canal.util; import redis.clients.jedis.Jedis; import redis.clients.jedis.JedisPool; import redis.clients.jedis.JedisPoolConfig; public class RedisUtil { // Redis服務器IP private static String ADDR = "192.168.252.128"; // Redis的端口號 private static int PORT = 6379; // 訪問密碼 //private static String AUTH = "admin"; // 可用鏈接實例的最大數目,默認值爲8; // 若是賦值爲-1,則表示不限制;若是pool已經分配了maxActive個jedis實例,則此時pool的狀態爲exhausted(耗盡)。 private static int MAX_ACTIVE = 1024; // 控制一個pool最多有多少個狀態爲idle(空閒的)的jedis實例,默認值也是8。 private static int MAX_IDLE = 200; // 等待可用鏈接的最大時間,單位毫秒,默認值爲-1,表示永不超時。若是超過等待時間,則直接拋出JedisConnectionException; private static int MAX_WAIT = 10000; // 過時時間 protected static int expireTime = 60 * 60 *24; // 鏈接池 protected static JedisPool pool; static { JedisPoolConfig config = new JedisPoolConfig(); //最大鏈接數 config.setMaxTotal(MAX_ACTIVE); //最多空閒實例 config.setMaxIdle(MAX_IDLE); //超時時間 config.setMaxWaitMillis(MAX_WAIT); // config.setTestOnBorrow(false); pool = new JedisPool(config, ADDR, PORT, 1000); } /** * 獲取jedis實例 */ protected static synchronized Jedis getJedis() { Jedis jedis = null; try { jedis = pool.getResource(); } catch (Exception e) { e.printStackTrace(); if (jedis != null) { pool.returnBrokenResource(jedis); } } return jedis; } /** * 釋放jedis資源 * @param jedis * @param isBroken */ protected static void closeResource(Jedis jedis, boolean isBroken) { try { if (isBroken) { pool.returnBrokenResource(jedis); } else { pool.returnResource(jedis); } } catch (Exception e) { } } /** * 是否存在key * @param key */ public static boolean existKey(String key) { Jedis jedis = null; boolean isBroken = false; try { jedis = getJedis(); jedis.select(0); return jedis.exists(key); } catch (Exception e) { isBroken = true; } finally { closeResource(jedis, isBroken); } return false; } /** * 刪除key * @param key */ public static void delKey(String key) { Jedis jedis = null; boolean isBroken = false; try { jedis = getJedis(); jedis.select(0); jedis.del(key); } catch (Exception e) { isBroken = true; } finally { closeResource(jedis, isBroken); } } /** * 取得key的值 * @param key */ public static String stringGet(String key) { Jedis jedis = null; boolean isBroken = false; String lastVal = null; try { jedis = getJedis(); jedis.select(0); lastVal = jedis.get(key); jedis.expire(key, expireTime); } catch (Exception e) { isBroken = true; } finally { closeResource(jedis, isBroken); } return lastVal; } /** * 添加string數據 * @param key * @param value */ public static String stringSet(String key, String value) { Jedis jedis = null; boolean isBroken = false; String lastVal = null; try { jedis = getJedis(); jedis.select(0); lastVal = jedis.set(key, value); jedis.expire(key, expireTime); } catch (Exception e) { e.printStackTrace(); isBroken = true; } finally { closeResource(jedis, isBroken); } return lastVal; } /** * 添加hash數據 * @param key * @param field * @param value */ public static void hashSet(String key, String field, String value) { boolean isBroken = false; Jedis jedis = null; try { jedis = getJedis(); if (jedis != null) { jedis.select(0); jedis.hset(key, field, value); jedis.expire(key, expireTime); } } catch (Exception e) { isBroken = true; } finally { closeResource(jedis, isBroken); } } }
穿透:頻繁查詢一個不存在的數據,因爲緩存不命中,每次都要查詢持久層。從而失去緩存的意義。
解決辦法: 持久層查詢不到就緩存空結果,查詢時先判斷緩存中是否exists(key) ,若是有直接返回空,沒有則查詢後返回,
注意insert時需清除查詢的key,不然即使DB中有值也查詢不到(固然也能夠設置空緩存的過時時間)
雪崩:緩存大量失效的時候,引起大量查詢數據庫。
解決辦法:①用鎖/分佈式鎖或者隊列串行訪問
②緩存失效時間均勻分佈
熱點key:某個key訪問很是頻繁,當key失效的時候有打量線程來構建緩存,致使負載增長,系統崩潰。
解決辦法:
①使用鎖,單機用synchronized,lock等,分佈式用分佈式鎖。
②緩存過時時間不設置,而是設置在key對應的value裏。若是檢測到存的時間超過過時時間則異步更新緩存。
③在value設置一個比過時時間t0小的過時時間值t1,當t1過時的時候,延長t1並作更新緩存操做。
4設置標籤緩存,標籤緩存設置過時時間,標籤緩存過時後,需異步地更新實際緩存 具體參照userServiceImpl4的處理方式
1、查詢redis緩存時,通常查詢若是以非id方式查詢,建議先由條件查詢到id,再由id查詢pojo
2、異步kafka在消費端接受信息後,該怎麼識別處理那張表,調用哪一個方法,此問題暫時還沒解決
3、比較簡單的redis緩存,推薦使用canal
參考文檔
http://blog.csdn.net/fly_time2012/article/details/50751316
http://blog.csdn.net/kkgbn/article/details/60576477
http://www.cnblogs.com/fidelQuan/p/4543387.html