咱們知道線程安全的問題就是出在多個線程同時修改共享變量,不可變對象的策略徹底規避了對對象的修改,因此在多線程中使用必定是線程安全的。java
不可變對象須要知足的條件:數組
下面來複習一下final關鍵字的做用安全
修飾類:多線程
修飾方法:併發
修飾對象:app
除了final修飾的方法來使對象不可變,還能夠用Collections類
中的unmodifiable
爲前綴的方法,包括Collection、List、Set、Map等,只需把對應集合的對象傳入這個方法這個集合就不容許修改了。ide
一樣地,在Guava
中也有相似的方法immutableXXX
能夠達到相同的效果。高併發
下面來驗證一下測試
@Slf4j
public class ImmutableExample1 {
private static Map<Integer, Integer> map = Maps.newHashMap();
static {
map.put(1, 2);
map.put(3, 4);
map.put(5, 6);
map = Collections.unmodifiableMap(map);
}
public static void main(String[] args) {
map.put(1, 3);
log.info("{}", map.get(1));
}
}
複製代碼
運行結果優化
能夠看到程序報了一個不支持操做的異常,說明當map通過Collections.unmodifiableMap
方法後就不支持更新操做了。
下面咱們進入Collections.unmodifiableMap
看一下它的實現
/** * Returns an unmodifiable view of the specified map. This method * allows modules to provide users with "read-only" access to internal * maps. Query operations on the returned map "read through" * to the specified map, and attempts to modify the returned * map, whether direct or via its collection views, result in an * <tt>UnsupportedOperationException</tt>.<p> * * The returned map will be serializable if the specified map * is serializable. * * @param <K> the class of the map keys * @param <V> the class of the map values * @param m the map for which an unmodifiable view is to be returned. * @return an unmodifiable view of the specified map. */
public static <K,V> Map<K,V> unmodifiableMap(Map<? extends K, ? extends V> m) {
return new UnmodifiableMap<>(m);
}
複製代碼
能夠看到這個方法返回了一個新的不能被修改的map,咱們來看一下這個map的實現。
/** * @serial include */
private static class UnmodifiableMap<K,V> implements Map<K,V>, Serializable {
private static final long serialVersionUID = -1034234728574286014L;
private final Map<? extends K, ? extends V> m;
UnmodifiableMap(Map<? extends K, ? extends V> m) {
if (m==null)
throw new NullPointerException();
this.m = m;
}
public int size() {return m.size();}
public boolean isEmpty() {return m.isEmpty();}
public boolean containsKey(Object key) {return m.containsKey(key);}
public boolean containsValue(Object val) {return m.containsValue(val);}
public V get(Object key) {return m.get(key);}
public V put(K key, V value) {
throw new UnsupportedOperationException();
}
public V remove(Object key) {
throw new UnsupportedOperationException();
}
public void putAll(Map<? extends K, ? extends V> m) {
throw new UnsupportedOperationException();
}
public void clear() {
throw new UnsupportedOperationException();
}
private transient Set<K> keySet;
private transient Set<Map.Entry<K,V>> entrySet;
private transient Collection<V> values;
public Set<K> keySet() {
if (keySet==null)
keySet = unmodifiableSet(m.keySet());
return keySet;
}
public Set<Map.Entry<K,V>> entrySet() {
if (entrySet==null)
entrySet = new UnmodifiableEntrySet<>(m.entrySet());
return entrySet;
}
public Collection<V> values() {
if (values==null)
values = unmodifiableCollection(m.values());
return values;
}
public boolean equals(Object o) {return o == this || m.equals(o);}
public int hashCode() {return m.hashCode();}
public String toString() {return m.toString();}
// Override default methods in Map
@Override
@SuppressWarnings("unchecked")
public V getOrDefault(Object k, V defaultValue) {
// Safe cast as we don't change the value
return ((Map<K, V>)m).getOrDefault(k, defaultValue);
}
@Override
public void forEach(BiConsumer<? super K, ? super V> action) {
m.forEach(action);
}
@Override
public void replaceAll(BiFunction<? super K, ? super V, ? extends V> function) {
throw new UnsupportedOperationException();
}
@Override
public V putIfAbsent(K key, V value) {
throw new UnsupportedOperationException();
}
@Override
public boolean remove(Object key, Object value) {
throw new UnsupportedOperationException();
}
@Override
public boolean replace(K key, V oldValue, V newValue) {
throw new UnsupportedOperationException();
}
@Override
public V replace(K key, V value) {
throw new UnsupportedOperationException();
}
@Override
public V computeIfAbsent(K key, Function<? super K, ? extends V> mappingFunction) {
throw new UnsupportedOperationException();
}
@Override
public V computeIfPresent(K key, BiFunction<? super K, ? super V, ? extends V> remappingFunction) {
throw new UnsupportedOperationException();
}
@Override
public V compute(K key, BiFunction<? super K, ? super V, ? extends V> remappingFunction) {
throw new UnsupportedOperationException();
}
@Override
public V merge(K key, V value, BiFunction<? super V, ? super V, ? extends V> remappingFunction) {
throw new UnsupportedOperationException();
}
複製代碼
從上面的實現中能夠看到UnmodifiableMap
對於不少操做都是直接拋出不支持操做的異常。
Guava
中的immutable
方法也是相似原理。
線程封閉就是把對象封裝到一個線程裏,只有一個線程能夠看到這個對象,這樣就算這個對象不是線程安全也不會有線程安全問題。
實現線程封閉主要有三種方式
下面主要來看ThreadLocal線程封閉方法。
ThreadLocal是爲每個線程都提供了一個線程內的局部變量,每一個線程只能訪問到屬於它的副本。
咱們來看一下ThreadLocal的源碼中的get和set方法
/** * Returns the value in the current thread's copy of this * thread-local variable. If the variable has no value for the * current thread, it is first initialized to the value returned * by an invocation of the {@link #initialValue} method. * * @return the current thread's value of this thread-local */
public T get() {
Thread t = Thread.currentThread();
ThreadLocalMap map = getMap(t);
if (map != null) {
ThreadLocalMap.Entry e = map.getEntry(this);
if (e != null) {
@SuppressWarnings("unchecked")
T result = (T)e.value;
return result;
}
}
return setInitialValue();
}
/** * Sets the current thread's copy of this thread-local variable * to the specified value. Most subclasses will have no need to * override this method, relying solely on the {@link #initialValue} * method to set the values of thread-locals. * * @param value the value to be stored in the current thread's copy of * this thread-local. */
public void set(T value) {
Thread t = Thread.currentThread();
ThreadLocalMap map = getMap(t);
if (map != null)
map.set(this, value);
else
createMap(t, value);
}
複製代碼
從源碼中能夠看出:每個線程擁有一個ThreadLocalMap
,這個map
存儲了該線程擁有的全部局部變量。
set
時先經過Thread.currentThread()
獲取當前線程,進而獲取到當前線程的ThreadLocalMap
,而後以ThreadLocal
本身爲key
,要存儲的對象爲值,存到當前線程的ThreadLocalMap
中。
get
時也是先得到當前線程的ThreadLocalMap
,以ThreadLocal
本身爲key
,取出和該線程的局部變量。
一個線程內能夠設置多個ThreadLocal
,這樣該線程就擁有了多個局部變量。好比當前線程爲t1
,在t1
內建立了兩個ThreadLocal
分別是tl1
和tl2
,那麼t1
的ThreadLocalMap
就有兩個鍵值對。
t1.threadLocals.set(tl1, obj1) // 等價於在t1線程中調用tl1.set(obj1)
t1.threadLocals.set(tl2, obj2) // 等價於在t1線程中調用tl2.set(obj1)
t1.threadLocals.getEntry(tl1) // 等價於在t1線程中調用tl1.get()得到obj1
t1.threadLocals.getEntry(tl2) // 等價於在t1線程中調用tl2.get()得到obj2
複製代碼
因爲不少常見的容器都是線程不安全的,這就要求開發人員在任何訪問到這些容器的地方進行同步處理,致使使用很是不便,所以Java提供了同步容器。
常見的同步容器有如下幾種:
ArrayList -> Vector, Stack
HashMap -> HashTable(key,value不能爲null)
Collections.synchronizedXXX(List, Set, Map)
注意:同步容器不是絕對的線程安全。
@Slf4j
public class VectorExample1 {
/** * 請求總數 */
public static int clientTotal = 5000;
/** * 同時併發執行線程數 */
public static int threadTotal = 200;
private static Vector<Integer> list = new Vector<>();
public static void main(String[] args) throws InterruptedException {
ExecutorService executorService = Executors.newCachedThreadPool();
final Semaphore semaphore = new Semaphore(threadTotal);
final CountDownLatch countDownLatch = new CountDownLatch(clientTotal);
for (int i = 0; i < clientTotal; i++){
final int count = i;
executorService.execute(() -> {
try {
semaphore.acquire();
update(count);
semaphore.release();
} catch (Exception e){
log.error("exception", e);
}
countDownLatch.countDown();
});
}
countDownLatch.await();
executorService.shutdown();
log.info("size:{}", list.size());
}
private static void update(int i){
list.add(i);
}
}
複製代碼
運行結果
在這裏Vector
是線程安全的。
下面來看一個線程不安全的例子
public class VectorExample2 {
private static Vector<Integer> vector = new Vector<>();
public static void main(String[] args) {
for (int i = 0; i < 10; i++){
vector.add(i);
}
Thread thread1 = new Thread() {
public void run() {
for (int i = 0; i < 10; i++){
vector.remove(i);
}
}
};
Thread thread2 = new Thread() {
public void run() {
for (int i = 0; i < 10; i++){
vector.get(i);
}
}
};
thread1.start();
thread2.start();
}
}
複製代碼
運行結果
能夠看到拋出了數組越界的異常。這是由於thread2
中可能會get
到已經被thread1
移除的下標。
@Slf4j
public class HashTableExample {
/** * 請求總數 */
public static int clientTotal = 5000;
/** * 同時併發執行線程數 */
public static int threadTotal = 200;
private static Map<Integer, Integer> map = new Hashtable<>();
public static void main(String[] args) throws InterruptedException {
ExecutorService executorService = Executors.newCachedThreadPool();
final Semaphore semaphore = new Semaphore(threadTotal);
final CountDownLatch countDownLatch = new CountDownLatch(clientTotal);
for (int i = 0; i < clientTotal; i++){
final int count = i;
executorService.execute(() -> {
try {
semaphore.acquire();
update(count);
semaphore.release();
} catch (Exception e){
log.error("exception", e);
}
countDownLatch.countDown();
});
}
countDownLatch.await();
executorService.shutdown();
log.info("size:{}", map.size());
}
private static void update(int i){
map.put(i, i);
}
}
複製代碼
運行結果
將以前例子中的容器類改爲
private static List<Integer> list = Collections.synchronizedList(Lists.newArrayList());
複製代碼
運行結果始終是5000
,線程安全。
將容器換成Set
,Map
也是同樣。
public class VectorExample3 {
private static void test1(Vector<Integer> v1) {
for (Integer i : v1) {
if (i.equals(3)){
v1.remove(i);
}
}
}
private static void test2(Vector<Integer> v1) {
Iterator<Integer> iterator = v1.iterator();
while (iterator.hasNext()) {
Integer i = iterator.next();
if (i.equals(3)){
v1.remove(i);
}
}
}
private static void test3(Vector<Integer> v1) {
for (int i = 0; i < v1.size(); i++){
if (v1.get(i).equals(3)) {
v1.remove(i);
}
}
}
public static void main(String[] args) {
Vector<Integer> vector = new Vector<>();
vector.add(1);
vector.add(2);
vector.add(3);
test1(vector);
}
}
複製代碼
這裏定義了3種對Vector
遍歷後刪除指定值的方法,依次對每一個方法進行測試。
測試結果:
test1
和test2
都拋出java.util.ConcurrentModificationException
異常
test3
運行正常
下面來看一下異常產生的緣由
從第一個報錯處點進去能夠看到
final void checkForComodification() {
if (modCount != expectedModCount)
throw new ConcurrentModificationException();
}
複製代碼
咱們在對一個集合進行遍歷操做的同時對它進行了增刪的操做,致使了modCount != expectedModCount
從而拋出異常。
所以當咱們用for-each
或迭代器
遍歷集合時不要對集合進行更新操做。若是須要對集合進行增刪操做,推薦的作法是在遍歷過程當中標記好要增刪的位置,遍歷結束後再進行相關的操做。
核心思想:
相比於ArrayList
,CopyOnWriteArrayList
是線程安全的。
當有新元素添加到CopyOnWriteArrayList
時,它先從原有的數組中拷貝一份出來,而後在新數組上作寫操做,寫完後再將原有的數組指向到新的數組。CopyOnWriteArrayLis
t的整個add
操做都是在鎖的保護下進行的。
缺點:
CopyOnWriteArrayList
只能保證最終的一致性,不能知足實時性的要求。CopyOnWriteArrayList
的讀操做都是在原數組上讀的,不須要加鎖。
下面來coding測試一下
public class CopyOnWriteArrayListExample {
/** * 請求總數 */
public static int clientTotal = 5000;
/** * 同時併發執行線程數 */
public static int threadTotal = 200;
private static List<Integer> list = new CopyOnWriteArrayList<>();
public static void main(String[] args) throws InterruptedException {
ExecutorService executorService = Executors.newCachedThreadPool();
final Semaphore semaphore = new Semaphore(threadTotal);
final CountDownLatch countDownLatch = new CountDownLatch(clientTotal);
for (int i = 0; i < clientTotal; i++){
final int count = i;
executorService.execute(() -> {
try {
semaphore.acquire();
update(count);
semaphore.release();
} catch (Exception e){
log.error("exception", e);
}
countDownLatch.countDown();
});
}
countDownLatch.await();
executorService.shutdown();
log.info("size:{}", list.size());
}
private static void update(int i){
list.add(i);
}
}
複製代碼
運行結果始終是5000,線程安全。
下面咱們進入CopyOnWriteArrayList
的add
方法看一下
/** * Appends the specified element to the end of this list. * * @param e element to be appended to this list * @return {@code true} (as specified by {@link Collection#add}) */
public boolean add(E e) {
final ReentrantLock lock = this.lock;
lock.lock();
try {
Object[] elements = getArray();
int len = elements.length;
Object[] newElements = Arrays.copyOf(elements, len + 1);
newElements[len] = e;
setArray(newElements);
return true;
} finally {
lock.unlock();
}
}
複製代碼
能夠看到整個方法是加了鎖的,添加新元素時是把整個數組複製到一個新的數組中。
HashSet
對應的線程安全類。
底層實現是基於CopyOnWriteArrayList
,所以它符合CopyOnWriteArrayList
的特色和適用場景。
迭代器不支持可變的remove
操做。
TreeSet
對應的線程安全類。
基於Map
集合,在多線程環境下add
、remove
等操做都是線程安全的,可是批量操做如addAll
、removeAll
等並不能保證以原子方式執行。緣由是它們的底層調用的仍是add
、remove
等方法,須要手動作同步操做。
不能存儲null
值。
HashMap
的線程安全類。
不能存儲null
值。
對讀操做作了大量優化,後面會詳細介紹。
TreeMap
的線程安全類。
內部使用SkipList
來實現。
key
有序,相比於ConcurrentHashMap
支持更高併發,存取數與線程沒有關係,也就是說在相同條件下併發線程越多ConcurrentSkipListMap
優點越大。
Written by Autu
2019.7.19