Semaphore信號量實現簡易版數據庫鏈接池

Semaphore 用於控制同時訪問某個特定資源的線程數量,主要用在流量控制java

下面咱們使用 Semaphore 來實現一個簡易版數據庫鏈接池數據庫

實現思路:less

  1. DBPoolSemaphore 中使用 LinkedList 充當鏈接池容器
  2. DBPoolSemaphore 中 實現一個取數據庫鏈接和釋放數據庫鏈接的方法:takeConnect 和 returnConnect
  3. takeConnect:在mills時間內還拿不到數據庫鏈接,返回一個null.returnConnect:釋放鏈接
  4. DBPoolSemaphore 中定義兩個信號量:Semaphore useful,useless.useful表示可用的數據庫鏈接,useless表示已用的數據庫鏈接,takeConnect和returnConnect都要操做這兩個信號量
  5. 測試類啓動50個線程去取數據庫鏈接並進行數據庫操做,最後釋放鏈接.每一個線程取20次
public class DBPoolSemaphore {
	
	private final static int POOL_SIZE = 10;
	/** * useful表示可用的數據庫鏈接,useless表示已用的數據庫鏈接 */
	private final Semaphore useful,useless;
	
	public DBPoolSemaphore() {
		this.useful = new Semaphore(POOL_SIZE);
		this.useless = new Semaphore(0);
	}

	/** * 存放數據庫鏈接的容器 */
	private static final LinkedList<Connection> pool = new LinkedList<>();
	//初始化池
	static {
        for (int i = 0; i < POOL_SIZE; i++) {
            pool.addLast(SqlConnectImpl.fetchConnection());
        }
	}

	/** * 歸還鏈接 * @param connection * @throws InterruptedException */
	public void returnConnect(Connection connection) throws InterruptedException {
		if(connection!=null) {
			System.out.println("當前有" + useful.getQueueLength() + "個線程等待數據庫鏈接!!"
					+ "可用鏈接數:" + useful.availablePermits());
			//returnConnect 必須爲拿到鏈接的線程調用,acquire方法是阻塞方法
			useless.acquire();
			synchronized (pool) {
				pool.addLast(connection);
			}
			useful.release();
		}
	}

	/** * 從池子拿鏈接 * @return * @throws InterruptedException */
	public Connection takeConnect() throws InterruptedException {
		//一秒超時
		boolean acquire = useful.tryAcquire(1000, TimeUnit.MILLISECONDS);
		Connection conn = null;
		if (acquire) {
			synchronized (pool) {
				conn = pool.removeFirst();
			}
			//已用的加1
			useless.release();
		}
		return conn;
	}
	
}
複製代碼
public class SqlConnectImpl implements Connection{
	
    public static final Connection fetchConnection(){
        return new SqlConnectImpl();
    }

	@Override
	public void commit() throws SQLException {
		SleepTools.ms(70);
	}
	@Override
	public Statement createStatement() throws SQLException {
		SleepTools.ms(1);
		return null;
	}
}
複製代碼
public class SemaphoreTest {

	private static DBPoolSemaphore dbPool = new DBPoolSemaphore();
	

	private static class BusiThread extends Thread{
		@Override
		public void run() {
			//讓每一個線程持有鏈接的時間不同
			Random r = new Random();
			long start = System.currentTimeMillis();
			Connection connect = null;
			try {
				connect = dbPool.takeConnect();
				System.out.println("Thread_" + Thread.currentThread().getId()
						+ "_獲取數據庫鏈接共耗時【" + (System.currentTimeMillis() - start) + "】ms.");
				//模擬業務操做,線程持有鏈接查詢數據
				SleepTools.ms(100+r.nextInt(100));

			} catch (InterruptedException ignored) {
			}finally {
				System.out.println("查詢數據完成,歸還鏈接!");
				if (connect != null) {
					try {
						dbPool.returnConnect(connect);
					} catch (InterruptedException ignored) {
					}
				}
			}
		}
	}
	
	public static void main(String[] args) {
        for (int i = 0; i < 50; i++) {
            Thread thread = new BusiThread();
            thread.start();
        }
	}
	
}
複製代碼
相關文章
相關標籤/搜索