package com.net.thread.future; import java.io.BufferedReader; import java.io.BufferedWriter; import java.io.File; import java.io.FileInputStream; import java.io.FileNotFoundException; import java.io.FileWriter; import java.io.IOException; import java.io.InputStreamReader; import java.text.SimpleDateFormat; import java.util.ArrayList; import java.util.Date; import java.util.HashMap; import java.util.Iterator; import java.util.List; import java.util.Map; import java.util.Random; import java.util.Set; import java.util.concurrent.ArrayBlockingQueue; import java.util.concurrent.BlockingQueue; import java.util.concurrent.Callable; import java.util.concurrent.CountDownLatch; import java.util.concurrent.ExecutionException; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.Future; /** * @author * @Time:2017年8月16日 下午5:26:37 * @version 1.0 * @description */ public class CallableDemo3 { final static SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss"); public static void main(String[] args) { File f = new File("C://Users//LENOVO//Desktop//file"); // 文件總數 final List<File> filePathsList = new ArrayList<File>(); File[] filePaths = f.listFiles(); for (File s : filePaths) { filePathsList.add(s); } CountDownLatch latch = new CountDownLatch(filePathsList.size()); ExecutorService pool = Executors.newFixedThreadPool(10); BlockingQueue<Future<Map<String, FileInputStream>>> queue = new ArrayBlockingQueue<Future<Map<String, FileInputStream>>>(100); System.out.println("-------------文件讀、寫任務開始時間:" + sdf.format(new Date())); for (int i = 0; i < filePathsList.size(); i++) { File temp = filePathsList.get(i); Future<Map<String, FileInputStream>> future = pool.submit(new MyCallableProducer(latch, temp)); queue.add(future); pool.execute(new MyCallableConsumer(queue)); } try { latch.await(); } catch (InterruptedException e) { e.printStackTrace(); } System.out.println("-------------文件讀、寫任務結束時間:" + sdf.format(new Date())); pool.shutdownNow(); } // 文件讀線程 static class MyCallableProducer implements Callable<Map<String, FileInputStream>> { private CountDownLatch latch; private File file; private FileInputStream fis = null; private Map<String, FileInputStream> fileMap = new HashMap<String, FileInputStream>(); public MyCallableProducer(CountDownLatch latch, File file) { this.latch = latch; this.file = file; } @Override public Map<String, FileInputStream> call() throws Exception { System.out.println(Thread.currentThread().getName() + " 線程開始讀取文件 :" + file.getName() + " ,時間爲 "+ sdf.format(new Date())); fis = new FileInputStream(file); fileMap.put(file.getName(), fis); doWork(); System.out.println(Thread.currentThread().getName() + " 線程讀取文件 :" + file.getName() + " 完畢" + " ,時間爲 "+ sdf.format(new Date())); latch.countDown(); return fileMap; } private void doWork() {
//此方法能夠添加一些業務邏輯,好比包裝pojo等等操做,返回的值能夠是任何類型 Random rand = new Random(); int time = rand.nextInt(10) * 1000; try { Thread.sleep(time); } catch (InterruptedException e) { e.printStackTrace(); } } } // 文件寫線程 static class MyCallableConsumer implements Runnable { private String fileName = ""; private BlockingQueue<Future<Map<String, FileInputStream>>> queue; private FileInputStream fis = null; private File dirFile = null; private BufferedReader br = null; private InputStreamReader isr = null; private FileWriter fw = null; private BufferedWriter bw = null; public MyCallableConsumer(BlockingQueue<Future<Map<String, FileInputStream>>> queue2) { this.queue = queue2; } @Override public void run() { try { Future<Map<String, FileInputStream>> future = queue.take(); Map<String, FileInputStream> map = future.get(); Set<String> set = map.keySet(); for (Iterator<String> iter = set.iterator(); iter.hasNext();) { fileName = iter.next().toString(); fis = map.get(fileName); System.out.println(Thread.currentThread().getName() + " 線程開始寫文件 :" + fileName + " ,時間爲 "+ sdf.format(new Date())); try { isr = new InputStreamReader(fis, "utf-8"); br = new BufferedReader(isr); dirFile = new File("d:" + File.separator + "gc3" + File.separator + fileName); fw = new FileWriter(dirFile); bw = new BufferedWriter(fw); String data = ""; bw.write("+++++++++++++" + Thread.currentThread().getName() + " 線程開始寫文件++++++++++++"); while ((data = br.readLine()) != null) { bw.write(data + "\r"); } } catch (FileNotFoundException e) { e.printStackTrace(); } catch (IOException e) { e.printStackTrace(); } finally { try { bw.close(); br.close(); } catch (IOException e) { e.printStackTrace(); } } } } catch (InterruptedException e) { e.printStackTrace(); } catch (ExecutionException e) { e.printStackTrace(); } } } }
說明:java
一、其實構思很簡單,阻塞隊列是線程安全的,那麼我多線程就用阻塞隊列,這樣能夠保證每一個寫線程拿到的具體內容不一樣,不會致使重複寫數據;安全
二、我使用異步線程進行讀寫,而非同步線程,這樣有助於提高總體讀、寫性能。多線程
三、CountDownLatch是信號燈,功能相似於join()方法,固然也能夠使用CyclicBarrierdom