多線程下載文件,ftp文件服務器

1: 多線程下載文件html

 

package com.li.multiplyThread;

import org.apache.commons.lang3.exception.ExceptionUtils;
import org.apache.http.client.ClientProtocolException;
import org.apache.http.client.methods.CloseableHttpResponse;
import org.apache.http.client.methods.HttpGet;
import org.apache.http.impl.client.CloseableHttpClient;
import org.apache.http.impl.client.HttpClients;
import org.apache.http.impl.conn.PoolingHttpClientConnectionManager;
import org.apache.http.protocol.BasicHttpContext;
import org.apache.http.protocol.HttpContext;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;

import java.io.*;
import java.net.HttpURLConnection;
import java.net.URL;
import java.util.ArrayList;
import java.util.Iterator;
import java.util.List;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.regex.Pattern;

/**
 * 多線程下載文件,經測試4G以上的文件也可正常下載
 */
public class DownLoadLargeFile {
    private static final Logger log = LogManager.getLogger(DownLoadLargeFile.class);
    // 測試標記
    private static boolean isTest = true;

    private CloseableHttpClient httpClient;

    public static void main(String[] args) {
        long starttimes = System.currentTimeMillis();
        DownLoadLargeFile app = new DownLoadLargeFile();

        PoolingHttpClientConnectionManager cm = new PoolingHttpClientConnectionManager();
        // 設置整個鏈接池最大鏈接數 20
        cm.setMaxTotal(20);
        app.httpClient = HttpClients.custom().setConnectionManager(cm).build();
        try {
            app.doDownload("http://mirrors.hust.edu.cn/apache//httpcomponents/httpclient/binary/httpcomponents-client-4.5.3-bin.zip", "d:/doctohtml/");
            // app.doDownload("d:/doctohtml/httpcomponents-client-4.5.3-bin.zip.cfg");
            app.httpClient.close();
        } catch (IOException e) {
            e.printStackTrace();
        }
        System.out.println((System.currentTimeMillis() - starttimes) + "ms");
        cm.shutdown();
        cm.close();
        cm = null;
    }

    /**
     * 啓動多個線程下載文件
     *
     * @param remoteUrl
     * @param localPath
     * @throws IOException
     */
    public void doDownload(String remoteUrl, String localPath) throws IOException {
        FileCfg fileCfg = new FileCfg(localPath, remoteUrl);
        if (fileCfg.getDnldStatus() == 0) {
            download(fileCfg);
        } else {
            System.out.println("解析錯誤,沒法下載");
        }
    }

    /**
     * 讀取配置文件並按照其內容啓動多個線程下載文件未下載完畢的部分
     *
     * @param cfgFile
     * @throws IOException
     */
    public void doDownload(String cfgFile) throws IOException {
        FileCfg fileCfg = new FileCfg(cfgFile);
        if (fileCfg.getDnldStatus() == 0) {
            download(fileCfg);
        } else {
            System.out.println("解析錯誤,沒法下載");
        }
    }

    /**
     * 根據配置文件下載文件
     *
     * @param fileCfg
     * @throws IOException
     */
    public void download(FileCfg fileCfg) throws IOException {
        if (fileCfg.getDnldStatus() == 0) {
            if (fileCfg.getFilePartList() != null && fileCfg.getFilePartList().size() > 0) {
                CountDownLatch end = new CountDownLatch(fileCfg.getFilePartList().size());
                ExecutorService cachedThreadPool = Executors.newCachedThreadPool();
                boolean hasDownloadPart = false;
                for (FilePart filePart : fileCfg.getFilePartList()) {
                    if (!filePart.isFinish()) {
                        hasDownloadPart = true;
                        // 僅下載未完成的文件片斷
                        DownloadThread downloadThread = new DownloadThread(filePart, end, httpClient);
                        cachedThreadPool.execute(downloadThread);
                    }
                }
                if (hasDownloadPart) {
                    try {
                        end.await();
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    }
                }
                // 下載完成後清除臨時文件信息
                fileCfg.clearFile();
                log.debug("下載完成!{} ", fileCfg.getDnldTmpFile());
            } else {
                System.out.println("沒有須要下載的內容");
            }
        } else {
            System.out.println("解析錯誤,沒法下載");
        }
    }

