SpringBoot項目:RedisTemplate實現輕量級消息隊列

背景 公司項目有個需求, 前端上傳excel文件, 後端讀取數據、處理數據、返回錯誤數據, 最簡單的方式同步處理, 客戶端上傳文件後一直阻塞等待響應, 但用戶體驗無疑不好, 處理數據可能十分耗時, 沒人願意傻等, 因爲項目暫未使用ActiveMQ等消息隊列中間件, 而redis的lpush和rpop很適合做爲一種輕量級的消息隊列實現, 因此用它完成這次功能開發前端

1、本文涉及知識點

  1. excel文件讀寫--阿里easyexcel sdkredis

  2. 文件上傳、下載--騰訊雲對象存儲數據庫

  3. 遠程服務調用--restTemplatejson

  4. 生產者、消費者--redisTemplate leftPush和rightPop操做後端

  5. 異步處理數據--Executors線程池網絡

  6. 讀取網絡文件流--HttpClientapp

  7. 自定義註解實現用戶身份認證--JWT token認證, 攔截器攔截標註有@LoginRequired註解的請求入口異步

固然, Java實現咯 涉及的知識點比較多, 每個知識點均可以做爲專題進行學習分析, 本文將完整實現呈現出來, 後期拆分與小夥伴分享學習maven

2、項目目錄結構

3a265d9f1a6e463f9fc00012cff19241


項目結構ide

說明: 數據庫DAO層放到另外一個模塊了, 不是本文重點

3、主要maven依賴

easyexcel

3aa31625ad96437d9595bcc18a86bcce


JWT

b122e6a3a9dc4b6fadbd79d91df55019


redis

6507e011ab654fd485d402511380b9cb


騰訊cos

cfcbbf3773f941628ecd6ad6239ab2b5


4、流程

  1. 用戶上傳文件

  2. 將文件存儲到騰訊cos

  3. 將上傳後的文件id及上傳記錄保存到數據庫

  4. redis生產一條導入消息, 即保存文件id到redis

  5. 請求結束, 返回"處理中"狀態

  6. redis消費消息

  7. 讀取cos文件, 異步處理數據

  8. 將錯誤數據以excel形式上傳至cos, 以供用戶下載, 並更新處理狀態爲"處理完成"

  9. 客戶端輪詢查詢處理狀態, 並能夠下載錯誤文件

  10. 結束

5、實現效果

  1. 上傳文件 
    上傳文件

  2. 數據庫導入記錄 
    數據庫導入記錄

  3. 導入的數據 
    導入的數據

  4. 下載錯誤文件 
    下載錯誤文件

  5. 錯誤數據提示 
    錯誤數據提示

  6. 查詢導入記錄 
    查詢導入記錄

6、代碼實現

一、導入excel控制層

    @LoginRequired
    @RequestMapping(value = "doImport", method = RequestMethod.POST)
    public JsonResponse doImport(@RequestParam("file") MultipartFile file, HttpServletRequest request) {
        PLUser user = getUser(request);
        return orderImportService.doImport(file, user.getId());
    }

二、service層

    @Override    public JsonResponse doImport(MultipartFile file, Integer userId) {        if (null == file || file.isEmpty()) {            throw new ServiceException("文件不能爲空");
        }

        String filename = file.getOriginalFilename();        if (!checkFileSuffix(filename)) {            throw new ServiceException("當前僅支持xlsx格式的excel");
        }        // 存儲文件
        String fileId = saveToOss(file);        if (StringUtils.isBlank(fileId)) {            throw new ServiceException("文件上傳失敗, 請稍後重試");
        }        // 保存記錄到數據庫
        saveRecordToDB(userId, fileId, filename);        // 生產一條訂單導入消息
        redisProducer.produce(RedisKey.orderImportKey, fileId);        return JsonResponse.ok("導入成功, 處理中...");
    }    /**
     * 校驗文件格式
     * @param fileName
     * @return
     */
    private static boolean checkFileSuffix(String fileName) {        if (StringUtils.isBlank(fileName) || fileName.lastIndexOf(".") <= 0) {            return false;
        }        int pointIndex = fileName.lastIndexOf(".");
        String suffix = fileName.substring(pointIndex, fileName.length()).toLowerCase();        if (".xlsx".equals(suffix)) {            return true;
        }        return false;
    }   /**
     * 將文件存儲到騰訊OSS
     * @param file
     * @return
     */
    private String saveToOss(MultipartFile file) {
        InputStream ins = null;        try {
            ins = file.getInputStream();
        } catch (IOException e) {
            e.printStackTrace();
        }

        String fileId;        try {
            String originalFilename = file.getOriginalFilename();            File f = new File(originalFilename);
            inputStreamToFile(ins, f);
            FileSystemResource resource = new FileSystemResource(f);

            MultiValueMap<String, Object> param = new LinkedMultiValueMap<>();
            param.add("file", resource);

            ResponseResult responseResult = restTemplate.postForObject(txOssUploadUrl, param, ResponseResult.class);
            fileId = (String) responseResult.getData();
        } catch (Exception e) {
            fileId = null;
        }        return fileId;
    }

三、redis生產者

33a5aa8c33304408987ef03ca3889098


四、redis消費者

@Servicepublic class RedisConsumer {    @Autowired
    public RedisTemplate redisTemplate;    @Value("${txOssFileUrl}")
    private String txOssFileUrl;    @Value("${txOssUploadUrl}")
    private String txOssUploadUrl;    @PostConstruct
    public void init() {
        processOrderImport();
    }    /**
     * 處理訂單導入
     */
    private void processOrderImport() {
        ExecutorService executorService = Executors.newCachedThreadPool();
        executorService.execute(() -> {            while (true) {
                Object object = redisTemplate.opsForList().rightPop(RedisKey.orderImportKey, 1, TimeUnit.SECONDS);                if (null == object) {                    continue;
                }
                String msg = JSON.toJSONString(object);
                executorService.execute(new OrderImportTask(msg, txOssFileUrl, txOssUploadUrl));
            }
        });
    }

}

五、處理任務線程類

public class OrderImportTask implements Runnable {
    public OrderImportTask(String msg, String txOssFileUrl, String txOssUploadUrl) {        this.msg = msg;        this.txOssFileUrl = txOssFileUrl;        this.txOssUploadUrl = txOssUploadUrl;
    }
}    /**     * 注入bean     */
    private void autowireBean() {        this.restTemplate = BeanContext.getApplicationContext().getBean(RestTemplate.class);        this.transactionTemplate = BeanContext.getApplicationContext().getBean(TransactionTemplate.class);        this.orderImportService = BeanContext.getApplicationContext().getBean(OrderImportService.class);
    }    @Override
    public void run() {        // 注入bean
        autowireBean();

        JSONObject jsonObject = JSON.parseObject(msg);        String fileId = jsonObject.getString("fileId");

        MultiValueMap<String, Object> param = new LinkedMultiValueMap<>();
        param.add("id", fileId);

        ResponseResult responseResult = restTemplate.postForObject(txOssFileUrl, param, ResponseResult.class);        String fileUrl = (String) responseResult.getData();        if (StringUtils.isBlank(fileUrl)) {            return;
        }

        InputStream inputStream = HttpClientUtil.readFileFromURL(fileUrl);        List<Object> list = ExcelUtil.read(inputStream);
        process(list, fileId);
    }    /**     * 將文件上傳至oss     * @param file     * @return     */
    private String saveToOss(File file) {        String fileId;        try {
            FileSystemResource resource = new FileSystemResource(file);
            MultiValueMap<String, Object> param = new LinkedMultiValueMap<>();
            param.add("file", resource);

            ResponseResult responseResult = restTemplate.postForObject(txOssUploadUrl, param, ResponseResult.class);
            fileId = (String) responseResult.getData();
        } catch (Exception e) {
            fileId = null;
        }        return fileId;
    }

說明: 處理數據的業務邏輯代碼就不用貼了

