Java庫本身就有多種線(xiàn)程安全的容器和同步工具,其中同步容器包括兩部分:一個(gè)是Vector和Hashtable。另外還有JDK1.2中加入的同步包裝類(lèi),這些類(lèi)都是由Collections.synchronizedXXX工廠(chǎng)方法。同步容器都是線(xiàn)程安全的,但是對于復合操作,缺有些缺點(diǎn):
① 迭代:在查覺(jué)到容器在迭代開(kāi)始以后被修改,會(huì )拋出一個(gè)未檢查異常ConcurrentModificationException,為了避免這個(gè)異常,需要在迭代期間,持有一個(gè)容器鎖。但是鎖的缺點(diǎn)也很明顯,就是對性能的影響。
② 隱藏迭代器:StringBuilder的toString方法會(huì )通過(guò)迭代容器中的每個(gè)元素,另外容器的hashCode和equals方法也會(huì )間接地調用迭代。類(lèi)似地,contailAll、removeAll、retainAll方法,以及容器作為參數的構造函數,都會(huì )對容器進(jìn)行迭代。
③ 缺少即加入等一些復合操作
public static Object getLast(Vector list) {
int lastIndex = list.size() - 1;
return list.get(lastIndex);
}
public static void deleteLast(Vector list) {
int lastIndex = list.size() - 1;
list.remove(lastIndex);
}
getLast和deleteLast都是復合操作,由先前對原子性的分析可以判斷,這依然存在線(xiàn)程安全問(wèn)題,有可能會(huì )拋出ArrayIndexOutOfBoundsException的異常,錯誤產(chǎn)生的邏輯如下所示:

