用Java實現一個通用併發對象池

這篇文章裏咱們主要討論下如何在Java裏實現一個對象池。最近幾年,Java虛擬機的性能在各方面都獲得了極大的提高,所以對大多數對象而言,已經沒有必要經過對象池來提升性能了。根本的緣由是,建立一個新的對象的開銷已經不像過去那樣昂貴了。php

然而,仍是有些對象,它們的建立開銷是很是大的,好比線程,數據庫鏈接等這些非輕量級的對象。在任何一個應用程序裏面,咱們確定會用到不止一個這樣的對象。若是有一種很方便的建立管理這些對象的池,使得這些對象可以動態的重用,而客戶端代碼也不用關心它們的生命週期,仍是會很給力的。html

在真正開始寫代碼前,咱們先來梳理下一個對象池須要完成哪些功能。java

  • 若是有可用的對象,對象池應當能返回給客戶端。react

  • 客戶端把對象放回池裏後,能夠對這些對象進行重用。sql

  • 對象池可以建立新的對象來知足客戶端不斷增加的需求。數據庫

  • 須要有一個正確關閉池的機制來確保關閉後不會發生內存泄露。緩存

不用說了,上面幾點就是咱們要暴露給客戶端的鏈接池的接口的基本功能。併發

咱們的聲明的接口以下:異步

package com.test.pool;/**
 * Represents a cached pool of objects.
 *
 * @author Swaranga
 *
 * @param <T> the type of object to pool.
 */public interface Pool<T>{ /**
  * Returns an instance from the pool.
  * The call may be a blocking one or a non-blocking one
  * and that is determined by the internal implementation.
  *
  * If the call is a blocking call,
  * the call returns immediately with a valid object
  * if available, else the thread is made to wait
  * until an object becomes available.
  * In case of a blocking call,
  * it is advised that clients react
  * to {@link InterruptedException} which might be thrown
  * when the thread waits for an object to become available.
  *
  * If the call is a non-blocking one,
  * the call returns immediately irrespective of
  * whether an object is available or not.
  * If any object is available the call returns it
  * else the call returns < code >null< /code >.
  *
  * The validity of the objects are determined using the
  * {@link Validator} interface, such that
  * an object < code >o< /code > is valid if
  * < code > Validator.isValid(o) == true < /code >.
  *
  * @return T one of the pooled objects.
  */
 T get(); /**
  * Releases the object and puts it back to the pool.
  *
  * The mechanism of putting the object back to the pool is
  * generally asynchronous,
  * however future implementations might differ.
  *
  * @param t the object to return to the pool
  */

 void release(T t); /**
  * Shuts down the pool. In essence this call will not
  * accept any more requests
  * and will release all resources.
  * Releasing resources are done
  * via the < code >invalidate()< /code >
  * method of the {@link Validator} interface.
  */

 void shutdown();
}

爲了可以支持任意對象,上面這個接口故意設計得很簡單通用。它提供了從池裏獲取/返回對象的方法,還有一個關閉池的機制,以便釋放對象。async

如今咱們來實現一下這個接口。開始動手以前,值得一提的是,一個理想的release方法應該先嚐試檢查下這個客戶端返回的對象是否還能重複使用。若是是的話再把它扔回池裏,若是不是,就捨棄掉這個對象。咱們但願這個Pool接口的全部實現都能遵循這個規則。在開始具體的實現類前,咱們先建立一個抽象類,以便限制後續的實現能遵循這點。咱們實現的抽象類就叫作AbstractPool,它的定義以下:

package com.test.pool;/**
 * Represents an abstract pool, that defines the procedure
 * of returning an object to the pool.
 *
 * @author Swaranga
 *
 * @param <T> the type of pooled objects.
 */abstract class AbstractPool <T> implements Pool <T>{ /**
  * Returns the object to the pool.
  * The method first validates the object if it is
  * re-usable and then puts returns it to the pool.
  *
  * If the object validation fails,
  * some implementations
  * will try to create a new one
  * and put it into the pool; however
  * this behaviour is subject to change
  * from implementation to implementation
  *
  */
 @Override
 public final void release(T t)
 {  if(isValid(t))
  {
   returnToPool(t);
  }  else
  {
   handleInvalidReturn(t);
  }
 } protected abstract void handleInvalidReturn(T t); protected abstract void returnToPool(T t); protected abstract boolean isValid(T t);
}

