每問(wèn)題每線(xiàn)程:在于它沒(méi)有對已創(chuàng )建線(xiàn)程的數量進(jìn)行任何限制,除非對客戶(hù)端能夠拋出的請求速率進(jìn)行限制。
無(wú)限制創(chuàng )建線(xiàn)程的缺點(diǎn):
1.線(xiàn)程生命周期的開(kāi)銷(xiāo):線(xiàn)程的創(chuàng )建和關(guān)閉并不是“免費的”。
2.資源消耗量:活動(dòng)線(xiàn)程會(huì )消耗系統資源,尤其是內存。
3.穩定性。
在java中,任務(wù)執行的首要抽象不是Thread,而是Executor。
public interface Executor {
/**
* Executor只是一個(gè)簡(jiǎn)單的接口,但是他卻為一個(gè)靈活而且強大的框架創(chuàng )造了基礎。
*/
void execute(Runnable command);
}
這個(gè)框架可以用于異步任務(wù)執行,而且支持很多不同類(lèi)型的任務(wù)執行策略。它還為任務(wù)提交和任務(wù)執行之間的解耦提供了標準的方法。另外,還提供了對生命周期的支持以及鉤子函數,可以添加諸如統計收集、應用程序管理機制和監視器等擴展。
1.newCachedThreadPool
創(chuàng )建可緩存的線(xiàn)程池,多的話(huà)回收,少的話(huà)增加,沒(méi)有限制
N2.ewFixedThreadPool(int nThreads)
創(chuàng )建定長(cháng)的線(xiàn)程池,每提交一個(gè)任務(wù)就創(chuàng )建一個(gè)線(xiàn)程,直到最大。如果某個(gè)以外終止,會(huì )補充一個(gè)新的
3.newScheduledThreadPool(int corePoolSize)
定長(cháng)線(xiàn)程池,而且支持定時(shí)的以及周期性的任務(wù)執行。相當于timer。
4.newSingleThreadExecutor()
單線(xiàn)程化的executor,只創(chuàng )建唯一的工作線(xiàn)程來(lái)之心吧任務(wù)。如果它意外結束,會(huì )有另一個(gè)取代它。它會(huì )保證任務(wù)隊列所規定的順序執行。
5.newSingleThreadScheduledExecutor()
創(chuàng )建一個(gè)單線(xiàn)程執行程序,它可安排在給定延遲后運行命令或者定期地執行。
private static final Executor
exec=Executors.newFixedThreadPool(100);
ServerSocket socket=new ServerSocket(80);
while(true){
final Socket connection=socket.accept();
Runnable task=new Runnable(){
public void run(){
handleRequest(connection);
}
};
exec.execute(exec);
}
線(xiàn)程如果無(wú)法正常關(guān)閉,則會(huì )阻止JVM的結束。線(xiàn)程池中的任務(wù),可能已經(jīng)完成,可能正在運行,其他還有在隊列中等待執行。關(guān)閉的時(shí)候可能平緩的關(guān)閉,到唐突的關(guān)閉(拔掉電源)。為了解決生命周期的問(wèn)題,ExecutorService擴展了Executor,并且添加了一些用于生命周期管理的方法。
public interface ExecutorService extends Executor {
void shutdown();
List<Runnable> shutdownNow();
boolean isShutdown();
boolean isTerminated();
boolean awaitTermination(long timeout, TimeUnit unit)
throws InterruptedException;
。。。。。
}
可以關(guān)閉 ExecutorService,這將導致其拒絕新任務(wù)。提供兩個(gè)方法來(lái)關(guān)閉 ExecutorService。shutdown() 方法在終止前允許執行以前提交的任務(wù),而 shutdownNow() 方法阻止等待任務(wù)啟動(dòng)并試圖停止當前正在執行的任務(wù)。在終止時(shí),執行程序沒(méi)有任務(wù)在執行,也沒(méi)有任務(wù)在等待執行,并且無(wú)法提交新任務(wù)。應該關(guān)閉未使用的 ExecutorService 以允許回收其資源。
exec.shutdown();
Timer的問(wèn)題:
1.只能創(chuàng )建唯一的線(xiàn)程來(lái)執行所有timer任務(wù)。
2.如果一個(gè)timerTask很耗時(shí),會(huì )導致其他TimerTask的時(shí)效準確性出問(wèn)題
3.如果TimerTask拋出未檢查的異常,Timer將會(huì )產(chǎn)生無(wú)法預料的行為。Timer不會(huì )重新恢復。另外一個(gè)Timer中的Task出現異常以后,后面再給這個(gè)Timer的任務(wù)也將會(huì )無(wú)法執行。
如果要自己構建調度服務(wù),那還可以考慮使用DelayQueue,它里面的每個(gè)對象低耦合一個(gè)延遲時(shí)間有關(guān)聯(lián),只有過(guò)期以后,DelayQueue才能讓你執行take操作獲取元素。那么當它里面的對象是FutureTask的時(shí)候,就可以構成一個(gè)簡(jiǎn)單的調度隊列。
Executor框架讓定制一個(gè)執行策略變得簡(jiǎn)單,不過(guò)想要使用它,你的任務(wù)還必須實(shí)現Runnable接口。在許多服務(wù)器請求中,都存在一個(gè)情況,那就是:?jiǎn)我坏目蛻?hù)請求。它能執行一些簡(jiǎn)單的任務(wù),但是他不能返回一個(gè)值或者拋出受檢查的異常。
很多任務(wù)都會(huì )引起計算延遲,包括執行數據庫查詢(xún)、從網(wǎng)絡(luò )上獲取資源、進(jìn)行復雜的計算。這些任務(wù)Callback抽象更好。它也可以被Executor框架執行

