并發(fā)庫中的BlockingQueue是一個(gè)比較好玩的類(lèi),顧名思義,就是阻塞隊列。該類(lèi)主要提供了兩個(gè)方法put()和take(),前者將一個(gè)對象放到隊列中,如果隊列已經(jīng)滿(mǎn)了,就等待直到有空閑節點(diǎn);后者從head取一個(gè)對象,如果沒(méi)有對象,就等待直到有可取的對象。
下面的例子比較簡(jiǎn)單,一個(gè)讀線(xiàn)程,用于將要處理的文件對象添加到阻塞隊列中,另外四個(gè)寫(xiě)線(xiàn)程用于取出文件對象,為了模擬寫(xiě)操作耗時(shí)長(cháng)的特點(diǎn),特讓線(xiàn)程睡眠一段隨機長(cháng)度的時(shí)間。另外,該Demo也使用到了線(xiàn)程池和原子整型(AtomicInteger),AtomicInteger可以在并發(fā)情況下達到原子化更新,避免使用了synchronized,而且性能非常高。由于阻塞隊列的put和take操作會(huì )阻塞,為了使線(xiàn)程退出,特在隊列中添加了一個(gè)“標識”,算法中也叫“哨兵”,當發(fā)現這個(gè)哨兵后,寫(xiě)線(xiàn)程就退出。
當然線(xiàn)程池也要顯式退出了。
package concurrent;
import java.io.File;
import java.io.FileFilter;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.atomic.AtomicInteger;
public class TestBlockingQueue {
static long randomTime() {
return (long) (Math.random() * 1000);
}
public static void main(String[] args) {
// 能容納100個(gè)文件
final BlockingQueue<File> queue = new LinkedBlockingQueue<File>(100);
// 線(xiàn)程池
final ExecutorService exec = Executors.newFixedThreadPool(5);
final File root = new File("F:\\JavaLib");
// 完成標志
final File exitFile = new File("");
// 讀個(gè)數
final AtomicInteger rc = new AtomicInteger();
// 寫(xiě)個(gè)數
final AtomicInteger wc = new AtomicInteger();
// 讀線(xiàn)程
Runnable read = new Runnable() {
public void run() {
scanFile(root);
scanFile(exitFile);
}
public void scanFile(File file) {
if (file.isDirectory()) {
File[] files = file.listFiles(new FileFilter() {
public boolean accept(File pathname) {
return pathname.isDirectory()
|| pathname.getPath().endsWith(".java");
}
});
for (File one : files)
scanFile(one);
} else {
try {
int index = rc.incrementAndGet();
System.out.println("Read0: " + index + " "
+ file.getPath());
queue.put(file);
} catch (InterruptedException e) {
}
}
}
};
exec.submit(read);
// 四個(gè)寫(xiě)線(xiàn)程
for (int index = 0; index < 4; index++) {
// write thread
final int NO = index;
Runnable write = new Runnable() {
String threadName = "Write" + NO;
public void run() {
while (true) {
try {
Thread.sleep(randomTime());
int index = wc.incrementAndGet();
File file = queue.take();
// 隊列已經(jīng)無(wú)對象
if (file == exitFile) {
// 再次添加"標志",以讓其他線(xiàn)程正常退出
queue.put(exitFile);
break;
}
System.out.println(threadName + ": " + index + " "
+ file.getPath());
} catch (InterruptedException e) {
}
}
}
};
exec.submit(write);
}
exec.shutdown();
}
}聯(lián)系客服