併發工具類(三)控制併發線程的數量 Semphore

前言

  JDK中爲了處理線程之間的同步問題,除了提供鎖機制以外,還提供了幾個很是有用的併發工具類:CountDownLatch、CyclicBarrier、Semphore、Exchanger、Phaser;
  CountDownLatch、CyclicBarrier、Semphore、Phaser 這四個工具類提供一種併發流程的控制手段;而Exchanger工具類則提供了在線程之間交換數據的一種手段。html

簡介

  

Semaphore(信號量)是用來控制同時訪問特定資源的線程數量,它經過協調各個線程,以保證合理的使用公共資源。
不少年以來,我都以爲從字面上很難理解Semaphore所表達的含義,只能把它比做是控制流量的紅綠燈,好比XX馬路要限制流量,只容許同時有一百輛車在這條路上行使,其餘的都必須在路口等待,因此前一百輛車會看到綠燈,能夠開進這條馬路,後面的車會看到紅燈,不能駛入XX馬路,可是若是前一百輛中有五輛車已經離開了XX馬路,那麼後面就容許有5輛車駛入馬路,這個例子裏說的車就是線程,駛入馬路就表示線程在執行,離開馬路就表示線程執行完成,看見紅燈就表示線程被阻塞,不能執行。

應用場景

  Semaphore能夠用於作java

流量控制
,特別公用資源有限的應用場景,好比數據庫鏈接。假若有一個需求,要讀取幾萬個文件的數據,由於都是IO密集型任務,咱們能夠啓動幾十個線程併發的讀取,可是若是讀到內存後,還須要存儲到數據庫中,而數據庫的鏈接數只有10個,這時咱們必須控制只有十個線程同時獲取數據庫鏈接保存數據,不然會報錯沒法獲取數據庫鏈接。這個時候,咱們就可使用Semaphore來作流控,代碼以下:

public class SemaphoreTest {

    private static final int THREAD_COUNT = 30;

    private static ExecutorService threadPool = Executors
            .newFixedThreadPool(THREAD_COUNT);

    private static Semaphore s = new Semaphore(10);

    public static void main(String[] args) {
        for (int i = 0; i < THREAD_COUNT; i++) {
            threadPool.execute(new Runnable() {
                @Override
                public void run() {
                    try {
                        s.acquire();
                        System.out.println("save data");
                        s.release();
                    } catch (InterruptedException e) {
                    }
                }
            });
        }

        threadPool.shutdown();
    }
}
複製代碼

在代碼中,雖然有30個線程在執行,可是隻容許10個併發的執行。Semaphore的構造方法Semaphore(int permits) 接受一個整型的數字,表示可用的許可證數量。Semaphore(10)表示容許10個線程獲取許可證,也就是最大併發數是10。Semaphore的用法也很簡單,首先線程使用Semaphore的acquire()獲取一個許可證,使用完以後調用release()歸還許可證。還能夠用tryAcquire()方法嘗試獲取許可證。
spring

Semphore的方法摘要

一、獲取許可

  API中提供了多種的方式獲取鎖:數據庫

  • 能夠獲取一個、多個許可;
  • 提供阻塞、非阻塞、超時的方式獲取許可;
  • 除了可中斷、還提供一個非中斷的方式獲取鎖;

public void acquire() throws InterruptedException
今後信號量獲取一個許可,在提供一個許可前一直將線程阻塞,不然線程被bash

中斷。


public void acquire(int permits) throws InterruptedException
數據結構

獲取多個許可。

今後信號量獲取給定數目的許可,在提供這些許可前一直將線程阻塞,或者線程已被併發

中斷。


public void acquireUninterruptibly()
今後信號量中獲取許可,在有可用的許可前將其阻塞。ide

不可中斷。


public void acquireUninterruptibly(int permits)
工具

獲取多個許可。

今後信號量獲取給定數目的許可,在提供這些許可前一直將線程阻塞。測試

不可中斷。


public boolean tryAcquire()
僅在調用時此信號量存在一個可用許可,才從信號量獲取許可。

非阻塞的方式嘗試獲取許可。


public boolean tryAcquire(int permits)
僅在調用時此信號量中有給定數目的許可時,才今後信號量中獲取這些許可。
public boolean tryAcquire(int permits, long timeout, TimeUnit unit) throws InterruptedException
若是在給定的等待時間內此信號量有可用的全部許可,而且當前線程未被中斷,則今後信號量獲取給定數目的許可。

超時等待獲取許可


public boolean tryAcquire(long timeout, TimeUnit unit) throws InterruptedException
若是在給定的等待時間內,此信號量有可用的許可而且當前線程未被中斷,則今後信號量獲取一個許可。

二、許可的釋放

public void release( ):
釋放一個許可,將其返回給信號量。
public void release(int permits)
釋放給定數目的許可,將其返回到信號量。

三、提供的監控方法