六、上傳文件到cos

    @RequestMapping("/txOssUpload")
    @ResponseBody    public ResponseResult txOssUpload(@RequestParam("file") MultipartFile file) throws UnsupportedEncodingException {        if (null == file || file.isEmpty()) {            return ResponseResult.fail("文件不能爲空");
        }        String originalFilename = file.getOriginalFilename();
        originalFilename = MimeUtility.decodeText(originalFilename);// 解決中文亂碼問題
        String contentType = getContentType(originalFilename);        String key;

        InputStream ins = null;
        File f = null;        try {
            ins = file.getInputStream();
            f = new File(originalFilename);
            inputStreamToFile(ins, f);            key = iFileStorageClient.txOssUpload(new FileInputStream(f), originalFilename, contentType);
        } catch (Exception e) {            return ResponseResult.fail(e.getMessage());
        } finally {            if (null != ins) {                try {
                    ins.close();
                } catch (IOException e) {
                    e.printStackTrace();
                }
            }            if (f.exists()) {// 刪除臨時文件
                f.delete();
            }
        }        return ResponseResult.ok(key);
    }    public static void inputStreamToFile(InputStream ins,File file) {        try {
            OutputStream os = new FileOutputStream(file);            int bytesRead = 0;            byte[] buffer = new byte[8192];            while ((bytesRead = ins.read(buffer, 0, 8192)) != -1) {
                os.write(buffer, 0, bytesRead);
            }
            os.close();
            ins.close();
        } catch (Exception e) {
            e.printStackTrace();
        }
    }    public String txOssUpload(FileInputStream inputStream, String key, String contentType) {        key = Uuid.getUuid() + "-" + key;
        OSSUtil.txOssUpload(inputStream, key, contentType);        try {            if (null != inputStream) {
                inputStream.close();
            }
        } catch (IOException e) {
            e.printStackTrace();
        }        return key;
    }    public static void txOssUpload(FileInputStream inputStream, String key, String contentType) {
        ObjectMetadata objectMetadata = new ObjectMetadata();        try{            int length = inputStream.available();
            objectMetadata.setContentLength(length);
        }catch (Exception e){
            logger.info(e.getMessage());
        }
        objectMetadata.setContentType(contentType);
        cosclient.putObject(txbucketName, key, inputStream, objectMetadata);
    }

七、下載文件

    /**
     * 騰訊雲文件下載
     * @param response
     * @param id
     * @return
     */
    @RequestMapping("/txOssDownload")    public Object txOssDownload(HttpServletResponse response, String id) {
        COSObjectInputStream cosObjectInputStream = iFileStorageClient.txOssDownload(id, response);        String contentType = getContentType(id);
        FileUtil.txOssDownload(response, contentType, cosObjectInputStream, id);
        return null;
    }    public static void txOssDownload(HttpServletResponse response, String contentType, InputStream fileStream, String fileName) {
        FileOutputStream fos = null;        response.reset();
        OutputStream os = null;
        try {            response.setContentType(contentType + "; charset=utf-8");            if(!contentType.equals(PlConstans.FileContentType.image)){
                try {                    response.setHeader("Content-Disposition", "attachment; filename=" + new String(fileName.getBytes("UTF-8"), "ISO8859-1"));
                } catch (UnsupportedEncodingException e) {                    response.setHeader("Content-Disposition", "attachment; filename=" + fileName);
                    logger.error("encoding file name failed", e);
                }
            }

            os = response.getOutputStream();

            byte[] b = new byte[1024 * 1024];            int len;            while ((len = fileStream.read(b)) > 0) {
                os.write(b, 0, len);
                os.flush();
                try {                    if(fos != null) {
                        fos.write(b, 0, len);
                        fos.flush();
                    }
                } catch (Exception e) {
                    logger.error(e.getMessage());
                }
            }
        } catch (IOException e) {
            IOUtils.closeQuietly(fos);
            fos = null;
        } finally {
            IOUtils.closeQuietly(os);
            IOUtils.closeQuietly(fileStream);            if(fos != null) {
                IOUtils.closeQuietly(fos);
            }
        }
    }

八、讀取網絡文件流

    /**
     * 讀取網絡文件流
     * @param url
     * @return
     */
    public static InputStream readFileFromURL(String url) {        if (StringUtils.isBlank(url)) {            return null;
        }

        HttpClient httpClient = new DefaultHttpClient();
        HttpGet methodGet = new HttpGet(url);        try {
            HttpResponse response = httpClient.execute(methodGet);            if (response.getStatusLine().getStatusCode() == 200) {
                HttpEntity entity = response.getEntity();                return entity.getContent();
            }
        } catch (Exception e) {
            e.printStackTrace();
        }        return null;
    }

