本文主要講解項目實戰中秒殺如何解決下面問題:前端
1)實現秒殺異步下單,掌握如何保證生產者&消費者消息不丟失mysql
2)實現防止惡意刷單ios
3)實現防止相同商品重複秒殺nginx
4)實現秒殺下單接口隱藏程序員
5)實現下單接口限流web
用戶在下單的時候,須要基於JWT令牌信息進行登錄人信息認證,肯定當前訂單是屬於誰的。面試
針對秒殺的特殊業務場景,僅僅依靠對象緩存或者頁面靜態化等技術去解決服務端壓力仍是遠遠不夠。redis
對於數據庫壓力仍是很大,因此須要異步下單,異步是最好的解決辦法,但會帶來一些額外的程序上的算法
複雜性。spring
public static void main(String[] args){ SpringApplication.run(SeckillApplication,class,args); } @Bean public TokenDecode tokenDecode(){ return new TokenDecode(); }
/** * 設置 redisTemplate 的序列化設置 * @param redisConnectionFactory * @return */ @Bean public RedisTemplate<Object, Object> redisTemplate(RedisConnectionFactory redisConnectionFactory) { // 1.建立 redisTemplate 模版 RedisTemplate<Object, Object> template = new RedisTemplate<>(); // 2.關聯 redisConnectionFactory template.setConnectionFactory(redisConnectionFactory); // 3.建立 序列化類 GenericToStringSerializer genericToStringSerializer = new GenericToStringSerializer(Object.class); // 4.序列化類,對象映射設置 // 5.設置 value 的轉化格式和 key 的轉化格式 template.setValueSerializer(genericToStringSerializer); template.setKeySerializer(new StringRedisSerializer()); template.afterPropertiesSet(); return template; }
@RestController @CrossOrigin @RequestMapping("/seckillorder") public class SecKillOrderController { @Autowired private TokenDecode tokenDecode; @Autowired private SecKillOrderService secKillOrderService; /** * 秒殺下單 * @param time 當前時間段 * @param id 秒殺商品id * @return */ @RequestMapping("/add") //獲取當前登錄人 String username = tokenDecode.getUserInfo().get("username"); boolean result = secKillOrderService.add(id,time,username); if (result){ return new Result(true, StatusCode.OK,"下單成功"); }else{ return new Result(false,StatusCode.ERROR,"下單失敗"); } } }
public interface SecKillOrderService { /** * 秒殺下單 * @param id 商品id * @param time 時間段 * @param username 登錄人姓名 * @return */ boolean add(Long id, String time, String username); }
當預加載秒殺商品的時候,提早加載每個商品的庫存信息,後續減庫存操做也會先預扣減緩存中的庫存再異步扣減mysql數據。
預扣減庫存會基於redis原子性操做實現
for (SeckillGoods seckillGoods : seckillGoodsList) { redisTemplate.boundHashOps(SECKILL_GOODS_KEY + redisExtName).put(seckillGoods.getId(),seckillGoods); //預加載庫存信息 redisTemplate.OpsForValue(SECKILL_GOODS_STOCK_COUNT_KEY+seckillGoods.getId(),se ckillGoods.getStockCount()); }
業務邏輯:
獲取秒殺商品數據與庫存量數據,若是沒有庫存則拋出異常執行redis預扣減庫存,並獲取扣減以後的庫存值若是扣減完的庫存值<=0, 則刪除redis中對應的商品信息與庫存信息基於mq異步方式完成與mysql數據同步(最終一致性)
注意:庫存數據從redis中取出,轉換成String
@Service public class SecKillOrderServiceImpl implements SecKillOrderService { @Autowired private RedisTemplate redisTemplate; @Autowired private IdWorker idWorker; @Autowired private CustomMessageSender customMessageSender; /** * 秒殺下單 * @param id 商品id * @param time 時間段 * @param username 登錄人姓名 * @return */ @Override public boolean add(Long id, String time, String username) { //獲取商品數據 SeckillGoods goods = (SeckillGoods) redisTemplate.boundHashOps("SeckillGoods_" + time).get(id); String redisStock = (String) redisTemplate.boundValueOps("StockCount_" + goods.getId()).get(); if(StringUtils.isEmpty(redisStock)){ return false; } int value=Integer.parseInt(redisStock); //若是沒有庫存,則直接拋出異常 if(goods==null || value<=0){ return false; } //redis預扣庫存 Long stockCount = redisTemplate.boundValueOps("StockCount_" + id).decrement(); if (stockCount<=0){ //庫存沒了 //刪除商品信息 redisTemplate.boundHashOps("SeckillGoods_" + time).delete(id); //刪除對應的庫存信息 redisTemplate.delete("StockCount_" + goods.getId()); } //有庫存 //若是有庫存,則建立秒殺商品訂單 SeckillOrder seckillOrder = new SeckillOrder(); seckillOrder.setId(idWorker.nextId()); seckillOrder.setUserId(username); seckillOrder.setSellerId(goods.getSellerId()); seckillOrder.setCreateTime(new Date()); seckillOrder.setStatus("0"); //發送消息 return true; } }
按照現有rabbitMQ的相關知識,生產者會發送消息到達消息服務器。可是在實際生產環境下,消息生產者發送的消息頗有可能當到達了消息服務器以後,因爲消息服務器的問題致使消息丟失,如宕機。由於消息服務器默認會將消息存儲在內存中。一旦消息服務器宕機,則消息會產生丟失。所以要保證生產者的消息不丟失,要開始持久化策略。
rabbitMQ持久化: 交換機持久化 隊列持久化 消息持久化
可是若是僅僅只是開啓這兩部分的持久化,也頗有可能形成消息丟失。由於消息服務器頗有可能在持久化的過程當中出現宕機。所以須要經過數據保護機制來保證消息必定會成功進行持久化,不然將一直進行消息發送。
事務機制 事務機制採用類數據庫的事務機制進行數據保護,當消息到達消息服務器,首先會開啓一個事務,接着進 行數據磁盤持久化,只有持久化成功纔會進行事務提交,向消息生產者返回成功通知,消息生產者一旦接收成 功通知則不會再發送此條消息。當出現異常,則返回失敗通知.消息生產者一旦接收失敗通知,則繼續發送該 條消息。 事務機制雖然可以保證數據安全,可是此機制採用的是同步機制,會產生系統間消息阻塞,影響整個系統 的消息吞吐量。從而致使整個系統的性能降低,所以不建議使用。 confirm機制 confirm模式須要基於channel進行設置, 一旦某條消息被投遞到隊列以後,消息隊列就會發送一個確 認信息給生產者,若是隊列與消息是可持久化的, 那麼確認消息會等到消息成功寫入到磁盤以後發出. confirm的性能高,主要得益於它是異步的.生產者在將第一條消息發出以後等待確認消息的同時也能夠 繼續發送後續的消息.當確認消息到達以後,就能夠經過回調方法處理這條確認消息. 若是MQ服務宕機了,則會 返回nack消息. 生產者一樣在回調方法中進行後續處理。
rabbitmq: host: 192.168.200.128 publisher-confirms: true #開啓confirm機制
@Configuration public class RabbitMQConfig { //秒殺商品訂單消息 public static final String SECKILL_ORDER_KEY="seckill_order"; @Bean public Queue queue(){ //開啓隊列持久化 return new Queue(SECKILL_ORDER_KEY,true); } }
@Component public class CustomMessageSender implements RabbitTemplate.ConfirmCallback { static final Logger log = LoggerFactory.getLogger(CustomMessageSender.class); private static final String MESSAGE_CONFIRM="message_confirm"; @Autowired private RabbitTemplate rabbitTemplate; @Autowired private RedisTemplate redisTemplate; public CustomMessageSender(RabbitTemplate rabbitTemplate) { this.rabbitTemplate = rabbitTemplate; rabbitTemplate.setConfirmCallback(this); } @Override public void confirm(CorrelationData correlationData, boolean ack, String cause) { if (ack){ //返回成功通知 //刪除redis中的相關數據 redisTemplate.delete(correlationData.getId()); redisTemplate.delete(MESSAGE_CONFIRM_+correlationData.getId()); }else{ //返回失敗通知 Map<String,String> map = (Map<String,String>)redisTemplate.opsForHash().entries(MESSAGE_CONFIRM_+correlationData.getId()); String exchange = map.get("exchange"); String routingKey = map.get("routingKey"); String sendMessage = map.get("sendMessage"); //從新發送 rabbitTemplate.convertAndSend(exchange,routingKey, JSON.toJSONString(sendMessage)); } } //自定義發送方法 public void sendMessage(String exchange,String routingKey,String message){ //設置消息惟一標識並存入緩存 CorrelationData correlationData = new CorrelationData(UUID.randomUUID().toString()); redisTemplate.opsForValue().set(correlationData.getId(),message); Map<String, String> map = new HashMap<>(); map.put("exchange", exchange); map.put("routingKey", routingKey); map.put("sendMessage", message); redisTemplate.opsForHash().putAll(MESSAGE_CONFIRM_+correlationData.getId(),map) ; //攜帶惟一標識發送消息 rabbitTemplate.convertAndSend(exchange,routingKey,message,correlationData); } }
更改下單業務層實現
@Autowired private CustomMessageSender customMessageSender;
<dependencies> <dependency> <groupId>com.changgou</groupId> <artifactId>changgou_common_db</artifactId> <version>1.0-SNAPSHOT</version> </dependency> <dependency> <groupId>org.springframework.cloud</groupId> <artifactId>spring-cloud-starter-netflix-eureka-client</artifactId> </dependency> <dependency> <groupId>com.changgou</groupId> <artifactId>changgou_service_order_api</artifactId> <version>1.0-SNAPSHOT</version> </dependency> <dependency> <groupId>com.changgou</groupId> <artifactId>changgou_service_seckill_api</artifactId> <version>1.0-SNAPSHOT</version> </dependency> <dependency> <groupId>com.changgou</groupId> <artifactId>changgou_service_goods_api</artifactId> <version>1.0-SNAPSHOT</version> </dependency> <dependency> <groupId>org.springframework.amqp</groupId> <artifactId>spring-rabbit</artifactId> </dependency> </dependencies>
server: port: 9022 spring: jackson: time-zone: GMT+8 application: name: sec-consume datasource: driver-class-name: com.mysql.jdbc.Driver url: jdbc:mysql://192.168.200.128:3306/changgou_seckill? useUnicode=true&characterEncoding=utf- 8&useSSL=false&allowMultiQueries=true&serverTimezone=GMT%2b8 username: root password: root main: allow-bean-definition-overriding: true #當遇到一樣名字的時候,是否容許覆蓋註冊 redis: host: 192.168.200.128 rabbitmq: host: 192.168.200.128 eureka: client: service-url: defaultZone: http://127.0.0.1:6868/eureka instance: prefer-ip-address: true feign: hystrix: enabled: true client: config: default: #配置全局的feign的調用超時時間 若是 有指定的服務配置 默認的配置不會生效 connectTimeout: 60000 # 指定的是 消費者 鏈接服務提供者的鏈接超時時間 是否能鏈接 單位是毫秒 readTimeout: 20000 # 指定的是調用服務提供者的 服務 的超時時間() 單位是毫秒 #hystrix 配置 hystrix: command: default: execution: timeout: #若是enabled設置爲false,則請求超時交給ribbon控制 enabled: true isolation: strategy: SEMAPHORE thread: # 熔斷器超時時間,默認:1000/毫秒 timeoutInMilliseconds: 20000
@SpringBootApplication @EnableDiscoveryClient @MapperScan(basePackages = {"com.changgou.consume.dao"}) public class OrderConsumerApplication { public static void main(String[] args) { SpringApplication.run(OrderConsumerApplication.class,args); } }
按照現有RabbitMQ知識,能夠得知當消息消費者成功接收到消息後,會進行消費並自動通知消息服務器將該條消息刪除。此種方式的實現使用的是消費者自動應答機制。可是此種方式很是的不安全。在生產環境下,當消息消費者接收到消息,頗有可能在處理消息的過程當中出現意外狀況從而致使消息丟失,由於若是使用自動應答機制是很是不安全。咱們須要確保消費者當把消息成功處理完成以後,消息服務器纔會將該條消息刪除。此時要實現這種效果的話,就須要將自動應答轉換爲手動應答,只有在消息消費者將消息處理完,纔會通知消息服務器將該條消息刪除。
rabbitmq: host: 192.168.200.128 listener: simple: acknowledge-mode: manual #手動
@Component public class ConsumeListener { @Autowired private SecKillOrderService secKillOrderService; @RabbitListener(queues = RabbitMQConfig.SECKILL_ORDER_KEY) public void receiveSecKillOrderMessage(Channel channel, Message message){ //轉換消息 SeckillOrder seckillOrder = JSON.parseObject(message.getBody(), SeckillOrder.class); //同步mysql訂單 int rows = secKillOrderService.createOrder(seckillOrder); if (rows>0){ //返回成功通知 try { channel.basicAck(message.getMessageProperties().getDeliveryTag(),false); } catch (IOException e) { e.printStackTrace(); } }else{ //返回失敗通知 try { //第一個boolean true全部消費者都會拒絕這個消息,false表明只有當前消費者拒 絕 //第二個boolean true當前消息會進入到死信隊列,false從新回到原有隊列中,默 認回到頭部 channel.basicNack(message.getMessageProperties().getDeliveryTag(),false,false); } catch (IOException e) { e.printStackTrace(); } } } }
3)定義業務層接口與實現類
public interface ConsumeService { int handleCreateOrder(SeckillOrder order); }
@Service public class SecKillOrderServiceImpl implements SecKillOrderService { @Autowired private SeckillGoodsMapper seckillGoodsMapper; @Autowired private SeckillOrderMapper seckillOrderMapper; /** * 添加訂單 * @param seckillOrder * @return */ @Override @Transactional public int createOrder(SeckillOrder seckillOrder) { int result =seckillGoodsMapper.updateStockCount(seckillOrder.getSeckillId()); if (result<=0){ return result; } result =seckillOrderMapper.insertSelective(seckillOrder); if (result<=0){ return result; }return 1;
數據庫字段unsigned介紹 unsigned-----無符號,修飾int 、char ALTER TABLE tb_seckill_goods MODIFY COLUMN stock_count int(11) UNSIGNED DEFAULT NULL COMMENT '剩餘庫存數';
在秒殺這種高併發的場景下,每秒都有可能產生幾萬甚至十幾萬條消息,若是沒有對消息處理量進行任何限制的話,頗有可能由於過多的消息堆積從而致使消費者宕機的狀況。所以官網建議對每個消息消費者都設置處理消息總數(消息抓取總數)。
消息抓取總數的值,設置過大或者太小都很差,太小的話,會致使整個系統消息吞吐能力降低,形成性能浪費。過大的話,則頗有可能致使消息過多,致使整個系統OOM。所以官網建議每個消費者將該值設置在100-300之間。
1)更新消費者。
//設置預抓取總數 channel.basicQos(300);
@FeignClient(name="seckill") public interface SecKillOrderFeign { /** * 秒殺下單 * @param time 當前時間段 * @param id 秒殺商品id * @return */ @RequestMapping("/seckillorder/add") public Result add(@RequestParam("time") String time, @RequestParam("id") Long id); }
@Controller @CrossOrigin @RequestMapping("/wseckillorder") public class SecKillOrderController { @Autowired private SecKillOrderFeign secKillOrderFeign; /** * 秒殺下單 * @param time 當前時間段 * @param id 秒殺商品id * @return */ @RequestMapping("/add") @ResponseBody public Result add(String time,Long id){ Result result = secKillOrderFeign.add(time, id); return result; } }
在生產場景下,頗有可能會存在某些用戶惡意刷單的狀況出現。這樣的操做對於系統而言,會致使業務出錯、髒數據、後端訪問壓力大等問題的出現。
通常要解決這個問題的話,須要前端進行控制,同時後端也須要進行控制。後端實現能夠經過Redisincrde 原子性遞增來進行解決。
//防止重複提交 private String preventRepeatCommit(String username,Long id) { String redisKey = "seckill_user_" + username+"_id_"+id; long count = redisTemplate.opsForValue().increment(redisKey, 1); if (count == 1){ //設置有效期五分鐘 redisTemplate.expire(redisKey, 5, TimeUnit.MINUTES); return "success"; } if (count>1){ return "fail"; } return "fail"; }
public interface SeckillOrderMapper extends Mapper<SeckillOrder> { /** * 查詢秒殺訂單信息 * @param username * @param id * @return */ @Select("select * from tb_seckill_order where user_id=#{username} and seckill_id=#{id}") SeckillOrder getSecKillOrderByUserNameAndGoodsId(String username, Long id); }
當前雖然能夠確保用戶只有在登陸的狀況下才能夠進行秒殺下單,可是沒法方法有一些惡意的用戶在登陸了以後,猜想秒殺下單的接口地址進行惡意刷單。因此須要對秒殺接口地址進行隱藏。
在用戶每一次點擊搶購的時候,都首先去生成一個隨機數並存入redis,接着用戶攜帶着這個隨機數去訪問秒殺下單,下單接口首先會從redis中獲取該隨機數進行匹配,若是匹配成功,則進行後續下單操做,若是匹配不成功,則認定爲非法訪問。
public class RandomUtil { public static String getRandomString() { int length = 15; String base = "abcdefghijklmnopqrstuvwxyz0123456789"; Random random = new Random(); StringBuffer sb = new StringBuffer(); for (int i = 0; i < length; i++) { int number = random.nextInt(base.length()); sb.append(base.charAt(number)); } return sb.toString(); } public static void main(String[] args) { String randomString = RandomUtil.getRandomString(); }
/** * 接口加密 * 生成隨機數存入redis,10秒有效期 */ @GetMapping("/getToken") @ResponseBody public String getToken(){ String randomString = RandomUtil.getRandomString(); String cookieValue = this.readCookie(); redisTemplate.boundValueOps("randomcode_"+cookieValue).set(randomString,10, TimeUnit.SECONDS); return randomString; } //讀取cookie private String readCookie(){ HttpServletRequest request = ((ServletRequestAttributes) RequestContextHolder.getRequestAttributes()).getRequest(); String cookieValue = CookieUtil.readCookie(request, "uid").get("uid"); return cookieValue; }
修改js下單方法
//秒殺下單 add:function(id){ app.msg ='正在下單'; //獲取隨機數 axios.get("/api/wseckillorder/getToken").then(function (response) { var random=response.data; axios.get("/api/wseckillorder/add? time="+moment(app.dateMenus[0]).format("YYYYMMDDHH")+"&id="+id+"&random="+random ).then(function (response) { if (response.data.flag){ app.msg='搶單成功,即將進入支付!'; }else{app.msg='搶單失敗'; } }) }) }
4.4 秒殺渲染服務更改
修改秒殺渲染服務下單接口
/** * 秒殺下單 * @param time 當前時間段 * @param id 秒殺商品id * @return */ @RequestMapping("/add") @ResponseBody public Result add(String time,Long id,String random){ //校驗密文有效 String randomcode = (String) redisTemplate.boundValueOps("randomcode").get(); if (StringUtils.isEmpty(randomcode) || !random.equals(randomcode)){ return new Result(false, StatusCode.ERROR,"無效訪問"); } Result result = secKillOrderFeign.add(time, id); return result; }
由於秒殺的特殊業務場景,生產場景下,還有可能要對秒殺下單接口進行訪問流量控制,防止過多的請求進入到後端服務器。對於限流的實現方式,咱們以前已經接觸過經過nginx限流,網關限流。可是他們都是對一個大的服務進行訪問限流,若是如今只是要對某一個服務中的接口方法進行限流呢?這裏推薦使用google提供的guava工具包中的RateLimiter進行實現,其內部是基於令牌桶算法進行限流計算
<dependency> <groupId>com.google.guava</groupId> <artifactId>guava</artifactId> <version>28.0-jre</version> </dependency>
@Documented @Target({ElementType.METHOD, ElementType.FIELD, ElementType.TYPE}) @Retention(RetentionPolicy.RUNTIME) public @interface AccessLimit {}
@Component @Scope @Aspect public class AccessLimitAop { @Autowired private HttpServletResponse httpServletResponse; private RateLimiter rateLimiter = RateLimiter.create(20.0); @Pointcut("@annotation(com.changgou.webSecKill.aspect.AccessLimit)") public void limit(){} @Around("limit()") public Object around(ProceedingJoinPoint proceedingJoinPoint){ boolean flag = rateLimiter.tryAcquire(); Object obj = null; try{ if (flag){ obj=proceedingJoinPoint.proceed(); }else{ String errorMessage = JSON.toJSONString(new Result(false,StatusCode.ERROR,"fail")); outMessage(httpServletResponse,errorMessage); } }catch (Throwable throwable) { throwable.printStackTrace(); }return obj; } private void outMessage(HttpServletResponse response, String errorMessage) { ServletOutputStream outputStream = null; try { response.setContentType("application/json;charset=UTF-8"); outputStream = response.getOutputStream(); outputStream.write(errorMessage.getBytes("UTF-8")); } catch (IOException e) { e.printStackTrace(); }finally { try {outputStream.close(); } catch (IOException e) { e.printStackTrace(); } }
歡迎觀看並寫出本身的看法!共同探討!
最後,最近不少小夥伴找我要Linux學習路線圖,因而我根據本身的經驗,利用業餘時間熬夜肝了一個月,整理了一份電子書。不管你是面試仍是自我提高,相信都會對你有幫助!
免費送給你們,只求你們金指給我點個贊!
也但願有小夥伴能加入我,把這份電子書作得更完美!
推薦閱讀: