分佈式秒殺

通常在具體的業務中,平臺方會發布秒殺席位個數,秒殺的時間段,讓各個商家報名,將本身的產品參與秒殺活動。這裏將同事畫的一張圖放上來,大體是這麼一個流程。關於秒殺原理能夠參考單機秒殺系統樣例 redis

這裏面關於Nginx的限流能夠參考高併發之Nginx的限流 數據庫

咱們這裏在秒殺的前一天將全部的批准參加秒殺的商家商品數據導入到redis的商品隊列中,這裏咱們使用噹噹的elastic-job來實現,關於elastic-job的使用,能夠參考分佈式調度Elastic-Job攻略 服務器

這裏是以商家的服務來當成一件商品來處理的,因此service能夠理解成商品。這裏有一個區位的概念,即秒殺當天商家能夠報名的全部時間段,咱們稱之爲區位,好比0點~2點,2點~4點等等。區位的實體類字段以下。併發

/**
 * 秒殺區位
 */
@Data
@AllArgsConstructor
@NoArgsConstructor
public class Location {
    private Long id; //區位id
    private LocalDate date; //日期
    private TimeSegment timeSegment; //時間段
    private int locationNumber; //區位數量,便可以容納多少商家報名
    private BigDecimal baseCost; //商家參與活動的最低繳納金
    private double sellPercent;  //銷售百分比繳納率
    private boolean releaseLocation; //是否發佈
}
/**
 * 秒殺服務
 */
@RequiredArgsConstructor
@NoArgsConstructor
public class Service {
    @Getter
    @Setter
    @NonNull
    private Long id; //服務id
    @Getter
    @Setter
    @NonNull
    private Long signUpId; //報名id
    @Getter
    @Setter
    @NonNull
    private String serviceCode; //服務編碼
    @Getter
    @Setter
    @NonNull
    private String serviceName; //服務名稱
    @Getter
    @Setter
    @NonNull
    private String serviceLevel; //服務分類
    @Getter
    @Setter
    @NonNull
    private Price price; //價格,包含普通價格和秒殺價格
    @Getter
    @Setter
    @NonNull
    private int totalCount; //參與秒殺的總數
    @Getter
    @Setter
    @NonNull
    private int limitCount; //單個用戶能夠購買該服務的數量
    @Getter
    @Setter
    @NonNull
    private TimeSegment timeSegment; //時間段
    @Getter
    @Setter
    @NonNull
    private CheckStatus checkStatus; //平臺對該秒殺的審覈狀態
    @Getter
    private Lock lock = new ReentrantLock(); //重入鎖
    @Getter
    private Condition condition = lock.newCondition(); //重入鎖條件
}

商家商品(服務)數據導入到redis的商品隊列中app

/**
 * 在天天的0點開始處理,查看後一天是否有秒殺活動
 */
@Slf4j
@Component
@ElasticSimpleJob(cron="0 0 0 * * ?",jobName="loadRedis",shardingTotalCount=2,jobParameter="日期",shardingItemParameters="0=Load0,1=Load1")
public class LoadDataToRedisJob implements SimpleJob {
    @Autowired
    private RedisService redisService;
    @Autowired
    private DataDao dataDao;

    @Override
    public void execute(ShardingContext shardingContext) {
        //獲取後一天全部的秒殺區位
        List<Location> locationNextDayList = dataDao.findJobToday(LocalDate.now().plusDays(1));
        if (!CollectionUtils.isEmpty(locationNextDayList)) {
            //獲取一天全部的秒殺區位時間段
            List<TimeSegment> segmentList = locationNextDayList.stream().map(location -> location.getTimeSegment())
                    .collect(Collectors.toList());
            switch (shardingContext.getShardingItem()) {
                case 0:
                    //獲取每一個時間段內的全部參與秒殺的服務
                    segmentList.stream().map(timeSegment -> {
                        List<Service> serviceInSegment = dataDao.findServiceInSegment(timeSegment);
                        serviceInSegment.stream().forEach(service -> service.setTimeSegment(timeSegment));
                        return serviceInSegment;
                        //扁平化全部服務,統一爲一組List
                    }).flatMap(services -> services.stream()).forEach(service -> {
                        //以服務id以及秒殺時間段組合爲主鍵
                        String key = service.getId() + service.getTimeSegment().toString();
                        //若是redis中存在該主鍵的隊列,則清空隊列
                        if (redisService.exists(key)) {
                            for (int i = 0; i < redisService.llen(key); i++) {
                                redisService.rpop(key);
                            }
                        }
                        //清空後,根據每一個服務的參與總數,將服務按總數量推送到該主鍵隊列中
                        for (int i = 0; i < service.getTotalCount(); i++) {
                            redisService.lpush(key, JSONObject.toJSONString(service));
                        }
                        log.info(service.getId() + service.getTimeSegment().toString());
                        //以服務id+":count"組合成該服務的總數鍵,若是redis中存在該鍵,則刪除
                        String countKey = service.getId() + ":count";
                        if (redisService.exists(countKey)) {
                            redisService.del(countKey);
                        }
                        //從新將總數放入該鍵的redis中存儲
                        redisService.set(countKey, String.valueOf(service.getTotalCount()));
                    });
                    break;
                case 1:
                    break;
                default:
                    break;
            }
        }

    }
}

將服務導入到redis隊列後,咱們須要設立一個秒殺活動開始的標識,讓秒殺下單隻能在秒殺活動進行中開啓,不在秒殺時間內不容許下單。dom

/**
 * 給秒殺時間點設立開啓標識,天天0點開始,每2小時執行一次
 */
@Slf4j
@Component
@ElasticSimpleJob(cron="0 0 0/2 * * ?",jobName="openTimeSeg",shardingTotalCount=1,jobParameter="日期",shardingItemParameters="0=Open0")
public class OpenTimeSegmentJob implements SimpleJob {
    @Autowired
    private RedisService redisService;
    @Autowired
    private DataDao dataDao;

    @Override
    public void execute(ShardingContext shardingContext) {
        //獲取當天的全部秒殺區位
        List<Location> locationToDayList = dataDao.findJobToday(LocalDate.now());
        //若是當天有秒殺活動
        if (!CollectionUtils.isEmpty(locationToDayList)) {
            //獲取當前時間點,當前時間點不必定是準點
            LocalDateTime now = LocalDateTime.now();
            int year = now.getYear();
            int month = now.getMonthValue();
            int day = now.getDayOfMonth();
            int hour = now.getHour();
            //將當前時間拼裝成整點
            LocalDateTime beginDate = LocalDateTime.of(year, month, day, hour, 0, 0);
            //以整點時間爲基準,在redis中放入開啓秒殺時間段,119分鐘後消失(每一個時間段段爲1小時59分鐘,2小時的最後一分鐘結束該時間段秒殺)
            redisService.set("TimeStart:" + new TimeSegment(beginDate, beginDate.plusMinutes(119)).toString(),
                    "opened",7140);
            log.info(beginDate.toString() + "至" + beginDate.plusMinutes(119).toString() + "秒殺開始");
        }
    }
}

