在開發中遇到總站發送命令請求分站將某資源經過FTP上傳過來,也就是總站提取分站的資源問題。而且總站實時能夠獲取已經提取了文件的大小的比例。java
思路:1.首先分站要將文件大小告知總站json
2.總站收到文件大小後,根據指定路徑去判斷指定路徑文件夾(分站的文件存儲的位置)下的文件大小,而後和當前文件大小/總大小,就獲取了已經上傳的所佔比。app
總站發送請求就再也不贅述了,直接說分站接收到請求後的處理,爲了防止惡意請求,咱們這對每次請求都加了「鹽」,那麼分站接收時須要判斷是否帶「鹽」了,若是沒有,則不予處理,返回錯誤。dom
分站接收請求contorlleride
@RequestMapping("/fileSize.htm") @ResponseBody public String getFileSize(HttpServletRequest request,HttpServletResponse response,String json) { log.info(this.getClass().getSimpleName() + "."+ Thread.currentThread().getStackTrace()[1].getMethodName()+ "獲取資源文件大小(字節)------start"); /*獲取參數--start*/ String checkCode = "see-P2C";//校驗碼 JSONObject jsonObject = new JSONObject(json); String tranno =jsonObject.getString("tranno"); //交易流水號 String isCheckcode = jsonObject.getString("checkCode"); //校驗碼 checkCode = tranno.substring(tranno.length()-6,tranno.length())+checkCode; String mD5CheckCode = MD5Util.encrypt(checkCode); Long ids =jsonObject.getLong("sid"); //資源ID Map<String, Object>map = new HashMap<String, Object>(); map.put("ids", ids); map.put("tranno", tranno); map.put("isCheckcode", isCheckcode); map.put("mD5CheckCode", mD5CheckCode); log.info(this.getClass().getSimpleName() + "."+ Thread.currentThread().getStackTrace()[1].getMethodName()+ "獲取資源文件大小(字節)------end"); return this.transRecordService.fileSizeHandle(map); }
TransRecordServiceImpl.java中咱們處理這些請求,首先是總站第一遍請求時,分站會判斷有沒有這個文件,若是沒有返回錯誤,若是有資源分站會告訴總站個人文件有多大、我會把文件放到你的FTP的哪一個文件夾下(路徑)。ui
/** * @desc 獲取文件總大小 * @author zp */ @Override public String fileSizeHandle(Map<String, Object> map) { // TODO Auto-generated method stub log.info(this.getClass().getSimpleName() + "." + Thread.currentThread().getStackTrace()[1].getMethodName() + "(獲取視頻總大小)----start"); String tranno = map.get("tranno").toString(); Long ids = (Long)map.get("ids"); String mD5CheckCode = map.get("mD5CheckCode").toString(); String isCheckcode = map.get("isCheckcode").toString(); /*交易記錄表插入信息--start*/ TranCodeModel tranCodeModel = new TranCodeModel(); tranCodeModel.setTrandesc("總站提取分站視頻資源"); tranCodeModel.setUrl("parentMsg/fileSizeHandle.htm"); tranCodeModel.setProjectphase("2"); tranCodeModel.setRspcode("00"); tranCodeModel.setRspinfo("交易成功"); tranCodeModel.setTranconent("提取資源(resourceid="+ids+")"); tranCodeModel.setTranno(tranno); tranCodeModel.setCreatetime(DateUtil.getCurrentDateString("YYYY-MM-dd HH:mm:ss")); /*交易記錄表插入信息--end*/ if(!mD5CheckCode.equals(isCheckcode)){ // 身份驗證失敗 tranCodeModel.setRspcode("01"); tranCodeModel.setRspinfo("身份驗證失敗"); this.insertdata(tranCodeModel); return "{\"rspcode\":\"01\",\"rspinfo\":\"identifyError\"}"; } if(this.vertifyTranNo(tranno)>0){ // 流水號重複 tranCodeModel.setRspcode("01"); tranCodeModel.setRspinfo("交易流水號重複"); this.insertdata(tranCodeModel); return "{\"rspcode\":\"01\",\"rspinfo\":\"trannoRepeat\"}"; } Map<String, Object> resourceMap = this.resourceService.findById(ids); if (Validator.isNull(resourceMap)||resourceMap==null) { // 資源被刪除 tranCodeModel.setRspcode("01"); tranCodeModel.setRspinfo("資源被刪除"); this.insertdata(tranCodeModel); return "{\"rspcode\":\"01\",\"rspinfo\":\"delresource\"}"; } if ("-1".equals(resourceMap.get("isextract").toString())) { // 提取異常 tranCodeModel.setRspcode("01"); tranCodeModel.setRspinfo("提取異常"); this.insertdata(tranCodeModel); return "{\"rspcode\":\"01\",\"rspinfo\":\"errvideo\"}"; } List<ResourceFileModel>rFiles = this.rFileService.findGroupByResourceId(ids); if (rFiles.size()==0) { // 視頻資源不存在 tranCodeModel.setRspcode("01"); tranCodeModel.setRspinfo("nofiles"); this.insertdata(tranCodeModel); return "{\"rspcode\":\"01\",\"rspinfo\":\"novideo\"}"; } Long filesize = (long)0; for (ResourceFileModel resourceFileModel : rFiles) { filesize += Long.parseLong(resourceFileModel.getSize()); } String uuid = UUID.randomUUID().toString().replace("-", ""); log.info(this.getClass().getSimpleName() + "." + Thread.currentThread().getStackTrace()[1].getMethodName() + "(獲取視頻總大小)----end"); return "{\"rspcode\":\"00\",\"rspinfo\":\"success\",\"filesize\":\""+filesize+"\",\"uuid\":\""+uuid+"\"}"; }
總站接收到分站第一次成功返回信息以後,而後通知分站,你能夠開始上傳了,總站也會將文件總大小進行保存,而後開始根據分站提供的文件路徑進行實時查詢文件大小,而後進行比對。this
分站接收到總站回信後,開始上傳文件(總站的回信也是加鹽的哦)。url
分站接收總站的第二次請求contorllerspa
/** * @desc 提取資源 * @author zp * @throws IOException * @date 2018-3-16 */ @RequestMapping("/pickup.htm") @ResponseBody public String pickup(HttpServletRequest request,HttpServletResponse response,String json) throws IOException{ log.info(this.getClass().getSimpleName() + "."+ Thread.currentThread().getStackTrace()[1].getMethodName()+ "提取資源------start"); /*獲取參數--start*/ String checkCode = "see-P2C";//校驗碼 JSONObject jsonObject = new JSONObject(json); String tranno =jsonObject.getString("tranno"); //交易流水號 String isCheckcode = jsonObject.getString("checkCode"); //校驗碼 checkCode = tranno.substring(tranno.length()-6,tranno.length())+checkCode; String mD5CheckCode = MD5Util.encrypt(checkCode); Long ids =jsonObject.getLong("sid"); //資源ID String ftpuser = jsonObject.getString("ftpuser");// ftp賬號 String ftppwd = jsonObject.getString("ftppwd");// ftp密碼 String ftpport = jsonObject.getString("ftpport");// ftp端口 String ftpIP = jsonObject.getString("ftpIP");// ftpIP String uuid = jsonObject.getString("uuid");// uuid Map<String, Object>map = new HashMap<String, Object>(); map.put("ids", ids); map.put("tranno", tranno); map.put("isCheckcode", isCheckcode); map.put("mD5CheckCode", mD5CheckCode); map.put("ftpuser", ftpuser); map.put("ftppwd", ftppwd); map.put("ftpIP", ftpIP); map.put("ftpport", ftpport); map.put("uuid", uuid); return this.transRecordService.pickHandle(map); }
TransRecordServiceImpl.java中,總站會提供本身的FTP的相關信息。而後開始上傳。裏邊也會處理鹽是否正確,是否有該文件等操做。 線程
/** * @desc 處理總站提取請求 * @author zp * @throws IOException * @date 2018-3-19 */ @Override public String pickHandle(Map<String, Object>map) throws IOException { String tranno = map.get("tranno").toString(); Long ids = (Long)map.get("ids"); String mD5CheckCode = map.get("mD5CheckCode").toString(); String isCheckcode = map.get("isCheckcode").toString(); String ftpuser = map.get("ftpuser").toString(); String ftppwd = map.get("ftppwd").toString(); String ftpIP = map.get("ftpIP").toString(); String ftpport = map.get("ftpport").toString(); /*交易記錄表插入信息--start*/ TranCodeModel tranCodeModel = new TranCodeModel(); tranCodeModel.setTrandesc("總站提取分站視頻資源"); tranCodeModel.setUrl("parentMsg/pickup.htm"); tranCodeModel.setProjectphase("2"); tranCodeModel.setRspcode("00"); tranCodeModel.setRspinfo("交易成功"); tranCodeModel.setTranconent("提取資源(resourceid="+ids+")"); tranCodeModel.setTranno(tranno); tranCodeModel.setCreatetime(DateUtil.getCurrentDateString("YYYY-MM-dd HH:mm:ss")); /*交易記錄表插入信息--end*/ if(!mD5CheckCode.equals(isCheckcode)){ // 身份驗證失敗 tranCodeModel.setRspcode("01"); tranCodeModel.setRspinfo("身份驗證失敗"); this.insertdata(tranCodeModel); return "{\"rspcode\":\"01\",\"rspinfo\":\"identifyError\"}"; } if(this.vertifyTranNo(tranno)>0){ // 流水號重複 tranCodeModel.setRspcode("01"); tranCodeModel.setRspinfo("交易流水號重複"); this.insertdata(tranCodeModel); return "{\"rspcode\":\"01\",\"rspinfo\":\"trannoRepeat\"}"; } // 開始上傳資源 // 1.判斷該任務是否錄製完成 Map<String, Object> reMap = this.resourceService.findById(ids); if (Validator.isNull(reMap)) { // 不存在 tranCodeModel.setRspcode("01"); tranCodeModel.setRspinfo("資源已被刪除"); this.insertdata(tranCodeModel); return "{\"rspcode\":\"01\",\"rspinfo\":\"delResouces\"}"; } if (!"3".equals(reMap.get("isextract").toString())&&!"2".equals(reMap.get("isextract").toString())) { // 資源未被提取或已被刪除 tranCodeModel.setRspcode("01"); tranCodeModel.setRspinfo("資源還未被提取或已被刪除"); this.insertdata(tranCodeModel); return "{\"rspcode\":\"01\",\"rspinfo\":\"nopick\"}"; } // 2.經過資源ID查找全部的資源信息 List<ResourceFileModel>rFiles = this.rFileService.findGroupByResourceId(ids); if (rFiles.size()==0) { // 視頻資源不存在 tranCodeModel.setRspcode("01"); tranCodeModel.setRspinfo("nofiles"); this.insertdata(tranCodeModel); return "{\"rspcode\":\"01\",\"rspinfo\":\"novideo\"}"; } // 3.開始上傳 String filePath = ""; String arr = ""; String picktemp = PropertyUtil.getProperty("picktemp"); // 臨時路徑 String uuid = map.get("uuid").toString(); picktemp = picktemp+uuid+"\\"; picktemp = picktemp.replaceAll("\\\\", "/"); for (ResourceFileModel resourceFileModel : rFiles) { filePath = resourceFileModel.getPath(); filePath = filePath.replaceAll("\\\\", "/"); String path = filePath.replaceAll("\\\\", "/"); String temp = filePath.substring(0,filePath.lastIndexOf("resources/")+10); String temp2 = filePath.substring(filePath.lastIndexOf("resources/")+10,filePath.length()); temp2 = temp2.substring(0,temp2.indexOf("/")+1); filePath = temp + temp2; path = path .substring(filePath.length(),path.length()); path = picktemp + path; path = path.replace("/picktemp/", "/resources/"); resourceFileModel.setPath(path); String url = path.substring(path.indexOf("ResourcesManager/"),path.length()); resourceFileModel.setUrl(url); arr += JSONObject.fromObject(resourceFileModel)+","; } if (arr.length()>0) { arr = arr.substring(0, arr.length()-1); } // 將文件拷貝到指定的臨時目錄 CopyDirectory copyfile = new CopyDirectory(); copyfile.copyDirectiory(filePath, picktemp); // filePath 源文件 picktemp 目標地址 log.info(this.getClass().getSimpleName() + "."+ Thread.currentThread().getStackTrace()[1].getMethodName()+ "提取資源------end"); try { String[] ip = ftpIP.split(":"); ftpIP = ip[0]; this.starFtp(picktemp, ftpIP, ftpuser, ftppwd, ftpport); return "{\"rspcode\":\"00\",\"rspinfo\":\"success\",\"resource\":["+arr+"]}"; } catch (Exception e) { // TODO: handle exception e.printStackTrace(); return "{\"rspcode\":\"01\",\"rspinfo\":\"ftperror\"}"; } } /** * @desc 開啓線程進行FTP上傳 * @author zp * @date 2018-3-23 */ public void starFtp(String picktemp, String ftpIP,String ftpuser,String ftppwd,String ftpport) { log.info(this.getClass().getSimpleName() + "."+ Thread.currentThread().getStackTrace()[1].getMethodName()+ "準備開啓FTP上傳線程------start"); //建立一個可重用固定線程數的線程池 ExecutorService pool = Executors.newFixedThreadPool(5); //建立實現了Runnable接口對象,Thread對象固然也實現了Runnable接口 CreateTreadPool t1 = new CreateTreadPool(picktemp, ftpIP, ftpuser, ftppwd, ftpport); //將線程放入池中進行執行 pool.execute(t1); System.out.println(Thread.currentThread().getName()+"進入線程"); //關閉線程池 pool.shutdown(); log.info(this.getClass().getSimpleName() + "."+ Thread.currentThread().getStackTrace()[1].getMethodName()+ "準備開啓FTP上傳線程------end"); }
開啓線程對FTP上傳進行操做,由於考慮到本次操做總站可能不關心上傳過程,因此成功與否就不實時通知總站了~~~~~
public class CreateTreadPool implements Runnable { private static Logger log = Logger.getLogger(CreateTreadPool.class); // public static ReentrantLock lock=new ReentrantLock(); //定義信號量,只能5個線程同時訪問 final Semaphore semaphore = new Semaphore(5); public static int c=0; public String picktemp; //路徑 public String ftpIP; // ftpip public String ftpuser ; // ftp賬號 public String ftppwd ; // ftp密碼 public String ftpport; // ftp端口 // 經過構造方法 獲取FTP相關信息。 public CreateTreadPool(String picktemp, String ftpIP,String ftpuser,String ftppwd,String ftpport){ this.picktemp = picktemp; this.ftpIP = ftpIP; this.ftpuser = ftpuser; this.ftppwd = ftppwd; this.ftpport = ftpport; } @Override public void run() { // TODO Auto-generated method stub System.out.println(Thread.currentThread().getName()+"---------上傳準備中---------"); try { //獲取許可 semaphore.acquire(); System.out.println(Thread.currentThread().getName()+"得到鎖"); System.out.println(Thread.currentThread().getName()+"====>"+c); boolean isSuccess = this.FTPUpload(picktemp, ftpIP, ftpuser, ftppwd, ftpport); if (isSuccess) { System.out.println(Thread.currentThread().getName()+"---------上傳成功---------"); }else { System.out.println(Thread.currentThread().getName()+"---------上傳失敗---------"); } c++; } catch (Exception e) { System.out.println(Thread.currentThread().getName()+"---------上傳失敗---------"); e.printStackTrace(); }finally{ System.out.println(Thread.currentThread().getName()+"------------釋放鎖"); semaphore.release(); } } /** * @desc FTP上傳 * @author zp * @date 2018-3-23 */ public boolean FTPUpload(String filePath, String serverIp,String ftpuser,String ftppwd,String ftpport) { log.info(this.getClass().getSimpleName() + "." + Thread.currentThread().getStackTrace()[1].getMethodName() + "(給總站FTP上傳視頻)----start"); FtpUtil ftpTool = new FtpUtil(ftpuser, ftppwd, serverIp, ftpport); // 建立連接 boolean linkStatus = ftpTool.connectServer(ftpuser, ftppwd, serverIp, ftpport); File file = new File(filePath); // 實例化 UploadListener listener = new UploadListener(ftpTool.client); // 建立監聽 ftpTool.ftpUploadFolder(file, listener); // 上傳 ftpTool.disconnect(); // 關閉 RemoveFilesUtil removeFilesUtil = new RemoveFilesUtil(); removeFilesUtil.DeleteFolder(filePath); // 移除本地臨時上傳的文件 log.info(this.getClass().getSimpleName() + "." + Thread.currentThread().getStackTrace()[1].getMethodName() + "(給總站FTP上傳視頻)----end"); return linkStatus; }