public int availablePermits( )
返回此信號量中當前可用的許可數
public int drainPermits()
獲取並返回當即可用的全部許可
public final int getQueueLength()
返回正在等待獲取的線程的估計數目。該值僅是估計的數字,由於在此方法遍歷內部數據結構的同時,線程的數目可能動態地變化。此方法用於監視系統狀態,不用於同步控制。
public final boolean hasQueuedThreads()
查詢是否有線程正在等待獲取。
public boolean isFair()
若是此信號量的公平設置爲 true,則返回 true。

protected 方法:

protected Collection
返回一個 collection,包含可能等待獲取的線程。由於在構造此結果的同時實際的線程 set 可能動態地變化,因此返回的 collection 僅是盡力的估計值。所返回 collection 中的元素沒有特定的順序。
protected void reducePermits(int reduction)
根據指定的縮減量減少可用許可的數目。此方法在使用信號量來跟蹤那些變爲不可用資源的子類中頗有用

@ Example 獲取、釋放多個許可

try {
    Semaphore semaphore = new Semaphore(5);
    //獲取一個許可
    semaphore.acquire();
    //一次性獲取4個許可
    semaphore.acquire(4);
    System.out.println("Semaphore 剩下的許可數量:"+semaphore.availablePermits());
    //一次性釋放5個許可
    semaphore.release(5);
    System.out.println("Semaphore 剩下的許可數量:"+semaphore.availablePermits());
    //再釋放5個許可
    semaphore.release();
    semaphore.release();
    semaphore.release(3);
    System.out.println("Semaphore 剩下的許可數量:"+semaphore.availablePermits());
   
    } catch (InterruptedException e) {
    e.printStackTrace();
複製代碼

運行結果:

Semaphore 剩下的許可數量:0
Semaphore 剩下的許可數量:5
Semaphore 剩下的許可數量:10
複製代碼

 從上面的運行結果能夠看出,

構造方法的 new Semaphore(5)中參數5並非最終的許可數量,能夠經過release()方法增長許可數量。

本人測試用例

package com.wxx.demo;

import com.wxx.demo.util.IdUtiles;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.springframework.boot.test.context.SpringBootTest;
import org.springframework.test.context.junit4.SpringRunner;

import java.util.concurrent.CountDownLatch;
import java.util.concurrent.Semaphore;

@RunWith(SpringRunner.class)
//@SpringBootTest(classes = LeisureWebApplication.class)
public class TaskTest {

    @Test
    public void taskTest(){

        Runnable task = new Runnable() {

            int count = 0;

            @Override
            public void run() {

                count ++;
                try{
                    String id = IdUtiles.creatId();
                    System.out.println(id);
                }catch (InterruptedException e){
                    e.printStackTrace();
                }
                System.out.println(count);
                System.out.println("Thread : " + Thread.currentThread().getId());

                try {
                    Thread.sleep(1000);
                }catch (InterruptedException e){
                    e.printStackTrace();
                }

            }
        };

        double executeTime = this.executeTime(100, task);
        System.out.println("執行時間: " + executeTime);
    }

    private double executeTime(int taskCount,Runnable task){

        CountDownLatch start = new CountDownLatch(1);
        CountDownLatch end = new CountDownLatch(taskCount);

        for (int i = 0; i < taskCount ; i++) {
            Thread thread = new Thread() {

                public void run(){
                    try {
                        start.await();

                        try {
                            task.run();
                        }finally {
                            end.countDown();
                        }

                    }catch (InterruptedException e){
                        e.printStackTrace();
                    }
                }
            };

            thread.start();
        }

        long startTime = System.nanoTime();

        //開啓開關
        start.countDown();

        long endTime = System.nanoTime();

        return endTime - startTime;
    }
}
複製代碼

package com.wxx.demo.util;

import java.text.SimpleDateFormat;
import java.util.concurrent.Semaphore;
import java.util.concurrent.TimeUnit;

/**
 * @Author : leisure
 * @Date : 2019/1/17
 */
public class IdUtiles {

    private static String lead = "leisure";
    private static int Guid = 100;
    private static Semaphore semaphore = new Semaphore(5,false);
    /**
     * 建立以字符串打頭結尾自增的惟一id
     * @return
     */
    public static synchronized String creatId() throws InterruptedException{
        //測試控制方法內的併發線程數 測試放開synchronized
        //semaphore.acquire();
        semaphore.tryAcquire(1,1000, TimeUnit.MILLISECONDS);
        int i = semaphore.availablePermits();
        System.out.println("當前可用許可" + i);


        int i1 = semaphore.drainPermits();
        System.out.println("當前當即可用的許可" + i1);

        boolean b = semaphore.hasQueuedThreads();
        System.out.println("當前是否有線程等待" + b);

        boolean fair = semaphore.isFair();
        System.out.println("當前信號是否公平" + fair);
        long l = System.currentTimeMillis();

        int queueLength = semaphore.getQueueLength();
        System.out.println("等待線程數" + queueLength);
        Thread.sleep(100);
        Guid += 1;

        String format = new SimpleDateFormat("yyyy").format(l);

        if (Guid > 999){
            Guid = 100;
        }

        String id = lead + format + l + Guid;

        semaphore.release();
        return id;
    }
}
複製代碼

注:文章源地址:https://www.cnblogs.com/jinggod/p/8494246.html

相關文章
相關標籤/搜索