如下代碼有點問題,會發生阻塞,還不知道啥問題:java
package com.test.service; import java.io.File; import java.io.InputStream; import java.io.RandomAccessFile; import java.net.HttpURLConnection; import java.net.URL; import java.util.concurrent.CountDownLatch; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.beans.factory.annotation.Value; import org.springframework.stereotype.Component; /** * <p> * 多線程文件下載,提升大文件的下載速度(暫未使用) * <p> * */ @Component public class MulitThreadDownload { private static Logger logger = LoggerFactory.getLogger(MulitThreadDownload.class); @Value("${onair.download.threadsize:5}") private int threadSize = 5; @Value("${onair.download.timeout:5000}") private int downloadTimeout; static boolean flag = true; //消息 private final CountDownLatch msgDownLatch = new CountDownLatch(1); //工做線程 private final CountDownLatch workDownLatch = new CountDownLatch(threadSize); private DowloadRunnable[] dowloadRunnables = new DowloadRunnable[threadSize]; public static void main(String[] args) { new MulitThreadDownload().downloadFile("", "G:\\123.mp4"); } public boolean downloadFile(String url,String filePath){ logger.debug("下載地址:{},目標文件路徑:{}",url,filePath); try { URL urlPath = new URL(url); HttpURLConnection conn = (HttpURLConnection)urlPath.openConnection(); conn.setConnectTimeout(downloadTimeout); conn.setRequestMethod("GET"); int status = conn.getResponseCode(); if(status == 200){ //200返回全部,206返回部分 //文件長度 int length = conn.getContentLength(); logger.info("獲取文件大小:{}",length); //建立下載文件 指定大小 RandomAccessFile raf = new RandomAccessFile(new File(filePath), "rwd"); raf.setLength(length); raf.close(); //釋放資源 //分塊大小 int blockSize = length / threadSize; //建立工做線程 for (int i = 1; i <= threadSize; i++) { int startIndex = blockSize*(i-1); int endIndex = blockSize * i - 1; if(i == threadSize){ endIndex = length; } logger.info("線程:{}下載文件開始點:{}結束點:{}",i,startIndex,endIndex); dowloadRunnables[i-1] = new DowloadRunnable(url,filePath,msgDownLatch, workDownLatch, i,startIndex,endIndex); Thread thread = new Thread(dowloadRunnables[i-1]); thread.start(); thread.setUncaughtExceptionHandler(new Thread.UncaughtExceptionHandler() { @Override public void uncaughtException(Thread t, Throwable e) { logger.debug("catch到異常",e); flag = false; } }); } //通知工做線程啓動,開始工做 msgDownLatch.countDown(); logger.debug("主線程阻塞,等待工做線程完成任務"); //起一個線程監控下載進度 //moniterLength(length); //阻塞主線程,等待工做線程完成 workDownLatch.await(); logger.debug("工做線程完成任務,主線程繼續"); return flag; } } catch (Throwable e) { logger.error("文件下載失敗:"+e.getMessage(),e); File file = new File(filePath); if(file.exists()){ file.delete(); //下載失敗 刪除臨時文件 } } return false; } //輸出下載進度 private void moniterLength(int length) { new Thread(new Runnable() { @Override public void run() { while(getDownloadLength() < length){ logger.debug("文件大小:{},目前下載大小:{},進度{}",length,getDownloadLength(),getDownloadLength()* 1.0 / (long)length); try { Thread.sleep(10000); } catch (InterruptedException e) { // TODO Auto-generated catch block e.printStackTrace(); } } } }).start(); } //監控下載進度 public int getDownloadLength(){ int length = 0; for (int i = 0; i < dowloadRunnables.length; i++) { length += dowloadRunnables[i].downloadLength; } return length; } } //下載線程 class DowloadRunnable implements Runnable{ private static Logger logger = LoggerFactory.getLogger(DowloadRunnable.class); private CountDownLatch msgDownLatch; private CountDownLatch workDownLatch; private int threadIndex; private int startIndex; private int endIndex; private String url; private String filePath; public int downloadLength; //已下載大小 public DowloadRunnable(String url, String filePath, CountDownLatch msgDownLatch, CountDownLatch workDownLatch, int threadIndex, int startIndex, int endIndex) { this.url = url; this.filePath = filePath; this.msgDownLatch = msgDownLatch; this.workDownLatch = workDownLatch; this.threadIndex = threadIndex; this.startIndex = startIndex; this.endIndex = endIndex; } @Override public void run() { try { //阻塞此線程,等待主線程給啓動消息(msgDownLatch.countDown()); msgDownLatch.await(); //具體工做 logger.info("線程{}任務開始",threadIndex); URL urlPath = new URL(url); HttpURLConnection conn = (HttpURLConnection)urlPath.openConnection(); conn.setConnectTimeout(5000); conn.setRequestProperty("Range", "bytes=" + startIndex + "-" + endIndex); conn.setRequestMethod("GET"); int status = conn.getResponseCode(); logger.debug("線程{}請求返回的responseCode:{}",threadIndex,status); if(status==206){ InputStream in = conn.getInputStream(); RandomAccessFile raf = new RandomAccessFile(filePath, "rwd"); raf.seek(startIndex); byte[] buffer = new byte[2048]; int length = 0; logger.debug("線程{}開始寫數據,開始點{}",threadIndex,startIndex); while((length = in.read(buffer)) != -1){ //logger.debug("線程{}讀取大小:{}",threadIndex,length); raf.write(buffer, 0, length); //downloadLength += length; } raf.close(); in.close(); }else{ logger.error("文件下載失敗,狀態碼:"+status); throw new Exception("文件下載失敗,狀態碼:"+status); } logger.info("線程{}任務完成",threadIndex); //工做完成 workDownLatch.countDown(); } catch (Throwable e) { logger.error(e.getMessage(),e); e.printStackTrace(); } } }
看不出來啥問題,先記下來!spring
單獨提出來下載功能代碼,大文件下載仍是有問題多線程
package com.test.service; import java.io.InputStream; import java.io.RandomAccessFile; import java.net.HttpURLConnection; import java.net.URL; import java.util.concurrent.CyclicBarrier; import org.slf4j.Logger; import org.slf4j.LoggerFactory; //下載線程 class DowloadThread extends Thread{ private static Logger logger = LoggerFactory.getLogger(DowloadThread.class); private int threadIndex; private int startIndex; private int endIndex; private String url; private String filePath; CyclicBarrier barrier; public int downloadLength; //已下載大小 public DowloadThread(String url, String filePath, int threadIndex, int startIndex, int endIndex) { this.url = url; this.filePath = filePath; this.threadIndex = threadIndex; this.startIndex = startIndex; this.endIndex = endIndex; } public DowloadThread(String url, String filePath, int threadIndex, int startIndex, int endIndex, final CyclicBarrier barrier) { this.url = url; this.filePath = filePath; this.threadIndex = threadIndex; this.startIndex = startIndex; this.endIndex = endIndex; this.barrier = barrier; } @Override public void run() { try { logger.info("線程{}任務開始",threadIndex); URL urlPath = new URL(url); HttpURLConnection conn = (HttpURLConnection)urlPath.openConnection(); conn.setConnectTimeout(5000); conn.setRequestProperty("Range", "bytes=" + startIndex + "-" + endIndex); conn.setRequestMethod("GET"); int status = conn.getResponseCode(); logger.debug("線程{}請求返回的responseCode:{}",threadIndex,status); if(status==206 || status == 200){ InputStream in = conn.getInputStream(); RandomAccessFile raf = new RandomAccessFile(filePath, "rwd"); raf.seek(startIndex); byte[] buffer = new byte[2048]; int length = 0; logger.debug("線程{}開始寫數據,開始點{}",threadIndex,startIndex); while((length = in.read(buffer)) != -1){ raf.write(buffer, 0, length); downloadLength += length; } raf.close(); in.close(); }else{ logger.error("文件下載失敗,狀態碼:"+status); throw new Exception("文件下載失敗,狀態碼:"+status); } logger.info("線程{}任務完成",threadIndex); } catch (Throwable e) { logger.error(e.getMessage(),e); e.printStackTrace(); } } }
下面的代碼相對來講好一些:dom
package com.test; import java.io.File; import java.io.FileInputStream; import java.io.FileOutputStream; import java.io.SequenceInputStream; import java.net.HttpURLConnection; import java.net.URL; import java.util.ArrayList; import java.util.Collections; import java.util.Enumeration; import java.util.List; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.Future; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.beans.factory.annotation.Value; import com.test.DownloadCallable; /** * 未使用 * */ public class DownloadService { private static Logger logger = LoggerFactory.getLogger(DownloadService.class); @Value("${onair.download.timeout:10000}") private int downloadTimeout = 10000; @Value("${onair.download.threadSize:5}") private int threadSize = 5; @Value("${onair.download.ddxc:false}") private boolean ddxc = true; ExecutorService executorService = Executors.newFixedThreadPool(threadSize); DownloadCallable[] callables = new DownloadCallable[threadSize]; List<Future<String>> futures = new ArrayList<>(); public static void main(String[] args) { } public boolean doWork(String url,String filePath){ logger.debug("源地址:{},目標地址:{}",url,filePath); try { URL path = new URL(url); HttpURLConnection conn = (HttpURLConnection)path.openConnection(); conn.setConnectTimeout(downloadTimeout); conn.setRequestMethod("GET"); int status = conn.getResponseCode(); if(status==200){ int length = conn.getContentLength(); int blockSize = length / threadSize; for (int i = 1; i <= threadSize; i++) { int startIndex = blockSize*(i-1); int endIndex = startIndex + blockSize -1; if(i==threadSize){ endIndex = length; } callables[i-1] = new DownloadCallable(i, startIndex, endIndex, url, downloadTimeout, filePath, ddxc); futures.add(executorService.submit(callables[i-1])); } for (int i = 0; i < threadSize; i++) { while(!futures.get(i).isDone()){ int size = getDownloadSize(blockSize); logger.debug("文件總大小:"+length+"==已下載:"+size+"進度:"+(float)size * 1.0/(float)length); Thread.sleep(30000); } } if(getDownloadSize(blockSize)==length){ System.out.println("下載完成沒報錯"); }else{ System.out.println("下載完成報錯了"); } } executorService.shutdown(); } catch (Throwable e) { logger.error(e.getMessage(),e); } return false; } public boolean addFile(String filePath) throws Throwable{ List<FileInputStream> list = new ArrayList<FileInputStream>(); List<File> listFile = new ArrayList<File>(); for (int i = 1; i <= threadSize; i++) { String tfile = filePath+"_tmp"+i; listFile.add(new File(tfile)); FileInputStream in = new FileInputStream(new File(tfile)); list.add(in); } //使用 Enumeration(列舉) 將文件所有列舉出來 Enumeration<FileInputStream> eum = Collections.enumeration(list); //SequenceInputStream合併流 合併文件 SequenceInputStream sis = new SequenceInputStream(eum); FileOutputStream fos = new FileOutputStream(new File(filePath)); byte[] by = new byte[1024]; int len; while((len=sis.read(by)) != -1){ fos.write(by, 0, len); } fos.flush(); fos.close(); sis.close(); System.out.println("合併完成!"); //刪除文件 for (File file : listFile) { file.delete(); } System.out.println("刪除文件完成!"); return true; } public int getDownloadSize(int blockSize){ int length = 0; for (int i = 1; i <=threadSize; i++) { //System.out.println("線程"+i+"==已下載大小:"+callables[i].downloadSize + "進度:"+ (float)callables[i-1].downloadSize *1.0 /(float)blockSize); //logger.debug("線程"+i+"==需下載大小:"+blockSize+"=已下載"+callables[i-1].downloadSize + "進度:"+ (float)callables[i-1].downloadSize *1.0 /(float)blockSize); length += callables[i-1].downloadSize; } return length; } }
package com.test; import java.io.File; import java.io.InputStream; import java.io.RandomAccessFile; import java.net.HttpURLConnection; import java.net.URL; import java.util.concurrent.Callable; import org.slf4j.Logger; import org.slf4j.LoggerFactory; public class DownloadCallable implements Callable<String>{ private static Logger logger = LoggerFactory.getLogger(DownloadCallable.class); int threadIndex; int startIndex; int endIndex; int timeout; String url; String filePath; boolean ddxc; public DownloadCallable() { } public DownloadCallable(int threadIndex, int startIndex, int endIndex, String url,int timeout, String filePath,boolean ddxc) { this.threadIndex = threadIndex; this.startIndex = startIndex; this.endIndex = endIndex; this.url = url; this.filePath = filePath; this.timeout = timeout; this.ddxc = ddxc; } public DownloadCallable(int threadIndex,String url,int timeout, String filePath) { this.url = url; this.filePath = filePath; this.threadIndex = threadIndex; this.timeout = timeout; } public DownloadCallable(int threadIndex,String url,int timeout, String filePath,boolean ddxc) { this.url = url; this.filePath = filePath; this.threadIndex = threadIndex; this.timeout = timeout; this.ddxc = ddxc; } //記錄已下載的大小 public int downloadSize = 0; @Override public String call() throws Exception { InputStream in = null; RandomAccessFile raf = null; try { URL path = new URL(url); HttpURLConnection conn = (HttpURLConnection)path.openConnection(); conn.setConnectTimeout(timeout); //conn.setReadTimeout(timeout); conn.setRequestMethod("GET"); //conn.setRequestProperty("Keep-Alive", timeout+""); conn.setRequestProperty("User-Agent", "Mozilla/5.0 (X11; U; Linux i686; en-US; rv:1.9.0.3) Gecko/2008092510 Ubuntu/8.04 (hardy) Firefox/3.0.3"); if(threadIndex > 0){ //多線程 conn.setRequestProperty("Range", "bytes=" + startIndex + "-" + endIndex); } int status = conn.getResponseCode(); if(status==200 || status == 206){ //獲取到數據 //斷點續傳取值邏輯 File tmpFile = new File(filePath+"_tmp"+threadIndex); if(tmpFile.exists()){ if(ddxc){ downloadSize = (int) tmpFile.length(); if(downloadSize >= conn.getContentLength()){ return "success"; //下載完成了 } }else{ tmpFile.delete(); } } //end raf = new RandomAccessFile(filePath+"_tmp"+threadIndex,"rw"); raf.seek(downloadSize); logger.debug("線程:"+threadIndex+"==下載開始節點:"+downloadSize+"=需下載大小::"+conn.getContentLength()); in = conn.getInputStream(); byte[] buffer = new byte[1024]; int length = 0; while((length = in.read(buffer))!=-1){ raf.write(buffer, 0, length); downloadSize += length; } }else{ return null; } conn.disconnect(); return "success"; } catch (Throwable e) { logger.error(e.getMessage()+"當前線程:"+threadIndex); return null; }finally { in.close(); raf.close(); } } }
還不完美!ide