Java Semaphore實現線程池任務調度

關於Semaphore舉例html

以一個停車場運做爲例。爲了簡單起見,假設停車場只有三個車位,一開始三個車位都是空的。這時若是同時來了五輛車,看門人容許其中三輛不受阻礙的進入,而後放下車攔,剩下的車則必須在入口等待,此後來的車也都不得不在入口處等待。這時,有一輛車離開停車場,看門人得知後,打開車攔,放入一輛,若是又離開兩輛,則又能夠放入兩輛,如此往復。在這個停車場系統中,車位是公共資源,每輛車比如一個線程,看門人起的就是信號量的做用。java

信號量的特性以下:多線程

信號量是一個非負整數(車位數),全部經過它的線程(車輛)都會將該整數減一(經過它固然是爲了使用資源),當該整數值爲零時,全部試圖經過它的線程都將處於等待狀態。在信號量上咱們定義兩種操做: Wait(等待) 和 Release(釋放)。 當一個線程調用Wait(等待)操做時,它要麼經過而後將信號量減一,要麼一直等下去,直到信號量大於一或超時。Release(釋放)其實是在信號量上執行加操做,對應於車輛離開停車場,該操做之因此叫作「釋放」是由於加操做其實是釋放了由信號量守護的資源。併發

在java中,還能夠設置該信號量是否採用公平模式,若是以公平方式執行,則線程將會按到達的順序(FIFO)執行,若是是非公平,則能夠後請求的有可能排在隊列的頭部。app


JDK中定義以下:
Semaphore(int permits, boolean fair)ide

建立具備給定的許可數和給定的公平設置的Semaphore。工具

Semaphore當前在多線程環境下被擴放使用,操做系統的信號量是個很重要的概念,在進程控制方面都有應用。Java併發庫Semaphore 能夠很輕鬆完成信號量控制,Semaphore能夠控制某個資源可被同時訪問的個數,經過 acquire() 獲取一個許可,若是沒有就等待,而 release() 釋放一個許可。好比在Windows下能夠設置共享文件的最大客戶端訪問個數。ui

 

與Semaphore具備相同做用的調度工具備CountDownLatch、CyclicBarrierspa

 

一.多線程資源分配

1.通常模式

package com.tianwt.app;

import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Semaphore;
import java.util.concurrent.TimeUnit;

public class SemaphoreThreadPool {
	
	public static int numbers = 20;
    public static void main(String[] args) {
    	// 容許2個線程同時訪問  
        final Semaphore semaphore = new Semaphore(2);  
        ExecutorService executorService = Executors.newCachedThreadPool();  
       
        for (int i = 0; i < 20; i++) {  
            final int index = i+1;   
            executorService.execute(new Runnable() {  
                public void run() {  
                    try {  
                        semaphore.acquire();  
                        // 這裏多是業務代碼  
                        numbers = numbers-1;
                        System.out.println("("+index+")."+"線程:" + Thread.currentThread().getName() + ",numbers=" + numbers);  
                        TimeUnit.SECONDS.sleep(1);  
                        semaphore.release();  
                        System.out.println("容許TASK個數:" + semaphore.availablePermits());    
                    } catch (InterruptedException e) {  
                        e.printStackTrace();  
                    }  
                }  
            });  
        }  
        executorService.shutdown();  
    }
   
}

執行結果以下:操作系統

(1).線程:pool-1-thread-1,numbers=18
(2).線程:pool-1-thread-2,numbers=18
(4).線程:pool-1-thread-4,numbers=16
(3).線程:pool-1-thread-3,numbers=16
容許TASK個數:0
容許TASK個數:0
容許TASK個數:2
容許TASK個數:0
(5).線程:pool-1-thread-5,numbers=14
(6).線程:pool-1-thread-6,numbers=14
(7).線程:pool-1-thread-7,numbers=13
容許TASK個數:0
(10).線程:pool-1-thread-10,numbers=12
容許TASK個數:0
容許TASK個數:1
容許TASK個數:1
(11).線程:pool-1-thread-11,numbers=10
(9).線程:pool-1-thread-9,numbers=10
容許TASK個數:0
(8).線程:pool-1-thread-8,numbers=8
容許TASK個數:1
(12).線程:pool-1-thread-12,numbers=9
容許TASK個數:2
容許TASK個數:2
(14).線程:pool-1-thread-14,numbers=6
(13).線程:pool-1-thread-13,numbers=6
容許TASK個數:2
容許TASK個數:0
(16).線程:pool-1-thread-16,numbers=4
(15).線程:pool-1-thread-15,numbers=4
容許TASK個數:2
容許TASK個數:2
(18).線程:pool-1-thread-18,numbers=2
(17).線程:pool-1-thread-17,numbers=2
(19).線程:pool-1-thread-19,numbers=1
(20).線程:pool-1-thread-20,numbers=0
容許TASK個數:0
容許TASK個數:0
容許TASK個數:1
容許TASK個數:2

