多線程爬蟲Java調用wget下載文件,獨立線程讀取輸出緩衝區

寫了個抓取appstore的,要抓取大量的app,原本是用httpclient,可是效果不理想,因而直接調用wget下載,可是因爲標準輸出、錯誤輸出的緣由會致使卡住,另外wget也會莫名的卡住。java

因此我採用:shell

1、獨立線程讀取輸出信息;apache

2、本身實現doWaitFor方法來代替api提供的waitFor()方法,避免子進程卡死。api

3、設置超時,殺死wget子進程,沒有正確返回的話,重試一次,並把超時時間加倍;app

 有了以上操做,wget不會卡死,就算卡住了也會由於超時被幹掉再重試一次,因此絕大部分的app能夠被抓取下來。dom

import com.google.common.io.Files;
import com.xxx.appstore.service.crawler.CalcMD5Service;
import org.apache.commons.lang.StringUtils;
import org.apache.commons.lang.math.RandomUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.io.File;
import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.TimeUnit;


public class CrawlerUtils {

    public static final String APK_DOWNLOAD_PATH = "/data/appstore/category/";
    private static Logger LOGGER = LoggerFactory.getLogger(CrawlerUtils.class);

    /**
     * 使用wget下載文件
     *
     * @param displayName  appName
     * @param category     分類
     * @param download_url 下載地址
     * @return 成功返回文件路徑,失敗返回null
     */
    public static String downloadFileByWget(String displayName, String category, String download_url) {
        if (StringUtils.isBlank(displayName) || StringUtils.isBlank(category) || StringUtils.isBlank(download_url)) {
            LOGGER.info("downloadFileByWget ERROR, displayName:{}, category:{}, download_url:{}", new Object[]{displayName, category, download_url});
            return null;
        }
        String fileName = CalcMD5Service.encoder(displayName + RandomUtils.nextInt(1000));
        String seed = CalcMD5Service.encoder(category);
        String midPath = StringUtils.left(seed, 10);
        String filePath = APK_DOWNLOAD_PATH + midPath + "/" + fileName + ".apk";
        File file = new File(filePath);
        try {
            Files.createParentDirs(file);
        } catch (IOException e) {
            LOGGER.warn("IOException", e);
            return null;
        }
        int retry = 2;
        int res = -1;
        int time = 1;
        while (retry-- > 0) {
            ProcessBuilder pb = new ProcessBuilder("wget", download_url, "-t", "2", "-T", "10", "-O", filePath);
            LOGGER.info("wget shell: {}", pb.command());
            Process ps = null;
            try {
                ps = pb.start();
            } catch (IOException e) {
                LOGGER.error("IOException", e);
            }
            res = doWaitFor(ps, 30 * time++);
            if (res != 0) {
                LOGGER.warn("Wget download failed...");
            } else {
                break;
            }
        }
        if (res != 0) {
            return null;
        }
        return filePath;
    }


    /**
     * @param ps      sub process
     * @param timeout 超時時間,SECONDS
     * @return 正常結束返回0
     */
    private static int doWaitFor(Process ps, int timeout) {
        int res = -1;
        if (ps == null) {
            return res;
        }
        List<String> stdoutList = new ArrayList<>();
        List<String> erroroutList = new ArrayList<>();
        boolean finished = false;
        int time = 0;
        ThreadUtil stdoutUtil = new ThreadUtil(ps.getInputStream(), stdoutList);
        ThreadUtil erroroutUtil = new ThreadUtil(ps.getErrorStream(), erroroutList);
        //啓動線程讀取緩衝區數據
        stdoutUtil.start();
        erroroutUtil.start();
        while (!finished) {
            time++;
            if (time >= timeout) {
                LOGGER.info("Process wget timeout 30s, destroyed!");
                ps.destroy();
                break;
            }
            try {
                res = ps.exitValue();
                finished = true;
            } catch (IllegalThreadStateException e) {
                try {
                    TimeUnit.SECONDS.sleep(1);
                } catch (InterruptedException e1) {

                }
            }
        }
        return res;
    }
}

 

import org.apache.commons.io.Charsets;

import java.io.BufferedReader;
import java.io.IOException;
import java.io.InputStream;
import java.io.InputStreamReader;
import java.util.List;


public class ThreadUtil implements Runnable {
    // 設置讀取的字符編碼
    private String character = Charsets.UTF_8.displayName();
    private List<String> list;
    private InputStream inputStream;

    public ThreadUtil(InputStream inputStream, List<String> list) {
        this.inputStream = inputStream;
        this.list = list;
    }

    public void start() {
        Thread thread = new Thread(this);
        thread.setDaemon(true);//將其設置爲守護線程
        thread.start();
    }

    public void run() {
        BufferedReader br = null;
        try {
            br = new BufferedReader(new InputStreamReader(inputStream, character));
            String line = null;
            while ((line = br.readLine()) != null) {
                list.add(line);
            }
        } catch (IOException e) {
            e.printStackTrace();
        } finally {
            try {
                //釋放資源
                inputStream.close();
                if (br != null) {
                    br.close();
                }
            } catch (IOException e) {
                e.printStackTrace();
            }
        }
    }

}
相關文章
相關標籤/搜索