在上面這個類裏,咱們讓對象池必須得先驗證對象後才能把它放回到池裏。具體的實現能夠自由選擇如何實現這三種方法,以便定製本身的行爲。它們根據本身的邏輯來決定如何判斷一個對象有效,無效的話應該怎麼處理(handleInvalidReturn方法),怎麼把一個有效的對象放回到池裏(returnToPool方法)。

有了上面這幾個類,咱們就能夠着手開始具體的實現了。不過還有個問題,因爲上面這些類是設計成能支持通用的對象池的,所以具體的實現不知道該如何驗證對象的有效性(由於對象都是泛型的)。所以咱們還須要些別的東西來幫助咱們完成這個。

咱們須要一個通用的方法來完成對象的校驗,而具體的實現沒必要關心對象是何種類型。所以咱們引入了一個新的接口,Validator,它定義了驗證對象的方法。這個接口的定義以下:

package com.test.pool; /**
  * Represents the functionality to
  * validate an object of the pool
  * and to subsequently perform cleanup activities.
  *
  * @author Swaranga
  *
  * @param < T > the type of objects to validate and cleanup.
  */
 public static interface Validator < T > {  /**
   * Checks whether the object is valid.
   *
   * @param t the object to check.
   *
   * @return <code>true</code>
   * if the object is valid else <code>false</code>.
   */
  public boolean isValid(T t);  /**
   * Performs any cleanup activities
   * before discarding the object.
   * For example before discarding
   * database connection objects,
   * the pool will want to close the connections.
   * This is done via the
   * <code>invalidate()</code> method.
   *
   * @param t the object to cleanup
   */

  public void invalidate(T t);
 }

上面這個接口定義了一個檢驗對象的方法,以及一個把對象置爲無效的方法。當準備廢棄一個對象並清理內存的時候,invalidate方法就派上用場了。值得注意的是這個接口自己沒有任何意義,只有當它在對象池裏使用的時候纔有意義,因此咱們把這個接口定義到Pool接口裏面。這和Java集合庫裏的Map和Map.Entry是同樣的。因此咱們的Pool接口就成了這樣:

package com.test.pool;/**
 * Represents a cached pool of objects.
 *
 * @author Swaranga
 *
 * @param < T > the type of object to pool.
 */public interface Pool< T >{ /**
  * Returns an instance from the pool.
  * The call may be a blocking one or a non-blocking one
  * and that is determined by the internal implementation.
  *
  * If the call is a blocking call,
  * the call returns immediately with a valid object
  * if available, else the thread is made to wait
  * until an object becomes available.
  * In case of a blocking call,
  * it is advised that clients react
  * to {@link InterruptedException} which might be thrown
  * when the thread waits for an object to become available.
  *
  * If the call is a non-blocking one,
  * the call returns immediately irrespective of
  * whether an object is available or not.
  * If any object is available the call returns it
  * else the call returns < code >null< /code >.
  *
  * The validity of the objects are determined using the
  * {@link Validator} interface, such that
  * an object < code >o< /code > is valid if
  * < code > Validator.isValid(o) == true < /code >.
  *
  * @return T one of the pooled objects.
  */
 T get(); /**
  * Releases the object and puts it back to the pool.
  *
  * The mechanism of putting the object back to the pool is
  * generally asynchronous,
  * however future implementations might differ.
  *
  * @param t the object to return to the pool
  */

 void release(T t); /**
  * Shuts down the pool. In essence this call will not
  * accept any more requests
  * and will release all resources.
  * Releasing resources are done
  * via the < code >invalidate()< /code >
  * method of the {@link Validator} interface.
  */

 void shutdown(); /**
  * Represents the functionality to
  * validate an object of the pool
  * and to subsequently perform cleanup activities.
  *
  * @author Swaranga
  *
  * @param < T > the type of objects to validate and cleanup.
  */
 public static interface Validator < T > {  /**
   * Checks whether the object is valid.
   *
   * @param t the object to check.
   *
   * @return <code>true</code>
   * if the object is valid else <code>false</code>.
   */
  public boolean isValid(T t);  /**
   * Performs any cleanup activities
   * before discarding the object.
   * For example before discarding
   * database connection objects,
   * the pool will want to close the connections.
   * This is done via the
   * <code>invalidate()</code> method.
   *
   * @param t the object to cleanup
   */

  public void invalidate(T t);
 }
}