到了秒殺時間,咱們就能夠開始下單了,先定義一個秒殺單的接口異步

public interface SecOrder {
    /**
     * 秒殺下單
     * @param secOrder
     * @return
     */
    public String makeOrder(SecOrder secOrder);

    /**
     * 是否存在該訂單編號的秒殺單
     * @param orderNo
     * @return
     */
    public boolean exitsOrder(String orderNo);

    /**
     * 修改支付狀態
     * @param orderNo
     */
    public void changePayStatus(String orderNo);
}

SecOrder的實現類的各屬性以下。分佈式

/**
 * 服務秒殺單
 */
@Slf4j
@AllArgsConstructor
@NoArgsConstructor
@ServiceSecOrderVersion(value = 1)
public class ServiceSecOrder implements SecOrder {
    @Getter
    @Setter
    private Long id; //訂單id
    @Getter
    @Setter
    private String orderNo; //訂單編號
    @Getter
    @Setter
    private Service service; //訂單服務內容
    @Getter
    @Setter
    private TimeSegment timeSegment; //秒殺時間段
    @Getter
    @Setter
    private int number; //訂單數量
    @Getter
    @Setter
    private BigDecimal amount; //訂單金額
    @Getter
    @Setter
    private AppUser user; //下單人
    @Getter
    @Setter
    private int orderStatus; //訂單狀態
    @Getter
    @Setter
    private LocalDateTime createDate; //建立日期

而後開始下秒殺訂單ide

@Override
public String makeOrder(SecOrder secOrder) {
    RedisService redisService = SpringBootUtil.getBean(RedisService.class);
    IdService idService = SpringBootUtil.getBean(IdService.class);
    MessageSender messageSender = SpringBootUtil.getBean(MessageSender.class);
    //若是當前時間在秒殺時間段以外,不容許秒殺下單
    if (LocalDateTime.now().isBefore(((ServiceSecOrder)secOrder).getTimeSegment().getBeginTime())
    || LocalDateTime.now().isAfter(((ServiceSecOrder)secOrder).getTimeSegment().getEndTime())) {
        throw new RuntimeException("不在秒殺時間段內");
    }
    //因爲測試時間的問題,此處須要屏蔽,等到實際部署時須要恢復
    LocalDateTime now = LocalDateTime.now();
    int year = now.getYear();
    int month = now.getMonthValue();
    int day = now.getDayOfMonth();
    int hour = now.getHour();
    LocalDateTime beginDate = LocalDateTime.of(year,month,day,hour,0,0);
    //從redis中檢查是否有開啓秒殺時間段
    if (!redisService.exists("TimeStart:" + new TimeSegment(beginDate,beginDate.plusMinutes(119)).toString())) {
        throw new RuntimeException("當前時間段無秒殺");
    }
    ((ServiceSecOrder)secOrder).setId(idService.genId());
    if (((ServiceSecOrder)secOrder).getNumber() > ((ServiceSecOrder)secOrder).getService().getLimitCount()) {
        throw new RuntimeException("秒殺數量超出限購");
    }
    AppUser loginAppUser = AppUserUtil.getLoginAppUser();
    AppUser user = new AppUser();
    user.setId(loginAppUser.getId());
    ((ServiceSecOrder)secOrder).setOrderNo(getCode(idService));
    user.setUsername(loginAppUser.getUsername());
    ((ServiceSecOrder)secOrder).setUser(user);
    //設置訂單狀態0表示未支付狀態
    ((ServiceSecOrder)secOrder).setOrderStatus(0);
    ((ServiceSecOrder)secOrder).setCreateDate(LocalDateTime.now());
    //設置用戶秒殺隊列鍵名(每一種服務都有獨立的用戶秒殺隊列)
    //隊列名由User:+服務id+時間段組成
    String key = "User:" + ((ServiceSecOrder)secOrder).getService().getId()
            + ((ServiceSecOrder)secOrder).getTimeSegment().toString();
    String serviceKey = ((ServiceSecOrder)secOrder).getService().getId()
            + ((ServiceSecOrder)secOrder).getService().getTimeSegment().toString();
    //若是服務隊列還有數據,則推送用戶進隊列,不然直接返回秒殺失敗
    if (redisService.llen(serviceKey) > 0) {
        //創建每一個服務對每一個用戶對限制數量對鍵,格式爲——"服務id:用戶id"
        String limitUserKey = ((ServiceSecOrder) secOrder).getService().getId() + ":"
                + ((ServiceSecOrder) secOrder).getUser().getId();
        //若是該鍵存在,獲取該鍵的值(這裏須要考慮分佈式的併發問題的可能)
        //可是有冪等,因此此處不會出現一個用戶同時秒殺一個商品(服務)的多個併發線程存在
        if (redisService.exists(limitUserKey)) {
            String limitCount = redisService.get(limitUserKey);
            //若是該鍵的值達到服務商品的限制數,返回秒殺失敗
            if (Integer.valueOf(limitCount) == ((ServiceSecOrder) secOrder).getService().getLimitCount()) {
                return ((ServiceSecOrder) secOrder).getUser().getUsername() + "秒殺服務"
                        + ((ServiceSecOrder) secOrder).getService().getServiceName() + "失敗";
            }
        }else {  //若是不存在,設置該鍵的值爲0
            redisService.set(limitUserKey,"0");
        }
        //將秒殺用戶id推送到該隊列中
        redisService.lpush(key, ((ServiceSecOrder) secOrder).getUser().getId() + "");

咱們將用戶id推送到redis隊列後就要開始匹配秒殺結果了,由於商品隊列早已經在前一天就推送進去了。高併發

/**
 * 秒殺結果匹配任務,天天0點開始,每2小時執行一次
 */
@Slf4j
@Component
@RabbitListener(queues = {SecendKillMq.SECENDKILL_QUEUE + "_1",
                        SecendKillMq.SECENDKILL_QUEUE + "_2",
                        SecendKillMq.SECENDKILL_QUEUE + "_3",
                        SecendKillMq.SECENDKILL_QUEUE + "_4",
                        SecendKillMq.SECENDKILL_QUEUE + "_5",
                        SecendKillMq.SECENDKILL_QUEUE + "_6",
                        SecendKillMq.SECENDKILL_QUEUE + "_7",
                        SecendKillMq.SECENDKILL_QUEUE + "_8",
                        SecendKillMq.SECENDKILL_QUEUE + "_9",
                        SecendKillMq.SECENDKILL_QUEUE + "_10"})
@ElasticSimpleJob(cron="0 0 0/2 * * ?",jobName="secResult",shardingTotalCount=2,jobParameter="日期",shardingItemParameters="0=SecKill0,1=SecKill1")
public class SecendKillResultJob implements SimpleJob {
    @Autowired
    private RedisService redisService;
    @Autowired
    private DataDao dataDao;
    //爲了跟MQ搭配,喚醒中斷用
    private List<Service> serviceList;

    @Override
    public void execute(ShardingContext shardingContext) {
        //獲取當天全部的秒殺區位
        List<Location> locationTodayList = dataDao.findJobToday(LocalDate.now());
        //若是當天有秒殺活動
        if (!CollectionUtils.isEmpty(locationTodayList)) {
            //獲取一天全部的秒殺區位時間段
            List<TimeSegment> segmentList = locationTodayList.stream().map(location -> location.getTimeSegment())
                    .collect(Collectors.toList());
            switch (shardingContext.getShardingItem()) {
                case 0:
                    //從全部秒殺區位時間段過濾當前秒殺時間段
                    this.serviceList = segmentList.stream().filter(timeSegment -> LocalDateTime.now().isAfter(timeSegment.getBeginTime()) &&
                            LocalDateTime.now().isBefore(timeSegment.getEndTime()))
                            //將時間段轉化成時間段內的秒殺服務
                            .map(timeSegment -> {
                                List<Service> serviceInSegment = dataDao.findServiceInSegment(timeSegment);
                                serviceInSegment.stream().forEach(service -> service.setTimeSegment(timeSegment));
                                return serviceInSegment;
                                //扁平化全部的秒殺服務,將全部當前時間段內的服務放入serviceList屬性中
                                //就是拿出當前時間段內全部參與秒殺的服務
                            }).flatMap(services -> services.stream()).collect(Collectors.toList());
                    //並行化處理全部的秒殺服務
                    int lism = 0;
                    if (serviceList.size() > Runtime.getRuntime().availableProcessors() * 2) {
                        lism = serviceList.size();
                    }else {
                        lism = Runtime.getRuntime().availableProcessors() * 2;
                    }
                    ForkJoinPool forkJoinPool = new ForkJoinPool(lism);
                    try {
                        forkJoinPool.submit(() ->
                        this.serviceList.parallelStream().forEach(service -> {
                            while (true) {
                                try {
                                    service.getLock().lock();
                                    String userKey = "User:" + service.getId() + service.getTimeSegment().toString();
                                    String serviceKey = service.getId() + service.getTimeSegment().toString();
                                    String countKey = service.getId() + ":count";
                                    //若是下秒殺時間內沒有用戶下單該服務,則中斷該服務的並行線程
                                    //若是有用戶下單則喚醒該並行線程
                                    while (redisService.llen(userKey).equals(0L)) {
                                        try {
                                            log.info("用戶隊列無數據,開始中斷");
                                            service.getCondition().await();
                                            LocalDateTime now = LocalDateTime.now();
                                            if (now.isAfter(service.getTimeSegment().getEndTime())) {
                                                break;
                                            }
                                        } catch (InterruptedException e) {
                                            e.printStackTrace();
                                        }
                                    }

到這裏,若是沒有用戶下單,則會進行線程中斷,不會去執行while (true)的無限循環。SecendKillResultJob同時又是RabbitMQ的一個消費者,同時監聽了10個消息隊列,監聽後進行以下處理

@RabbitHandler
public void receice(byte[] data, Channel channel, Message message) throws IOException {
    try {
        //告訴服務器收到這條消息 已經被我消費了 能夠在隊列刪掉;不然消息服務器覺得這條消息沒處理掉 後續還會在發
        channel.basicAck(message.getMessageProperties().getDeliveryTag(),false);
        //收到服務id
        Long serviceId = unSerialize(data);
        log.info(serviceId + "");
        //若是當前秒殺服務列表不爲空
        if (!CollectionUtils.isEmpty(this.serviceList)) {
            //從服務列表中過濾出id爲MQ收取的服務ID的服務
            this.serviceList.stream().filter(service -> service.getId().equals(serviceId))
                    .forEach(service -> {
                        log.info("存在" + service.getId());
                        try {
                            service.getLock().lock();
                            //對該服務所在線程進行喚醒
                            service.getCondition().signalAll();
                        } finally {
                            service.getLock().unlock();
                        }
                    });
        }
    } catch (IOException e) {
        e.printStackTrace();
        //丟棄這條消息
        channel.basicNack(message.getMessageProperties().getDeliveryTag(), false,false);
        log.info("receiver fail");
    }
}

/**
 * 反序列化
 * @param data
 * @return
 */
private Long unSerialize(byte[] data) {
    Input input = null;
    try {
        Kryo kryo = new Kryo();
        input = new Input(new ByteArrayInputStream(data));
        return kryo.readObject(input,Long.class);
    }
    finally {
        input.close();
    }
}

這樣咱們再回到ServiceSecOrder的makeOrder下單方法中,將用戶下單的服務id異步發送到MQ中,去喚醒秒殺結果匹配任務繼續執行。

/**
 * 服務秒殺單
 */
@Slf4j
@AllArgsConstructor
@NoArgsConstructor
@ServiceSecOrderVersion(value = 1)
public class ServiceSecOrder implements SecOrder {
    @Getter
    @Setter
    private Long id; //訂單id
    @Getter
    @Setter
    private String orderNo; //訂單編號
    @Getter
    @Setter
    private Service service; //訂單服務內容
    @Getter
    @Setter
    private TimeSegment timeSegment; //秒殺時間段
    @Getter
    @Setter
    private int number; //訂單數量
    @Getter
    @Setter
    private BigDecimal amount; //訂單金額
    @Getter
    @Setter
    private AppUser user; //下單人
    @Getter
    @Setter
    private int orderStatus; //訂單狀態
    @Getter
    @Setter
    private LocalDateTime createDate; //建立日期

    @Override
    public String makeOrder(SecOrder secOrder) {
        RedisService redisService = SpringBootUtil.getBean(RedisService.class);
        IdService idService = SpringBootUtil.getBean(IdService.class);
        MessageSender messageSender = SpringBootUtil.getBean(MessageSender.class);
        //若是當前時間在秒殺時間段以外,不容許秒殺下單
        if (LocalDateTime.now().isBefore(((ServiceSecOrder)secOrder).getTimeSegment().getBeginTime())
        || LocalDateTime.now().isAfter(((ServiceSecOrder)secOrder).getTimeSegment().getEndTime())) {
            throw new RuntimeException("不在秒殺時間段內");
        }
        //因爲測試時間的問題,此處須要屏蔽,等到實際部署時須要恢復
        LocalDateTime now = LocalDateTime.now();
        int year = now.getYear();
        int month = now.getMonthValue();
        int day = now.getDayOfMonth();
        int hour = now.getHour();
        LocalDateTime beginDate = LocalDateTime.of(year,month,day,hour,0,0);
        //從redis中檢查是否有開啓秒殺時間段
        if (!redisService.exists("TimeStart:" + new TimeSegment(beginDate,beginDate.plusMinutes(119)).toString())) {
            throw new RuntimeException("當前時間段無秒殺");
        }
        ((ServiceSecOrder)secOrder).setId(idService.genId());
        if (((ServiceSecOrder)secOrder).getNumber() > ((ServiceSecOrder)secOrder).getService().getLimitCount()) {
            throw new RuntimeException("秒殺數量超出限購");
        }
        AppUser loginAppUser = AppUserUtil.getLoginAppUser();
        AppUser user = new AppUser();
        user.setId(loginAppUser.getId());
        ((ServiceSecOrder)secOrder).setOrderNo(getCode(idService));
        user.setUsername(loginAppUser.getUsername());
        ((ServiceSecOrder)secOrder).setUser(user);
        //設置訂單狀態0表示未支付狀態
        ((ServiceSecOrder)secOrder).setOrderStatus(0);
        ((ServiceSecOrder)secOrder).setCreateDate(LocalDateTime.now());
        //設置用戶秒殺隊列鍵名(每一種服務都有獨立的用戶秒殺隊列)
        //隊列名由User:+服務id+時間段組成
        String key = "User:" + ((ServiceSecOrder)secOrder).getService().getId()
                + ((ServiceSecOrder)secOrder).getTimeSegment().toString();
        String serviceKey = ((ServiceSecOrder)secOrder).getService().getId()
                + ((ServiceSecOrder)secOrder).getService().getTimeSegment().toString();
        //若是服務隊列還有數據,則推送用戶進隊列,不然直接返回秒殺失敗
        if (redisService.llen(serviceKey) > 0) {
            //創建每一個服務對每一個用戶對限制數量對鍵,格式爲——"服務id:用戶id"
            String limitUserKey = ((ServiceSecOrder) secOrder).getService().getId() + ":"
                    + ((ServiceSecOrder) secOrder).getUser().getId();
            //若是該鍵存在,獲取該鍵的值(這裏須要考慮分佈式的併發問題的可能)
            //可是有冪等,因此此處不會出現一個用戶同時秒殺一個商品(服務)的多個併發線程存在
            if (redisService.exists(limitUserKey)) {
                String limitCount = redisService.get(limitUserKey);
                //若是該鍵的值達到服務商品的限制數,返回秒殺失敗
                if (Integer.valueOf(limitCount) == ((ServiceSecOrder) secOrder).getService().getLimitCount()) {
                    return ((ServiceSecOrder) secOrder).getUser().getUsername() + "秒殺服務"
                            + ((ServiceSecOrder) secOrder).getService().getServiceName() + "失敗";
                }
            }else {  //若是不存在,設置該鍵的值爲0
                redisService.set(limitUserKey,"0");
            }
            //將秒殺用戶id推送到該隊列中
            redisService.lpush(key, ((ServiceSecOrder) secOrder).getUser().getId() + "");
            //喚醒秒殺結果匹配任務繼續執行。
            CompletableFuture.runAsync(() -> {
                messageSender.send(SecendKillMq.MQ_EXCHANGE_SECENDKILL,
                        SecendKillMq.ROUTING_KEY_SECENDKILL,
                        ((ServiceSecOrder) secOrder).getService().getId());
            });

這裏咱們須要先看一段redis-lua代碼,表示若是用戶隊列,服務隊列均有數據,則將其彈出,放入一個hash中做爲匹配的結果,同時扣減服務隊列總數。其意義能夠參考用戶金額的終極解決方案--Redis Lua

@Override
public Object secondKill(String userKey, String serviceKey,String userResult, String countKey,String serviceId) {
    String script = "if redis.call('llen',KEYS[1]) > 0 and redis.call('llen',KEYS[2]) > 0 " +
            "and tonumber(redis.call('get',KEYS[4])) > 0 then " +
            "local userid = redis.call('rpop',KEYS[1])" +
            "redis.call('hset',KEYS[3],userid,redis.call('rpop',KEYS[2])) " +
            "redis.call('decr',KEYS[4]) " +
            "return redis.call('incr',KEYS[5]..':'..userid) else return 0 end";
    return execute(jedis -> jedis.eval(script,5,userKey,serviceKey,userResult,countKey,serviceId));
}

秒殺服務線程喚醒後,繼續執行

/**
 * 秒殺結果匹配任務,天天0點開始,每2小時執行一次
 */
@Slf4j
@Component
@RabbitListener(queues = {SecendKillMq.SECENDKILL_QUEUE + "_1",
                        SecendKillMq.SECENDKILL_QUEUE + "_2",
                        SecendKillMq.SECENDKILL_QUEUE + "_3",
                        SecendKillMq.SECENDKILL_QUEUE + "_4",
                        SecendKillMq.SECENDKILL_QUEUE + "_5",
                        SecendKillMq.SECENDKILL_QUEUE + "_6",
                        SecendKillMq.SECENDKILL_QUEUE + "_7",
                        SecendKillMq.SECENDKILL_QUEUE + "_8",
                        SecendKillMq.SECENDKILL_QUEUE + "_9",
                        SecendKillMq.SECENDKILL_QUEUE + "_10"})
@ElasticSimpleJob(cron="0 0 0/2 * * ?",jobName="secResult",shardingTotalCount=2,jobParameter="日期",shardingItemParameters="0=SecKill0,1=SecKill1")
public class SecendKillResultJob implements SimpleJob {
    @Autowired
    private RedisService redisService;
    @Autowired
    private DataDao dataDao;
    //爲了跟MQ搭配,喚醒中斷用
    private List<Service> serviceList;

    @Override
    public void execute(ShardingContext shardingContext) {
        //獲取當天全部的秒殺區位
        List<Location> locationTodayList = dataDao.findJobToday(LocalDate.now());
        //若是當天有秒殺活動
        if (!CollectionUtils.isEmpty(locationTodayList)) {
            //獲取一天全部的秒殺區位時間段
            List<TimeSegment> segmentList = locationTodayList.stream().map(location -> location.getTimeSegment())
                    .collect(Collectors.toList());
            switch (shardingContext.getShardingItem()) {
                case 0:
                    //從全部秒殺區位時間段過濾當前秒殺時間段
                    this.serviceList = segmentList.stream().filter(timeSegment -> LocalDateTime.now().isAfter(timeSegment.getBeginTime()) &&
                            LocalDateTime.now().isBefore(timeSegment.getEndTime()))
                            //將時間段轉化成時間段內的秒殺服務
                            .map(timeSegment -> {
                                List<Service> serviceInSegment = dataDao.findServiceInSegment(timeSegment);
                                serviceInSegment.stream().forEach(service -> service.setTimeSegment(timeSegment));
                                return serviceInSegment;
                                //扁平化全部的秒殺服務,將全部當前時間段內的服務放入serviceList屬性中
                                //就是拿出當前時間段內全部參與秒殺的服務
                            }).flatMap(services -> services.stream()).collect(Collectors.toList());
                    //並行化處理全部的秒殺服務
                    int lism = 0;
                    if (serviceList.size() > Runtime.getRuntime().availableProcessors() * 2) {
                        lism = serviceList.size();
                    }else {
                        lism = Runtime.getRuntime().availableProcessors() * 2;
                    }
                    ForkJoinPool forkJoinPool = new ForkJoinPool(lism);
                    try {
                        forkJoinPool.submit(() ->
                        this.serviceList.parallelStream().forEach(service -> {
                            while (true) {
                                try {
                                    service.getLock().lock();
                                    String userKey = "User:" + service.getId() + service.getTimeSegment().toString();
                                    String serviceKey = service.getId() + service.getTimeSegment().toString();
                                    String countKey = service.getId() + ":count";
                                    //若是下秒殺時間內沒有用戶下單該服務,則中斷該服務的並行線程
                                    //若是有用戶下單則喚醒該並行線程
                                    while (redisService.llen(userKey).equals(0L)) {
                                        try {
                                            log.info("用戶隊列無數據,開始中斷");
                                            service.getCondition().await();
                                            LocalDateTime now = LocalDateTime.now();
                                            if (now.isAfter(service.getTimeSegment().getEndTime())) {
                                                break;
                                            }
                                        } catch (InterruptedException e) {
                                            e.printStackTrace();
                                        }
                                    }
                                    log.info("中斷被喚醒,繼續運行");
                                    //若是用戶隊列和服務隊列均有數據
                                    while (redisService.llen(userKey) > 0 && redisService.llen(serviceKey) > 0) {
                                        //匹配出秒殺結果,並扣減服務數量,增長用戶秒殺過該服務的數量
                                        redisService.secondKill(userKey, serviceKey, "UserResult" + service.getId(), countKey,String.valueOf(service.getId()));
                                    }
                                    //若是服務隊列爲空,表示被秒殺完了,從用戶隊列彈出用戶,告知秒殺失敗
                                    while (redisService.llen(serviceKey).equals(0L)) {
                                        redisService.hset("UserResult" + service.getId(),redisService.rpop(userKey),"秒殺失敗");
                                        if (redisService.llen(userKey).equals(0L)) {
                                            break;
                                        }
                                    }
                                    //當前時間已經超出了秒殺時間段,結束while(true)無限循環
                                    LocalDateTime now = LocalDateTime.now();
                                    if (now.isAfter(service.getTimeSegment().getEndTime())) {
                                        break;
                                    }
                                } finally {
                                    service.getLock().unlock();
                                }
                            }
                        })).get();
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    } catch (ExecutionException e) {
                        e.printStackTrace();
                    }
                    break;
                case 1:
                    break;
                default:
                    break;
            }
        }
    }

當有用戶秒殺到服務時,或者服務被秒殺完,用戶的下單須要知道本身是否秒殺成功或者秒殺失敗。

/**
 * 服務秒殺單
 */
@Slf4j
@AllArgsConstructor
@NoArgsConstructor
@ServiceSecOrderVersion(value = 1)
public class ServiceSecOrder implements SecOrder {
    @Getter
    @Setter
    private Long id; //訂單id
    @Getter
    @Setter
    private String orderNo; //訂單編號
    @Getter
    @Setter
    private Service service; //訂單服務內容
    @Getter
    @Setter
    private TimeSegment timeSegment; //秒殺時間段
    @Getter
    @Setter
    private int number; //訂單數量
    @Getter
    @Setter
    private BigDecimal amount; //訂單金額
    @Getter
    @Setter
    private AppUser user; //下單人
    @Getter
    @Setter
    private int orderStatus; //訂單狀態
    @Getter
    @Setter
    private LocalDateTime createDate; //建立日期

    @Override
    public String makeOrder(SecOrder secOrder) {
        RedisService redisService = SpringBootUtil.getBean(RedisService.class);
        IdService idService = SpringBootUtil.getBean(IdService.class);
        MessageSender messageSender = SpringBootUtil.getBean(MessageSender.class);
        //若是當前時間在秒殺時間段以外,不容許秒殺下單
        if (LocalDateTime.now().isBefore(((ServiceSecOrder)secOrder).getTimeSegment().getBeginTime())
        || LocalDateTime.now().isAfter(((ServiceSecOrder)secOrder).getTimeSegment().getEndTime())) {
            throw new RuntimeException("不在秒殺時間段內");
        }
        //因爲測試時間的問題,此處須要屏蔽,等到實際部署時須要恢復
        LocalDateTime now = LocalDateTime.now();
        int year = now.getYear();
        int month = now.getMonthValue();
        int day = now.getDayOfMonth();
        int hour = now.getHour();
        LocalDateTime beginDate = LocalDateTime.of(year,month,day,hour,0,0);
        //從redis中檢查是否有開啓秒殺時間段
        if (!redisService.exists("TimeStart:" + new TimeSegment(beginDate,beginDate.plusMinutes(119)).toString())) {
            throw new RuntimeException("當前時間段無秒殺");
        }
        ((ServiceSecOrder)secOrder).setId(idService.genId());
        if (((ServiceSecOrder)secOrder).getNumber() > ((ServiceSecOrder)secOrder).getService().getLimitCount()) {
            throw new RuntimeException("秒殺數量超出限購");
        }
        AppUser loginAppUser = AppUserUtil.getLoginAppUser();
        AppUser user = new AppUser();
        user.setId(loginAppUser.getId());
        ((ServiceSecOrder)secOrder).setOrderNo(getCode(idService));
        user.setUsername(loginAppUser.getUsername());
        ((ServiceSecOrder)secOrder).setUser(user);
        //設置訂單狀態0表示未支付狀態
        ((ServiceSecOrder)secOrder).setOrderStatus(0);
        ((ServiceSecOrder)secOrder).setCreateDate(LocalDateTime.now());
        //設置用戶秒殺隊列鍵名(每一種服務都有獨立的用戶秒殺隊列)
        //隊列名由User:+服務id+時間段組成
        String key = "User:" + ((ServiceSecOrder)secOrder).getService().getId()
                + ((ServiceSecOrder)secOrder).getTimeSegment().toString();
        String serviceKey = ((ServiceSecOrder)secOrder).getService().getId()
                + ((ServiceSecOrder)secOrder).getService().getTimeSegment().toString();
        //若是服務隊列還有數據,則推送用戶進隊列,不然直接返回秒殺失敗
        if (redisService.llen(serviceKey) > 0) {
            //創建每一個服務對每一個用戶對限制數量對鍵,格式爲——"服務id:用戶id"
            String limitUserKey = ((ServiceSecOrder) secOrder).getService().getId() + ":"
                    + ((ServiceSecOrder) secOrder).getUser().getId();
            //若是該鍵存在,獲取該鍵的值(這裏須要考慮分佈式的併發問題的可能)
            //可是有冪等,因此此處不會出現一個用戶同時秒殺一個商品(服務)的多個併發線程存在
            if (redisService.exists(limitUserKey)) {
                String limitCount = redisService.get(limitUserKey);
                //若是該鍵的值達到服務商品的限制數,返回秒殺失敗
                if (Integer.valueOf(limitCount) == ((ServiceSecOrder) secOrder).getService().getLimitCount()) {
                    return ((ServiceSecOrder) secOrder).getUser().getUsername() + "秒殺服務"
                            + ((ServiceSecOrder) secOrder).getService().getServiceName() + "失敗";
                }
            }else {  //若是不存在,設置該鍵的值爲0
                redisService.set(limitUserKey,"0");
            }
            //將秒殺用戶id推送到該隊列中
            redisService.lpush(key, ((ServiceSecOrder) secOrder).getUser().getId() + "");
            //喚醒秒殺結果匹配任務繼續執行。
            CompletableFuture.runAsync(() -> {
                messageSender.send(SecendKillMq.MQ_EXCHANGE_SECENDKILL,
                        SecendKillMq.ROUTING_KEY_SECENDKILL,
                        ((ServiceSecOrder) secOrder).getService().getId());
            });
            LocalDateTime start = LocalDateTime.now();
            //從redis的匹配結果獲取當前用戶的秒殺結果
            Future<String> future = CompletableFuture.supplyAsync(() -> {
                while (true) {
                    if (redisService.hexists("UserResult" + ((ServiceSecOrder) secOrder).getService().getId(),
                            ((ServiceSecOrder) secOrder).getUser().getId() + "")) {
                        return redisService.hget("UserResult" + ((ServiceSecOrder) secOrder).getService().getId(),
                                ((ServiceSecOrder) secOrder).getUser().getId() + "");
                    }
                    if (LocalDateTime.now().isAfter(start.plusSeconds(3))) {
                        return "秒殺失敗";
                    }
                }
            });
            try {
                if (future.get(3000, TimeUnit.MILLISECONDS).equals("秒殺失敗")) {
                    log.info(((ServiceSecOrder) secOrder).getUser().getUsername() + "秒殺服務"
                            + ((ServiceSecOrder) secOrder).getService().getServiceName() + "失敗");
                    return ((ServiceSecOrder) secOrder).getUser().getUsername() + "秒殺服務"
                            + ((ServiceSecOrder) secOrder).getService().getServiceName() + "失敗";
                } else {
                    log.info(((ServiceSecOrder) secOrder).getUser().getUsername() + "秒殺服務"
                            + ((ServiceSecOrder) secOrder).getService().getServiceName() + "成功");
                    SecOrderDao secOrderDao = SpringBootUtil.getBean(SecOrderDao.class);
                    //秒殺成功的將秒殺單存入數據庫
                    secOrderDao.saveServiceSecOrder((ServiceSecOrder) secOrder);
                    //等待支付結果,5分鐘後根據支付與否進行處理
                    //若是未支付,則將被秒殺到的服務從新入服務隊列,並增長服務總數
                    ServicePayBackSender servicePayBackSender = SpringBootUtil.getBean(ServicePayBackSender.class);
                    CompletableFuture.runAsync(() -> {
                        servicePayBackSender.send(SecendKillMq.MQ_EXCHANGE_DEAD,SecendKillMq.ROURING_KEY_DEAD,(ServiceSecOrder) secOrder);
                    });
                    return ((ServiceSecOrder) secOrder).getUser().getUsername() + "秒殺服務"
                            + ((ServiceSecOrder) secOrder).getService().getServiceName() + "成功";
                }
            } catch (Exception e) {
                e.printStackTrace();
                log.info(((ServiceSecOrder) secOrder).getUser().getUsername() + "秒殺服務"
                        + ((ServiceSecOrder) secOrder).getService().getServiceName() + "失敗");
                return ((ServiceSecOrder) secOrder).getUser().getUsername() + "秒殺服務"
                        + ((ServiceSecOrder) secOrder).getService().getServiceName() + "失敗";
            }
        } else {
            return ((ServiceSecOrder) secOrder).getUser().getUsername() + "秒殺服務"
                    + ((ServiceSecOrder) secOrder).getService().getServiceName() + "失敗";
        }
    }

這裏使用了RabbitMQ的延遲隊列,配置以下

/**
 * rabbitmq配置
 *
 */
@Configuration
public class RabbitmqConfig {

   @Bean
   public List<Queue> secendKillQueues() {
      List<Queue> queues = new ArrayList<>();
      for (int i = 1;i < 11;i++) {
         Queue queue = new Queue(SecendKillMq.SECENDKILL_QUEUE + "_" + i);
         queues.add(queue);
      }
      return queues;
   }

   @Bean
   public TopicExchange secendKillExchange() {
      return new TopicExchange(SecendKillMq.MQ_EXCHANGE_SECENDKILL);
   }
// @Bean
// public FanoutExchange secondKillExchange() {
//    return new FanoutExchange(SecendKillMq.MQ_EXCHANGE_SECENDKILL);
// }

   @Bean
   public List<Binding> bingingSecondKill(){
      List<Binding> bindings = new ArrayList<>();
      for (int i = 1;i < 11;i++) {
         Binding binding = BindingBuilder.bind(secendKillQueues().get(i - 1)).to(secendKillExchange())
               .with(SecendKillMq.ROUTING_KEY_SECENDKILL + "_" + i);
         bindings.add(binding);
      }
      return bindings;
   }

   @Bean
   public Queue deadQueue() {
      Map<String,Object> arguments = new HashMap<>();
      arguments.put("x-dead-letter-exchange",SecendKillMq.MQ_EXCHANGE_DEAD);
      arguments.put("x-dead-letter-routing-key",SecendKillMq.ROUTING_KEY_PAYBACK);
      return new Queue(SecendKillMq.DEAD_QUEUE,true,false,false,arguments);
   }

   @Bean
   public DirectExchange deadExchange() {
      return new DirectExchange(SecendKillMq.MQ_EXCHANGE_DEAD);
   }

   @Bean
   public Binding bindingDeadExchange() {
      return BindingBuilder.bind(deadQueue()).to(deadExchange())
            .with(SecendKillMq.ROURING_KEY_DEAD);
   }

   @Bean
   public Queue payBackQueue() {
      return new Queue(SecendKillMq.PAYBACK_QUEUE,true,false,false);
   }

   @Bean
   public Binding bindingPayBack() {
      return BindingBuilder.bind(payBackQueue()).to(deadExchange())
            .with(SecendKillMq.ROUTING_KEY_PAYBACK);
   }

}

消息生產者ServicePayBackSender以下

@Slf4j
@Component
public class ServicePayBackSender implements RabbitTemplate.ConfirmCallback,RabbitTemplate.ReturnCallback {
    @Autowired
    private RabbitTemplate rabbitTemplate;

    public void send(String exchange,String routingKey,Object content) {
        log.info("send content=" + content);
        this.rabbitTemplate.setMandatory(true);
        this.rabbitTemplate.setConfirmCallback(this);
        this.rabbitTemplate.setReturnCallback(this);
        MessagePostProcessor processor = message -> {
            message.getMessageProperties().setExpiration(300000 + "");
            return message;
        };
        this.rabbitTemplate.convertAndSend(exchange,routingKey,serialize(content),processor);
    }

    /**
     * 確認後回調:
     * @param correlationData
     * @param ack
     * @param cause
     */
    @Override
    public void confirm(CorrelationData correlationData, boolean ack, String cause) {
        if (!ack) {
            log.info("send ack fail, cause = " + cause);
        } else {
            log.info("send ack success");
        }
    }

    /**
     * 失敗後return回調:
     *
     * @param message
     * @param replyCode
     * @param replyText
     * @param exchange
     * @param routingKey
     */
    @Override
    public void returnedMessage(Message message, int replyCode, String replyText, String exchange, String routingKey) {
        log.info("send fail return-message = " + new String(message.getBody()) + ", replyCode: " + replyCode + ", replyText: " + replyText + ", exchange: " + exchange + ", routingKey: " + routingKey);
    }

    /**
     * 對消息對象進行二進制序列化
     * @param o
     * @return
     */
    private byte[] serialize(Object o) {
        Kryo kryo = new Kryo();
        ByteArrayOutputStream stream = new ByteArrayOutputStream();
        Output output = new Output(stream);
        kryo.writeObject(output, o);
        output.close();
        return stream.toByteArray();
    }
}

消費者以下

@Slf4j
@Component
@RabbitListener(queues = SecendKillMq.PAYBACK_QUEUE)
public class ServicePayBackDeal {
    @Autowired
    private SecOrderDao secOrderDao;
    @Autowired
    private RedisService redisService;

    @RabbitHandler
    public void receice(byte[] data, Channel channel, Message message) throws IOException {
        try {
            //告訴服務器收到這條消息 已經被我消費了 能夠在隊列刪掉;不然消息服務器覺得這條消息沒處理掉 後續還會在發
            channel.basicAck(message.getMessageProperties().getDeliveryTag(),false);
            ServiceSecOrder order = unSerialize(data);
            if (secOrderDao.countServiceSecOrderHasPay(order) == 0) {
                String key = order.getService().getId() + order.getService().getTimeSegment().toString();
                String countKey = order.getService().getId() + ":count";
                String countLimit = order.getService().getId() + ":" + order.getUser().getId();
                redisService.unPayedBack(key, JSONObject.toJSONString(order.getService()),countKey,countLimit);
            }
        } catch (IOException e) {
            e.printStackTrace();
            //丟棄這條消息
            channel.basicNack(message.getMessageProperties().getDeliveryTag(), false,false);
            log.info("receiver fail");
        }
    }

    /**
     * 反序列化
     * @param data
     * @return
     */
    private ServiceSecOrder unSerialize(byte[] data) {
        Input input = null;
        try {
            Kryo kryo = new Kryo();
            input = new Input(new ByteArrayInputStream(data));
            return kryo.readObject(input,ServiceSecOrder.class);
        }
        finally {
            input.close();
        }
    }
}

unPayedBack也是一段redis-lua,代碼以下

@Override
public Object unPayedBack(String serviceKey,String serviceValue, String countKey,String limitCountKey) {
    String script = "redis.call('lpush',KEYS[1],ARGV[1]) " +
            "redis.call('incr',KEYS[2]) " +
            "return redis.call('decr',KEYS[3])";
    return execute(jedis -> jedis.eval(script,3,serviceKey,countKey,limitCountKey,serviceValue));
}

在秒殺結束後喚醒全部的秒殺中斷,退出while(true)的無限循環

/**
 * 在每一個秒殺段最後一分鐘喚醒全部秒殺中斷,天天的1點59分開始,每2小時執行一次
 */
@Component
@ElasticSimpleJob(cron="0 59 1/2 * * ?",jobName="signal",shardingTotalCount=2,jobParameter="日期",shardingItemParameters="0=Signal0,1=Signal1")
public class SignalJob implements SimpleJob {
    @Autowired
    private DataDao dataDao;
    @Autowired
    private MessageSender messageSender;

    @Override
    public void execute(ShardingContext shardingContext) {
        //獲取當天全部的秒殺區位
        List<Location> locationTodayList = dataDao.findJobToday(LocalDate.now());
        if (!CollectionUtils.isEmpty(locationTodayList)) {
            //獲取一天全部的秒殺區位時間段
            List<TimeSegment> segmentList = locationTodayList.stream().map(location -> location.getTimeSegment())
                    .collect(Collectors.toList());
            switch (shardingContext.getShardingItem()) {
                case 0:
                    segmentList.stream().filter(timeSegment -> LocalDateTime.now().plusMinutes(-2L).isAfter(timeSegment.getBeginTime()) &&
                            LocalDateTime.now().plusMinutes(-2L).isBefore(timeSegment.getEndTime()))
                            //將時間段轉化成時間段內的秒殺服務
                            .map(timeSegment -> {
                                List<Service> serviceInSegment = dataDao.findServiceInSegment(timeSegment);
                                serviceInSegment.stream().forEach(service -> service.setTimeSegment(timeSegment));
                                return serviceInSegment;
                                //扁平化全部的秒殺服務
                            }).flatMap(services -> services.stream()).forEach(service -> {
                                CompletableFuture.runAsync(() -> {
                                    messageSender.send(SecendKillMq.MQ_EXCHANGE_SECENDKILL,
                                    SecendKillMq.ROUTING_KEY_SECENDKILL,
                                    service.getId());
                                });
                            });
                    break;
                case 1:
                    break;
                default:
                    break;
            }
        }
    }
}

如今咱們來創建下單的Controller

先創建一個秒殺單工廠接口

public interface SecOrderFactory {
    /**
     * 建立秒殺訂單
     * @return
     */
    public SecOrder createSecOrder();

    /**
     * 獲取秒殺訂單
     * @return
     */
    public SecOrder getSecOrder();
}

Service版本標籤

@Target(ElementType.TYPE)
@Retention(RetentionPolicy.RUNTIME)
public @interface ServiceSecOrderVersion {
    int value();
}

Service工廠實現類

@Component
public class ServiceSecOrderFactory implements SecOrderFactory {
    private Set<Class<?>> classes = ClassUtil.getClassSet("com.cloud.secondkill.domain");
    private SecOrder createdSecOrder;

    @PostConstruct
    private void init() {
        this.createdSecOrder = createSecOrder();
    }

    @Override
    public SecOrder createSecOrder() {
        Object instance = null;
        try {
            //過濾有@ServiceSecOrderVersion標籤的類
            instance = classes.stream().filter(clazz -> clazz.isAnnotationPresent(ServiceSecOrderVersion.class))
                    //過濾實現了SecOrder接口的類
                    .filter(clazz -> SecOrder.class.isAssignableFrom(clazz))
                    //找出版本號大的類,並實例化爲對象
                    .max(Comparator.comparingInt(clazz -> clazz.getAnnotation(ServiceSecOrderVersion.class).value()))
                    .get().newInstance();
        } catch (InstantiationException e) {
            e.printStackTrace();
        } catch (IllegalAccessException e) {
            e.printStackTrace();
        }
        return (SecOrder) instance;
    }

    @Override
    public SecOrder getSecOrder() {
        return createdSecOrder;
    }
}

秒殺單的Bean

@Component
public class SecOrderBean {
    @Getter
    private Map<String,Class> secOrderFactoryMap = new HashMap<>();
    @Getter
    private Map<String,Class> secOrderMap = new HashMap<>();

    @PostConstruct
    private void init() {
        Set<Class<?>> classes = ClassUtil.getClassSet("com.cloud.secondkill.domain");
        classes.stream().filter(clazz -> SecOrderFactory.class.isAssignableFrom(clazz))
                .forEach(clazz -> secOrderFactoryMap.put(clazz.getSimpleName(),clazz));
        classes.stream().filter(clazz -> SecOrder.class.isAssignableFrom(clazz))
                .forEach(clazz -> secOrderMap.put(clazz.getSimpleName(),clazz));
    }
}

下單Controller

@Slf4j
@RestController
public class SecOrderController {
    private ThreadLocal<SecOrderFactory>  secOrderFactory = new ThreadLocal<>();
    private ThreadLocal<SecOrder> secOrderService = new ThreadLocal<>();
    @Autowired
    private SecOrderBean secOrderBean;

    @SuppressWarnings("unchecked")
    @Transactional
    @PostMapping("/makesecorder")
    @LxRateLimit(perSecond = 500,timeOut = 500) //此處爲接口限流,數字能夠根據實際清空改寫
    public Result<String> makeSecOrder(@RequestBody String secOrderStr, @RequestParam("type") String type) throws Exception {
        log.info(secOrderStr);
        try {
            SecOrder secOrder = setSecOrderFactory(secOrderStr, type);
            String secResult = this.secOrderService.get().makeOrder(secOrder);
            return Result.success(secResult);
        } finally {
            secOrderFactory.remove();
            secOrderService.remove();
        }
    }

    private SecOrder setSecOrderFactory(String secOrderStr,String type) {
        Class classType = secOrderBean.getSecOrderMap().get(type);
        Object secOrder = JSONObject.parseObject(secOrderStr, classType);
        setSecOrderFactory(type);
        return (SecOrder) secOrder;
    }

    private void setSecOrderFactory(String type) {
        Class classFactoryType = secOrderBean.getSecOrderFactoryMap().get(type + "Factory");
        this.secOrderFactory.set((SecOrderFactory) SpringBootUtil.getBean(classFactoryType));
        this.secOrderService.set(this.secOrderFactory.get().getSecOrder());
    }
}

以上不明白的能夠參考用工廠方法模式來下不一樣訂單 以及使用簡單工廠加接口加適配器模式來遵照開閉原則

@LxRateLimit爲接口限流

@Target(ElementType.METHOD)
@Retention(RetentionPolicy.RUNTIME)
public @interface LxRateLimit {
    /**
     *
     * @return
     */
    String value() default "";

    /**
     * 每秒向桶中放入令牌的數量   默認最大即不作限流
     * @return
     */
    double perSecond() default Double.MAX_VALUE;

    /**
     * 獲取令牌的等待時間  默認0
     * @return
     */
    int timeOut() default 0;

    /**
     * 超時時間單位
     * @return
     */
    TimeUnit timeOutUnit() default TimeUnit.MILLISECONDS;
}

AOP攔截

@Slf4j
@Aspect
@Component
public class LxRateLimitAspect {
    private RateLimiter rateLimiter = RateLimiter.create(Double.MAX_VALUE);

    /**
     * 帶有指定註解切入
     */
    @ResponseBody
    @Around(value = "@annotation(com.cloud.secondkill.annotion.LxRateLimit)")
    public Object aroundNotice(ProceedingJoinPoint pjp) throws Throwable {
        log.info("攔截到了{}方法...", pjp.getSignature().getName());
        Signature signature = pjp.getSignature();
        MethodSignature methodSignature = (MethodSignature)signature;
        //獲取目標方法
        Method targetMethod = methodSignature.getMethod();
        if (targetMethod.isAnnotationPresent(LxRateLimit.class)) {
            //獲取目標方法的@LxRateLimit註解
            LxRateLimit lxRateLimit = targetMethod.getAnnotation(LxRateLimit.class);
            rateLimiter.setRate(lxRateLimit.perSecond());
            if (!rateLimiter.tryAcquire(lxRateLimit.timeOut(), lxRateLimit.timeOutUnit()))
                return "服務器繁忙,請稍後再試!";
        }
        return pjp.proceed();
    }
}

具體原理能夠參考Guava RateLimiter限流源碼解析和實例應用

如今要加入秒殺的冪等,來防止工具秒殺

token Controller

@RestController
public class TokenController {
    @Autowired
    private RedisService redisService;

    @GetMapping("/gettoken")
    public Map getToken(@RequestParam("url") String url) {
        Map<String,String> tokenMap = new HashMap<>();
        String tokenValue = UUID.randomUUID().toString();
        AppUser user = AppUserUtil.getLoginAppUser();
        String key = url + user.getId();
        tokenMap.put(key,tokenValue);
        redisService.set(key,tokenValue);
        return tokenMap;
    }
}

配置冪等Spring MVC攔截器

@Component
@Slf4j
public class TokenInterceptor implements HandlerInterceptor {
    @Autowired
    private RedisService redisService;

    @Override
    public boolean preHandle(HttpServletRequest request, HttpServletResponse response, Object handler) throws Exception {
        AppUser user = AppUserUtil.getLoginAppUser();
        String tokenName = request.getRequestURI() + user.getId();
        String tokenValue = request.getParameter("token_value");
        if (tokenValue != null && !tokenValue.equals("")) {
            log.info("tokenName:{},tokenValue:{}",tokenName,tokenValue);
            return handleToken(request,response,handler,user);
        }
        return false;
    }

    @Override
    public void postHandle(HttpServletRequest request, HttpServletResponse response, Object handler, @Nullable ModelAndView modelAndView) throws Exception {
        if (redisService.exists(request.getParameter("token_value"))) {
            RedisTool.releaseDistributedLock(redisService, request.getParameter("token_value"), request.getParameter("token_value"));
        }
    }

    @Override
    public void afterCompletion(HttpServletRequest request, HttpServletResponse response, Object handler, @Nullable Exception ex) throws Exception {

    }

    /**
     * 分佈式鎖處理
     * @param request
     * @param response
     * @param handler
     * @return
     * @throws Exception
     */
    private boolean handleToken(HttpServletRequest request, HttpServletResponse response, Object handler,AppUser user) throws Exception {
        //當大量高併發下全部帶token參數的請求進來時,進行分佈式鎖定,容許某一臺服務器的一個線程進入,鎖定時間3分鐘
        if (RedisTool.tryGetDistributedLock(redisService,request.getParameter("token_value"),request.getParameter("token_value"),180)) {
            if (redisService.exists(request.getRequestURI() + user.getId())) {
                //當請求的url與token與redis中的存儲相同時
                if (redisService.get(request.getRequestURI() + user.getId()).equals(request.getParameter("token_value"))) {
                    //放行的該線程刪除redis中存儲的token
                    redisService.del(request.getRequestURI() + user.getId());
                    log.info("放行");
                    RedisTool.releaseDistributedLock(redisService,request.getParameter("token_value"),request.getParameter("token_value"));
                    //放行
                    return true;
                }
            }
            log.info("攔截");
            //當請求的url與token與redis中的存儲不相同時,解除鎖定
            RedisTool.releaseDistributedLock(redisService,request.getParameter("token_value"),request.getParameter("token_value"));
            //進行攔截
            return false;
        }
        return false;
    }
}
@SpringBootConfiguration
public class TokenInterceptorConfig extends WebMvcConfigurerAdapter {
    @Autowired
    private TokenInterceptor tokenInterceptor;

    @Override
    public void addInterceptors(InterceptorRegistry registry) {
        registry.addInterceptor(tokenInterceptor).addPathPatterns("/makesecorder");
}
}
相關文章
相關標籤/搜索