Executors包含很多靜態(tài)方法,可以吧Runnable和PrivilegedAction封裝為Callable。

Runnable和Callable描述的是抽象的計算性任務(wù),這些任務(wù)通常是有限的,他們有開(kāi)始,而且最終會(huì )結束。
一個(gè)Executor執行的任務(wù)有4個(gè)周期,創(chuàng )建、提交、開(kāi)始、完成。由于任務(wù)的執行會(huì )花很長(cháng)時(shí)間,我們也希望可以取消任務(wù)。
Future描述了任務(wù)的生命周期,并提供了相關(guān)的方法來(lái)獲得任務(wù)的結果、取消任務(wù)以及檢驗任務(wù)是否完成還是取消。對應的isDone、isCancelled() 方法,它不能后退,一旦完成,就永遠停在完成狀態(tài)上。用get(等待)獲得結果。
/**
* @param args
* @throws InterruptedException
* @throws ExecutionException
*/
public static void main(String[] args) throws
InterruptedException {
int threadCounts = 19;// 使用的線(xiàn)程數
long sum = 0;
ExecutorService exec = Executors.newFixedThreadPool(threadCounts);
List<Callable<Long>> callList = new ArrayList<Callable<Long>>();
// 生成很大的List
List<Integer> list = new ArrayList<Integer>();
for (int i = 0; i <= 1000000; i++) {
list.add(i);
}
int len = list.size() / threadCounts;// 平均分割List
// List中的數量沒(méi)有線(xiàn)程數多(很少存在)
if (len == 0) {
threadCounts = list.size();// 采用一個(gè)線(xiàn)程處理List中的一個(gè)元素
len = list.size() / threadCounts;// 重新平均分割List
}
for (int i = 0; i < threadCounts; i++) {
final List<Integer> subList;
if (i == threadCounts - 1) {
subList = list.subList(i * len, list.size());
} else {
subList = list.subList(i * len,
len * (i + 1) > list.size() ? list.size() : len
* (i + 1));
}
// 采用匿名內部類(lèi)實(shí)現
callList.add(new Callable<Long>() {
public Long call() throws Exception {
long subSum = 0L;
for (Integer i : subList) {
subSum += i;
}
System.out.println("分配給線(xiàn)程:"
+ Thread.currentThread().getName()
+ "那一部分List的整數和為:\tSubSum:" + subSum);
return subSum;
}
});
}
List<Future<Long>> futureList = exec.invokeAll(callList);
for (Future<Long> future : futureList) {
sum += future.get();
}
exec.shutdown();
System.out.println(sum);
}
Runnable和Callable類(lèi)都可以通過(guò)Service的submit方法提交,并且返回一個(gè)Future,它表示這個(gè)任務(wù),可以獲得該任務(wù)的執行結果或者取消它。

另外,可以給為Runnable/Callable顯示的實(shí)例化一個(gè)FutureTask:
Callable pAccount = new PrivateAccount();
FutureTask futureTask = new FutureTask(pAccount);

向Executor提交一個(gè)批處理任務(wù),并且希望獲得結果,那么你將會(huì )使用Future,然后不斷的調用isDone來(lái)檢驗是否完成,這樣太麻煩,還有更好的方法,那就是完成服務(wù),CompletionService。poll方法不會(huì )等待,返回null。take方法會(huì )等待。
它整合了Executor和BlockingQueue的功能,你可以將Callable任務(wù)交給他執行,然后使用類(lèi)似于隊列中的take何poll方法,在結果完整時(shí)可用時(shí)獲得這個(gè)結果。ExecutorCompletionService是它的實(shí)現類(lèi)。
它的實(shí)現也比較簡(jiǎn)單,在構造函數中創(chuàng )建一個(gè)BlockingQueue,用它保存結果:
private final BlockingQueue<Future<V>> completionQueue;
提交的任務(wù)被包裝成QueueFuture:
private class QueueingFuture extends FutureTask<Void> {
QueueingFuture(RunnableFuture<V> task) {
super(task, null);
this.task = task;
}
protected void done() { completionQueue.add(task); }
private final Future<V> task;
}
覆寫(xiě)done方法,將結果置入BlockingQueue。
它與上面獲得一堆FutureTask,然后遍歷的去get等返回還不一樣。它只能一個(gè)個(gè)獲取,代表有一個(gè)拿一個(gè)。FutureTask的get可能后面的FutureTask都已經(jīng)好了,可是有一個(gè)還沒(méi)好,那就卡在中間了。
聯(lián)系客服