    public static void callback(FilePart filePart) {
        if (isTest) {
            System.out.println(">>>子線程執行以後的值是:" + filePart.toString());
        }
        // 保存線程執行結果
        File newFile = new File(filePart.getPartCfgfileName());

        try {
            // byte,char 1字節
            // short 2字節
            // int 4字節
            // long 8字節
            // Boolean 1字節
            RandomAccessFile raf = new RandomAccessFile(newFile, "rws");
            raf.seek(filePart.getThreadId() * 21 + 20);
            raf.writeBoolean(filePart.isFinish());
            raf.close();
        } catch (IOException e) {
            e.printStackTrace();
        }
    }

    // 待下載文件任務信息
    public class FileCfg {

        // 待下載的文件連接
        private String url = null;
        // 待下載的文件連接
        private String fileName = null;
        // 待下載的文件長度
        private long fileSize = 0l;
        // 每一個線程下載的字節數
        private int unitSize = 1024000;
        // 下載狀態
        private short dnldStatus = -1;
        // 下載路徑
        private String dnldPath = "d:/download";

        private List<FilePart> filePartList = null;

        public FileCfg(String cfgFile) {
            try {
                // 讀取配置文件
                DataInputStream is = new DataInputStream(new BufferedInputStream(new FileInputStream(cfgFile)));
                this.url = is.readUTF();
                this.fileName = is.readUTF();
                this.dnldPath = is.readUTF();
                this.fileSize = is.readLong();
                this.unitSize = is.readInt();
                this.dnldStatus = is.readShort();

                // 下載片斷數
                int partSize = is.readInt();
                is.close();
                if (isTest) {
                    System.out.println("FileCfg--->" + toString());
                }

                boolean reDownload = false;
                File downloadFile = new File(getDnldTmpFile());
                if (!downloadFile.exists() || !downloadFile.isFile()) {
                    // 從新下載文件
                    RandomAccessFile raf;
                    try {
                        raf = new RandomAccessFile(getDnldTmpFile(), "rw");
                        raf.setLength(fileSize);
                        raf.close();
                    } catch (IOException e) {
                        e.printStackTrace();
                    }
                    reDownload = true;
                }

                // 讀取文件下載片斷信息
                filePartList = new ArrayList<>(partSize);
                is = new DataInputStream(new BufferedInputStream(new FileInputStream(cfgFile + ".part")));
                for (int i = 0; i < partSize; i++) {
                    FilePart part = new FilePart(getUrl(), getDnldTmpFile(), getDnldPartFile(), is.readInt(),
                            is.readLong(), is.readLong());
                    boolean finish = is.readBoolean();
                    if (!reDownload) {
                        part.setFinish(finish);
                    }
                    if (isTest) {
                        System.out.println(i + "--->" + part.toString());
                    }
                    filePartList.add(part);
                }
                is.close();
            } catch (IOException e) {

            }
        }

        public FileCfg(String dnldPath, String url) throws IOException {
            this.url = url;
            this.dnldPath = dnldPath;

            HttpURLConnection httpConnection = (HttpURLConnection) new URL(url).openConnection();
            httpConnection.setRequestMethod("HEAD");
            int responseCode = httpConnection.getResponseCode();
            if (responseCode >= 400) {
                System.out.println("Web服務器響應錯誤!");
                return;
            }

            String lengthField = httpConnection.getHeaderField("Content-Length");
            if (lengthField != null && isLong(lengthField)) {
                System.out.println("文件大小:[" + lengthField + "]");
                this.fileSize = Long.parseLong(lengthField);
            }

            String nameField = httpConnection.getHeaderField("Content-Disposition");
            if (nameField != null) {
                String mark = "filename=\"";
                if (isTest) {
                    System.out.println("字符串:[" + nameField + "]");
                }
                int idx = nameField.indexOf(mark);
                this.fileName = nameField.substring(idx + mark.length(), nameField.length() - 1);
                // 若是沒有解析到文件名稱,則從url中獲取
                if (this.fileName == null || this.fileName.length() < 1) {
                    this.fileName = url.substring(url.lastIndexOf("/") + 1, url.length()).replace("%20", " ");
                }
                if (isTest) {
                    System.out.println("文件名稱:[" + fileName + "]");
                }
            }

            if (isTest) {
                // 讀取全部的Head信息
                httpConnection.getContentLength();
                httpConnection.getContentLengthLong();
                httpConnection.getHeaderFields();
                for (Iterator<String> iter = httpConnection.getHeaderFields().keySet().iterator(); iter.hasNext();) {
                    String key = iter.next();
                    System.out.println("[" + key + "][" + httpConnection.getHeaderField(key) + "]");
                }
            }

            calFileInfo();
        }