從執行結果來看,CacheThreadPool連續建立了20個線程,一樣也印證了Semaphore資源分配功能。遺憾的是的Semaphore使用公平模式初始化時,並未徹底進行FIFO模式排隊。另一點,從numbers咱們知道,Semaphore只是用來實現資源分配的,並不會資源同步

final Semaphore semaphore = new Semaphore(2,true);  

運行結果以下:

2.同步模式

(2).線程:pool-1-thread-2,numbers=18
(1).線程:pool-1-thread-1,numbers=18
容許TASK個數:0
(3).線程:pool-1-thread-3,numbers=16
容許TASK個數:0
(4).線程:pool-1-thread-4,numbers=17
容許TASK個數:2
容許TASK個數:0
(5).線程:pool-1-thread-5,numbers=14
(6).線程:pool-1-thread-6,numbers=14
容許TASK個數:0
(9).線程:pool-1-thread-9,numbers=12
(7).線程:pool-1-thread-7,numbers=13
容許TASK個數:0
(11).線程:pool-1-thread-11,numbers=10
容許TASK個數:0
容許TASK個數:0
(10).線程:pool-1-thread-10,numbers=10
(8).線程:pool-1-thread-8,numbers=8
容許TASK個數:2
容許TASK個數:0
(12).線程:pool-1-thread-12,numbers=8
容許TASK個數:2
(14).線程:pool-1-thread-14,numbers=6
容許TASK個數:2
(13).線程:pool-1-thread-13,numbers=6
容許TASK個數:0
容許TASK個數:0
(16).線程:pool-1-thread-16,numbers=4
(15).線程:pool-1-thread-15,numbers=5
容許TASK個數:0
(17).線程:pool-1-thread-17,numbers=3
(18).線程:pool-1-thread-18,numbers=2
容許TASK個數:0
容許TASK個數:1
容許TASK個數:2
(20).線程:pool-1-thread-20,numbers=0
(19).線程:pool-1-thread-19,numbers=1
容許TASK個數:2
容許TASK個數:2

咱們看到,pool-1-thread-8和pool-1-thread-7相距很遠。

2.同步模式

如何解決3個不足呢?

import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Semaphore;
import java.util.concurrent.TimeUnit;

public class SemaphoreThreadPool {
	
	public static int numbers = 20;
    public static void main(String[] args) {
    	// 容許2個線程同時訪問  
        final Semaphore semaphore = new Semaphore(2,true);  
        final ExecutorService executorService = Executors.newFixedThreadPool(4);
       
        for (int i = 0; i < 20; i++) {  
            final int index = i+1;   
            executorService.execute(new Runnable() {  
                public void run() {  
                    try {  
                        semaphore.acquire();  
                        // 這裏多是業務代碼  
                        synchronized (executorService) {
                        numbers = numbers-1;
                        System.out.println("("+index+")."+"線程:" + Thread.currentThread().getName() + ",numbers=" + numbers);  
                        TimeUnit.SECONDS.sleep(1);  
                        }
                        semaphore.release();  
                        System.out.println("容許TASK個數:" + semaphore.availablePermits());    
                    } catch (InterruptedException e) {  
                        e.printStackTrace();  
                    }  
                }  
            });  
        }  
        executorService.shutdown();  
    }
   
}

這樣就能夠解決線程池過多,FIFO問題以及同步問題。

二.線程等待

public static boolean isLocked = true;
    public static void main(String[] args) throws InterruptedException {
    	
    	final Semaphore semaphore = new Semaphore(0);
    	Thread t = new Thread(new Runnable() {
			@Override
			public void run() {
				try {
					TimeUnit.SECONDS.sleep(3);
				} catch (InterruptedException e) {
					e.printStackTrace();
				}finally{
					System.out.println(Thread.currentThread().getName()+":start");
					/**
					 *  (●'◡'●)
					 * 注意,若是是重要的變量值修改,必須在semaphore調用release以前,不然有可能形成未知問題
					 *  (●'◡'●)
					 */
					isLocked = true; 
					semaphore.release(1);
					System.out.println(Thread.currentThread().getName()+":end");
				}
			}
		});
    	
    	t.start();
    	semaphore.acquire(); //由於初始信號量爲0,所以這裏阻塞等待
    	System.out.println("主線程執行!");
    	semaphore.release();
    
    }

以前在iOS開發中遇到變量值修改不一樣不得問題,在這裏提醒開發者,若是是重要的變量值修改,必須在semaphore調用release以前,不然有可能形成未知問題

iOS dispatch信號量semaphore

相關文章
相關標籤/搜索