一、描述
Java提供的NIO API來(lái)開(kāi)發(fā)高性能網(wǎng)絡(luò )服務(wù)器,JDK 1.4以前的網(wǎng)絡(luò )通信程序是基于阻塞式API的——即當程序執行輸入、輸出操作后,在這些操作返回之前會(huì )一直阻塞該線(xiàn)程,所以服務(wù)器必須為每個(gè)客戶(hù)端都提供一條獨立線(xiàn)程進(jìn)行處理,當服務(wù)器需要同時(shí)處理大量客戶(hù)端時(shí),這種做法會(huì )導致性能下降。使用NIO API則可以讓服務(wù)器使用一個(gè)或有限幾個(gè)線(xiàn)程來(lái)同時(shí)處理連接到服務(wù)器上的所有客戶(hù)端。
NIO使用面向緩沖(buffer)的模型。這就是說(shuō),NIO主要處理大塊的數據。這就避免了利用流模型處理所引起的問(wèn)題,在有可能的情況下,它甚至可以為了得到最大的吞吐量而使用系統級的工具?;玖?/span>InputStream和OutputStream能夠讀寫(xiě)字節數據;它們的子類(lèi)可以讀寫(xiě)各種各樣的數據。在NIO中,所有的數據都通過(guò)緩沖讀寫(xiě)。從圖1可以看到兩種模型的比較:
圖1.流模型使用Streams和Bytes;NIO模型使用Channels和Buffers
使用緩沖的好處:
A. 它可以大塊的處理數據。你可以讀寫(xiě)大塊數據,緩沖的大小只受你所分配的內存數量的限制。
B. 它可以表示系統級的緩沖。多種系統采用統一的內存配置完成I/O處理,而不需要將數據從系統內存中拷貝到應用程序的內存空間。buffer對象的不同實(shí)現可以直接表示這些系統級的緩沖,這就意味著(zhù)你可以用最少的拷貝次數來(lái)完成對數據的讀寫(xiě)。
二、Select工具
select提供了一種很好的方法來(lái)完成大量的數據源并行處理。它的名字來(lái)源于Unix系統中提供相同功能的C程序系統調用select()。
阻塞式編程特點(diǎn):
通常,I/O屬于阻塞式系統調用。當你對輸入流調用read()方法,直到數據讀入完成之前方法一直被阻塞。如果你讀入本地文件就不需要等待很長(cháng)時(shí)間。但是如果你從文件服務(wù)器或這是socket連接讀取數據的話(huà),那么你就要等很長(cháng)時(shí)間。但你在等待過(guò)程中,你讀取數據的線(xiàn)程將不能做任何事。
當然,在Java中你很容易為多個(gè)流創(chuàng )建多個(gè)線(xiàn)程。但是線(xiàn)程需要消耗大量的資源。在很多實(shí)現中,每個(gè)線(xiàn)程需要占用一塊內存,即使它什么也不做。同時(shí)太多的線(xiàn)程會(huì )對性能造成很大的影響。
Select編程特點(diǎn):
select采用不同的工作方式。通過(guò)selet你把輸入流注冊到一個(gè)Selector對象上。當某個(gè)流發(fā)生I/O活動(dòng)時(shí),selector將會(huì )通知你。以這種方式就可以只用一個(gè)線(xiàn)程讀入多個(gè)數據源。盡管Selector不能幫你讀取數據,但是它可以監聽(tīng)網(wǎng)絡(luò )連接請求和越過(guò)較慢的通道進(jìn)行寫(xiě)數據。
Java的NIO為非阻塞式的Socket通信提供了如下幾個(gè)特殊類(lèi):
Selector:
它是SelectableChannel對象的多路復用器,所有希望采用非阻塞方式進(jìn)行通信的Channel都應該注冊到Selector對象??赏ㄟ^(guò)調用此類(lèi)的靜態(tài)open()方法來(lái)創(chuàng )建Selector實(shí)例,該方法將使用系統默認的Selector來(lái)返回新的Selector。Selector可以同時(shí)監控多個(gè)SelectableChannel的IO狀況,是非阻塞IO的核心。
一個(gè)Selector實(shí)例有3個(gè)SelectionKey的集合:
A. 所有SelectionKey集合:代表了注冊在該Selector上的Channel,這個(gè)集合可以通過(guò)keys()方法返回。
B. 被選擇的SelectionKey集合:代表了所有可通過(guò)select()方法監測到、需要進(jìn)行IO處理的Channel,這個(gè)集合可以通過(guò)selectedKeys()返回。
C. 被取消的SelectionKey集合:代表了所有被取消注冊關(guān)系的Channel,在下一次執行select()方法時(shí),這些Channel對應的SelectionKey會(huì )被徹底刪除,程序通常無(wú)須直接訪(fǎng)問(wèn)該集合。
Select 相關(guān)的方法:
A. int select() :監控所有注冊的Channel,當它們中間有需要處理的IO操作時(shí),該方法返回,并將對應的SelectionKey加入被選擇的SelectionKey集合中,該方法返回這些Channel的數量。
B. int select(long timeout):可以設置超時(shí)時(shí)長(cháng)的select()操作。
C. int selectNow():執行一個(gè)立即返回的select()操作,相對于無(wú)參數的select()方法而言,該方法不會(huì )阻塞線(xiàn)程。
D. Selector wakeup():使一個(gè)還未返回的select()方法立刻返回。
SelectableChannel:
它代表可以支持非阻塞IO操作的Channel對象,可以將其注冊到Selector上,這種注冊的關(guān)系由SelectionKey實(shí)例表示。應用程序可調用SelectableChannel 的register()方法將其注冊到指定Selector上,當該Selector上某些SelectableChannel上有需要處理的IO操作時(shí),程序可以調用Selector實(shí)例的select()方法獲取它們的數量,并可以通過(guò)selectedKeys()方法返回它們對應的SelectKey集合——通過(guò)該集合就可以獲取所有需要處理IO操作的SelectableChannel集。
SelectableChannel對象支持阻塞和非阻塞兩種模式(所有channel默認都是阻塞模式),必須使用非阻塞式模式才可以利用非阻塞IO操作。
SelectableChannel提供了如下兩個(gè)方法來(lái)設置和返回該Channel的模式狀態(tài):
SelectableChannel configureBlocking(boolean block):設置是否采用阻塞模式。
boolean isBlocking():返回該Channel是否是阻塞模式。
使用NIO實(shí)現非阻塞式服務(wù)器的示意圖:
從圖中可以看出,服務(wù)器上所有Channel(包括ServerSocketChannel和SocketChannel)都需要向Selector注冊,而該Selector則負責監視這些Socket的IO狀態(tài),當其中任意一個(gè)或多個(gè)Channel具有可用的IO操作時(shí),該Selector的select()方法將會(huì )返回大于0的整數,該整數值就表示該Selector上有多少個(gè)Channel具有可用的IO操作,并提供了selectedKeys()方法來(lái)返回這些Channel對應的SelectionKey集合。正是通過(guò)Selector,使得服務(wù)器端只需要不斷地調用Selector實(shí)例的select()方法即可知道當前所有Channel是否有需要處理的IO操作。
三、應用范例
服務(wù)端代碼:
import java.io.IOException;
import java.net.InetSocketAddress;
import java.nio.ByteBuffer;
import java.nio.channels.SelectionKey;
import java.nio.channels.Selector;
import java.nio.channels.ServerSocketChannel;
import java.nio.channels.SocketChannel;
import java.util.Iterator;
import java.util.Set;
class AsyncServer implements Runnable {
private ByteBuffer r_buff = ByteBuffer.allocate(1024);
private ByteBuffer w_buff = ByteBuffer.allocate(1024);
private static int port = 8848;
public AsyncServer() {
new Thread(this).start();
}
private void info(String str){
System.out.println(str);
}
public void run() {
try {
Selector s = Selector.open(); //// 生成一個(gè)信號監視器
ServerSocketChannel ssc = ServerSocketChannel.open(); // 生成一個(gè)偵聽(tīng)端
ssc.configureBlocking(false);// 將偵聽(tīng)端設為異步方式
// 偵聽(tīng)端綁定到一個(gè)端口
ssc.socket().bind(new InetSocketAddress(port));
ssc.register(s, SelectionKey.OP_ACCEPT,new NetEventHandler());// 設置偵聽(tīng)端所選的異步信號OP_ACCEPT
info(”開(kāi)始啟動(dòng)服務(wù)器”);
while (true) {
int n = s.select(100);
if (n == 0) {// 沒(méi)有指定的I/O事件發(fā)生
continue;
}
Set<SelectionKey> readys = s.selectedKeys();
if(readys.size() == 0){
continue;
}
while (readys.iterator().hasNext()) {
SelectionKey key = readys.iterator().next();
if (key.isAcceptable()) {// 偵聽(tīng)端信號觸發(fā)
info(”偵聽(tīng)端信號觸發(fā)”);
ServerSocketChannel server = (ServerSocketChannel) key.channel();
SocketChannel sc = server.accept();
sc.configureBlocking(false);
sc.register(s, SelectionKey.OP_READ,new NetEventHandler());
}
if (key.isReadable()) {// 某socket可讀信號
DealwithData(key);
}
readys.iterator().remove();
}
}
} catch (Exception e) {
e.printStackTrace();
}
}
public void DealwithData(SelectionKey key) throws IOException {
NetEventHandler eventHandler = (NetEventHandler)key.attachment();
info(”eventHandler:” + eventHandler);
// 由key獲取指定socketchannel的引用
SocketChannel sc = (SocketChannel) key.channel();
r_buff.clear();
int count;
while ((count = sc.read(r_buff)) > 0);
// 將r_buff內容拷入w_buff
r_buff.flip();
w_buff.clear();
w_buff.put(r_buff);
w_buff.flip();
// 將數據返回給客戶(hù)端
EchoToClient(sc);
w_buff.clear();
r_buff.clear();
}
public void EchoToClient(SocketChannel sc) throws IOException {
while (w_buff.hasRemaining())
sc.write(w_buff);
}
public static void main(String args[]) {
if (args.length > 0) {
port = Integer.parseInt(args[0]);
}
new AsyncServer();
}
}
客戶(hù)端代碼:
import java.io.BufferedReader;
import java.io.IOException;
import java.io.InputStreamReader;
import java.net.InetSocketAddress;
import java.nio.ByteBuffer;
import java.nio.channels.SocketChannel;
class AsyncClient {
private SocketChannel sc;
private final int MAX_LENGTH = 1024;
private ByteBuffer r_buff = ByteBuffer.allocate(MAX_LENGTH);
private ByteBuffer w_buff = ByteBuffer.allocate(MAX_LENGTH);
private static String host ;
private static int port = 8848;
public AsyncClient() {
try {
InetSocketAddress addr = new InetSocketAddress(host, port);
// 生成一個(gè)socketchannel
sc = SocketChannel.open();
// 連接到server
sc.connect(addr);
while (!sc.finishConnect())
;
System.out.println(”connection has been established!…”);
while (true) {
// 回射消息
String echo;
try {
System.err.println(”Enter msg you’d like to send: “);
BufferedReader br = new BufferedReader(
new InputStreamReader(System.in));
// 輸入回射消息
echo = br.readLine();
// 把回射消息放入w_buff中
w_buff.clear();
w_buff.put(echo.getBytes());
w_buff.flip();
} catch (IOException ioe) {
System.err.println(”sth. is wrong with br.readline() “);
}
// 發(fā)送消息
while (w_buff.hasRemaining())
sc.write(w_buff);
w_buff.clear();
// 進(jìn)入接收狀態(tài)
Rec();
// 間隔1秒
Thread.currentThread().sleep(1000);
}
} catch (IOException ioe) {
ioe.printStackTrace();
} catch (InterruptedException ie) {
ie.printStackTrace();
}
}
public void Rec() throws IOException {
int count;
r_buff.clear();
count = sc.read(r_buff);
r_buff.flip();
byte[] temp = new byte[r_buff.limit()];
r_buff.get(temp);
System.out.println(”reply is ” + count + ” long, and content is: “
+ new String(temp));
}
public static void main(String args[]) {
if (args.length < 1) {// 輸入需有主機名或IP地址
try {
System.err.println(”Enter host name: “);
BufferedReader br = new BufferedReader(new InputStreamReader(
System.in));
host = br.readLine();
} catch (IOException ioe) {
System.err.println(”sth. is wrong with br.readline() “);
}
} else if (args.length == 1) {
host = args[0];
} else if (args.length > 1) {
host = args[0];
port = Integer.parseInt(args[1]);
}
new AsyncClient();
}
}
聯(lián)系客服