        public FileCfg(String dnldPath, String url, String fileName, long fileSize) {
            this.url = url;
            this.fileName = fileName;
            this.fileSize = fileSize;
            this.dnldPath = dnldPath;
            calFileInfo();
        }

        /**
         * 判斷指定的字符串是否可轉爲Long型數據
         *
         * @param str
         * @return
         */
        public boolean isLong(String str) {
            if (str == null || str.trim().length() < 1) {
                return false;
            }
            Pattern pattern = Pattern.compile("[0-9]*");
            return pattern.matcher(str).matches();
        }

        public String getUrl() {
            return url;
        }

        public void setUrl(String url) {
            this.url = url;
        }

        public String getFileName() {
            return fileName;
        }

        public void setFileName(String fileName) {
            this.fileName = fileName;
        }

        public long getFileSize() {
            return fileSize;
        }

        public void setFileSize(long fileSize) {
            this.fileSize = fileSize;
        }

        public int getUnitSize() {
            return unitSize;
        }

        public void setUnitSize(int unitSize) {
            this.unitSize = unitSize;
        }

        public short getDnldStatus() {
            return dnldStatus;
        }

        public void setDnldStatus(short dnldStatus) {
            this.dnldStatus = dnldStatus;
        }

        public String getDnldPath() {
            return dnldPath;
        }

        public void setDnldPath(String dnldPath) {
            this.dnldPath = dnldPath;
        }

        public List<FilePart> getFilePartList() {
            return filePartList;
        }

        public void setFilePartList(List<FilePart> filePartList) {
            this.filePartList = filePartList;
        }

        public String getDnldCfgFile() {
            return dnldPath + "/" + fileName + ".cfg";
        }

        public String getDnldPartFile() {
            return dnldPath + "/" + fileName + ".cfg.part";
        }

        public String getDnldTmpFile() {
            return dnldPath + "/" + fileName + ".tmp";
        }

        public String getDnldFile() {
            return dnldPath + "/" + fileName;
        }

        public void calFileInfo() {
            // 計算文件片斷數量
            if (fileSize < 1) {
                // 沒有須要下載的文件
                dnldStatus = -2;
                return;
            }
            long filePartSize = (fileSize - 1) / unitSize + 1;
            if (filePartSize > Integer.MAX_VALUE) {
                // 文件過大,不能下載
                dnldStatus = -10;
                return;
            }

            // 構建文件片斷列表
            filePartList = new ArrayList<>((int) filePartSize);
            for (int i = 0; i < filePartSize; i++) {
                long offset = i * unitSize;
                long length = unitSize;
                // 讀取數量超過文件大小
                if ((offset + length) > this.fileSize) {
                    length = this.fileSize - offset;
                }
                FilePart part = new FilePart(getUrl(), getDnldTmpFile(), getDnldPartFile(), i, offset, length);
                if (isTest) {
                    System.out.println(i + "--->" + part.toString());
                }
                filePartList.add(part);
            }
            dnldStatus = 0;

            // 構建完成,保存信息到文檔
            writeFile();

            // 檢查下載文件是否存在
            File newFile = new File(fileName);
            if (!newFile.exists() || !newFile.isFile()) {
                // 文件不存在,則從新建立,如存在,則保持原狀,用於斷點續傳
                RandomAccessFile raf;
                try {
                    raf = new RandomAccessFile(newFile, "rw");
                    raf.setLength(fileSize);
                    raf.close();
                } catch (IOException e) {
                    e.printStackTrace();
                }
            }
        }

