然而,創(chuàng )建和銷(xiāo)毀線(xiàn)程本身就有一定的開(kāi)銷(xiāo),如果頻繁創(chuàng )建和銷(xiāo)毀線(xiàn)程,CPU和內存開(kāi)銷(xiāo)就不可忽略,垃圾收集器還必須負擔更多的工作。因此,線(xiàn)程池就是為了避免頻繁創(chuàng )建和銷(xiāo)毀線(xiàn)程。
每當服務(wù)器接受了一個(gè)新的請求后,服務(wù)器就從線(xiàn)程池中挑選一個(gè)等待的線(xiàn)程并執行請求處理。處理完畢后,線(xiàn)程并不結束,而是轉為阻塞狀態(tài)再次被放入線(xiàn)程池中。這樣就避免了頻繁創(chuàng )建和銷(xiāo)毀線(xiàn)程。
Worker Pattern實(shí)現了類(lèi)似線(xiàn)程池的功能。首先定義Task接口:
package com.crackj2ee.thread;
public interface Task {
void execute();
}
線(xiàn)程將負責執行execute()方法。注意到任務(wù)是由子類(lèi)通過(guò)實(shí)現execute()方法實(shí)現的,線(xiàn)程本身并不知道自己執行的任務(wù)。它只負責運行一個(gè)耗時(shí)的execute()方法。
具體任務(wù)由子類(lèi)實(shí)現,我們定義了一個(gè)CalculateTask和一個(gè)TimerTask:
// CalculateTask.java
package com.crackj2ee.thread;
public class CalculateTask implements Task {
private static int count = 0;
private int num = count;
public CalculateTask() {
count++;
}
public void execute() {
System.out.println("[CalculateTask " + num + "] start...");
try {
Thread.sleep(3000);
}
catch(InterruptedException ie) {}
System.out.println("[CalculateTask " + num + "] done.");
}
}
// TimerTask.java
package com.crackj2ee.thread;
public class TimerTask implements Task {
private static int count = 0;
private int num = count;
public TimerTask() {
count++;
}
public void execute() {
System.out.println("[TimerTask " + num + "] start...");
try {
Thread.sleep(2000);
}
catch(InterruptedException ie) {}
System.out.println("[TimerTask " + num + "] done.");
}
}
以上任務(wù)均簡(jiǎn)單的sleep若干秒。
TaskQueue實(shí)現了一個(gè)隊列,客戶(hù)端可以將請求放入隊列,服務(wù)器線(xiàn)程可以從隊列中取出任務(wù):
package com.crackj2ee.thread;
import java.util.*;
public class TaskQueue {
private List queue = new LinkedList();
public synchronized Task getTask() {
while(queue.size()==0) {
try {
this.wait();
}
catch(InterruptedException ie) {
return null;
}
}
return (Task)queue.remove(0);
}
public synchronized void putTask(Task task) {
queue.add(task);
this.notifyAll();
}
}
終于到了真正的WorkerThread,這是真正執行任務(wù)的服務(wù)器線(xiàn)程:
package com.crackj2ee.thread;
public class WorkerThread extends Thread {
private static int count = 0;
private boolean busy = false;
private boolean stop = false;
private TaskQueue queue;
public WorkerThread(ThreadGroup group, TaskQueue queue) {
super(group, "worker-" + count);
count++;
this.queue = queue;
}
public void shutdown() {
stop = true;
this.interrupt();
try {
this.join();
}
catch(InterruptedException ie) {}
}
public boolean isIdle() {
return !busy;
}
public void run() {
System.out.println(getName() + " start.");
while(!stop) {
Task task = queue.getTask();
if(task!=null) {
busy = true;
task.execute();
busy = false;
}
}
System.out.println(getName() + " end.");
}
}
前面已經(jīng)講過(guò),queue.getTask()是一個(gè)阻塞方法,服務(wù)器線(xiàn)程可能在此wait()一段時(shí)間。此外,WorkerThread還有一個(gè)shutdown方法,用于安全結束線(xiàn)程。
最后是ThreadPool,負責管理所有的服務(wù)器線(xiàn)程,還可以動(dòng)態(tài)增加和減少線(xiàn)程數:
package com.crackj2ee.thread;
import java.util.*;
public class ThreadPool extends ThreadGroup {
private List threads = new LinkedList();
private TaskQueue queue;
public ThreadPool(TaskQueue queue) {
super("Thread-Pool");
this.queue = queue;
}
public synchronized void addWorkerThread() {
Thread t = new WorkerThread(this, queue);
threads.add(t);
t.start();
}
public synchronized void removeWorkerThread() {
if(threads.size()>0) {
WorkerThread t = (WorkerThread)threads.remove(0);
t.shutdown();
}
}
public synchronized void currentStatus() {
System.out.println("-----------------------------------------------");
System.out.println("Thread count = " + threads.size());
Iterator it = threads.iterator();
while(it.hasNext()) {
WorkerThread t = (WorkerThread)it.next();
System.out.println(t.getName() + ": " + (t.isIdle() ? "idle" : "busy"));
}
System.out.println("-----------------------------------------------");
}
}
currentStatus()方法是為了方便調試,打印出所有線(xiàn)程的當前狀態(tài)。
最后,Main負責完成main()方法:
package com.crackj2ee.thread;
public class Main {
public static void main(String[] args) {
TaskQueue queue = new TaskQueue();
ThreadPool pool = new ThreadPool(queue);
for(int i=0; i<10; i++) {
queue.putTask(new CalculateTask());
queue.putTask(new TimerTask());
}
pool.addWorkerThread();
pool.addWorkerThread();
doSleep(8000);
pool.currentStatus();
pool.addWorkerThread();
pool.addWorkerThread();
pool.addWorkerThread();
pool.addWorkerThread();
pool.addWorkerThread();
doSleep(5000);
pool.currentStatus();
}
private static void doSleep(long ms) {
try {
Thread.sleep(ms);
}
catch(InterruptedException ie) {}
}
}
main()一開(kāi)始放入了20個(gè)Task,然后動(dòng)態(tài)添加了一些服務(wù)線(xiàn)程,并定期打印線(xiàn)程狀態(tài),運行結果如下:
worker-0 start.
[CalculateTask 0] start...
worker-1 start.
[TimerTask 0] start...
[TimerTask 0] done.
[CalculateTask 1] start...
[CalculateTask 0] done.
[TimerTask 1] start...
[CalculateTask 1] done.
[CalculateTask 2] start...
[TimerTask 1] done.
[TimerTask 2] start...
[TimerTask 2] done.
[CalculateTask 3] start...
-----------------------------------------------
Thread count = 2
worker-0: busy
worker-1: busy
-----------------------------------------------
[CalculateTask 2] done.
[TimerTask 3] start...
worker-2 start.
[CalculateTask 4] start...
worker-3 start.
[TimerTask 4] start...
worker-4 start.
[CalculateTask 5] start...
worker-5 start.
[TimerTask 5] start...
worker-6 start.
[CalculateTask 6] start...
[CalculateTask 3] done.
[TimerTask 6] start...
[TimerTask 3] done.
[CalculateTask 7] start...
[TimerTask 4] done.
[TimerTask 7] start...
[TimerTask 5] done.
[CalculateTask 8] start...
[CalculateTask 4] done.
[TimerTask 8] start...
[CalculateTask 5] done.
[CalculateTask 9] start...
[CalculateTask 6] done.
[TimerTask 9] start...
[TimerTask 6] done.
[TimerTask 7] done.
-----------------------------------------------
Thread count = 7
worker-0: idle
worker-1: busy
worker-2: busy
worker-3: idle
worker-4: busy
worker-5: busy
worker-6: busy
-----------------------------------------------
[CalculateTask 7] done.
[CalculateTask 8] done.
[TimerTask 8] done.
[TimerTask 9] done.
[CalculateTask 9] done.
仔細觀(guān)察:一開(kāi)始只有兩個(gè)服務(wù)器線(xiàn)程,因此線(xiàn)程狀態(tài)都是忙,后來(lái)線(xiàn)程數增多,7個(gè)線(xiàn)程中的兩個(gè)狀態(tài)變成idle,說(shuō)明處于wait()狀態(tài)。
思考:本例的線(xiàn)程調度算法其實(shí)根本沒(méi)有,因為這個(gè)應用是圍繞TaskQueue設計的,不是以Thread Pool為中心設計的。因此,Task調度取決于TaskQueue的getTask()方法,你可以改進(jìn)這個(gè)方法,例如使用優(yōu)先隊列,使優(yōu)先級高的任務(wù)先被執行。
如果所有的服務(wù)器線(xiàn)程都處于busy狀態(tài),則說(shuō)明任務(wù)繁忙,TaskQueue的隊列越來(lái)越長(cháng),最終會(huì )導致服務(wù)器內存耗盡。因此,可以限制 TaskQueue的等待任務(wù)數,超過(guò)最大長(cháng)度就拒絕處理。許多Web服務(wù)器在用戶(hù)請求繁忙時(shí)就會(huì )拒絕用戶(hù):HTTP 503 SERVICE UNAVAILABLE
聯(lián)系客服