正是由于同步容器類(lèi)有以上問(wèn)題,導致這些類(lèi)成了雞肋,于是Java 5推出了并發(fā)容器類(lèi),Map對應的有ConcurrentHashMap,List對應的有CopyOnWriteArrayList。與同步容器類(lèi)相比,它有以下特性:
· 更加細化的鎖機制。同步容器直接把容器對象做為鎖,這樣就把所有操作串行化,其實(shí)這是沒(méi)必要的,過(guò)于悲觀(guān),而并發(fā)容器采用更細粒度的鎖機制,名叫分離鎖,保證一些不會(huì )發(fā)生并發(fā)問(wèn)題的操作進(jìn)行并行執行
· 附加了一些原子性的復合操作。比如putIfAbsent方法
· 迭代器的弱一致性,而非“及時(shí)失敗”。它在迭代過(guò)程中不再拋出Concurrentmodificationexception異常,而是弱一致性。
· 在并發(fā)高的情況下,有可能size和isEmpty方法不準確,但真正在并發(fā)環(huán)境下這些方法也沒(méi)什么作用。
· 另外,它還有一些附加的原子操作,缺少即加入、相等便移除、相等便替換。
putIfAbsent(K key, V value),缺少即加入(如果該鍵已經(jīng)存在,則不加入)
如果指定鍵已經(jīng)不再與某個(gè)值相關(guān)聯(lián),則將它與給定值關(guān)聯(lián)。
類(lèi)似于下面的操作
If(!map.containsKey(key)){
return map.put(key,value);
}else{
return map.get(key);
}
remove(Object key, Object value),相等便移除
只有目前將鍵的條目映射到給定值時(shí),才移除該鍵的條目。
類(lèi)似于下面的:
if(map.containsKey(key) && map.get(key).equals(value)){
Map.remove();
return true;
}else{
return false;
}
replace(K key, V oldValue, V newValue),相等便替換。
只有目前將鍵的條目映射到某一值時(shí),才替換該鍵的條目。
上面提到的三個(gè),都是原子的。在一些緩存應用中可以考慮代替HashMap/Hashtable。
· CopyOnWriteArrayList采用寫(xiě)入時(shí)復制的方式避開(kāi)并發(fā)問(wèn)題。這其實(shí)是通過(guò)冗余和不可變性來(lái)解決并發(fā)問(wèn)題,在性能上會(huì )有比較大的代價(jià),但如果寫(xiě)入的操作遠遠小于迭代和讀操作,那么性能就差別不大了。
應用它們的場(chǎng)合通常在數組相對較小,并且遍歷操作的數量大大超過(guò)可變操作的數量時(shí),這種場(chǎng)合應用它們非常好。它們所有可變的操作都是先取得后臺數組的副本,對副本進(jìn)行更改,然后替換副本,這樣可以保證永遠不會(huì )拋出ConcurrentModificationException移除。
Java中的隊列接口就是Queue,它有會(huì )拋出異常的add、remove方法,在隊尾插入元素以及對頭移除元素,還有不會(huì )拋出異常的,對應的offer、poll方法。
List實(shí)現了deque接口以及List接口,可以將它看做是這兩種的任何一種。
Queue queue=new LinkedList();
queue.offer("testone");
queue.offer("testtwo");
queue.offer("testthree");
queue.offer("testfour");
System.out.println(queue.poll()); //testone
一個(gè)基于優(yōu)先級堆(簡(jiǎn)單的使用鏈表的話(huà),可能插入的效率會(huì )比較低O(N))的無(wú)界優(yōu)先級隊列。優(yōu)先級隊列的元素按照其自然順序進(jìn)行排序,或者根據構造隊列時(shí)提供的 Comparator 進(jìn)行排序,具體取決于所使用的構造方法。優(yōu)先級隊列不允許使用 null 元素。依靠自然順序的優(yōu)先級隊列還不允許插入不可比較的對象。
queue=new PriorityQueue();
queue.offer("testone");
queue.offer("testtwo");
queue.offer("testthree");
queue.offer("testfour");
System.out.println(queue.poll()); //testfour
基于鏈節點(diǎn)的,線(xiàn)程安全的隊列。并發(fā)訪(fǎng)問(wèn)不需要同步。在隊列的尾部添加元素,并在頭部刪除他們。所以只要不需要知道隊列的大小,并發(fā)隊列就是比較好的選擇。
生產(chǎn)者和消費者模式,生產(chǎn)者不需要知道消費者的身份或者數量,甚至根本沒(méi)有消費者,他們只負責把數據放入隊列。類(lèi)似地,消費者也不需要知道生產(chǎn)者是誰(shuí),以及是誰(shuí)給他們安排的工作。
而Java知道大家清楚這個(gè)模式的并發(fā)復雜性,于是乎提供了阻塞隊列(BlockingQueue)來(lái)滿(mǎn)足這個(gè)模式的需求。阻塞隊列說(shuō)起來(lái)很簡(jiǎn)單,就是當隊滿(mǎn)的時(shí)候寫(xiě)線(xiàn)程會(huì )等待,直到隊列不滿(mǎn)的時(shí)候;當隊空的時(shí)候讀線(xiàn)程會(huì )等待,直到隊不空的時(shí)候。實(shí)現這種模式的方法很多,其區別也就在于誰(shuí)的消耗更低和等待的策略更優(yōu)。以LinkedBlockingQueue的具體實(shí)現為例,它的put源碼如下:
public void put(E e) throws InterruptedException {
if (e == null) throw new NullPointerException();
int c = -1;
final ReentrantLock putLock = this.putLock;
final AtomicInteger count = this.count;
putLock.lockInterruptibly();
try {
try {
while (count.get() == capacity)
notFull.await();
} catch (InterruptedException ie) {
notFull.signal();
// propagate to a non-interrupted thread
throw ie;
}
insert(e);
c = count.getAndIncrement();
if (c + 1 < capacity)
notFull.signal();
} finally {
putLock.unlock();
}
if (c == 0)
signalNotEmpty();
}
撇開(kāi)其鎖的具體實(shí)現,其流程就是我們在操作系統課上學(xué)習到的標準生產(chǎn)者模式,看來(lái)那些枯燥的理論還是有用武之地的。其中,最核心的還是Java的鎖實(shí)現,有興趣的朋友可以再進(jìn)一步深究一下。
阻塞隊列Blocking queue,提供了可阻塞的put和take方法,他們與可定時(shí)的offer和poll方法是等價(jià)。Put方法簡(jiǎn)化了處理,如果是有界隊列,那么當隊列滿(mǎn)的時(shí)候,生成者就會(huì )阻塞,從而改消費者更多的追趕速度。