        /**
         * 將下載信息保存到文件中,以便斷點續傳
         */
        public void writeFile() {
            // 文件下載信息
            String dnldFile = getDnldCfgFile();
            // 文件片斷信息
            String dnldPartFile = getDnldPartFile();

            try {
                // 保存文件下載信息到文件
                DataOutputStream os = new DataOutputStream(new BufferedOutputStream(new FileOutputStream(dnldFile)));
                os.writeUTF(url);
                os.writeUTF(fileName);
                os.writeUTF(dnldPath);
                os.writeLong(fileSize);
                os.writeInt(unitSize);
                os.writeShort(dnldStatus);
                os.writeInt(this.filePartList.size());
                os.flush();
                os.close();

                // 保存文件片斷信息到文件
                os = new DataOutputStream(new BufferedOutputStream(new FileOutputStream(dnldPartFile)));
                for (int i = 0, p = filePartList.size(); i < p; i++) {
                    FilePart part = filePartList.get(i);
                    os.writeInt(part.getThreadId());
                    os.writeLong(part.getOffset());
                    os.writeLong(part.getLength());
                    os.writeBoolean(part.isFinish());
                    os.flush();
                }
                os.close();

                // 生成文件,並指定大小(與待下載的文件相同)
                File saveFile = new File(getDnldTmpFile());
                RandomAccessFile raf = new RandomAccessFile(saveFile, "rw");
                raf.setLength(fileSize);
                raf.close();
            } catch (IOException e) {
                e.printStackTrace();
            }
        }

        public void clearFile() {
            // 文件下載信息
            String dnldFile = getDnldCfgFile();
            // 文件片斷信息
            String dnldPartFile = getDnldPartFile();

            File file = new File(dnldFile);
            // 若是文件路徑所對應的文件存在,而且是一個文件,則直接刪除
            if (file.exists() && file.isFile()) {
                if (file.delete()) {
                    System.out.println("刪除文件" + dnldFile + "成功!");
                } else {
                    System.out.println("刪除文件" + dnldFile + "失敗!");
                }
            } else {
                System.out.println("刪除文件失敗:" + dnldFile + "不存在!");
            }

            file = new File(dnldPartFile);
            // 若是文件路徑所對應的文件存在,而且是一個文件,則直接刪除
            if (file.exists() && file.isFile()) {
                if (file.delete()) {
                    System.out.println("刪除文件" + dnldPartFile + "成功!");
                } else {
                    System.out.println("刪除文件" + dnldPartFile + "失敗!");
                }
            } else {
                System.out.println("刪除文件失敗:" + dnldPartFile + "不存在!");
            }

            // 下載完成後的臨時文件名改成正式名稱
            File oldFile = new File(getDnldTmpFile());
            File newFile = new File(getDnldFile());
            if (oldFile.exists()) {
                // 重命名文件存在
                if (!newFile.exists()) {
                    oldFile.renameTo(newFile);
                } else {
                    // 若在該目錄下已經有一個文件和新文件名相同,則不容許重命名
                    System.out.println(newFile + "已經存在!");
                }
            }
        }

        public String toString() {
            return "isTest[" + isTest + "]url[" + url + "]fileName[" + fileName + "]fileSize[" + fileSize + "]unitSize["
                    + unitSize + "]dnldStatus[" + dnldStatus + "]dnldPath[" + dnldPath + "]filePartList["
                    + ((filePartList != null) ? filePartList.size() : 0) + "]";
        }
    }

    /**
     * 文件片斷信息
     */
    public class FilePart {

        // 待下載的文件
        private String url = null;
        // 本地文件名
        private String fileName = null;
        // 本地文件名
        private String partCfgfileName = null;
        // 當前第幾個線程
        private int threadId = 0;
        // 偏移量
        private long offset = 0;
        // 分配給本線程的下載字節數
        private long length = 0;
        // 監聽本線程下載是否完成
        private boolean finish = false;

        public FilePart(String url, String fileName, String partCfgfileName, int threadId, long offset, long length) {
            this.url = url;
            this.fileName = fileName;
            this.partCfgfileName = partCfgfileName;
            this.threadId = threadId;
            this.offset = offset;
            this.length = length;
        }

        public String getUrl() {
            return url;
        }

        public void setUrl(String url) {
            this.url = url;
        }

        public String getFileName() {
            return fileName;
        }

