多線程

  (源碼地址:https://github.com/EduMoral/edu/blob/master/concurrent/src)java

1,  同步方法和費同步方法是否能夠同時調用git

  能夠!同步方法須要鎖,非同步方法不須要鎖,既然不須要鎖兩個方法獲取不衝突;github

2,對業務寫方法加鎖,對業務寫方法不加鎖,容易產生髒讀(dirtyRead)面試

  解決髒讀的方案 copyOnwrite 犧牲寫性能增長讀性能api

3,一個同步方法調用另外一個同步方法,一個線程已擁有某個對象的鎖,再次申請的時候任然會獲得該對象的鎖,也就是說synchronized得到鎖是是可重入的數組

  子類的同步方法也能夠調用父類的同步方法緩存

4,synchronized 關鍵字拋出異常會釋放鎖併發

5,volatile 關鍵字使一個變量在多個線程之間可見  框架

  volatile不能替代synchonnized,volatile只解決了可見性,synchonnzed既解決了可見性又解決了原子性dom

6,比較簡單的原子性操做問題能夠atomInteger 等相關類

7,鎖定對象若是的對象的屬性發生改變鎖沒有改變,若是對象的引用指向兩一個對象則該鎖發生改變

8,不使用字符串做爲鎖對象,由於有可能用的框架使用的也是該字符串,字符串存在常量池中屬於同一個對象,會發生死鎖問題;

9,Object.wait() 會釋放鎖,notify()不會釋放鎖 使用wait時90%用while,大多數狀況下使用notifyAll

10,countdownLanch 門栓

  countDownLanch lanch = new CountDownLanch(5);

  lanch.await()

  每調用countDown() 數值減1;爲0時開始執行如下代碼

 

11, reentrantlock用於替代synchronized 因爲m1鎖定this,只有m1執行完畢的時候,m2才能執行

      使用reentrantlock能夠完成一樣的功能 須要注意的是,必需要必需要必需要手動釋放鎖(重要的事情說三遍)
      使用syn鎖定的話若是遇到異常,jvm會自動釋放鎖,可是lock必須手動釋放鎖,所以常常在finally中進行鎖的釋放
      
      使用reentrantlock能夠進行「嘗試鎖定」tryLock,這樣沒法鎖定,或者在指定時間內沒法鎖定,線程能夠決定是否繼續等待
      
      使用ReentrantLock還能夠調用lockInterruptibly方法,能夠對線程interrupt方法作出響應,
      在一個線程等待鎖的過程當中,能夠被打斷
      
      ReentrantLock還能夠指定爲公平鎖

12,面試題:寫一個固定容量同步容器,擁有put和get方法,以及getCount方法,
 * 可以支持2個生產者線程以及10個消費者線程的阻塞調用
 *
 * 使用wait和notify/notifyAll來實現
 *
 * 使用Lock和Condition來實現
 * 對比兩種方式,Condition的方式能夠更加精確的指定哪些線程被喚醒
 *
 * @author mashibing
 *
package yxxy.c_021;

import java.util.LinkedList;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;

public class MyContainer2<T> {
    final private LinkedList<T> lists = new LinkedList<>();
    final private int MAX = 10; //最多10個元素
    private int count = 0;
    
    private Lock lock = new ReentrantLock();
    private Condition producer = lock.newCondition();
    private Condition consumer = lock.newCondition();
    
    public void put(T t) {
        try {
            lock.lock();
            while(lists.size() == MAX) { //想一想爲何用while而不是用if?
                producer.await();
            }
            
            lists.add(t);
            ++count;
            consumer.signalAll(); //通知消費者線程進行消費
        } catch (InterruptedException e) {
            e.printStackTrace();
        } finally {
            lock.unlock();
        }
    }
    
    public T get() {
        T t = null;
        try {
            lock.lock();
            while(lists.size() == 0) {
                consumer.await();
            }
            t = lists.removeFirst();
            count --;
            producer.signalAll(); //通知生產者進行生產
        } catch (InterruptedException e) {
            e.printStackTrace();
        } finally {
            lock.unlock();
        }
        return t;
    }
    
    public static void main(String[] args) {
        MyContainer2<String> c = new MyContainer2<>();
        //啓動消費者線程
        for(int i=0; i<10; i++) {
            new Thread(()->{
                for(int j=0; j<5; j++) System.out.println(c.get());
            }, "c" + i).start();
        }
        
        try {
            TimeUnit.SECONDS.sleep(2);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        
        //啓動生產者線程
        for(int i=0; i<2; i++) {
            new Thread(()->{
                for(int j=0; j<25; j++) c.put(Thread.currentThread().getName() + " " + j);
            }, "p" + i).start();
        }
    }
}

13,面試題:寫一個固定容量同步容器,擁有put和get方法,以及getCount方法,
  可以支持2個生產者線程以及10個消費者線程的阻塞調用
 
  使用wait和notify/notifyAll來實現
 
  使用Lock和Condition來實現
  對比兩種方式,Condition的方式能夠更加精確的指定哪些線程被喚醒
 
public class MyContainer2<T> {
    final private LinkedList<T> lists = new LinkedList<>();
    final private int MAX = 10; //最多10個元素
    private int count = 0;
    
    private Lock lock = new ReentrantLock();
    private Condition producer = lock.newCondition();
    private Condition consumer = lock.newCondition();
    
    public void put(T t) {
        try {
            lock.lock();
            while(lists.size() == MAX) { //想一想爲何用while而不是用if?
                producer.await();
            }
            
            lists.add(t);
            ++count;
            consumer.signalAll(); //通知消費者線程進行消費
        } catch (InterruptedException e) {
            e.printStackTrace();
        } finally {
            lock.unlock();
        }
    }
    
    public T get() {
        T t = null;
        try {
            lock.lock();
            while(lists.size() == 0) {
                consumer.await();
            }
            t = lists.removeFirst();
            count --;
            producer.signalAll(); //通知生產者進行生產
        } catch (InterruptedException e) {
            e.printStackTrace();
        } finally {
            lock.unlock();
        }
        return t;
    }
    
    public static void main(String[] args) {
        MyContainer2<String> c = new MyContainer2<>();
        //啓動消費者線程
        for(int i=0; i<10; i++) {
            new Thread(()->{
                for(int j=0; j<5; j++) System.out.println(c.get());
            }, "c" + i).start();
        }
        
        try {
            TimeUnit.SECONDS.sleep(2);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        
        //啓動生產者線程
        for(int i=0; i<2; i++) {
            new Thread(()->{
                for(int j=0; j<25; j++) c.put(Thread.currentThread().getName() + " " + j);
            }, "p" + i).start();
        }
    }
}

 

13,併發容器

hashtable,collections.synchtonizedxxx 併發不過高的場景下使用

 

//static Queue<String> tickets = new ConcurrentLinkedQueue<>(); 

併發量叫高的狀況下使用

currentHashMap 不排序 currentHashSet

ConcurrentSkipListMap<>(); //高併發而且排序ConcurrentSkipListSet

queue 隊列

Queue<String> strs = new ConcurrentLinkedQueue<>();
        
        for(int i=0; i<10; i++) {
            strs.offer("a" + i);  //add
        }
        
        System.out.println(strs);
        
        System.out.println(strs.size());
        
        System.out.println(strs.poll()); 取後刪除
        System.out.println(strs.size());
          
        System.out.println(strs.peek()); 只取不刪除
        System.out.println(strs.size());

 

無界隊列   LinkedBlockingQueue

static BlockingQueue<String> strs = new LinkedBlockingQueue<>();

        static Random r = new Random();

        public static void main(String[] args) {
            new Thread(() -> {
                for (int i = 0; i < 100; i++) {
                    try {
                        strs.put("a" + i); //若是滿了,就會等待
                        TimeUnit.MILLISECONDS.sleep(r.nextInt(1000));
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    }
                }
            }, "p1").start();

            for (int i = 0; i < 5; i++) {
                new Thread(() -> {
                    for (;;) {
                        try {
                            System.out.println(Thread.currentThread().getName() + " take -" + strs.take()); //若是空了,就會等待
                        } catch (InterruptedException e) {
                            e.printStackTrace();
                        }
                    }
                }, "c" + i).start();

            }
        }
    }

 

有界隊列

static BlockingQueue<String> strs = new ArrayBlockingQueue<>(10);

===========================

static BlockingQueue<MyTask> tasks = new DelayQueue<>();

 

線程池

當洗個線程須要返回值時用callable 接口,runnable沒有返回值  

Executor
    ExecutorService submit
    Callable = Runnable
    Executors
    ThreadPool
    Future  拿到將來的返回值,調用future時會阻塞,直到任務執行結束


    fixed cached single scheduled workstealing forkjoin
    ThreadpoolExecutor
    PStreamAPI

 

 

Executors.newFixedThreadPool(5);  固定線程池

===========

ExecutorService service = Executors.newCachedThreadPool(); 可緩存線程池 沒有空閒線程就新建一個線程,有空閒線程就複用。默認時間爲60s,60秒後線程自動死亡

===========

ExecutorService service = Executors.newSingleThreadExecutor(); 單線程池

===========

ScheduledExecutorService service = Executors.newScheduledThreadPool(4); 週期線程 定時任務,好處:線程能夠重複使用
    service.scheduleAtFixedRate(()->{
        try {
            TimeUnit.MILLISECONDS.sleep(new Random().nextInt(1000));
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        System.out.println(Thread.currentThread().getName());
}, 0, 500, TimeUnit.MILLISECONDS);

=============

ExecutorService service = Executors.newWorkStealingPool(); 任務執行完自動找活幹 工做竊取 CPU是幾核默認啓用幾個線程,守護線程,實現方式是ForkJoinPool

=============

分叉合併 遞歸調用線程

public class T12_ForkJoinPool {
    static int[] nums = new int[1000000];
    static final int MAX_NUM = 50000;
    static Random r = new Random();
   
    static {
        for(int i=0; i<nums.length; i++) {
            nums[i] = r.nextInt(100);
        }
       
        System.out.println(Arrays.stream(nums).sum()); //stream api
    }
   
    /*
    static class AddTask extends RecursiveAction {
       
        int start, end;
       
        AddTask(int s, int e) {
            start = s;
            end = e;
        }
        @Override
        protected void compute() {
           
            if(end-start <= MAX_NUM) {
                long sum = 0L;
                for(int i=start; i<end; i++) sum += nums[i];
                System.out.println("from:" + start + " to:" + end + " = " + sum);
            } else {
           
                int middle = start + (end-start)/2;
               
                AddTask subTask1 = new AddTask(start, middle);
                AddTask subTask2 = new AddTask(middle, end);
                subTask1.fork();
                subTask2.fork();
            }
           
           
        }
       
    }
    */
   
    static class AddTask extends RecursiveTask<Long> {
       
        private static final long serialVersionUID = 1L;
        int start, end;
       
        AddTask(int s, int e) {
            start = s;
            end = e;
        }

        @Override
        protected Long compute() {
           
            if(end-start <= MAX_NUM) {
                long sum = 0L;
                for(int i=start; i<end; i++) sum += nums[i];
                return sum;
            }
           
            int middle = start + (end-start)/2;
           
            AddTask subTask1 = new AddTask(start, middle);
            AddTask subTask2 = new AddTask(middle, end);
            subTask1.fork();
            subTask2.fork();
           
            return subTask1.join() + subTask2.join();
        }
       
    }
   
    public static void main(String[] args) throws IOException {
        ForkJoinPool fjp = new ForkJoinPool();
        AddTask task = new AddTask(0, nums.length);
        fjp.execute(task);
        long result = task.join();
        System.out.println(result);
       
        //System.in.read();
       
    }
}

=================

線程池核心類 ThreadPoolExecutor 能夠實現自定義線程池

service.scheduleAtFixedRate(()->{
try {
TimeUnit.MILLISECONDS.sleep(new Random().nextInt(1000));
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println(Thread.currentThread().getName());

}, 0, 500, TimeUnit.MILLISECONDS);

ScheduledExecutorService service = Executors.newScheduledThreadPool(4);

 

 

線程池工做原理:

1,查看核心線程是否已滿,若是沒有建立新線程執行任務;

2,若是核心線程達到數量,則把任務丟進隊列,若是隊列隊列沒滿,則等待被調用,若是隊列已滿則執行第三步

3,查看最大線程數是否已達到數量,若是沒有則新建線程執行任務,若是達到最大線程則執行拒絕策略

 

線程池的拒絕策略有

 1,丟掉任務,拋出異常

 2,丟掉任務,不拋出異常

 3,把隊列裏較早的任務移除,嘗試丟進隊列

 4,把任務丟給主線程

阻塞隊列有:

 1,new ArrayBlockingQueue<>(5) 基於數組的先進先出隊列 有界

 2,new LinkedBlockingQueue<>();基於鏈表的先進先出隊列

 3,new Synchronous<>()無緩衝的等待隊列,無界

相關文章
相關標籤/搜索