準備工做已經差很少了,在最後開始前咱們還須要一個終極武器,這纔是這個對象池的殺手鐗。就是「可以建立新的對象」。咱們的對象池是泛型的,所以它們得知道如何去生成新的對象來填充這個池子。這個功能不能依賴於對象池自己,必需要有一個通用的方式來建立新的對象。經過一個ObjectFactory的接口就能完成這個,它只有一個「如何建立新的對象」的方法。咱們的ObjectFactory接口以下:

package com.test.pool;/**
 * Represents the mechanism to create
 * new objects to be used in an object pool.
 *
 * @author Swaranga
 *
 * @param < T > the type of object to create.
 */public interface ObjectFactory < T >{ /**
  * Returns a new instance of an object of type T.
  *
  * @return T an new instance of the object of type T
  */
 public abstract T createNew();
}

咱們的工具類都已經搞定了,如今能夠開始真正實現咱們的Pool接口了。由於咱們但願這個池能在併發程序裏面使用,因此咱們會建立一個阻塞的對象池,當沒有對象可用的時候,讓客戶端先阻塞住。咱們的阻塞機制是讓客戶端一直阻塞直到有對象可用爲止。這樣的話致使咱們還須要再增長一個只阻塞必定時間的方法,若是在超時時間到來前有對象可用則返回,若是超時了就返回null而不是一直等待下去。這樣的實現有點相似Java併發庫裏的LinkedBlockingQueue,所以真正實現前咱們再暴露一個接口,BlockingPool,相似於Java併發庫裏的BlockingQueue接口。

這裏是BlockingQueue的聲明:

package com.test.pool;import java.util.concurrent.TimeUnit;/**
 * Represents a pool of objects that makes the
 * requesting threads wait if no object is available.
 *
 * @author Swaranga
 *
 * @param < T > the type of objects to pool.
 */public interface BlockingPool < T > extends Pool < T >{ /**
  * Returns an instance of type T from the pool.
  *
  * The call is a blocking call,
  * and client threads are made to wait
  * indefinitely until an object is available.
  * The call implements a fairness algorithm
  * that ensures that a FCFS service is implemented.
  *
  * Clients are advised to react to InterruptedException.
  * If the thread is interrupted while waiting
  * for an object to become available,
  * the current implementations
  * sets the interrupted state of the thread
  * to <code>true</code> and returns null.
  * However this is subject to change
  * from implementation to implementation.
  *
  * @return T an instance of the Object
  * of type T from the pool.
  */
 T get(); /**
  * Returns an instance of type T from the pool,
  * waiting up to the
  * specified wait time if necessary
  * for an object to become available..
  *
  * The call is a blocking call,
  * and client threads are made to wait
  * for time until an object is available
  * or until the timeout occurs.
  * The call implements a fairness algorithm
  * that ensures that a FCFS service is implemented.
  *
  * Clients are advised to react to InterruptedException.
  * If the thread is interrupted while waiting
  * for an object to become available,
  * the current implementations
  * set the interrupted state of the thread
  * to <code>true</code> and returns null.
  * However this is subject to change
  * from implementation to implementation.
  *
  *
  * @param time amount of time to wait before giving up,
  *   in units of <tt>unit</tt>
  * @param unit a <tt>TimeUnit</tt> determining
  *   how to interpret the
  *        <tt>timeout</tt> parameter
  *
  * @return T an instance of the Object
  * of type T from the pool.
  *
  * @throws InterruptedException
  * if interrupted while waiting
  */

 T get(long time, TimeUnit unit) throws InterruptedException;
}

BoundedBlockingPool的實現以下:

