是一種滿(mǎn)足一種規則的類(lèi)的統稱(chēng)。它有以下特性:
· 它是一個(gè)對象。
· 封裝狀態(tài),而這些狀態(tài)決定著(zhù)線(xiàn)程執行到某一點(diǎn)是通過(guò)還是被迫等待
· 提供操作狀態(tài)的方法。
其實(shí)BlockingQueue就是一種Synchronizer。Java還提供了其他幾種Synchronizer。
CountDownLatch是一種閉鎖,它通過(guò)內部一個(gè)計數器count來(lái)標示狀態(tài)。一個(gè)線(xiàn)程調用await()方法之后,當count>0時(shí),所有調用其await方法的線(xiàn)程都需等待,當通過(guò)其countDown方法將count降為0時(shí)所有等待的線(xiàn)程將會(huì )被喚起。使用實(shí)例如下所示:
public class TestHarness {
public long timeTasks(int nThreads, final Runnable task)
throws InterruptedException {
//開(kāi)始的"門(mén)",計數是1
final CountDownLatch startGate = new CountDownLatch(1);
//結束的"門(mén)",計數是工作的線(xiàn)程數
final CountDownLatch endGate = new CountDownLatch(nThreads);
for (int i = 0; i < nThreads; i++) {
Thread t = new Thread() {
public void run() {
try {
//使得每個(gè)線(xiàn)程都處于等待狀態(tài),確保他們已經(jīng)處于等待狀態(tài)
startGate.await();
try {
task.run();
}finally {
//每個(gè)線(xiàn)程都為結束線(xiàn)程減1
endGate.countDown();
}
} catch (InterruptedException ignored) { }
}
};
t.start();
}
long start = System.nanoTime();
//開(kāi)始運行,每個(gè)線(xiàn)程都已經(jīng)準備好了
startGate.countDown();
//直到每個(gè)線(xiàn)程都與你選哪個(gè)完畢,獲得執行時(shí)間
endGate.await();
long end = System.nanoTime();
return end-start;
}
}
閉鎖主要是主線(xiàn)程等待子線(xiàn)程,等待的是某個(gè)事件,子線(xiàn)程在countDown之后就可以結束,將數量減1,所有子線(xiàn)程結束之后,到0,那么主線(xiàn)程的等待也結束了,繼續往下(和關(guān)卡的對比)。
當然也可以實(shí)現類(lèi)似關(guān)卡的功能,初始化一個(gè)1的內部計數器,調用await方法讓它停下來(lái),
另一個(gè)可以作為閉鎖的類(lèi)是FutureTask,它描述了一個(gè)抽象的可攜帶結果的計算。它是通過(guò)Callable實(shí)現的,它等價(jià)于一個(gè)可攜帶結果的Runnable,并且有3個(gè)狀態(tài):等待、運行和完成(包括正常結束、取消和異常)。一旦FutureTask進(jìn)入完成狀態(tài),它就永遠停留在這個(gè)狀態(tài)上。
Semaphore類(lèi)實(shí)際上就是操作系統中談到的信號量的一種實(shí)現,其原理就不再累述,可見(jiàn)探索并發(fā)編程------操作系統篇
它就像一個(gè)容量池,用于限制可以訪(fǎng)問(wèn)某些資源的線(xiàn)程數目。一般操作系統的進(jìn)程調度中使用了PV原語(yǔ),需要設置一個(gè)信號量信號量表示可用資源的數量,P原語(yǔ)就相當于acquire,V原語(yǔ)就相當于release。
具體使用就是通過(guò)其acquire(獲得一個(gè)許可)和release(釋放許可)方法來(lái)完成,如以下示例:
/**
* 信號量測試
* 這里用于控制對內容池的訪(fǎng)問(wèn),內容池的大小作為Semaphore的構造函數傳遞給它
* 每個(gè)線(xiàn)程獲取數據之前必須獲得許可。這樣就限制了訪(fǎng)問(wèn)線(xiàn)程池的數目
*
* @author Administrator
*
*/
public class SemaphoreTes {
private static final int MAX_AVAILABLE = 5;
protected Object[] items = { "AAA", "BBB", "CCC", "DDD", "EEE" };
protected boolean[] used = new boolean[MAX_AVAILABLE];
private final Semaphore available = new Semaphore(MAX_AVAILABLE, true);
public static void main(String args[]) {
final SemaphoreTes pool = new SemaphoreTes();
Runnable runner = new Runnable() {
@Override
public void run() {
try {
Object o;
o = pool.getItem();
System.out.println(Thread.currentThread().getName()
+ " acquire " + o);
Thread.sleep(1000);
pool.putItem(o);
System.out.println(Thread.currentThread().getName()
+ " release " + o);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
};
for (int i = 0; i < 10; i++)// 構造 10 個(gè)線(xiàn)程
{
Thread t = new Thread(runner, "t" + i);
t.start();
}
}
// 獲取數據,需要得到許可
public Object getItem() throws InterruptedException {
available.acquire();
return getNextAvailableItem();
}
// 放回數據,釋放許可
public void putItem(Object x) {
if (markAsUnused(x))
available.release();
}
protected synchronized Object getNextAvailableItem() {
for (int i = 0; i < MAX_AVAILABLE; ++i) {
if (!used[i]) {
used[i] = true;
return items[i];
}
}
return null;
}
protected synchronized boolean markAsUnused(Object item) {
for (int i = 0; i < MAX_AVAILABLE; ++i) {
if (item == items[i]) {
if (used[i]) {
used[i] = false;
return true;
} else
return false;
}
}
return false;
}
}
輸出:
t0 acquire AAA
t1 acquire BBB
t2 acquire CCC
t3 acquire DDD
t4 acquire EEE
t0 release AAA
t1 release BBB
t2 release CCC
t6 acquire AAA
t7 acquire BBB
t5 acquire CCC
t8 acquire DDD
t3 release DDD
t9 acquire EEE
t4 release EEE
t6 release AAA
t7 release BBB
t8 release DDD
t5 release CCC
t9 release EEE
信號量被初始化為容器鎖期望容量的最大值,add操作在向底層容器添加條目之前,需要先獲得一個(gè)許可,并且如果沒(méi)有加入任何東西,則立刻釋放許可。同樣,一個(gè)成功過(guò)的remove操作釋放ige許可,使得更多的元素能夠加入其中。
將信號量初始化為1,使得它在使用時(shí)最多只有一個(gè)可用的許可,從而可用作一個(gè)相互排斥的鎖。這通常也成為二進(jìn)制信號量,因為它只有兩種狀態(tài):一個(gè)可用的許可,0個(gè)可用的許可。按此方式使用時(shí),二進(jìn)制信號量具有某種屬性(與很多Lock實(shí)現不同),即可以由線(xiàn)程釋放“鎖”,控制”鎖“,而不是由所有者(因為信號量沒(méi)有權的概念)。在某些專(zhuān)門(mén)的上下文(如死鎖恢復)中會(huì )很有用。
在實(shí)際應用中,有時(shí)候需要多個(gè)線(xiàn)程同時(shí)工作以完成同一件事情,而且在完成過(guò)程中,往往會(huì )等所欲線(xiàn)程都到達某一個(gè)階段以后再統一執行。
比如有幾個(gè)旅行團需要途徑深圳、廣州、最后到達重慶。旅行團中自駕游的,有徒步的,有乘坐旅游大巴的;這些旅行團同時(shí)出發(fā),并且每到一個(gè)目的地,都要的古代其他旅行團到達此地以后再同時(shí)出發(fā),直到都到達終點(diǎn)站重慶。這種情況下,CyclicBarrier就可以用了。
它是一個(gè)同步輔助類(lèi),它允許一組線(xiàn)程相互等待,直到到達某個(gè)公共屏障點(diǎn)(common barrier point)。因為該barrier在釋放等待線(xiàn)程以后可以重用,所以稱(chēng)為循環(huán)的barrier。CyclicBarrier最重要的屬性就是參與者個(gè)數,另一個(gè)重要方法就是await()。當所有線(xiàn)程都調用了await()后,就表示這些線(xiàn)程都可以繼續執行,否則繼續等待。
關(guān)卡和閉鎖類(lèi)似,也是阻塞一組線(xiàn)程,直到某件事情發(fā)生,而不同在于關(guān)卡是等到符合某種條件的所有線(xiàn)程都達到關(guān)卡點(diǎn)。具體使用上可以用CyclicBarrier來(lái)應用關(guān)卡。
private int threadCount;
private CyclicBarrier barrier;
private int loopCount = 10;
public CyclicBarrierTest(int threadCount) {
this.threadCount = threadCount;
barrier = new CyclicBarrier(threadCount, new Runnable() {
public void run() {
System.out.println("---------------");
}
});
for (int i = 0; i < threadCount; ++i) {
Thread thread = new Thread("test-thread " + i) {
public void run() {
for (int j = 0; j < loopCount; ++j) {
doTest(j);
try {
/**
* 在到達10次以后,大家都會(huì )通過(guò)
* 在這里通過(guò)之前,會(huì )執行barrier的那個(gè)回調方法
* 而且他還可以被循環(huán)使用
*/
barrier.await();
} catch (InterruptedException e) {
return;
} catch (BrokenBarrierException e) {
return;
}
System.out.println("goon"+j);
}
}
};
thread.start();
}
}
private void doTest(int i) { /* do xxx */
System.out.println("test"+i);
}
public static void main(String args[]){
new CyclicBarrierTest(9);
}
在每個(gè)線(xiàn)程結束之后執行,可以統計用。
和閉鎖不同的是,他要求子線(xiàn)程之間相互等待,直到大家都達到后,才能繼續。

淘寶面試題:如何充分利用多核CPU,計算很大的List中所有整數的和
java多線(xiàn)程學(xué)習-java.util.concurrent詳解(一) Latch/Barrier
提供了一個(gè)同步點(diǎn),在這個(gè)同步點(diǎn),一對線(xiàn)程可以交換數據。每個(gè)線(xiàn)程通過(guò)exchange()方法的入口提供數據給他的伙伴線(xiàn)程,并接收伙伴線(xiàn)程的數據,并返回。
當運行不對稱(chēng)的活動(dòng)時(shí)很有用,比如當一個(gè)線(xiàn)程填充buffer,另一個(gè)線(xiàn)程buffer中消費數據;這些線(xiàn)程可以用exchanger來(lái)交換數據。當兩個(gè)線(xiàn)程通過(guò)exchanger交互了對象,這個(gè)交換對于連個(gè)線(xiàn)程來(lái)說(shuō)都是安全的。所以在特定的使用場(chǎng)景比較有用(兩個(gè)伙伴線(xiàn)程之間的數據交互)
注意:
1.初始化Exchanger對象時(shí),可以通過(guò)泛型指定杯子能交換的信息類(lèi)型。如“new Exchanger<String>;” 表示只能交換String類(lèi)型的信息。
2. Exchanger的exchanger方法表示當前線(xiàn)程準備交換信息,等待其他線(xiàn)程與它交換信息。當有其他線(xiàn)程調用該Exchanger對象的exchange方法時(shí),立即交換信息。
// 描述一個(gè)裝水的杯子
public static class Cup{
// 標識杯子是否有水
private boolean full = false;
public Cup(boolean full){
this.full = full;
}
// 添水,假設需要5s
public void addWater(){
if (!this.full){
try {
Thread.sleep(5000);
} catch (InterruptedException e) {
}
this.full = true;
}
}
// 喝水,假設需要10s
public void drinkWater(){
if (this.full){
try {
Thread.sleep(10000);
} catch (InterruptedException e) {
}
this.full = false;
}
}
}
public static void testExchanger() {
// 初始化一個(gè)Exchanger,并規定可交換的信息類(lèi)型是杯子
final Exchanger<Cup> exchanger = new Exchanger<Cup>();
// 初始化一個(gè)空的杯子和裝滿(mǎn)水的杯子
final Cup initialEmptyCup = new Cup(false);
final Cup initialFullCup = new Cup(true);
//服務(wù)生線(xiàn)程
class Waiter implements Runnable {
public void run() {
Cup currentCup = initialEmptyCup;
try {
int i=0;
while (i < 2){
System.out.println("服務(wù)生開(kāi)始往杯子中添水:"
+ System.currentTimeMillis());
// 往空的杯子里加水
currentCup.addWater();
System.out.println("服務(wù)生添水完畢:"
+ System.currentTimeMillis());
// 杯子滿(mǎn)后和顧客的空杯子交換
System.out.println("服務(wù)生等待與顧客交換杯子:"
+ System.currentTimeMillis());
currentCup = exchanger.exchange(currentCup);
System.out.println("服務(wù)生與顧客交換杯子完畢:"
+ System.currentTimeMillis());
i++;
}
} catch (InterruptedException ex) {
}
}
}
//顧客線(xiàn)程
class Customer implements Runnable {
public void run() {
Cup currentCup = initialFullCup;
try {
int i=0;
while (i < 2){
System.out.println("顧客開(kāi)始喝水:"
+ System.currentTimeMillis());
//把杯子里的水喝掉
currentCup.drinkWater();
System.out.println("顧客喝水完畢:"
+ System.currentTimeMillis());
//將空杯子和服務(wù)生的滿(mǎn)杯子交換
System.out.println("顧客等待與服務(wù)生交換杯子:"
+ System.currentTimeMillis());
currentCup = exchanger.exchange(currentCup);
System.out.println("顧客與服務(wù)生交換杯子完畢:"
+ System.currentTimeMillis());
i++;
}
} catch (InterruptedException ex) {
}
}
}
new Thread(new Waiter()).start();
new Thread(new Customer()).start();
}
public static void main(String[] args) {
ExchangerTest.testExchanger();
}
服務(wù)生開(kāi)始往杯子中添水:1288093204390
顧客開(kāi)始喝水:1288093204406
服務(wù)生添水完畢:1288093209390
服務(wù)生等待與顧客交換杯子:1288093209390
顧客喝水完畢:1288093214406
顧客等待與服務(wù)生交換杯子:1288093214406
服務(wù)生與顧客交換杯子完畢:1288093214406
服務(wù)生開(kāi)始往杯子中添水:1288093214406
顧客與服務(wù)生交換杯子完畢:1288093214406
① Future接口標識異步計算的結果,它提供了檢查計算是否完成的方法isDone(),以等待計算的完成,并取得計算的結果。
② 計算完成以后只能使用get獲得計算結果
③ 有必要,可以在完成前阻塞此方法
④ 取消,則以cancel方法來(lái)執行

FutureTask類(lèi)是Future的一個(gè)實(shí)現,并且實(shí)現了Runnable,所以可以通過(guò)Executor(線(xiàn)程池)來(lái)執行。
如過(guò)在主線(xiàn)程中需要執行比較耗時(shí)的操作,但又不想阻塞主線(xiàn)程時(shí),可以把這些業(yè)務(wù)交給Future對象在后臺完成,當主線(xiàn)程將來(lái)需要時(shí),可以通Future對象獲得后臺作業(yè)的計算結果或者執行狀態(tài)。
個(gè)人的感覺(jué)來(lái)看,Future和Runnable和
聯(lián)系客服