九、ExcelUtil

    /**
     * 讀excel
     * @param inputStream 文件輸入流
     * @return list集合
     */
    public static List<Object> read(InputStream inputStream) {        return EasyExcelFactory.read(inputStream, new Sheet(1, 1));
    }    /**
     * 寫excel
     * @param data list數據
     * @param clazz
     * @param saveFilePath 文件保存路徑
     * @throws IOException
     */
    public static void write(List<? extends BaseRowModel> data, Class<? extends BaseRowModel> clazz, String saveFilePath) throws IOException {
        File tempFile = new File(saveFilePath);
        OutputStream out = new FileOutputStream(tempFile);
        ExcelWriter writer = EasyExcelFactory.getWriter(out);
        Sheet sheet = new Sheet(1, 3, clazz, "Sheet1", null);
        writer.write(data, sheet);
        writer.finish();
        out.close();
    }

說明: 至此, 整個流程算是完整了, 下面將其餘知識點代碼也貼出來參考

7、其餘

一、@LoginRequired註解

/**
 * 在須要登陸驗證的Controller的方法上使用此註解
 */@Target({ElementType.METHOD})@Retention(RetentionPolicy.RUNTIME)
public @interface LoginRequired {
}

二、MyControllerAdvice

@ControllerAdvicepublic class MyControllerAdvice {    @ResponseBody
    @ExceptionHandler(TokenValidationException.class)
    public JsonResponse tokenValidationExceptionHandler() {        return JsonResponse.loginInvalid();
    }    @ResponseBody
    @ExceptionHandler(ServiceException.class)
    public JsonResponse serviceExceptionHandler(ServiceException se) {        return JsonResponse.fail(se.getMsg());
    }    @ResponseBody
    @ExceptionHandler(Exception.class)
    public JsonResponse exceptionHandler(Exception e) {
        e.printStackTrace();        return JsonResponse.fail(e.getMessage());
    }

}

三、AuthenticationInterceptor

public class AuthenticationInterceptor implements HandlerInterceptor {    private static final String CURRENT_USER = "user";    @Autowired
    private UserService userService;    @Override
    public boolean preHandle(HttpServletRequest request, HttpServletResponse response, Object handler) {        // 若是不是映射到方法直接經過
        if (!(handler instanceof HandlerMethod)) {            return true;
        }
        HandlerMethod handlerMethod = (HandlerMethod) handler;
        Method method = handlerMethod.getMethod();        // 判斷接口是否有@LoginRequired註解, 有則須要登陸
        LoginRequired methodAnnotation = method.getAnnotation(LoginRequired.class);        if (methodAnnotation != null) {            // 驗證token
            Integer userId = JwtUtil.verifyToken(request);
            PLUser plUser = userService.selectByPrimaryKey(userId);            if (null == plUser) {                throw new RuntimeException("用戶不存在,請從新登陸");
            }
            request.setAttribute(CURRENT_USER, plUser);            return true;
        }        return true;
    }    @Override
    public void postHandle(HttpServletRequest httpServletRequest, HttpServletResponse httpServletResponse, Object o, ModelAndView modelAndView) throws Exception {
    }    @Override
    public void afterCompletion(HttpServletRequest httpServletRequest, HttpServletResponse httpServletResponse, Object o, Exception e) throws Exception {
    }
}

四、JwtUtil

    public static final long EXPIRATION_TIME = 2592_000_000L; // 有效期30天
    public static final String SECRET = "pl_token_secret";    public static final String HEADER = "token";    public static final String USER_ID = "userId";    /**
     * 根據userId生成token
     * @param userId
     * @return
     */
    public static String generateToken(String userId) {        HashMap<String, Object> map = new HashMap<>();        map.put(USER_ID, userId);        String jwt = Jwts.builder()
                .setClaims(map)
                .setExpiration(new Date(System.currentTimeMillis() + EXPIRATION_TIME))
                .signWith(SignatureAlgorithm.HS512, SECRET)
                .compact();        return jwt;
    }    /**
     * 驗證token
     * @param request
     * @return 驗證經過返回userId
     */
    public static Integer verifyToken(HttpServletRequest request) {        String token = request.getHeader(HEADER);        if (token != null) {            try {
                Map<String, Object> body = Jwts.parser()
                        .setSigningKey(SECRET)
                        .parseClaimsJws(token)
                        .getBody();                for (Map.Entry entry : body.entrySet()) {                    Object key = entry.getKey();                    Object value = entry.getValue();                    if (key.toString().equals(USER_ID)) {                        return Integer.valueOf(value.toString());// userId
                    }
                }                return null;
            } catch (Exception e) {
                logger.error(e.getMessage());                throw new TokenValidationException("unauthorized");
            }
        } else {            throw new TokenValidationException("missing token");
        }
    }
相關文章
相關標籤/搜索