package com.test.pool;import java.util.concurrent.BlockingQueue;import java.util.concurrent.Callable;import java.util.concurrent.ExecutorService;import java.util.concurrent.Executors;import java.util.concurrent.LinkedBlockingQueue;import java.util.concurrent.TimeUnit;public final class BoundedBlockingPool
        extends <AbstractPool>        implements <BlockingPool>{    private int size;    private BlockingQueue  objects;    private Validator  validator;    private ObjectFactory  objectFactory;    private ExecutorService executor =
            Executors.newCachedThreadPool();    private volatile boolean shutdownCalled;    public BoundedBlockingPool(            int size,
            Validator  validator,
            ObjectFactory  objectFactory)
    {        super();        this.objectFactory = objectFactory;        this.size = size;        this.validator = validator;
        objects = new LinkedBlockingQueue (size);
        initializeObjects();
        shutdownCalled = false;
    }    public T get(long timeOut, TimeUnit unit)
    {        if(!shutdownCalled)
        {
            T t = null;            try
            {
                t = objects.poll(timeOut, unit);                return t;
            }            catch(InterruptedException ie)
            {
                Thread.currentThread().interrupt();
            }            return t;
        }        throw new IllegalStateException(                'Object pool is already shutdown');
    }    public T get()
    {        if(!shutdownCalled)
        {
            T t = null;            try
            {
                t = objects.take();
            }            catch(InterruptedException ie)
            {
                Thread.currentThread().interrupt();
            }            return t;
        }        throw new IllegalStateException(                'Object pool is already shutdown');
    }    public void shutdown()
    {
        shutdownCalled = true;
        executor.shutdownNow();
        clearResources();
    }    private void clearResources()
    {        for(T t : objects)
        {
            validator.invalidate(t);
        }
    }    @Override
    protected void returnToPool(T t)
    {        if(validator.isValid(t))
        {
            executor.submit(new ObjectReturner(objects, t));
        }
    }    @Override
    protected void handleInvalidReturn(T t)
    {
    }    @Override
    protected boolean isValid(T t)
    {        return validator.isValid(t);
    }    private void initializeObjects()
    {        for(int i = 0; i < size; i++)
        {
            objects.add(objectFactory.createNew());
        }
    }    private class ObjectReturner
            implements <Callable>    {        private BlockingQueue  queue;        private E e;        public ObjectReturner(BlockingQueue  queue, E e)
        {            this.queue = queue;            this.e = e;
        }        public Void call()
        {            while(true)
            {                try
                {
                    queue.put(e);                    break;
                }                catch(InterruptedException ie)
                {
                    Thread.currentThread().interrupt();
                }
            }            return null;
        }

    }

}

上面是一個很是基本的對象池,它內部是基於一個LinkedBlockingQueue來實現的。這裏惟一比較有意思的方法就是returnToPool。由於內部的存儲是一個LinkedBlockingQueue實現的,若是咱們直接把返回的對象扔進去的話,若是隊列已滿可能會阻塞住客戶端。不過咱們不但願客戶端由於把對象放回池裏這麼個普通的方法就阻塞住了。因此咱們把最終將對象插入到隊列裏的任務做爲一個異步的的任務提交給一個Executor來執行,以便讓客戶端線程能當即返回。

如今咱們將在本身的代碼中使用上面這個對象池,用它來緩存數據庫鏈接。咱們須要一個校驗器來驗證數據庫鏈接是否有效。

下面是這個JDBCConnectionValidator:

package com.test;import java.sql.Connection;import java.sql.SQLException;import com.test.pool.Pool.Validator;public final class JDBCConnectionValidator implements Validator < Connection >{    public boolean isValid(Connection con)
    {        if(con == null)
        {            return false;
        }        try
        {            return !con.isClosed();
        }        catch(SQLException se)
        {            return false;
        }

    }    public void invalidate(Connection con)
    {        try
        {
            con.close();
        }        catch(SQLException se)
        {
        }
    }

}

還有一個JDBCObjectFactory,它將用來生成新的數據庫鏈接對象:

package com.test;import java.sql.Connection;import java.sql.DriverManager;import java.sql.SQLException;import com.test.pool.ObjectFactory;public class JDBCConnectionFactory implements ObjectFactory < Connection >{   private String connectionURL;   private String userName;   private String password;   public JDBCConnectionFactory(
     String driver,
     String connectionURL,
     String userName,
     String password) {     super();     try
     {
        Class.forName(driver);
     }     catch(ClassNotFoundException ce)
     {        throw new IllegalArgumentException('Unable to find driver in classpath', ce);
     }     this.connectionURL = connectionURL;     this.userName = userName;     this.password = password;
   }   public Connection createNew()
   {      try
      {         return DriverManager.getConnection(
            connectionURL,
            userName,
            password);
      }      catch(SQLException se)
      {         throw new IllegalArgumentException('Unable to create new connection', se);
      }
   }
}

如今咱們用上述的Validator和ObjectFactory來建立一個JDBC的鏈接池:

package com.test;import java.sql.Connection;import com.test.pool.Pool;import com.test.pool.PoolFactory;public class Main{    public static void main(String[] args)
    {
        Pool < Connection > pool =            new BoundedBlockingPool < Connection > (            10,            new JDBCConnectionValidator(),            new JDBCConnectionFactory('', '', '', '')
        );        //do whatever you like
    }
}