FIFO的隊列,與LinkedList(由鏈節點(diǎn)支持,無(wú)界)和ArrayList(由數組支持,有界)相似(Linked有更好的插入和移除性能,Array有更好的查找性能,考慮到阻塞隊列的特性,移除頭部,加入尾部,兩個(gè)都區別不大),但是卻擁有比同步List更好的并發(fā)性能。
另外,LinkedList永遠不會(huì )等待,因為他是無(wú)界的。
BlockingQueue<String> queue=new ArrayBlockingQueue<String>(5);
Producer p=new Producer(queue);
Consumer c1=new Consumer(queue);
Consumer c2=new Consumer(queue);
new Thread(p).start();
new Thread(c1).start();
new Thread(c2).start();
/**
* 生產(chǎn)者
* @author Administrator
*
*/
class Producer implements Runnable {
private final BlockingQueue queue;
Producer(BlockingQueue q) { queue = q; }
public void run() {
try {
for(int i=0;i<100;i++){
queue.put(produce());
}
} catch (InterruptedException ex) {}
}
String produce() {
String temp=""+(char)('A'+(int)(Math.random()*26));
System.out.println("produce"+temp);
return temp;
}
}
/**
* 消費者
* @author Administrator
*
*/
class Consumer implements Runnable {
private final BlockingQueue queue;
Consumer(BlockingQueue q) { queue = q; }
public void run() {
try {
for(int i=0;i<100;i++){
consume(queue.take());
}
} catch (InterruptedException ex) {}
}
void consume(Object x) {
System.out.println("cousume"+x.toString());
}
}
輸出:
produceK
cousumeK
produceV
cousumeV
produceQ
cousumeQ
produceI
produceD
produceI
produceG
produceA
produceE
cousumeD
一個(gè)按優(yōu)先級堆支持的無(wú)界優(yōu)先級隊列隊列,如果不希望按照FIFO的順序進(jìn)行處理,它非常有用。它可以比較元素本身的自然順序,也可以使用一個(gè)Comparator排序。
一個(gè)優(yōu)先級堆支持的,基于時(shí)間的調度隊列。加入到隊列中的元素必須實(shí)現新的Delayed接口(只有一個(gè)方法,Long getDelay(java.util.concurrent.TimeUnit unit)),添加可以理立即返回,但是在延遲時(shí)間過(guò)去之前,不能從隊列中取出元素,如果多個(gè)元素的延遲時(shí)間已到,那么最早失效鏈接/失效時(shí)間最長(cháng)的元素將第一個(gè)取出。
static class NanoDelay implements Delayed{
long tigger;
NanoDelay(long i){
tigger=System.nanoTime()+i;
}
public boolean equals(Object other){
return ((NanoDelay)other).tigger==tigger;
}
/**
* 返回此對象相關(guān)的剩余延遲時(shí)間,零或負值指示延遲時(shí)間已經(jīng)用盡
*/
public long getDelay(TimeUnit unit) {
long n=tigger-System.nanoTime();
return unit.convert(n, TimeUnit.NANOSECONDS);
}
public long getTriggerTime(){
return tigger;
}
/**
* 相互比較,看誰(shuí)的實(shí)效時(shí)間最長(cháng),誰(shuí)先出去
*/
public int compareTo(Delayed o) {
long i=tigger;
long j=((NanoDelay)o).tigger;
if(i<j){
return -1;
}
if(i>j)
return 1;
return 0;
}
}
public static void main(String[] args) throws InterruptedException{
Random random=new Random();
DelayQueue<NanoDelay> queue=new DelayQueue<NanoDelay>();
for(int i=0;i<5;i++){
queue.add(new NanoDelay(random.nextInt(1000)));
}
long last=0;
for(int i=0;i<5;i++){
NanoDelay delay=(NanoDelay)(queue.take());
long tt=delay.getTriggerTime();
System.out.println("Trigger time:"+tt);
if(i!=0){
System.out.println("Data: "+(tt-last));
}
last=tt;
}
}
不是一個(gè)真正的隊列,因為它不會(huì )為隊列元素維護任何存儲空間,不過(guò)它維護一個(gè)排隊的線(xiàn)程清單,這些線(xiàn)程等待把元素加入(enqueue)隊列或者移出(dequeue)隊列。也就是說(shuō),它非常直接的移交工作,減少了生產(chǎn)者和消費者之間移動(dòng)數據的延遲時(shí)間,另外,也可以更快的知道反饋信息,當移交被接受時(shí),它就知道消費者已經(jīng)得到了任務(wù)。
因為SynChronousQueue沒(méi)有存儲的能力,所以除非另一個(gè)線(xiàn)程已經(jīng)做好準備,否則put和take會(huì )一直阻止。它只有在消費者比較充足的時(shí)候比較合適。
JAVA6中新增了兩個(gè)容器Deque和BlockingDeque,他們分別擴展了Queue和BlockingQueue。Deque它是一個(gè)雙端隊列,允許高效的在頭和尾分別進(jìn)行插入和刪除,它的實(shí)現分別是ArrayDeque和LinkedBlockingQueue。
雙端隊列使得他們能夠工作在一種稱(chēng)為“竊取工作”的模式上面。
1..同步的(synchronized)+HashMap,如果不存在,則計算,然后加入,該方法需要同步。
HashMap cache=new HashMap();
public synchronized V compute(A arg){
V result=cace.get(arg);
Ii(result==null){
result=c.compute(arg);
Cache.put(result);
}
Return result;
}
2.用ConcurrentHashMap代替HashMap+同步.,這樣的在get和set的時(shí)候也基本能保證原子性。但是會(huì )帶來(lái)重復計算的問(wèn)題.
Map<A,V> cache=new ConcurrentHashMap<A,V>();
public V compute(A arg){
V result=cace.get(arg);
Ii(result==null){
result=c.compute(arg);
Cache.put(result);
}
Return result;
}
3.采用FutureTask代替直接存儲值,這樣可以在一開(kāi)始創(chuàng )建的時(shí)候就將Task加入
Map<A,FutureTask<V>> cache=new ConcurrentHashMap<A,FutureTask<V>>();
public V compute(A arg){
FutureTask <T> f=cace.get(arg);
//檢查再運行的缺陷
Ii(f==null){
Callable<V> evel=new Callable(){
Public V call() throws ..{
return c.compute(arg);
}
};
FutureTask <T> ft=new FutureTask<T>(evel);
f=ft;
cache.put(arg,ft;
ft.run();
}
Try{
//阻塞,直到完成
return f.get();
}cach(){
}
}
4.上面還有檢查再運行的缺陷,在高并發(fā)的情況下啊,雙方都沒(méi)發(fā)現FutureTask,并且都放入Map(后一個(gè)被前一個(gè)替代),都開(kāi)始了計算。
這里的解決方案在于,當他們都要放入Map的時(shí)候,如果可以有原子方法,那么已經(jīng)有了以后,后一個(gè)FutureTask就加入,并且啟動(dòng)。
public V compute(A arg){
FutureTask <T> f=cace.get(arg);
//檢查再運行的缺陷
Ii(f==null){
Callable<V> evel=new Callable(){
Public V call() throws ..{
return c.compute(arg);
}
};
FutureTask <T> ft=new FutureTask<T>(evel);
f=cache.putIfAbsent(args,ft); //如果已經(jīng)存在,返回存在的值,否則返回null
if(f==null){f=ft;ft.run();} //以前不存在,說(shuō)明應該開(kāi)始這個(gè)計算
else{ft=null;} //取消該計算
}
Try{
//阻塞,直到完成
return f.get();
}cach(){
}
}
5.上面的程序上來(lái)看已經(jīng)完美了,不過(guò)可能帶來(lái)緩存污染的可能性。如果一個(gè)計算被取消或者失敗,那么這個(gè)鍵以后的值永遠都是失敗了;一種解決方案是,發(fā)現取消或者失敗的task,就移除它,如果有Exception,也移除。
6.另外,如果考慮緩存過(guò)期的問(wèn)題,可以為每個(gè)結果關(guān)聯(lián)一個(gè)過(guò)去時(shí)間,并周期性的掃描,清除過(guò)期的緩存。(過(guò)期時(shí)間可以用Delayed接口實(shí)現,參考DelayQueue,給他一個(gè)大于當前時(shí)間XXX的時(shí)間,,并且不斷減去當前時(shí)間,直到返回負數,說(shuō)明延遲時(shí)間已到了。)
7.
聯(lián)系客服