        public void setFileName(String fileName) {
            this.fileName = fileName;
        }

        public String getPartCfgfileName() {
            return partCfgfileName;
        }

        public void setPartCfgfileName(String partCfgfileName) {
            this.partCfgfileName = partCfgfileName;
        }

        public int getThreadId() {
            return threadId;
        }

        public void setThreadId(int threadId) {
            this.threadId = threadId;
        }

        public long getOffset() {
            return offset;
        }

        public void setOffset(long offset) {
            this.offset = offset;
        }

        public long getLength() {
            return length;
        }

        public void setLength(long length) {
            this.length = length;
        }

        public boolean isFinish() {
            return finish;
        }

        public void setFinish(boolean finish) {
            this.finish = finish;
        }

        public String toString() {
            return "threadId[" + threadId + "]offset[" + offset + "]length[" + length + "]finish[" + finish + "]";
        }

    }

    /**
     * 文件下載線程
     */
    public class DownloadThread extends Thread {

        // // 待下載的文件
        // private String url = null;
        // // 本地文件名
        // private String fileName = null;
        // 待下載文件片斷
        private FilePart filePart = null;
        // 通知服務器文件的取值範圍
        private String rangeStr = "";
        // 同步工具類,容許一個或多個線程一直等待,直到其餘線程的操做執行完後再執行
        private CountDownLatch end;
        // http客戶端
        private CloseableHttpClient httpClient;
        // 上下文
        private HttpContext context;

        /**
         * @param part
         * @param end
         * @param hc
         */
        public DownloadThread(FilePart part, CountDownLatch end, CloseableHttpClient hc) {
            this.filePart = part;
            this.end = end;
            this.httpClient = hc;
            this.context = new BasicHttpContext();
            this.rangeStr = "bytes=" + filePart.getOffset() + "-" + (filePart.getOffset() + filePart.getLength());
            if (isTest) {
                System.out.println("rangeStr[" + rangeStr + "]");
                System.out.println("偏移量=" + filePart.getOffset() + ";字節數=" + filePart.getLength());
            }
        }

        public void run() {
            try {
                HttpGet httpGet = new HttpGet(filePart.getUrl());
                httpGet.addHeader("Range", rangeStr);
                CloseableHttpResponse response = httpClient.execute(httpGet, context);
                BufferedInputStream bis = new BufferedInputStream(response.getEntity().getContent());

                byte[] buff = new byte[1024];
                int bytesRead;
                File newFile = new File(filePart.getFileName());
                // Rws模式就是同步模式,每write修改一個byte,立馬寫到磁盤。固然性能差點兒,適合小的文件,debug模式,或者須要安全性高的時候。
                // Rwd模式跟個rws基礎的同樣,不過只對「文件的內容」同步更新到磁盤,而不對metadata同步更新。
                // 默認情形下(rw模式下),是使用buffer的,只有cache滿的或者使用RandomAccessFile.close()關閉流的時候兒才真正的寫到文件。
                // 這個會有兩個問題:
                // 1.調試麻煩的--->使用write方法修改byte的時候兒,只修改到個內存內,還沒到個文件,不能使用notepad++工具當即看見修改效果。
                // 2.當系統halt的時候兒,不能寫到文件,安全性稍微差點兒。
                RandomAccessFile raf = new RandomAccessFile(newFile, "rws");

                long offset = filePart.getOffset();
                while ((bytesRead = bis.read(buff, 0, buff.length)) != -1) {
                    raf.seek(offset);
                    raf.write(buff, 0, bytesRead);
                    offset += bytesRead;
                }
                raf.close();
                bis.close();
                // 下載線程執行完畢
                filePart.setFinish(true);
                // 調用回調函數告訴主進程該線程已執行完畢
                DownLoadLargeFile.callback(filePart);
            } catch (ClientProtocolException e) {
                log.error("文件下載異常信息:{}", ExceptionUtils.getStackTrace(e));
            } catch (IOException e) {
                log.error("文件下載異常信息:{}", ExceptionUtils.getStackTrace(e));
            } finally {
                end.countDown();
                if (isTest) {
                    log.info("剩餘線程[" + end.getCount() + "]繼續執行!");
                }
            }
        }
    }

}
相關文章
相關標籤/搜索