爲了犒勞下能讀完整篇文章的讀者,我這再提供另外一個非阻塞的對象池的實現,這個實現和前面的惟一不一樣就是即便對象不可用,它也不會讓客戶端阻塞,而是直接返回null。具體的實如今這:

package com.test.pool;import java.util.LinkedList;import java.util.Queue;import java.util.concurrent.Semaphore;public class BoundedPool < T > extends AbstractPool < T >{    private int size;    private Queue < T > objects;    private Validator < T > validator;    private ObjectFactory < T > objectFactory;    private Semaphore permits;    private volatile boolean shutdownCalled;    public BoundedPool(        int size,
        Validator < T > validator,
        ObjectFactory < T > objectFactory)
        {        super();        this.objectFactory = objectFactory;        this.size = size;        this.validator = validator;
        objects = new LinkedList < T >();
        initializeObjects();
        shutdownCalled = false;
    }    @Override
    public T get()
    {
        T t = null;        if(!shutdownCalled)
        {            if(permits.tryAcquire())
            {
                t = objects.poll();
            }

         }         else
         {             throw new IllegalStateException('Object pool already shutdown');
         }         return t;
     }     @Override
     public void shutdown()
     {
         shutdownCalled = true;
         clearResources();
     }     private void clearResources()
     {         for(T t : objects)
         {
             validator.invalidate(t);
         }
     }     @Override
     protected void returnToPool(T t)
     {         boolean added = objects.add(t);         if(added)
         {
             permits.release();
         }
     }     @Override
     protected void handleInvalidReturn(T t)
     {
     }     @Override
     protected boolean isValid(T t)
     {         return validator.isValid(t);
     }     private void initializeObjects()
     {         for(int i = 0; i < size; i++)
         {
             objects.add(objectFactory.createNew());
         }
     }
}

考慮到咱們如今已經有兩種實現,很是威武了,得讓用戶經過工廠用具體的名稱來建立不一樣的對象池了。工廠來了:

package com.test.pool;

import com.test.pool.Pool.Validator;/**

* Factory and utility methods for

* {@link Pool} and {@link BlockingPool} classes

* defined in this package.
* This class supports the following kinds of methods:
*
*
<ul>
*
<li> Method that creates and returns a default non-blocking
*        implementation of the {@link Pool} interface.
*   </li>
*
*
<li> Method that creates and returns a
*        default implementation of
*        the {@link BlockingPool} interface.
*   </li>
*
</ul>
*
* @author Swaranga
*/public final class PoolFactory{    private PoolFactory()
    {
    }/**
* Creates a and returns a new object pool,
* that is an implementation of the {@link BlockingPool},
* whose size is limited by
* the <tt> size </tt> parameter.
*
* @param size the number of objects in the pool.
* @param factory the factory to create new objects.
* @param validator the validator to
* validate the re-usability of returned objects.
*
* @return a blocking object pool
* bounded by <tt> size </tt>
*/public static < T > Pool < T >
newBoundedBlockingPool(
int size,
ObjectFactory < T > factory,
Validator < T > validator)
{    return new BoundedBlockingPool < T > (
    size,
    validator,
    factory);
}/*
* Creates a and returns a new object pool,
* that is an implementation of the {@link Pool}
* whose size is limited
* by the <tt> size </tt> parameter.
*
* @param size the number of objects in the pool.
* @param factory the factory to create new objects.
* @param validator the validator to validate
* the re-usability of returned objects.
*
* @return an object pool bounded by <tt> size </tt>
*/public static < T > Pool < T > newBoundedNonBlockingPool(
    int size,
    ObjectFactory < T > factory,
    Validator < T > validator)
{    return new BoundedPool < T >(size, validator, factory);
}
}

如今咱們的客戶端就能用一種可讀性更強的方式來建立對象池了:

package com.test;import java.sql.Connection;import com.test.pool.Pool;import com.test.pool.PoolFactory;public class Main{    public static void main(String[] args)
    {
        Pool < Connection > pool =
        PoolFactory.newBoundedBlockingPool(        10,        new JDBCConnectionFactory('', '', '', ''),        new JDBCConnectionValidator());        //do whatever you like
     }
}

好吧,終於寫完了,拖了這麼久了。盡情使用和完善它吧,或者再多加幾種實現。

快樂編碼,快樂分享!

相關文章
相關標籤/搜索