欧美性猛交XXXX免费看蜜桃,成人网18免费韩国,亚洲国产成人精品区综合,欧美日韩一区二区三区高清不卡,亚洲综合一区二区精品久久

打開(kāi)APP
userphoto
未登錄

開(kāi)通VIP,暢享免費電子書(shū)等14項超值服

開(kāi)通VIP
Apache Mina Server 2.0 中文參考手冊

Apache Mina Server 2.0 中文參考手冊

 

Apache Mina Server 是一個(gè)網(wǎng)絡(luò )通信應用框架,也就是說(shuō),它主要是對基于TCP/IP、UDP/IP

協(xié)議棧的通信框架(當然,也可以提供JAVA 對象的序列化服務(wù)、虛擬機管道通信服務(wù)等),

Mina 可以幫助我們快速開(kāi)發(fā)高性能、高擴展性的網(wǎng)絡(luò )通信應用,Mina 提供了事件驅動(dòng)、異

步(Mina 的異步IO 默認使用的是JAVA NIO 作為底層支持)操作的編程模型。

Mina 主要有1.x 2.x 兩個(gè)分支,這里我們講解最新版本2.0,如果你使用的是Mina 1.x,

那么可能會(huì )有一些功能并不適用。學(xué)習本文檔,需要你已掌握JAVA IO、JAVA NIO、JAVA

Socket、JAVA 線(xiàn)程及并發(fā)庫(java.util.concurrent.*)的知識。

Mina 同時(shí)提供了網(wǎng)絡(luò )通信的Server 端、Client 端的封裝,無(wú)論是哪端,Mina 在整個(gè)網(wǎng)通

通信結構中都處于如下的位置:

可見(jiàn)Mina API 將真正的網(wǎng)絡(luò )通信與我們的應用程序隔離開(kāi)來(lái),你只需要關(guān)心你要發(fā)送、

接收的數據以及你的業(yè)務(wù)邏輯即可。

同樣的,無(wú)論是哪端,Mina 的執行流程如下所示:

(1.) IoService:這個(gè)接口在一個(gè)線(xiàn)程上負責套接字的建立,擁有自己的Selector,監

聽(tīng)是否有連接被建立。

(2.) IoProcessor:這個(gè)接口在另一個(gè)線(xiàn)程上負責檢查是否有數據在通道上讀寫(xiě),也就是

說(shuō)它也擁有自己的Selector,這是與我們使用JAVA NIO 編碼時(shí)的一個(gè)不同之處,

通常在JAVA NIO 編碼中,我們都是使用一個(gè)Selector,也就是不區分IoService

IoProcessor 兩個(gè)功能接口。另外,IoProcessor負責調用注冊在IoService

的過(guò)濾器,并在過(guò)濾器鏈之后調用IoHandler。

(3.) IoFilter:這個(gè)接口定義一組攔截器,這些攔截器可以包括日志輸出、黑名單過(guò)濾、

數據的編碼(write 方向)與解碼(read 方向)等功能,其中數據的encode decode

是最為重要的、也是你在使用Mina 時(shí)最主要關(guān)注的地方。

(4.) IoHandler:這個(gè)接口負責編寫(xiě)業(yè)務(wù)邏輯,也就是接收、發(fā)送數據的地方。

_______________________________________________________________________________

1. 簡(jiǎn)單的TCPServer

(1.) 第一步:編寫(xiě)IoService

按照上面的執行流程,我們首先需要編寫(xiě)IoService,IoService本身既是服務(wù)端,又是客

戶(hù)端,我們這里編寫(xiě)服務(wù)端,所以使用IoAcceptor 實(shí)現,由于IoAcceptor是與協(xié)議無(wú)關(guān)的,

因為我們要編寫(xiě)TCPServer,所以我們使用IoAcceptor的實(shí)現NioSocketAcceptor,實(shí)際上

底層就是調用java.nio.channels.ServerSocketChannel 類(lèi)。當然,如果你使用了Apache

APR 庫,那么你可以選擇使用AprSocketAcceptor 作為TCPServer 的實(shí)現,據傳說(shuō)Apache APR

庫的性能比JVM 自帶的本地庫高出很多。

那么IoProcessor 是由指定的IoService 內部創(chuàng )建并調用的,我們并不需要關(guān)心。

publicclass MyServer {

main方法:

IoAcceptoracceptor=new NioSocketAcceptor();

acceptor.getSessionConfig().setReadBufferSize(2048);

acceptor.getSessionConfig.setIdleTime(IdleStatus.BOTH_IDLE,10);

acceptor.bind(newInetSocketAddress(9123));

}

這段代碼我們初始化了服務(wù)端的TCP/IP 的基于NIO 的套接字,然后調用IoSessionConfig

設置讀取數據的緩沖區大小、讀寫(xiě)通道均在10 秒內無(wú)任何操作就進(jìn)入空閑狀態(tài)。

(2.) 第二步:編寫(xiě)過(guò)濾器

這里我們處理最簡(jiǎn)單的字符串傳輸,Mina 已經(jīng)為我們提供了TextLineCodecFactory編解碼

器工廠(chǎng)來(lái)對字符串進(jìn)行編解碼處理。

acceptor.getFilterChain().addLast("codec",

newProtocolCodecFilter(

newTextLineCodecFactory(

Charset.forName("UTF-8"),

LineDelimeter.WINDOWS.getValue(),

LineDelimiter.WINDOWS.getValue()

)

)

);

這段代碼要在acceptor.bind()方法之前執行,因為綁定套接字之后就不能再做這些準備工

作了。

這里先不用清楚編解碼器是如何工作的,這個(gè)是后面重點(diǎn)說(shuō)明的內容,這里你只需要清楚,

我們傳輸的以換行符為標識的數據,所以使用了Mina 自帶的換行符編解碼器工廠(chǎng)。

(3.) 第三步:編寫(xiě)IoHandler

這里我們只是簡(jiǎn)單的打印Client 傳說(shuō)過(guò)來(lái)的數據。

public class MyIoHandlerextends IoHandlerAdapter {

//這里我們使用的SLF4J作為日志門(mén)面,至于為什么在后面說(shuō)明。

private final static Logger log =LoggerFactory

.getLogger(MyIoHandler.class);

@Override

public void messageReceived(IoSessionsession, Object message)

throws Exception{

Stringstr = message.toString();

log.info("The message received is[" + str + "]");

if (str.endsWith("quit")) {

session.close(true);

return;

}

}

}

然后我們把這個(gè)IoHandler 注冊到IoService

acceptor.setHandler(newMyIoHandler());

當然這段代碼也要在acceptor.bind()方法之前執行。

然后我們運行MyServer 中的main 方法,你可以看到控制臺一直處于阻塞狀態(tài),此時(shí),我們

telnet127.0.0.1 9123 訪(fǎng)問(wèn),然后輸入一些內容,當按下回車(chē)鍵,你會(huì )發(fā)現數據在

Server 端被輸出,但要注意不要輸入中文,因為Windows 的命令行窗口不會(huì )對傳輸的數據

進(jìn)行UTF-8 編碼。當輸入quit 結尾的字符串時(shí),連接被斷開(kāi)。

這里注意你如果使用的操作系統,或者使用的Telnet 軟件的換行符是什么,如果不清楚,

可以刪掉第二步中的兩個(gè)紅色的參數,使用TextLineCodec 內部的自動(dòng)識別機制。

_______________________________________________________________________________

2. 簡(jiǎn)單的TCPClient

這里我們實(shí)現Mina 中的TCPClient,因為前面說(shuō)過(guò)無(wú)論是Server 端還是Client 端,在Mina

中的執行流程都是一樣的。唯一不同的就是IoService Client端實(shí)現是IoConnector。

(1.) 第一步:編寫(xiě)IoService并注冊過(guò)濾器

publicclass MyClient {

main方法:

IoConnectorconnector=new NioSocketConnector();

connector.setConnectTimeoutMillis(30000);

connector.getFilterChain().addLast("codec",

newProtocolCodecFilter(

newTextLineCodecFactory(

Charset.forName("UTF-8"),

LineDelimiter.WINDOWS.getValue(),

LineDelimiter.WINDOWS.getValue()

)

)

);

connector.connect(newInetSocketAddress("localhost", 9123));

}

(2.) 第三步:編寫(xiě)IoHandler

public class ClientHandlerextends IoHandlerAdapter {

private final static Logger LOGGER =LoggerFactory

.getLogger(ClientHandler.class);

private final Stringvalues;

public ClientHandler(Stringvalues) {

this.values = values;

}

@Override

public void sessionOpened(IoSessionsession) {

session.write(values);

}

}

注冊IoHandler

connector.setHandler(newClientHandler("你好!\r\n 大家好!"));

然后我們運行MyClient,你會(huì )發(fā)現MyServer 輸出如下語(yǔ)句:

Themessage received is [你好!]

Themessage received is [大家好!]

我們看到服務(wù)端是按照收到兩條消息輸出的,因為我們用的編解碼器是以換行符判斷數據是

否讀取完畢的。

_______________________________________________________________________________

3. 介紹MinaTCP的主要接口:

通過(guò)上面的兩個(gè)示例,你應該對Mina 如何編寫(xiě)TCP/IP 協(xié)議棧的網(wǎng)絡(luò )通信有了一些感性的認

識。

(1.)IoService

這個(gè)接口是服務(wù)端IoAcceptor、客戶(hù)端IoConnector的抽象,提供IO 服務(wù)和管理IoSession

的功能,它有如下幾個(gè)常用的方法:

A. TransportMetadata getTransportMetadata()

這個(gè)方法獲取傳輸方式的元數據描述信息,也就是底層到底基于什么的實(shí)現,譬如:nio、

apr 等。

B. void addListener(IoServiceListener listener)

這個(gè)方法可以為IoService 增加一個(gè)監聽(tīng)器,用于監聽(tīng)IoService的創(chuàng )建、活動(dòng)、失效、空

閑、銷(xiāo)毀,具體可以參考IoServiceListener 接口中的方法,這為你參與IoService 的生命

周期提供了機會(huì )。

C. void removeListener(IoServiceListener listener)

這個(gè)方法用于移除上面的方法添加的監聽(tīng)器。

D. void setHandler(IoHandler handler)

這個(gè)方法用于向IoService 注冊IoHandler,同時(shí)有getHandler()方法獲取Handler。

E. Map<Long,IoSession> getManagedSessions()

這個(gè)方法獲取IoService 上管理的所有IoSession,Map key IoSession id。

F. IoSessionConfig getSessionConfig()

這個(gè)方法用于獲取IoSession 的配置對象,通過(guò)IoSessionConfig對象可以設置Socket

接的一些選項。

_______________________________________________________________________________

(2.)IoAcceptor

這個(gè)接口是TCPServer 的接口,主要增加了voidbind()監聽(tīng)端口、void unbind()解除對

套接字的監聽(tīng)等方法。這里與傳統的JAVA 中的ServerSocket不同的是IoAcceptor 可以多

次調用bind()方法(或者在一個(gè)方法中傳入多個(gè)SocketAddress參數)同時(shí)監聽(tīng)多個(gè)端口。

_______________________________________________________________________________

(3.)IoConnector

這個(gè)接口是TCPClient 的接口, 主要增加了ConnectFutureconnect(SocketAddress

remoteAddress,SocketAddress localAddress)方法,用于與Server 端建立連接,第二個(gè)參

數如果不傳遞則使用本地的一個(gè)隨機端口訪(fǎng)問(wèn)Server 端。這個(gè)方法是異步執行的,同樣的,

也可以同時(shí)連接多個(gè)服務(wù)端。

_______________________________________________________________________________

(4.)IoSession

這個(gè)接口用于表示Server 端與Client 端的連接,IoAcceptor.accept()的時(shí)候返回實(shí)例。

這個(gè)接口有如下常用的方法:

A. WriteFuture write(Object message)

這個(gè)方法用于寫(xiě)數據,該操作是異步的。

B. CloseFuture close(boolean immediately)

這個(gè)方法用于關(guān)閉IoSession,該操作也是異步的,參數指定true表示立即關(guān)閉,否則就

在所有的寫(xiě)操作都flush 之后再關(guān)閉。

C. Object setAttribute(Object key,Object value)

這個(gè)方法用于給我們向會(huì )話(huà)中添加一些屬性,這樣可以在會(huì )話(huà)過(guò)程中都可以使用,類(lèi)似于

HttpSession setAttrbute()方法。IoSession 內部使用同步的HashMap 存儲你添加的自

定義屬性。

D. SocketAddress getRemoteAddress()

這個(gè)方法獲取遠端連接的套接字地址。

E. void suspendWrite()

這個(gè)方法用于掛起寫(xiě)操作,那么有void resumeWrite()方法與之配對。對于read()方法同

樣適用。

F. ReadFuture read()

這個(gè)方法用于讀取數據, 但默認是不能使用的, 你需要調用IoSessionConfig

setUseReadOperation(true)才可以使用這個(gè)異步讀取的方法。一般我們不會(huì )用到這個(gè)方法,

因為這個(gè)方法的內部實(shí)現是將數據保存到一個(gè)BlockingQueue,假如是Server 端,因為大

量的Client 端發(fā)送的數據在Server 端都這么讀取,那么可能會(huì )導致內存泄漏,但對于

Client,可能有的時(shí)候會(huì )比較便利。

G. IoService getService()

這個(gè)方法返回與當前會(huì )話(huà)對象關(guān)聯(lián)的IoService 實(shí)例。

關(guān)于TCP連接的關(guān)閉:

無(wú)論在客戶(hù)端還是服務(wù)端,IoSession 都用于表示底層的一個(gè)TCP連接,那么你會(huì )發(fā)現無(wú)論

Server 端還是Client 端的IoSession 調用close()方法之后,TCP 連接雖然顯示關(guān)閉, 但

主線(xiàn)程仍然在運行,也就是JVM 并未退出,這是因為IoSessionclose()僅僅是關(guān)閉了TCP

的連接通道,并沒(méi)有關(guān)閉Server 端、Client 端的程序。你需要調用IoService dispose()

方法停止Server 端、Client 端。

_______________________________________________________________________________

(5.)IoSessionConfig

這個(gè)方法用于指定此次會(huì )話(huà)的配置,它有如下常用的方法:

A. void setReadBufferSize(int size)

這個(gè)方法設置讀取緩沖的字節數,但一般不需要調用這個(gè)方法,因為IoProcessor 會(huì )自動(dòng)調

整緩沖的大小。你可以調用setMinReadBufferSize()、setMaxReadBufferSize()方法,這

樣無(wú)論IoProcessor 無(wú)論如何自動(dòng)調整,都會(huì )在你指定的區間。

B. void setIdleTime(IdleStatus status,int idleTime)

這個(gè)方法設置關(guān)聯(lián)在通道上的讀、寫(xiě)或者是讀寫(xiě)事件在指定時(shí)間內未發(fā)生,該通道就進(jìn)入空

閑狀態(tài)。一旦調用這個(gè)方法,則每隔idleTime 都會(huì )回調過(guò)濾器、IoHandler中的sessionIdle()

方法。

C. void setWriteTimeout(int time)

這個(gè)方法設置寫(xiě)操作的超時(shí)時(shí)間。

D. void setUseReadOperation(boolean useReadOperation)

這個(gè)方法設置IoSession read()方法是否可用,默認是false。

_______________________________________________________________________________

(6.)IoHandler

這個(gè)接口是你編寫(xiě)業(yè)務(wù)邏輯的地方,從上面的示例代碼可以看出,讀取數據、發(fā)送數據基本

都在這個(gè)接口總完成,這個(gè)實(shí)例是綁定到IoService 上的,有且只有一個(gè)實(shí)例(沒(méi)有給一個(gè)

IoService 注入一個(gè)IoHandler 實(shí)例會(huì )拋出異常)。它有如下幾個(gè)方法:

A. void sessionCreated(IoSession session)

這個(gè)方法當一個(gè)Session 對象被創(chuàng )建的時(shí)候被調用。對于TCP 連接來(lái)說(shuō),連接被接受的時(shí)候

調用,但要注意此時(shí)TCP 連接并未建立,此方法僅代表字面含義,也就是連接的對象

IoSession 被創(chuàng )建完畢的時(shí)候,回調這個(gè)方法。

對于UDP 來(lái)說(shuō),當有數據包收到的時(shí)候回調這個(gè)方法,因為UDP 是無(wú)連接的。

B. void sessionOpened(IoSession session)

這個(gè)方法在連接被打開(kāi)時(shí)調用,它總是在sessionCreated()方法之后被調用。對于TCP 來(lái)

說(shuō),它是在連接被建立之后調用,你可以在這里執行一些認證操作、發(fā)送數據等。

對于UDP 來(lái)說(shuō),這個(gè)方法與sessionCreated()沒(méi)什么區別,但是緊跟其后執行。如果你每

隔一段時(shí)間,發(fā)送一些數據,那么sessionCreated()方法只會(huì )在第一次調用,但是

sessionOpened()方法每次都會(huì )調用。

C. void sessionClosed(IoSession session)

對于TCP 來(lái)說(shuō),連接被關(guān)閉時(shí),調用這個(gè)方法。

對于UDP 來(lái)說(shuō),IoSession close()方法被調用時(shí)才會(huì )毀掉這個(gè)方法。

D. void sessionIdle(IoSession session, IdleStatus status)

這個(gè)方法在IoSession 的通道進(jìn)入空閑狀態(tài)時(shí)調用,對于UDP協(xié)議來(lái)說(shuō),這個(gè)方法始終不會(huì )

被調用。

E. void exceptionCaught(IoSession session, Throwablecause)

這個(gè)方法在你的程序、Mina 自身出現異常時(shí)回調,一般這里是關(guān)閉IoSession。

F. void messageReceived(IoSession session, Objectmessage)

接收到消息時(shí)調用的方法,也就是用于接收消息的方法,一般情況下,message 是一個(gè)

IoBuffer 類(lèi),如果你使用了協(xié)議編解碼器,那么可以強制轉換為你需要的類(lèi)型。通常我們

都是會(huì )使用協(xié)議編解碼器的, 就像上面的例子, 因為協(xié)議編解碼器是

TextLineCodecFactory,所以我們可以強制轉message String 類(lèi)型。

G. void messageSent(IoSession session, Object message)

當發(fā)送消息成功時(shí)調用這個(gè)方法,注意這里的措辭,發(fā)送成功之后,也就是說(shuō)發(fā)送消息是不

能用這個(gè)方法的。

發(fā)送消息的時(shí)機:

發(fā)送消息應該在sessionOpened()、messageReceived()方法中調用IoSession.write()方法

完成。因為在sessionOpened()方法中,TCP 連接已經(jīng)真正打開(kāi),同樣的在messageReceived()

方法TCP 連接也是打開(kāi)狀態(tài),只不過(guò)兩者的時(shí)機不同。sessionOpened()方法是在TCP 連接

建立之后,接收到數據之前發(fā)送;messageReceived()方法是在接收到數據之后發(fā)送,你可

以完成依據收到的內容是什么樣子,決定發(fā)送什么樣的數據。

因為這個(gè)接口中的方法太多,因此通常使用適配器模式的IoHandlerAdapter,覆蓋你所感

興趣的方法即可。

_______________________________________________________________________________

(7.)IoBuffer

這個(gè)接口是對JAVA NIO ByteBuffer 的封裝,這主要是因為ByteBuffer 只提供了對基本

數據類(lèi)型的讀寫(xiě)操作,沒(méi)有提供對字符串等對象類(lèi)型的讀寫(xiě)方法,使用起來(lái)更為方便,另外,

ByteBuffer 是定長(cháng)的,如果想要可變,將很麻煩。IoBuffer 的可變長(cháng)度的實(shí)現類(lèi)似于

StringBuffer。IoBuffer ByteBuffer一樣,都是非線(xiàn)程安全的。本節的一些內容如果不

清楚,可以參考java.nio.ByteBuffer 接口。

這個(gè)接口有如下常用的方法:

A. static IoBuffer allocate(int capacity,booleanuseDirectBuffer)

這個(gè)方法內部通過(guò)SimpleBufferAllocator 創(chuàng )建一個(gè)實(shí)例,第一個(gè)參數指定初始化容量,第

二個(gè)參數指定使用直接緩沖區還是JAVA 內存堆的緩存區,默認為false。

B. void free()

釋放緩沖區,以便被一些IoBufferAllocator 的實(shí)現重用,一般沒(méi)有必要調用這個(gè)方法,除

非你想提升性能(但可能未必效果明顯)。

C. IoBuffer setAutoExpand(boolean autoExpand)

這個(gè)方法設置IoBuffer 為自動(dòng)擴展容量,也就是前面所說(shuō)的長(cháng)度可變,那么可以看出長(cháng)度

可變這個(gè)特性默認是不開(kāi)啟的。

D. IoBuffer setAutoShrink(boolean autoShrink)

這個(gè)方法設置IoBuffer 為自動(dòng)收縮,這樣在compact()方法調用之后,可以裁減掉一些沒(méi)

有使用的空間。如果這個(gè)方法沒(méi)有被調用或者設置為false,你也可以通過(guò)調用shrink()

方法手動(dòng)收縮空間。

E. IoBuffer order(ByteOrder bo)

這個(gè)方法設置是Big Endian 還是Little Endian,JAVA 中默認是Big Endian,C++和其他

語(yǔ)言一般是Little Endian。

F. IoBuffer asReadOnlyBuffer()

這個(gè)方法設置IoBuffer 為只讀的。

G. Boolean prefixedDataAvailable(int prefixLength,int maxDataLength)

這個(gè)方法用于數據的最開(kāi)始的1、2、4 個(gè)字節表示的是數據的長(cháng)度的情況,prefixLentgh

表示這段數據的前幾個(gè)字節(只能是1、2、4 的其中一個(gè))的代表的是這段數據的長(cháng)度,

maxDataLength 表示最多要讀取的字節數。返回結果依賴(lài)于等式

remaining()-prefixLength>=maxDataLength,也就是總的數據-表示長(cháng)度的字節,剩下的字

節數要比打算讀取的字節數大或者相等。

H. String getPrefixedString(int prefixLength,CharsetDecoderdecoder)

如果上面的方法返回true,那么這個(gè)方法將開(kāi)始讀取表示長(cháng)度的字節之后的數據,注意要

保持這兩個(gè)方法的prefixLength 的值是一樣的。

G、H 兩個(gè)方法在后面講到的PrefixedStringDecoder中的內部實(shí)現使用。

IoBuffer 剩余的方法與ByteBuffer 都是差不多的,額外增加了一些便利的操作方法,例如:

IoBuffer putString(String value,CharsetEncoder encoder)可以方便的以指定的編碼方

式存儲字符串、InputStream asInputStream()方法從IoBuffer 剩余的未讀的數據中轉為

輸入流等。

_______________________________________________________________________________

(8.)IoFuture

Mina 的很多操作中,你會(huì )看到返回值是XXXFuture,實(shí)際上他們都是IoFuture 的子類(lèi),

看到這樣的返回值,這個(gè)方法就說(shuō)明是異步執行的,主要的子類(lèi)有ConnectFuture、

CloseFuture 、ReadFuture 、WriteFuture。這個(gè)接口的大部分操作都和

java.util.concurrent.Future 接口是類(lèi)似的,譬如:await()、awaitUninterruptibly()

等,一般我們常用awaitUninterruptibly()方法可以等待異步執行的結果返回。

這個(gè)接口有如下常用的方法:

A. IoFuture addListener(IoFutureListener<?>listener)

這個(gè)方法用于添加一個(gè)監聽(tīng)器, 在異步執行的結果返回時(shí)監聽(tīng)器中的回調方法

operationComplete(IoFuture future),也就是說(shuō),這是替代awaitUninterruptibly()

法另一種等待異步執行結果的方法,它的好處是不會(huì )產(chǎn)生阻塞。

B. IoFuture removeListener(IoFutureListener<?>listener)

這個(gè)方法用于移除指定的監聽(tīng)器。

C. IoSession getSession()

這個(gè)方法返回當前的IoSession。

舉個(gè)例子,我們在客戶(hù)端調用connect()方法訪(fǎng)問(wèn)Server 端的時(shí)候,實(shí)際上這就是一個(gè)異

步執行的方法,也就是調用connect()方法之后立即返回,執行下面的代碼,而不管是否連

接成功。那么如果我想在連接成功之后執行一些事情(譬如:獲取連接成功后的IoSession

對象),該怎么辦呢?按照上面的說(shuō)明,你有如下兩種辦法:

第一種:

ConnectFuturefuture = connector.connect(new InetSocketAddress(

HOSTNAME,PORT));

// 等待是否連接成功,相當于是轉異步執行為同步執行。

future.awaitUninterruptibly();

// 連接成功后獲取會(huì )話(huà)對象。如果沒(méi)有上面的等待,由于connect()方法是異步的,session

可能會(huì )無(wú)法獲取。

session= future.getSession();

第二種:

ConnectFuturefuture = connector.connect(new InetSocketAddress(

HOSTNAME,PORT));

future.addListener(new IoFutureListener<ConnectFuture>() {

@Override

public void operationComplete(ConnectFuturefuture) {

try {

Thread.sleep(5000);

}catch (InterruptedException e) {

e.printStackTrace();

}

IoSessionsession = future.getSession();

System.out.println("++++++++++++++++++++++++++++");

}

});

System.out.println("*************");

為了更好的看清楚使用監聽(tīng)器是異步的,而不是像awaitUninterruptibly()那樣會(huì )阻塞主

線(xiàn)程的執行,我們在回調方法中暫停5 秒鐘,然后輸出+++,在最后輸出***。我們執行代碼

之后,你會(huì )發(fā)現首先輸出***(這證明了監聽(tīng)器是異步執行的),然后IoSession對象Created,

系統暫停5 秒,然后輸出+++,最后IoSession 對象Opened,也就是TCP 連接建立。

_______________________________________________________________________________

4.日志配置:

前面的示例代碼中提到了使用SLF4J 作為日志門(mén)面,這是因為Mina內部使用的就是SLF4J,

你也使用SLF4J 可以與之保持一致性。

Mina 如果想啟用日志跟蹤Mina 的運行細節,你可以配置LoggingFilter 過(guò)濾器,這樣你可

以看到Session 建立、打開(kāi)、空閑等一系列細節在日志中輸出,默認SJF4J 是按照DEBUG

級別輸出跟蹤信息的,如果你想給某一類(lèi)別的Mina 運行信息輸出指定日志輸出級別,可以

調用LoggingFilter setXXXLogLevel(LogLevel.XXX)。

例:

LoggingFilterlf = new LoggingFilter();

lf.setSessionOpenedLogLevel(LogLevel.ERROR);

acceptor.getFilterChain().addLast("logger",lf);

這里IoSession 被打開(kāi)的跟蹤信息將以ERROR 級別輸出到日志。

_______________________________________________________________________________

5.過(guò)濾器:

前面我們看到了LoggingFilter、ProtocolCodecFilter兩個(gè)過(guò)濾器,一個(gè)負責日志輸出,

一個(gè)負責數據的編解碼,通過(guò)最前面的Mina 執行流程圖,在IoProcessorIoHandler

間可以有很多的過(guò)濾器,這種設計方式為你提供可插拔似的擴展功能提供了非常便利的方

式,目前的Apache CXF、Apache Struts2 中的攔截器也都是一樣的設計思路。

Mina 中的IoFilter 是單例的,這與CXF、Apache Struts2 沒(méi)什么區別。

IoService 實(shí)例上會(huì )綁定一個(gè)DefaultIoFilterChainBuilder 實(shí)例,

DefaultIoFilterChainBuilder 會(huì )把使用內部的EntryImpl 類(lèi)把所有的過(guò)濾器按照順序連在

一起,組成一個(gè)過(guò)濾器鏈。

DefaultIoFilterChainBuilder 類(lèi)如下常用的方法:

A. void addFirst(String name,IoFilter filter)

這個(gè)方法把過(guò)濾器添加到過(guò)濾器鏈的頭部,頭部就是IoProcessor 之后的第一個(gè)過(guò)濾器。同

樣的addLast()方法把過(guò)濾器添加到過(guò)濾器鏈的尾部。

B. void addBefore(String baseName,String name,IoFilterfilter)

這個(gè)方法將過(guò)濾器添加到baseName 指定的過(guò)濾器的前面,同樣的addAfter()方法把過(guò)濾器

添加到baseName 指定的過(guò)濾器的后面。這里要注意無(wú)論是那種添加方法,每個(gè)過(guò)濾器的名

字(參數name)必須是唯一的。

C. IoFilter remove(Stirng name)

這個(gè)方法移除指定名稱(chēng)的過(guò)濾器,你也可以調用另一個(gè)重載的remove()方法,指定要移除

IoFilter 的類(lèi)型。

D. List<Entry> getAll()

這個(gè)方法返回當前IoService 上注冊的所有過(guò)濾器。

默認情況下,過(guò)濾器鏈中是空的,也就是getAll()方法返回長(cháng)度為0List,但實(shí)際Mina

內部有兩個(gè)隱藏的過(guò)濾器:HeadFilter、TailFilter,分別在List 的最開(kāi)始和最末端,很

明顯,TailFilter 在最末端是為了調用過(guò)濾器鏈之后,調用IoHandler。但這兩個(gè)過(guò)濾器對

你來(lái)說(shuō)是透明的,可以忽略它們的存在。

編寫(xiě)一個(gè)過(guò)濾器很簡(jiǎn)單,你需要實(shí)現IoFilter 接口,如果你只關(guān)注某幾個(gè)方法,可以繼承

IoFilterAdapter 適配器類(lèi)。IoFilter 接口中主要包含兩類(lèi)方法,一類(lèi)是與IoHandler 中的

方法名一致的方法,相當于攔截IoHandler 中的方法,另一類(lèi)是IoFilter的生命周期回調

方法,這些回調方法的執行順序和解釋如下所示:

(1.)init()在首次添加到鏈中的時(shí)候被調用,但你必須將這個(gè)IoFilter

ReferenceCountingFilter 包裝起來(lái),否則init()方法永遠不會(huì )被調用。

(2.)onPreAdd()在調用添加到鏈中的方法時(shí)被調用,但此時(shí)還未真正的加入到鏈。

(3.)onPostAdd()在調用添加到鏈中的方法后被調,如果在這個(gè)方法中有異常拋出,則過(guò)濾

器會(huì )立即被移除,同時(shí)destroy()方法也會(huì )被調用(前提是使用ReferenceCountingFilter

包裝)。

(4.)onPreRemove()在從鏈中移除之前調用。

(5.)onPostRemove()在從鏈中移除之后調用。

(6.)destory()在從鏈中移除時(shí)被調用,使用方法與init()要求相同。

無(wú)論是哪個(gè)方法,要注意必須在實(shí)現時(shí)調用參數nextFilter 的同名方法,否則,過(guò)濾器鏈

的執行將被中斷,IoHandler 中的同名方法一樣也不會(huì )被執行,這就相當于Servlet 中的

Filter 必須調用filterChain.doFilter(request,response)才能繼續前進(jìn)是一樣的道理。

示例:

public class MyIoFilterimplements IoFilter {

@Override

public void destroy()throws Exception {

System.out.println("%%%%%%%%%%%%%%%%%%%%%%%%%%%destroy");

}

@Override

public void exceptionCaught(NextFilternextFilter, IoSession

session,

Throwablecause) throws Exception {

System.out.println("%%%%%%%%%%%%%%%%%%%%%%%%%%%exceptionCaught");

nextFilter.exceptionCaught(session,cause);

}

@Override

public void filterClose(NextFilternextFilter, IoSession session)

throws Exception{

System.out.println("%%%%%%%%%%%%%%%%%%%%%%%%%%%filterClose");

nextFilter.filterClose(session);

}

@Override

public void filterWrite(NextFilternextFilter, IoSession session,

WriteRequestwriteRequest) throws Exception{

System.out.println("%%%%%%%%%%%%%%%%%%%%%%%%%%%filterWrite");

nextFilter.filterWrite(session,writeRequest);

}

@Override

public void init()throws Exception {

System.out.println("%%%%%%%%%%%%%%%%%%%%%%%%%%%init");

}

@Override

public void messageReceived(NextFilternextFilter, IoSession

session,

Objectmessage) throws Exception{

System.out.println("%%%%%%%%%%%%%%%%%%%%%%%%%%%messageReceived");

nextFilter.messageReceived(session,message);

}

@Override

public void messageSent(NextFilternextFilter, IoSession session,

WriteRequestwriteRequest) throws Exception{

System.out.println("%%%%%%%%%%%%%%%%%%%%%%%%%%%messageSent");

nextFilter.messageSent(session,writeRequest);

}

@Override

public void onPostAdd(IoFilterChainparent, String name,

NextFilternextFilter) throws Exception{

System.out.println("%%%%%%%%%%%%%%%%%%%%%%%%%%%onPostAdd");

}

@Override

public void onPostRemove(IoFilterChainparent, String name,

NextFilternextFilter) throws Exception{

System.out.println("%%%%%%%%%%%%%%%%%%%%%%%%%%%onPostRemove");

}

@Override

public void onPreAdd(IoFilterChainparent, String name,

NextFilternextFilter) throws Exception{

System.out.println("%%%%%%%%%%%%%%%%%%%%%%%%%%%onPreAdd");

}

@Override

public void onPreRemove(IoFilterChainparent, String name,

NextFilternextFilter) throws Exception{

System.out.println("%%%%%%%%%%%%%%%%%%%%%%%%%%%onPreRemove");

}

@Override

public void sessionClosed(NextFilternextFilter, IoSession session)

throws Exception{

System.out.println("%%%%%%%%%%%%%%%%%%%%%%%%%%%sessionClosed");

nextFilter.sessionClosed(session);

}

@Override

public void sessionCreated(NextFilternextFilter, IoSession session)

throws Exception{

System.out.println("%%%%%%%%%%%%%%%%%%%%%%%%%%%sessionCreated");

nextFilter.sessionCreated(session);

}

@Override

public void sessionIdle(NextFilternextFilter, IoSession session,

IdleStatusstatus) throws Exception {

System.out.println("%%%%%%%%%%%%%%%%%%%%%%%%%%%sessionIdle");

nextFilter.sessionIdle(session,status);

}

@Override

public void sessionOpened(NextFilternextFilter, IoSession session)

throws Exception{

System.out.println("%%%%%%%%%%%%%%%%%%%%%%%%%%%sessionOpened");

nextFilter.sessionOpened(session);

}

}

我們將這個(gè)攔截器注冊到上面的TCPServer IoAcceptor的過(guò)濾器鏈中的最后一個(gè):

acceptor.getFilterChain().addLast("myIoFilter",

new ReferenceCountingFilter(new MyIoFilter()));

這里我們將MyIoFilter ReferenceCountingFilter包裝起來(lái),這樣你可以看到init()、

destroy()方法調用。我們啟動(dòng)客戶(hù)端訪(fǎng)問(wèn),然后關(guān)閉客戶(hù)端,你會(huì )看到執行順序如下所示:

init_onPreAdd_onPostAdd_sessionCreated_sessionOpened_messageReceived_filt

erClose_sessionClosed_onPreRemove_onPostRemove_destroy。

IoHandler 的對應方法會(huì )跟在上面的對應方法之后執行,這也就是說(shuō)從橫向(單獨的看一個(gè)

過(guò)濾器中的所有方法的執行順序)上看,每個(gè)過(guò)濾器的執行順序是上面所示的順序;從縱向

(方法鏈的調用)上看,如果有filter1、filter2 兩個(gè)過(guò)濾器,sessionCreated()方法的

執行順序如下所示:

filter1-sessionCreated_filter2-sessionCreated_IoHandler-sessionCreated。

這里你要注意init、onPreAdd、onPostAdd 三個(gè)方法并不是在Server 啟動(dòng)時(shí)調用的,而是

IoSession 對象創(chuàng )建之前調用的,也就是說(shuō)IoFilterChain.addXXX()方法僅僅負責初始化過(guò)

濾器并注冊過(guò)濾器,但并不調用任何方法,包括init()初始化方法也是在IoProcessor 開(kāi)

始工作的時(shí)候被調用。

IoFilter 是單例的,那么init()方法是否只被執行一次呢?這個(gè)是不一定的,因為IoFilter

是被IoProcessor 調用的,而每個(gè)IoService 通常是關(guān)聯(lián)多個(gè)IoProcessor,所以IoFilter

init()方法是在每個(gè)IoProcessor 線(xiàn)程上只執行一次。關(guān)于Mina 的線(xiàn)程問(wèn)題,我們后面

會(huì )詳細討論,這里你只需要清楚,init()destroy()的調用次數與IoProceesor 的個(gè)數有

關(guān),假如一個(gè)IoService 關(guān)聯(lián)了3 個(gè)IoProcessor,有五個(gè)并發(fā)的客戶(hù)端請求,那么你會(huì )看

到三次init()方法被調用,以后將不再會(huì )調用。

Mina中自帶的過(guò)濾器:

過(guò)濾器 說(shuō)明

BlacklistFilter 設置一些IP 地址為黑名單,不允許訪(fǎng)問(wèn)。

BufferedWriteFilter 設置輸出時(shí)像BufferedOutputStream 一樣進(jìn)行緩

沖。

CompressionFilter 設置在輸入、輸出流時(shí)啟用JZlib 壓縮。

ConnectionThrottleFilter 這個(gè)過(guò)濾器指定同一個(gè)IP 地址(不含端口號)上

的請求在多長(cháng)的毫秒值內可以有一個(gè)請求,如果

小于指定的時(shí)間間隔就有連續兩個(gè)請求,那么第

二個(gè)請求將被忽略(IoSession.close())。正如

Throttle 的名字一樣,調節訪(fǎng)問(wèn)的頻率。這個(gè)過(guò)

濾器最好放在過(guò)濾器鏈的前面。

FileRegionWriteFilter 如果你想使用File 對象進(jìn)行輸出,請使用這個(gè)過(guò)

濾器。要注意,你需要使用WriteFuture 或者在

messageSent() 方法中關(guān)閉File 所關(guān)聯(lián)的

FileChannel 通道。

StreamWriteFilter 如果你想使用InputStream 對象進(jìn)行輸出,請使

用這個(gè)過(guò)濾器。要注意,你需要使用WriteFuture

或者在messageSent()方法中關(guān)閉File 所關(guān)聯(lián)的

FileChannel 通道。

NoopFilter 這個(gè)過(guò)濾器什么也不做,如果你想測試過(guò)濾器鏈

是否起作用,可以用它來(lái)測試。

ProfilerTimerFilter 這個(gè)過(guò)濾器用于檢測每個(gè)事件方法執行的時(shí)間,

所以最好放在過(guò)濾器鏈的前面。

ProxyFilter 這個(gè)過(guò)濾器在客戶(hù)端使用ProxyConnector 作為實(shí)

現時(shí),會(huì )自動(dòng)加入到過(guò)濾器鏈中,用于完成代理

功能。

RequestResponseFilter 暫不知曉。

SessionAttributeInitializingFilter 這個(gè)過(guò)濾器在IoSession 中放入一些屬性(Map),

通常放在過(guò)濾器的前面,用于放置一些初始化的

信息。

MdcInjectionFilter 針對日志輸出做MDC 操作,可以參考LOG4J MDC、

NDC 的文檔。

WriteRequestFilter CompressionFilter、RequestResponseFilter

基類(lèi),用于包裝寫(xiě)請求的過(guò)濾器。

還有一些過(guò)濾器,會(huì )在各節中詳細討論,這里沒(méi)有列出,譬如:前面的LoggingFilger 日志

過(guò)濾器。

_______________________________________________________________________________

6.協(xié)議編解碼器:

前面說(shuō)過(guò),協(xié)議編解碼器是在使用Mina 的時(shí)候你最需要關(guān)注的對象,因為在網(wǎng)絡(luò )傳輸的數

據都是二進(jìn)制數據(byte),而你在程序中面向的是JAVA 對象,這就需要你實(shí)現在發(fā)送數據

時(shí)將JAVA 對象編碼二進(jìn)制數據,而接收數據時(shí)將二進(jìn)制數據解碼為JAVA對象(這個(gè)可不是

JAVA 對象的序列化、反序列化那么簡(jiǎn)單的事情)。

Mina 中的協(xié)議編解碼器通過(guò)過(guò)濾器ProtocolCodecFilter 構造,這個(gè)過(guò)濾器的構造方法需

要一個(gè)ProtocolCodecFactory,這從前面注冊TextLineCodecFactory的代碼就可以看出來(lái)。

ProtocolCodecFactory 中有如下兩個(gè)方法:

public interface ProtocolCodecFactory {

ProtocolEncodergetEncoder(IoSession session) throws Exception;

ProtocolDecodergetDecoder(IoSession session) throws Exception;

}

因此,構建一個(gè)ProtocolCodecFactory 需要ProtocolEncoder、ProtocolDecoder 兩個(gè)實(shí)例。

你可能要問(wèn)JAVA 對象和二進(jìn)制數據之間如何轉換呢?這個(gè)要依據具體的通信協(xié)議,也就是

Server 端要和Client 端約定網(wǎng)絡(luò )傳輸的數據是什么樣的格式,譬如:第一個(gè)字節表示數據

長(cháng)度,第二個(gè)字節是數據類(lèi)型,后面的就是真正的數據(有可能是文字、有可能是圖片等等),

然后你可以依據長(cháng)度從第三個(gè)字節向后讀,直到讀取到指定第一個(gè)字節指定長(cháng)度的數據。

簡(jiǎn)單的說(shuō),HTTP 協(xié)議就是一種瀏覽器與Web 服務(wù)器之間約定好的通信協(xié)議,雙方按照指定

的協(xié)議編解碼數據。我們再直觀(guān)一點(diǎn)兒說(shuō),前面一直使用的TextLine 編解碼器就是在讀取

網(wǎng)絡(luò )上傳遞過(guò)來(lái)的數據時(shí),只要發(fā)現哪個(gè)字節里存放的是ASCII 10、13 字符(\r、\n),

就認為之前的字節就是一個(gè)字符串(默認使用UTF-8 編碼)。

以上所說(shuō)的就是各種協(xié)議實(shí)際上就是網(wǎng)絡(luò )七層結構中的應用層協(xié)議,它位于網(wǎng)絡(luò )層(IP)、

傳輸層(TCP)之上,Mina 的協(xié)議編解碼器就是讓你實(shí)現一套自己的應用層協(xié)議棧。

_______________________________________________________________________________

(6-1.)簡(jiǎn)單的編解碼器示例:

下面我們舉一個(gè)模擬電信運營(yíng)商短信協(xié)議的編解碼器實(shí)現,假設通信協(xié)議如下所示:

Msip:wap.fetion.com.cn SIP-C/2.0

S:1580101xxxx

R:1889020xxxx

L:21

HelloWorld!

這里的第一行表示狀態(tài)行,一般表示協(xié)議的名字、版本號等,第二行表示短信的發(fā)送號碼,

第三行表示短信接收的號碼,第四行表示短信的字節數,最后的內容就是短信的內容。

上面的每一行的末尾使用ASCII 10\n)作為換行符,因為這是純文本數據,協(xié)議要

求雙方使用UTF-8 對字符串編解碼。

實(shí)際上如果你熟悉HTTP協(xié)議,上面的這個(gè)精簡(jiǎn)的短信協(xié)議和HTTP 協(xié)議的組成是非常像

的,第一行是狀態(tài)行,中間的是消息報頭,最后面的是消息正文。

在解析這個(gè)短信協(xié)議之前,你需要知曉TCP的一個(gè)事項,那就是數據的發(fā)送沒(méi)有規模性,

所謂的規模性就是作為數據的接收端,不知道到底什么時(shí)候數據算是讀取完畢,所以應用層

協(xié)議在制定的時(shí)候,必須指定數據讀取的截至點(diǎn)。一般來(lái)說(shuō),有如下三種方式設置數據讀取

的長(cháng)度:

(1.)使用分隔符,譬如:TextLine 編解碼器。你可以使用\r、\n、NUL 這些ASC II 中的

特殊的字符來(lái)告訴數據接收端,你只要遇見(jiàn)分隔符,就表示數據讀完了,不用在那里傻等著(zhù)

不知道還有沒(méi)有數據沒(méi)讀完???我可不可以開(kāi)始把已經(jīng)讀取到的字節解碼為指定的數據類(lèi)

型了???

(2.)定長(cháng)的字節數,這種方式是使用長(cháng)度固定的數據發(fā)送,一般適用于指令發(fā)送,譬如:數

據發(fā)送端規定發(fā)送的數據都是雙字節,AA表示啟動(dòng)、BB 表示關(guān)閉等等。

(3.)在數據中的某個(gè)位置使用一個(gè)長(cháng)度域,表示數據的長(cháng)度,這種處理方式最為靈活,上面

的短信協(xié)議中的那個(gè)L就是短信文字的字節數,其實(shí)HTTP 協(xié)議的消息報頭中的

Content-Length也是表示消息正文的長(cháng)度,這樣數據的接收端就知道我到底讀到多長(cháng)的

字節數就表示不用再讀取數據了。

相比較解碼(字節轉為JAVA 對象,也叫做拆包)來(lái)說(shuō),編碼(JAVA對象轉為字節,也叫做

打包)就很簡(jiǎn)單了,你只需要把JAVA 對象轉為指定格式的字節流,write()就可以了。

下面我們開(kāi)始對上面的短信協(xié)議進(jìn)行編解碼處理。

第一步,協(xié)議對象:

public class SmsObject{

private Stringsender;// 短信發(fā)送者

private Stringreceiver;// 短信接受者

private Stringmessage;// 短信內容

public StringgetSender() {

return sender;

}

public void setSender(Stringsender) {

this.sender = sender;

}

public StringgetReceiver() {

return receiver;

}

public void setReceiver(Stringreceiver) {

this.receiver = receiver;

}

public StringgetMessage() {

return message;

}

public void setMessage(Stringmessage) {

this.message = message;

}

}

第二步,編碼器:

Mina 中編寫(xiě)編碼器可以實(shí)現ProtocolEncoder,其中有encode()、dispose()兩個(gè)方法需

要實(shí)現。這里的dispose()方法用于在銷(xiāo)毀編碼器時(shí)釋放關(guān)聯(lián)的資源,由于這個(gè)方法一般我

們并不關(guān)心,所以通常我們直接繼承適配器ProtocolEncoderAdapter。

public class CmccSipcEncoderextends ProtocolEncoderAdapter {

private final Charsetcharset;

public CmccSipcEncoder(Charsetcharset) {

this.charset = charset;

}

@Override

public void encode(IoSessionsession, Object message,

ProtocolEncoderOutputout) throws Exception {

SmsObjectsms = (SmsObject) message;

CharsetEncoderce = charset.newEncoder();

IoBufferbuffer = IoBuffer.allocate(100).setAutoExpand(true);

StringstatusLine = "M sip:wap.fetion.com.cn SIP-C/2.0";

Stringsender = sms.getSender();

Stringreceiver = sms.getReceiver();

StringsmsContent = sms.getMessage();

buffer.putString(statusLine+ '\n', ce);

buffer.putString("S: " + sender + '\n', ce);

buffer.putString("R: " + receiver + '\n', ce);

buffer

.putString("L: " + (smsContent.getBytes(charset).length)

+"\n",

ce);

buffer.putString(smsContent,ce);

buffer.flip();

out.write(buffer);

}

}

這里我們依據傳入的字符集類(lèi)型對message 對象進(jìn)行編碼,編碼的方式就是按照短信協(xié)議拼

裝字符串到IoBuffer 緩沖區,然后調用ProtocolEncoderOutputwrite()方法輸出字節

流。這里要注意生成短信內容長(cháng)度時(shí)的紅色代碼,我們使用String 類(lèi)與Byte[]類(lèi)型之間的

轉換方法獲得轉為字節流后的字節數。

解碼器的編寫(xiě)有以下幾個(gè)步驟:

A. encode()方法中的message對象強制轉換為指定的對象類(lèi)型;

B. 創(chuàng )建IoBuffer 緩沖區對象,并設置為自動(dòng)擴展;

C. 將轉換后的message 對象中的各個(gè)部分按照指定的應用層協(xié)議進(jìn)行組裝,并put()

IoBuffer 緩沖區;

D. 當你組裝數據完畢之后,調用flip()方法,為輸出做好準備,切記在write()方法之前,

要調用IoBuffer flip()方法,否則緩沖區的position 的后面是沒(méi)有數據可以用來(lái)輸

出的,你必須調用flip()方法將position 移至0,limit 移至剛才的position。這個(gè)

flip()方法的含義請參看java.nio.ByteBuffer。

E. 最后調用ProtocolEncoderOutput write()方法輸出IoBuffer 緩沖區實(shí)例。

第三步,解碼器:

Mina 中編寫(xiě)解碼器,可以實(shí)現ProtocolDecoder 接口,其中有decode()、finishDecode()、

dispose()三個(gè)方法。這里的finishDecode()方法可以用于處理在IoSession 關(guān)閉時(shí)剩余的

未讀取數據,一般這個(gè)方法并不會(huì )被使用到,除非協(xié)議中未定義任何標識數據什么時(shí)候截止

的約定,譬如:Http 響應的Content-Length 未設定,那么在你認為讀取完數據后,關(guān)閉TCP

連接(IoSession 的關(guān)閉)后,就可以調用這個(gè)方法處理剩余的數據,當然你也可以忽略調

剩余的數據。同樣的,一般情況下,我們只需要繼承適配器ProtocolDecoderAdapter,關(guān)

decode()方法即可。

但前面說(shuō)過(guò)解碼器相對編碼器來(lái)說(shuō),最麻煩的是數據發(fā)送過(guò)來(lái)的規模,以聊天室為例,一個(gè)

TCP 連接建立之后,那么隔一段時(shí)間就會(huì )有聊天內容發(fā)送過(guò)來(lái),也就是decode()方法會(huì )被往

復調用,這樣處理起來(lái)就會(huì )非常麻煩。那么Mina 中幸好提供了CumulativeProtocolDecoder

類(lèi),從名字上可以看出累積性的協(xié)議解碼器,也就是說(shuō)只要有數據發(fā)送過(guò)來(lái),這個(gè)類(lèi)就會(huì )去

讀取數據,然后累積到內部的IoBuffer 緩沖區,但是具體的拆包(把累積到緩沖區的數據

解碼為JAVA 對象)交由子類(lèi)的doDecode()方法完成,實(shí)際上CumulativeProtocolDecoder

就是在decode()反復的調用暴漏給子類(lèi)實(shí)現的doDecode()方法。

具體執行過(guò)程如下所示:

A. 你的doDecode()方法返回true時(shí),CumulativeProtocolDecoder decode()方法會(huì )首

先判斷你是否在doDecode()方法中從內部的IoBuffer 緩沖區讀取了數據,如果沒(méi)有,

則會(huì )拋出非法的狀態(tài)異常,也就是你的doDecode()方法返回true就表示你已經(jīng)消費了

本次數據(相當于聊天室中一個(gè)完整的消息已經(jīng)讀取完畢),進(jìn)一步說(shuō),也就是此時(shí)你

必須已經(jīng)消費過(guò)內部的IoBuffer 緩沖區的數據(哪怕是消費了一個(gè)字節的數據)。如果

驗證過(guò)通過(guò),那么CumulativeProtocolDecoder 會(huì )檢查緩沖區內是否還有數據未讀取,

如果有就繼續調用doDecode()方法,沒(méi)有就停止對doDecode()方法的調用,直到有新

的數據被緩沖。

B. 當你的doDecode()方法返回false 時(shí),CumulativeProtocolDecoder 會(huì )停止對doDecode()

方法的調用,但此時(shí)如果本次數據還有未讀取完的,就將含有剩余數據的IoBuffer

沖區保存到IoSession 中,以便下一次數據到來(lái)時(shí)可以從IoSession中提取合并。如果

發(fā)現本次數據全都讀取完畢,則清空IoBuffer 緩沖區。

簡(jiǎn)而言之,當你認為讀取到的數據已經(jīng)夠解碼了,那么就返回true,否則就返回false。這

個(gè)CumulativeProtocolDecoder 其實(shí)最重要的工作就是幫你完成了數據的累積,因為這個(gè)工

作是很煩瑣的。

public class CmccSipcDecoderextends CumulativeProtocolDecoder {

private final Charsetcharset;

public CmccSipcDecoder(Charsetcharset) {

this.charset = charset;

}

@Override

protected boolean doDecode(IoSession session, IoBuffer in,

ProtocolDecoderOutputout) throws Exception {

IoBufferbuffer = IoBuffer.allocate(100).setAutoExpand(true);

CharsetDecodercd = charset.newDecoder();

int matchCount= 0;

StringstatusLine = "",sender = "", receiver = "", length = "",

sms= "";

int i= 1;

while (in.hasRemaining()){

byte b= in.get();

buffer.put(b);

if (b== 10 && i < 5) {

matchCount++;

if (i== 1) {

buffer.flip();

statusLine= buffer.getString(matchCount, cd);

statusLine= statusLine.substring(0,

statusLine.length()- 1);

matchCount= 0;

buffer.clear();

}

if (i== 2) {

buffer.flip();

sender= buffer.getString(matchCount, cd);

sender= sender.substring(0, sender.length() - 1);

matchCount= 0;

buffer.clear();

}

if (i== 3) {

buffer.flip();

receiver= buffer.getString(matchCount, cd);

receiver= receiver.substring(0, receiver.length() -

1);

matchCount= 0;

buffer.clear();

}

if (i== 4) {

buffer.flip();

length= buffer.getString(matchCount, cd);

length= length.substring(0, length.length() - 1);

matchCount= 0;

buffer.clear();

}

i++;

}else if (i == 5) {

matchCount++;

if (matchCount== Long.parseLong(length.split(": ")[1]))

{

buffer.flip();

sms= buffer.getString(matchCount, cd);

i++;

break;

}

}else {

matchCount++;

}

}

SmsObjectsmsObject = new SmsObject();

smsObject.setSender(sender.split(": ")[1]);

smsObject.setReceiver(receiver.split(": ")[1]);

smsObject.setMessage(sms);

out.write(smsObject);

return false;

}

}

我們的這個(gè)短信協(xié)議解碼器使用\nASCII 10 字符)作為分解點(diǎn),一個(gè)字節一個(gè)字節的

讀取,那么第一次發(fā)現\n 的字節位置之前的部分,必然就是短信協(xié)議的狀態(tài)行,依次類(lèi)推,

你就可以解析出來(lái)發(fā)送者、接受者、短信內容長(cháng)度。然后我們在解析短信內容時(shí),使用獲取

到的長(cháng)度進(jìn)行讀取。全部讀取完畢之后, 然后構造SmsObject 短信對象, 使用

ProtocolDecoderOutput write()方法輸出,最后返回false,也就是本次數據全部讀取

完畢,告知CumulativeProtocolDecoder 在本次數據讀取中不需要再調用doDecode()方法

了。

這里需要注意的是兩個(gè)狀態(tài)變量i、matchCount,i 用于記錄解析到了短信協(xié)議中的哪一行

\n),matchCount 記錄在當前行中讀取到了哪一個(gè)字節。狀態(tài)變量在解碼器中經(jīng)常被使用,

我們這里的情況比較簡(jiǎn)單,因為我們假定短信發(fā)送是在一次數據發(fā)送中完成的,所以狀態(tài)變

量的使用也比較簡(jiǎn)單。假如數據的發(fā)送被拆成了多次(譬如:短信協(xié)議的短信內容、消息報

頭被拆成了兩次數據發(fā)送),那么上面的代碼勢必就會(huì )存在問(wèn)題,因為當第二次調用

doDecode()方法時(shí),狀態(tài)變量i、matchCount勢必會(huì )被重置,也就是原來(lái)的狀態(tài)值并沒(méi)有被

保存。那么我們如何解決狀態(tài)保存的問(wèn)題呢?

答案就是將狀態(tài)變量保存在IoSession 中或者是Decoder實(shí)例自身,但推薦使用前者,因為

雖然Decoder 是單例的,其中的實(shí)例變量保存的狀態(tài)在Decoder實(shí)例銷(xiāo)毀前始終保持,但

Mina 并不保證每次調用doDecode()方法時(shí)都是同一個(gè)線(xiàn)程(這也就是說(shuō)第一次調用

doDecode()IoProcessor-1 線(xiàn)程,第二次有可能就是IoProcessor-2 線(xiàn)程),這就會(huì )產(chǎn)生

多線(xiàn)程中的實(shí)例變量的可視性(Visibility,具體請參考JAVA 的多線(xiàn)程知識)問(wèn)題。IoSession

中使用一個(gè)同步的HashMap 保存對象,所以你不需要擔心多線(xiàn)程帶來(lái)的問(wèn)題。

使用IoSession 保存解碼器的狀態(tài)變量通常的寫(xiě)法如下所示:

A. 在解碼器中定義私有的內部類(lèi)Context,然后將需要保存的狀態(tài)變量定義在Context

存儲。

B. 在解碼器中定義方法獲取這個(gè)Context 的實(shí)例,這個(gè)方法的實(shí)現要優(yōu)先從IoSession

獲取Context。

具體代碼示例如下所示:

// 上下文作為保存狀態(tài)的內部類(lèi)的名字,意思很明顯,就是讓狀態(tài)跟隨上下文,在整個(gè)調

用過(guò)程中都可以被保持。

publicclass XXXDecoder extends CumulativeProtocolDecoder{

privatefinal AttributeKey CONTEXT =

newAttributeKey(getClass(), "context" );

publicContext getContext(IoSession session){

Contextctx=(Context)session.getAttribute(CONTEXT);

if(ctx==null){

ctx=newContext();

session.setAttribute(CONTEXT,ctx);

}

}

privateclass Context {

//態(tài)變

}

}

注意這里我們使用了Mina 自帶的AttributeKey 類(lèi)來(lái)定義保存在IoSession 中的對象的鍵

值,這樣可以有效的防止鍵值重復。另外,要注意在全部處理完畢之后,狀態(tài)要復位,譬如:

聊天室中的一條消息讀取完畢之后,狀態(tài)變量要變?yōu)槌跏贾?,以便下次處理時(shí)重新使用。

第四步,編解碼工廠(chǎng):

public class CmccSipcCodecFactoryimplements ProtocolCodecFactory {

private final CmccSipcEncoderencoder;

private final CmccSipcDecoderdecoder;

public CmccSipcCodecFactory(){

this(Charset.defaultCharset());

}

public CmccSipcCodecFactory(CharsetcharSet) {

this.encoder = new CmccSipcEncoder(charSet);

this.decoder = new CmccSipcDecoder(charSet);

}

@Override

public ProtocolDecodergetDecoder(IoSession session) throws

Exception{

return decoder;

}

@Override

public ProtocolEncodergetEncoder(IoSession session) throws

Exception{

return encoder;

}

}

實(shí)際上這個(gè)工廠(chǎng)類(lèi)就是包裝了編碼器、解碼器,通過(guò)接口中的getEncoder()、getDecoder()

方法向ProtocolCodecFilter 過(guò)濾器返回編解碼器實(shí)例,以便在過(guò)濾器中對數據進(jìn)行編解碼

處理。

第五步,運行示例:

下面我們修改最一開(kāi)始的示例中的MyServer、MyClient 的代碼,如下所示:

acceptor.getFilterChain().addLast(

"codec",

new ProtocolCodecFilter(new CmccSipcCodecFactory(Charset

.forName("UTF-8"))));

connector.getFilterChain().addLast(

"codec",

new ProtocolCodecFilter(new

CmccSipcCodecFactory(

Charset.forName("UTF-8"))));

然后我們在ClientHandler 中發(fā)送一條短信:

public void sessionOpened(IoSessionsession) {

SmsObjectsms = new SmsObject();

sms.setSender("15801012253");

sms.setReceiver("18869693235");

sms.setMessage("你好!Hello World!");

session.write(sms);

}

最后我們在MyIoHandler 中接收這條短信息:

public void messageReceived(IoSessionsession, Object message)

throws Exception{

SmsObjectsms = (SmsObject) message;

log.info("The message received is[" +sms.getMessage() + "]");

}

你會(huì )看到Server 端的控制臺輸出如下信息:

Themessage received is [你好!Hello World!]

_______________________________________________________________________________

(6-2.)復雜的解碼器:

下面我們講解一下如何在解碼器中保存狀態(tài)變量,也就是真正的實(shí)現上面所說(shuō)的Context。

我們假設這樣一種情況,有兩條短信:

Msip:wap.fetion.com.cn SIP-C/2.0

S:1580101xxxx

R:1889020xxxx

L: 21

Hello World!

M sip:wap.fetion.com.cn SIP-C/2.0

S: 1580101xxxx

R: 1889020xxxx

L: 21

Hello World!

他們按照上面的顏色標識發(fā)送,也就是說(shuō)紅色部分、藍色部分、綠色部分分別發(fā)送(調用三

IoSession.write()方法),那么如果你還用上面的CmccSipcDecoder,將無(wú)法工作,因為

第一次數據流(紅色部分)發(fā)送過(guò)取時(shí),數據是不完整的,無(wú)法解析出一條短信息,當二次

數據流(藍色部分)發(fā)送過(guò)去時(shí),已經(jīng)可以解析出第一條短信息了,但是第二條短信還是不

完整的,需要等待第三次數據流(綠色部分)的發(fā)送。

注意:由于模擬數據發(fā)送的規模性問(wèn)題很麻煩,所以這里采用了這種極端的例子說(shuō)明問(wèn)題,

雖不具有典型性,但很能說(shuō)明問(wèn)題,這就足夠了,所以不要追究這種發(fā)送消息是否在真實(shí)環(huán)

境中存在,更不要追究其合理性。

CmccSispcDecoder 類(lèi)改為如下的寫(xiě)法:

public class CmccSipcDecoderextends CumulativeProtocolDecoder {

private final Charsetcharset;

private final AttributeKeyCONTEXT = new AttributeKey(getClass(),

"context");

public CmccSipcDecoder(Charsetcharset) {

this.charset = charset;

}

@Override

protected boolean doDecode(IoSession session, IoBuffer in,

ProtocolDecoderOutputout) throws Exception {

Contextctx = getContext(session);

CharsetDecodercd = charset.newDecoder();

int matchCount= ctx.getMatchCount();

int line= ctx.getLine();

IoBufferbuffer = ctx.innerBuffer;

StringstatusLine = ctx.getStatusLine(),

sender= ctx.getSender(),

receiver= ctx.getReceiver(),

length= ctx.getLength(),

sms= ctx.getSms();

while (in.hasRemaining()){

byte b= in.get();

matchCount++;

buffer.put(b);

if (line< 4 && b == 10) {

if (line== 0) {

buffer.flip();

statusLine= buffer.getString(matchCount, cd);

statusLine= statusLine.substring(0,

statusLine.length()- 1);

matchCount= 0;

buffer.clear();

ctx.setStatusLine(statusLine);

}

if (line== 1) {

buffer.flip();

sender= buffer.getString(matchCount, cd);

sender= sender.substring(0, sender.length() - 1);

matchCount= 0;

buffer.clear();

ctx.setSender(sender);

}

if (line== 2) {

buffer.flip();

receiver= buffer.getString(matchCount, cd);

receiver= receiver.substring(0, receiver.length() -

1);

matchCount= 0;

buffer.clear();

ctx.setReceiver(receiver);

}

if (line== 3) {

buffer.flip();

length= buffer.getString(matchCount, cd);

length= length.substring(0, length.length() - 1);

matchCount= 0;

buffer.clear();

ctx.setLength(length);

}

line++;

}else if (line == 4) {

if (matchCount== Long.parseLong(length.split(": ")[1]))

{

buffer.flip();

sms= buffer.getString(matchCount, cd);

ctx.setSms(sms);

// 由于下面的break,這里需要調用else外面的兩行代碼

ctx.setMatchCount(matchCount);

ctx.setLine(line);

break;

}

}

ctx.setMatchCount(matchCount);

ctx.setLine(line);

}

if (ctx.getLine()== 4

&&Long.parseLong(ctx.getLength().split(": ")[1]) == ctx

.getMatchCount()){

SmsObjectsmsObject = new SmsObject();

smsObject.setSender(sender.split(": ")[1]);

smsObject.setReceiver(receiver.split(": ")[1]);

smsObject.setMessage(sms);

out.write(smsObject);

ctx.reset();

return true;

}else {

return false;

}

}

private ContextgetContext(IoSession session) {

Contextcontext = (Context) session.getAttribute(CONTEXT);

if (context== null) {

context= new Context();

session.setAttribute(CONTEXT, context);

}

return context;

}

private class Context{

private final IoBufferinnerBuffer;

private StringstatusLine = "";

private Stringsender = "";

private Stringreceiver = "";

private Stringlength = "";

private Stringsms = "";

public Context(){

innerBuffer =IoBuffer.allocate(100).setAutoExpand(true);

}

private int matchCount =0;

private int line = 0;

public int getMatchCount(){

return matchCount;

}

public void setMatchCount(int matchCount) {

this.matchCount = matchCount;

}

public int getLine(){

return line;

}

public void setLine(int line) {

this.line = line;

}

public StringgetStatusLine() {

return statusLine;

}

public void setStatusLine(StringstatusLine) {

this.statusLine = statusLine;

}

public StringgetSender() {

return sender;

}

public void setSender(Stringsender) {

this.sender = sender;

}

public StringgetReceiver() {

return receiver;

}

public void setReceiver(Stringreceiver) {

this.receiver = receiver;

}

public StringgetLength() {

return length;

}

public void setLength(Stringlength) {

this.length = length;

}

public StringgetSms() {

return sms;

}

public void setSms(Stringsms) {

this.sms = sms;

}

public void reset(){

this.innerBuffer.clear();

this.matchCount = 0;

this.line = 0;

this.statusLine = "";

this.sender = "";

this.receiver = "";

this.length = "";

this.sms = "";

}

}

}

這里我們做了如下的幾步操作:

(1.) 所有記錄狀態(tài)的變量移到了Context 內部類(lèi)中,包括記錄讀到短信協(xié)議的哪一行的

line。每一行讀取了多少個(gè)字節的matchCount,還有記錄解析好的狀態(tài)行、發(fā)送者、

接受者、短信內容、累積數據的innerBuffer 等。這樣就可以在數據不能完全解碼,

等待下一次doDecode()方法的調用時(shí),還能承接上一次調用的數據。

(2.) doDecode()方法中主要的變化是各種狀態(tài)變量首先是從Context 中獲取,然后操

作之后,將最新的值setXXX()Context 中保存。

(3.) 這里注意doDecode()方法最后的判斷,當認為不夠解碼為一條短信息時(shí),返回

false,也就是在本次數據流解碼中不要再調用doDecode()方法;當認為已經(jīng)解碼

出一條短信息時(shí),輸出短消息,然后重置所有的狀態(tài)變量,返回true,也就是如果

本次數據流解碼中還有沒(méi)解碼完的數據,繼續調用doDecode()方法。

下面我們對客戶(hù)端稍加改造,來(lái)模擬上面的紅、藍、綠三次發(fā)送聊天短信息的情況:

MyClient

ConnectFuturefuture = connector.connect(new InetSocketAddress(

HOSTNAME,PORT));

future.awaitUninterruptibly();

session= future.getSession();

for (int i = 0; i < 3; i++) {

SmsObjectsms = new SmsObject();

session.write(sms);

System.out.println("****************"+ i);

}

這里我們?yōu)榱朔奖阊菔?,不?span lang="EN-US">IoHandler 中發(fā)送消息,而是直接在MyClient 中發(fā)送,你要

注意的是三次發(fā)送都要使用同一個(gè)IoSession,否則就不是從同一個(gè)通道發(fā)送過(guò)去的了。

CmccSipcEncoder

public void encode(IoSessionsession, Object message,

ProtocolEncoderOutputout) throws Exception {

SmsObjectsms = (SmsObject) message;

CharsetEncoderce = charset.newEncoder();

StringstatusLine = "M sip:wap.fetion.com.cn SIP-C/2.0";

Stringsender = "15801012253";

Stringreceiver = "15866332698";

StringsmsContent = "你好!HelloWorld!";

IoBufferbuffer = IoBuffer.allocate(100).setAutoExpand(true);

buffer.putString(statusLine+ '\n', ce);

buffer.putString("S:" + sender + '\n', ce);

buffer.putString("R:" + receiver + '\n', ce);

buffer.flip();

out.write(buffer);

IoBuffer buffer2 = IoBuffer.allocate(100).setAutoExpand(true);

buffer2.putString("L: " + (smsContent.getBytes(charset).length)

+ "\n",ce);

buffer2.putString(smsContent, ce);

buffer2.putString(statusLine + '\n', ce);

buffer2.flip();

out.write(buffer2);

IoBuffer buffer3 = IoBuffer.allocate(100).setAutoExpand(true);

buffer3.putString("S: " + sender + '\n', ce);

buffer3.putString("R: " + receiver + '\n', ce);

buffer3.putString("L: " + (smsContent.getBytes(charset).length)

+ "\n",ce);

buffer3.putString(smsContent, ce);

buffer3.putString(statusLine + '\n', ce);

buffer3.flip();

out.write(buffer3);

}

上面的這段代碼要配合MyClient來(lái)操作,你需要做的是在MyClient中的紅色輸出語(yǔ)句處設置

斷點(diǎn),然后第一調用時(shí)CmccSipcEncoder中注釋掉藍、綠色的代碼,也就是發(fā)送兩條短信息的

第一部分(紅色的代碼),依次類(lèi)推,也就是MyClient的中的三次斷點(diǎn)中,分別執行

CmccSipcEncoder中的紅、藍、綠三段代碼,也就是模擬兩條短信的三段發(fā)送。

你會(huì )看到Server端的運行結果是:當MyClient第一次到達斷點(diǎn)時(shí),沒(méi)有短信息被讀取到,當

MyClient第二次到達斷點(diǎn)時(shí),第一條短信息輸出,當MyClient第三次到達斷點(diǎn)時(shí),第二條短

信息輸出。

Mina中自帶的解碼器:

解碼器 說(shuō)明

CumulativeProtocolDecoder 累積性解碼器,上面我們重點(diǎn)說(shuō)明了這個(gè)解

碼器的用法。

SynchronizedProtocolDecoder 這個(gè)解碼器用于將任何一個(gè)解碼器包裝為一

個(gè)線(xiàn)程安全的解碼器,用于解決上面說(shuō)的每

次執行decode()方法時(shí)可能線(xiàn)程不是上一次

的線(xiàn)程的問(wèn)題,但這樣會(huì )在高并發(fā)時(shí),大大

降低系統的性能。

TextLineDecoder 按照文本的換行符( Windows:\r\n 、

Linux:\n、Mac:\r)解碼數據。

PrefixedStringDecoder 這個(gè)類(lèi)繼承自CumulativeProtocolDecoder

類(lèi),用于讀取數據最前端的1、2、4 個(gè)字節

表示后面的數據長(cháng)度的數據。譬如:一個(gè)段

數據的前兩個(gè)字節表示后面的真實(shí)數據的長(cháng)

度,那么你就可以用這個(gè)方法進(jìn)行解碼。

_______________________________________________________________________________

(6-3.)多路分離的解碼器:

假設一段數據發(fā)送過(guò)來(lái)之后,需要根據某種條件決定使用哪個(gè)解碼器,而不是像上面的例子,

固定使用一個(gè)解碼器,那么該如何做呢?

幸好Mina 提供了org.apache.mina.filter.codec.demux包來(lái)完成這種多路分離

Demultiplexes)的解碼工作,也就是同時(shí)注冊多個(gè)解碼器,然后運行時(shí)依據傳入的數據

決定到底使用哪個(gè)解碼器來(lái)工作。所謂多路分離就是依據條件分發(fā)到指定的解碼器,譬如:

上面的短信協(xié)議進(jìn)行擴展,可以依據狀態(tài)行來(lái)判斷使用1.0 版本的短信協(xié)議解碼器還是2.0

版本的短信協(xié)議解碼器。

下面我們使用一個(gè)簡(jiǎn)單的例子,說(shuō)明這個(gè)多路分離的解碼器是如何使用的,需求如下所示:

(1.) 客戶(hù)端傳入兩個(gè)int 類(lèi)型的數字,還有一個(gè)char 類(lèi)型的符號。

(2.) 如果符號是+,服務(wù)端就是用1 號解碼器,對兩個(gè)數字相加,然后把結果返回給客戶(hù)

端。

(3.) 如果符號是-,服務(wù)端就使用2 號解碼器,將兩個(gè)數字變?yōu)橄喾磾?,然后相加,把結

果返回給客戶(hù)端。

Demux 開(kāi)發(fā)編解碼器主要有如下幾個(gè)步驟:

A. 定義Client 端、Server端發(fā)送、接收的數據對象。

B. 使用Demux 編寫(xiě)編碼器是實(shí)現MessageEncoder<T>接口,T 是你要編碼的數據對象,這

個(gè)MessageEncoder 會(huì )在DemuxingProtocolEncoder中調用。

C. 使用Demux 編寫(xiě)編碼器是實(shí)現MessageDecoder接口,這個(gè)MessageDecoder 會(huì )在

DemuxingProtocolDecoder 中調用。

D. DemuxingProtocolCodecFactory 中調用addMessageEncoder()、addMessageDecoder()

方法組裝編解碼器。

MessageEncoder的接口如下所示:

public interface MessageEncoder<T> {

void encode(IoSessionsession, T message, ProtocolEncoderOutput out)

throws Exception;

}

你注意到消息編碼器接口與在ProtocolEncoder 中沒(méi)什么不同,區別就是Object message

被泛型具體化了類(lèi)型,你不需要手動(dòng)的類(lèi)型轉換了。

MessageDecoder的接口如下所示:

public interface MessageDecoder {

static MessageDecoderResultOK = MessageDecoderResult.OK;

static MessageDecoderResultNEED_DATA =

MessageDecoderResult.NEED_DATA;

static MessageDecoderResultNOT_OK = MessageDecoderResult.NOT_OK;

MessageDecoderResultdecodable(IoSession session, IoBuffer in);

MessageDecoderResultdecode(IoSession session, IoBuffer in,

ProtocolDecoderOutputout) throws Exception;

void finishDecode(IoSessionsession, ProtocolDecoderOutput out)

throws Exception;

}

(1.)decodable()方法有三個(gè)返回值,分別表示如下的含義:

A. MessageDecoderResult.NOT_OK:表示這個(gè)解碼器不適合解碼數據,然后檢查其它解碼

器,如果都不滿(mǎn)足會(huì )拋異常;

B. MessageDecoderResult.NEED_DATA:表示當前的讀入的數據不夠判斷是否能夠使用這

個(gè)解碼器解碼,然后再次調用decodable()方法檢查其它解碼器,如果都是NEED_DATA,

則等待下次輸入;

C. MessageDecoderResult.OK: 表示這個(gè)解碼器可以解碼讀入的數據, 然后則調用

MessageDecoder decode()方法。

這里注意decodable()方法對參數IoBuffer in 的任何操作在方法結束之后,都會(huì )復原,也就是

你不必擔心在調用decode()方法時(shí),position 已經(jīng)不在緩沖區的起始位置。這個(gè)方法相當于

是預讀取,用于判斷是否是可用的解碼器。

(2.)decode()方法有三個(gè)返回值,分別表示如下的含義:

A. MessageDecoderResult.NOT_OK:表示解碼失敗,會(huì )拋異常;

B. MessageDecoderResult.NEED_DATA:表示數據不夠,需要讀到新的數據后,再次調用

decode()方法。

C. MessageDecoderResult.OK:表示解碼成功。

代碼演示:

(1.)客戶(hù)端發(fā)送的數據對象:

public class SendMessage{

private int i = 0;

private int j = 0;

private char symbol ='+';

public char getSymbol(){

return symbol;

}

public void setSymbol(char symbol) {

this.symbol = symbol;

}

public int getI(){

return i;

}

public void setI(int i) {

this.i = i;

}

public int getJ(){

return j;

}

public void setJ(int j) {

this.j = j;

}

}

(2.)服務(wù)端發(fā)送的返回結果對象:

public class ResultMessage{

private int result =0;

public int getResult(){

return result;

}

public void setResult(int result) {

this.result = result;

}

}

(3.)客戶(hù)端使用的SendMessage的編碼器:

public class SendMessageEncoderimplements MessageEncoder<SendMessage>

{

@Override

public void encode(IoSessionsession, SendMessage message,

ProtocolEncoderOutputout) throws Exception {

IoBufferbuffer = IoBuffer.allocate(10);

buffer.putChar(message.getSymbol());

buffer.putInt(message.getI());

buffer.putInt(message.getJ());

buffer.flip();

out.write(buffer);

}

}

這里我們的SendMessage、ResultMessage 中的字段都是用長(cháng)度固定的基本數據類(lèi)型,這樣

IoBuffer 就不需要自動(dòng)擴展了,提高性能。按照一個(gè)char、兩個(gè)int 計算,這里的IoBuffer

只需要10 個(gè)字節的長(cháng)度就可以了。

(4.)服務(wù)端使用的SendMessage1號解碼器:

public class SendMessageDecoderPositiveimplements MessageDecoder {

@Override

public MessageDecoderResultdecodable(IoSession session, IoBuffer in)

{

if (in.remaining()< 2)

return MessageDecoderResult.NEED_DATA;

else {

char symbol= in.getChar();

if (symbol== '+') {

return MessageDecoderResult.OK;

}else {

return MessageDecoderResult.NOT_OK;

}

}

}

@Override

public MessageDecoderResultdecode(IoSession session, IoBuffer in,

ProtocolDecoderOutputout) throws Exception {

SendMessagesm = new SendMessage();

sm.setSymbol(in.getChar());

sm.setI(in.getInt());

sm.setJ(in.getInt());

out.write(sm);

return MessageDecoderResult.OK;

}

@Override

public void finishDecode(IoSessionsession, ProtocolDecoderOutput

out)

throws Exception{

// undo

}

}

因為客戶(hù)端發(fā)送的SendMessage 的前兩個(gè)字節(char)就是符號位,所以我們在decodable()

方法中對此條件進(jìn)行了判斷,之后讀到兩個(gè)字節,并且這兩個(gè)字節表示的字符是+時(shí),才認

為這個(gè)解碼器可用。

(5.)服務(wù)端使用的SendMessage2號解碼器:

public class SendMessageDecoderNegativeimplements MessageDecoder {

@Override

public MessageDecoderResultdecodable(IoSession session, IoBuffer in)

{

if (in.remaining()< 2)

return MessageDecoderResult.NEED_DATA;

else {

char symbol= in.getChar();

if (symbol== '-') {

return MessageDecoderResult.OK;

}else {

return MessageDecoderResult.NOT_OK;

}

}

}

@Override

public MessageDecoderResultdecode(IoSession session, IoBuffer in,

ProtocolDecoderOutputout) throws Exception {

SendMessagesm = new SendMessage();

sm.setSymbol(in.getChar());

sm.setI(-in.getInt());

sm.setJ(-in.getInt());

out.write(sm);

return MessageDecoderResult.OK;

}

@Override

public void finishDecode(IoSessionsession, ProtocolDecoderOutput

out)

throws Exception{

// undo

}

}

(6.)服務(wù)端使用的ResultMessage的編碼器:

public class ResultMessageEncoderimplements

MessageEncoder<ResultMessage>{

@Override

public void encode(IoSessionsession, ResultMessage message,

ProtocolEncoderOutputout) throws Exception {

IoBufferbuffer = IoBuffer.allocate(4);

buffer.putInt(message.getResult());

buffer.flip();

out.write(buffer);

}

}

(7.)客戶(hù)端使用的ResultMessage的解碼器:

public class ResultMessageDecoderimplements MessageDecoder {

@Override

public MessageDecoderResultdecodable(IoSession session, IoBuffer in)

{

if (in.remaining()< 4)

return MessageDecoderResult.NEED_DATA;

else if (in.remaining()== 4)

return MessageDecoderResult.OK;

else

return MessageDecoderResult.NOT_OK;

}

@Override

public MessageDecoderResultdecode(IoSession session, IoBuffer in,

ProtocolDecoderOutputout) throws Exception {

ResultMessagerm = new ResultMessage();

rm.setResult(in.getInt());

out.write(rm);

return MessageDecoderResult.OK;

}

@Override

public void finishDecode(IoSessionsession, ProtocolDecoderOutput

out)

throws Exception{

// undo

}

}

(8.)組裝這些編解碼器的工廠(chǎng):

public class MathProtocolCodecFactoryextends

DemuxingProtocolCodecFactory{

public MathProtocolCodecFactory(boolean server) {

if (server){

super.addMessageEncoder(ResultMessage.class,

ResultMessageEncoder.class);

super.addMessageDecoder(SendMessageDecoderPositive.class);

super.addMessageDecoder(SendMessageDecoderNegative.class);

}else {

super

.addMessageEncoder(SendMessage.class,

SendMessageEncoder.class);

super.addMessageDecoder(ResultMessageDecoder.class);

}

}

}

這個(gè)工廠(chǎng)類(lèi)我們使用了構造方法的一個(gè)布爾類(lèi)型的參數,以便其可以在Server 端、Client

端同時(shí)使用。我們以Server 端為例,你可以看到調用兩次addMessageDecoder()方法添加

1 號、2 號解碼器,其實(shí)DemuxingProtocolDecoder 內部在維護了一個(gè)MessageDecoder

數組,用于保存添加的所有的消息解碼器,每次decode()的時(shí)候就調用每個(gè)MessageDecoder

decodable()方法逐個(gè)檢查,只要發(fā)現一個(gè)MessageDecoder不是對應的解碼器,就從數

組中移除,直到找到合適的MessageDecoder,如果最后發(fā)現數組為空,就表示沒(méi)找到對應

MessageDecoder,最后拋出異常。

(9.)Server端:

public class Server{

public static void main(String[] args) throws Exception{

IoAcceptoracceptor = new NioSocketAcceptor();

LoggingFilterlf = new LoggingFilter();

acceptor.getSessionConfig().setIdleTime(IdleStatus.BOTH_IDLE,

5);

acceptor.getFilterChain().addLast("logger", lf);

acceptor.getFilterChain().addLast("codec",

new ProtocolCodecFilter(new

MathProtocolCodecFactory(true)));

acceptor.setHandler(new ServerHandler());

acceptor.bind(new InetSocketAddress(9123));

}

}

(10.)Server端使用的IoHandler

public class ServerHandlerextends IoHandlerAdapter {

private final static Logger log =LoggerFactory

.getLogger(ServerHandler.class);

@Override

public void sessionIdle(IoSessionsession, IdleStatus status)

throws Exception{

session.close(true);

}

@Override

public void messageReceived(IoSessionsession, Object message)

throws Exception{

SendMessagesm = (SendMessage) message;

log.info("The message received is [" + sm.getI() +" "

+sm.getSymbol() + " " +sm.getJ() + " ]");

ResultMessagerm = new ResultMessage();

rm.setResult(sm.getI()+ sm.getJ());

session.write(rm);

}

}

(11.)Client端:

public class Client{

public static void main(String[] args) throws Throwable{

IoConnectorconnector = new NioSocketConnector();

connector.setConnectTimeoutMillis(30000);

connector.getFilterChain().addLast("logger", new

LoggingFilter());

connector.getFilterChain().addLast("codec",

new ProtocolCodecFilter(new

MathProtocolCodecFactory(false)));

connector.setHandler(new ClientHandler());

connector.connect(new InetSocketAddress("localhost", 9123));

}

}

(12.)Client端的IoHandler

public class ClientHandlerextends IoHandlerAdapter {

private final static Logger LOGGER =LoggerFactory

.getLogger(ClientHandler.class);

@Override

public void sessionOpened(IoSessionsession) throws Exception{

SendMessagesm = new SendMessage();

sm.setI(100);

sm.setJ(99);

sm.setSymbol('+');

session.write(sm);

}

@Override

public void messageReceived(IoSessionsession, Object message) {

ResultMessagers = (ResultMessage) message;

LOGGER.info(String.valueOf(rs.getResult()));

}

}

你嘗試改變(12.)中的紅色代碼中的正負號,會(huì )看到服務(wù)端使用了兩個(gè)不同的解碼器對其進(jìn)

行處理。

_______________________________________________________________________________

7.線(xiàn)程模型配置:

Mina 中的很多執行環(huán)節都使用了多線(xiàn)程機制,用于提高性能。Mina 中默認在三個(gè)地方使用

了線(xiàn)程:

(1.) IoAcceptor

這個(gè)地方用于接受客戶(hù)端的連接建立,每監聽(tīng)一個(gè)端口(每調用一次bind()方法),都啟用

一個(gè)線(xiàn)程,這個(gè)數字我們不能改變。這個(gè)線(xiàn)程監聽(tīng)某個(gè)端口是否有請求到來(lái),一旦發(fā)現,則

創(chuàng )建一個(gè)IoSession 對象。因為這個(gè)動(dòng)作很快,所以有一個(gè)線(xiàn)程就夠了。

(2.) IoConnector

這個(gè)地方用于與服務(wù)端建立連接,每連接一個(gè)服務(wù)端(每調用一次connect()方法),就啟

用一個(gè)線(xiàn)程,我們不能改變。同樣的,這個(gè)線(xiàn)程監聽(tīng)是否有連接被建立,一旦發(fā)現,則創(chuàng )建

一個(gè)IoSession 對象。因為這個(gè)動(dòng)作很快,所以有一個(gè)線(xiàn)程就夠了。

(3.) IoProcessor

這個(gè)地方用于執行真正的IO 操作,默認啟用的線(xiàn)程個(gè)數是CPU 的核數+1,譬如:?jiǎn)?span lang="EN-US">CPU

核的電腦,默認的IoProcessor 線(xiàn)程會(huì )創(chuàng )建3 個(gè)。這也就是說(shuō)一個(gè)IoAcceptor 或者

IoConnector 默認會(huì )關(guān)聯(lián)一個(gè)IoProcessor 池,這個(gè)池中有3 個(gè)IoProcessor。因為IO 操作

耗費資源,所以這里使用IoProcessor 池來(lái)完成數據的讀寫(xiě)操作,有助于提高性能。這也就

是前面說(shuō)的IoAccetor、IoConnector 使用一個(gè)Selector,而IoProcessor 使用自己?jiǎn)为毜?span lang="EN-US">

Selector 的原因。

那么為什么IoProcessor 池中的IoProcessor 數量只比CPU 的核數大1 呢?因為IO 讀寫(xiě)操

作是耗費CPU 的操作,而每一核CPU 同時(shí)只能運行一個(gè)線(xiàn)程,因此IoProcessor 池中的

IoProcessor 的數量并不是越多越好。

這個(gè)IoProcessor 的數量可以調整,如下所示:

IoAcceptoracceptor=new NioSocketAcceptor(5);

IoConnectorconnector=new NioSocketConnector(5);

這樣就會(huì )將IoProcessor 池中的數量變?yōu)?span lang="EN-US">5 個(gè),也就是說(shuō)可以同時(shí)處理5 個(gè)讀寫(xiě)操作。

還記得前面說(shuō)過(guò)Mina 的解碼器要使用IoSession 保存狀態(tài)變量,而不是Decoder 本身,這

是因為Mina 不保證每次執行doDecode()方法的都是同一個(gè)IoProcessor 這句話(huà)嗎?其實(shí)這

個(gè)問(wèn)題的根本原因是IoProcessor 是一個(gè)池,每次IoSession進(jìn)入空閑狀態(tài)時(shí)(無(wú)讀些數據

發(fā)生),IoProcessor 都會(huì )被回收到池中,以便其他的IoSession使用,所以當IoSession

從空閑狀態(tài)再次進(jìn)入繁忙狀態(tài)時(shí),IoProcessor 會(huì )再次分配給其一個(gè)IoProcessor 實(shí)例,而

此時(shí)已經(jīng)不能保證還是上一次繁忙狀態(tài)時(shí)的那個(gè)IoProcessor 了。

你還會(huì )發(fā)現IoAcceptor 、IoConnector 還有一個(gè)構造方法,你可以指定一個(gè)

java.util.concurrent.Executor 類(lèi)作為線(xiàn)程池對象,那么這個(gè)線(xiàn)程池對象是做什么用的

呢?其實(shí)就是用于創(chuàng )建(1.)、(2.)中的用于監聽(tīng)是否有TCP 連接建立的那個(gè)線(xiàn)程,默認情況

下,使用Executors.newCachedThreadPool()方法創(chuàng )建Executor 實(shí)例,也就是一個(gè)無(wú)界的

線(xiàn)程池(具體內容請參看JAVA 的并發(fā)庫)。大家不要試圖改變這個(gè)Executor的實(shí)例,也就

是使用內置的即可,否則可能會(huì )造成一些莫名其妙的問(wèn)題,譬如:性能在某個(gè)訪(fǎng)問(wèn)量級別時(shí),

突然下降。因為無(wú)界線(xiàn)程池是有多少個(gè)Socket 建立,就分配多少個(gè)線(xiàn)程,如果你改為

Executors 的其他創(chuàng )建線(xiàn)程池的方法,創(chuàng )建了一個(gè)有界線(xiàn)程池,那么一些請求將無(wú)法得到及

時(shí)響應,從而出現一些問(wèn)題。

下面我們完整的綜述一下Mina 的工作流程:

(1.) IoService 實(shí)例創(chuàng )建的時(shí)候,同時(shí)一個(gè)關(guān)聯(lián)在IoService 上的IoProcessor 池、

線(xiàn)程池也被創(chuàng )建;

(2.) IoService 建立套接字(IoAcceptor bind()或者是IoConnector connect()

方法被調用)時(shí),IoService 從線(xiàn)程池中取出一個(gè)線(xiàn)程,監聽(tīng)套接字端口;

(3.) IoService 監聽(tīng)到套接字上有連接請求時(shí),建立IoSession 對象,從IoProcessor

池中取出一個(gè)IoProcessor 實(shí)例執行這個(gè)會(huì )話(huà)通道上的過(guò)濾器、IoHandler;

(4.) 當這條IoSession 通道進(jìn)入空閑狀態(tài)或者關(guān)閉時(shí),IoProcessor 被回收。

上面說(shuō)的是Mina 默認的線(xiàn)程工作方式,那么我們這里要講的是如何配置IoProcessor 的多

線(xiàn)程工作方式。因為一個(gè)IoProcessor 負責執行一個(gè)會(huì )話(huà)上的所有過(guò)濾器、IoHandler,也

就是對于IO 讀寫(xiě)操作來(lái)說(shuō),是單線(xiàn)程工作方式(就是按照順序逐個(gè)執行)。假如你想讓某個(gè)

事件方法(譬如:sessionIdle()、sessionOpened()等)在單獨的線(xiàn)程中運行(也就是非

IoProcessor 所在的線(xiàn)程),那么這里就需要用到一個(gè)ExecutorFilter 的過(guò)濾器。

你可以看到IoProcessor 的構造方法中有一個(gè)參數是java.util.concurrent.Executor,也

就是可以讓IoProcessor 調用的過(guò)濾器、IoHandler中的某些事件方法在線(xiàn)程池中分配的線(xiàn)

程上獨立運行,而不是運行在IoProcessor 所在的線(xiàn)程。

例:

acceptor.getFilterChain().addLast("exceutor",new ExecutorFilter());

我們看到是用這個(gè)功能,簡(jiǎn)單的一行代碼就可以了。那么ExecutorFilter 還有許多重載的

構造方法,這些重載的有參構造方法,參數主要用于指定如下信息:

(1.) 指定線(xiàn)程池的屬性信息,譬如:核心大小、最大大小、等待隊列的性質(zhì)等。

你特別要關(guān)注的是ExecutorFilter 內部默認使用的是OrderedThreadPoolExecutor作為線(xiàn)

程池的實(shí)現,從名字上可以看出是保證各個(gè)事件在多線(xiàn)程執行中的順序(譬如:各個(gè)事件方

法的執行是排他的,也就是不可能出現兩個(gè)事件方法被同時(shí)執行;messageReceived()總是

sessionClosed() 方法之前執行), 這是因為多線(xiàn)程的執行是異步的, 如果沒(méi)有

OrderedThreadPoolExecutor 來(lái)保證IoHandler 中的方法的調用順序,可能會(huì )出現嚴重的問(wèn)

題。但是如果你的代碼確實(shí)沒(méi)有依賴(lài)于IoHandler 中的事件方法的執行順序,那么你可以使

UnorderedThreadPoolExecutor 作為線(xiàn)程池的實(shí)現。

因此,你也最好不要改變默認的Executor 實(shí)現,否則,事件的執行順序就會(huì )混亂,譬如:

messageReceived()、messageSent()方法被同時(shí)執行。

(2.) 哪些事件方法被關(guān)注,也就哪些事件方法用這個(gè)線(xiàn)程池執行。

線(xiàn)程池可以異步執行的事件類(lèi)型是位于IoEventType 中的九個(gè)枚舉值中除了

SESSION_CREATED 之外的其余八個(gè),這說(shuō)明Session 建立的事件只能與IoProcessor 在同一

個(gè)線(xiàn)程上執行。

public enum IoEventType{

SESSION_CREATED,

SESSION_OPENED,

SESSION_CLOSED,

MESSAGE_RECEIVED,

MESSAGE_SENT,

SESSION_IDLE,

EXCEPTION_CAUGHT,

WRITE,

CLOSE,

}

默認情況下,沒(méi)有配置關(guān)注的事件類(lèi)型,有如下六個(gè)事件方法會(huì )被自動(dòng)使用線(xiàn)程池異步執行:

IoEventType.EXCEPTION_CAUGHT,

IoEventType.MESSAGE_RECEIVED,

IoEventType.MESSAGE_SENT,

IoEventType.SESSION_CLOSED,

IoEventType.SESSION_IDLE,

IoEventType.SESSION_OPENED

其實(shí)ExecutorFilter 的工作機制很簡(jiǎn)單,就是在調用下一個(gè)過(guò)濾器的事件方法時(shí),把其交

Executor execute(Runnablerunnable)方法來(lái)執行,其實(shí)你自己在IoHandler 或者某

個(gè)過(guò)濾器的事件方法中開(kāi)啟一個(gè)線(xiàn)程,也可以完成同樣的功能,只不過(guò)這樣做,你就失去了

程序的可配置性,線(xiàn)程調用的代碼也會(huì )完全耦合在代碼中。但要注意的是絕對不能開(kāi)啟線(xiàn)程

讓其執行sessionCreated()方法。

如果你真的打算使用這個(gè)ExecutorFilter,那么最好想清楚它該放在過(guò)濾器鏈的哪個(gè)位置,

針對哪些事件做異步處理機制。一般ExecutorFilter 都是要放在ProtocolCodecFilter 過(guò)

濾器的后面,也就是不要讓編解碼運行在獨立的線(xiàn)程上,而是要運行在IoProcessor 所在的

線(xiàn)程,因為編解碼處理的數據都是由IoProcessor 讀取和發(fā)送的,沒(méi)必要開(kāi)啟新的線(xiàn)程,否

則性能反而會(huì )下降。一般使用ExecutorFilter 的典型場(chǎng)景是將業(yè)務(wù)邏輯(譬如:耗時(shí)的數

據庫操作)放在單獨的線(xiàn)程中運行,也就是說(shuō)與IO 處理無(wú)關(guān)的操作可以考慮使用

ExecutorFilter 來(lái)異步執行。

_______________________________________________________________________________

8. Spring集成:

Mina Spring 集成很簡(jiǎn)單,我們這里以Server 端為例,如下所示:

<!--構造協(xié)議編解碼過(guò)濾器 -->

<bean id="cmccSipcCodecFilter"

class="org.apache.mina.filter.codec.ProtocolCodecFilter">

<constructor-arg>

<bean

class="com.cmcc.fetionwap.ms.mina.codec.sipc.CmccSipcCodecFactory">

<constructor-arg index="0" type="java.nio.charset.Charset">

<value>UTF-8</value>

</constructor-arg>

</bean>

</constructor-arg>

</bean>

<!--構造日志過(guò)濾器 -->

<bean id="cmccSipcloggingFilter"

class="org.apache.mina.filter.logging.LoggingFilter"/>

<!--構造過(guò)濾器鏈-->

<bean id="cmccSipcFilterChainBuilder"

class="org.apache.mina.core.filterchain.DefaultIoFilterChainBuild

er">

<property name="filters">

<map>

<entry key="cmccSipcloggingFilter"

value-ref="cmccSipcloggingFilter" />

<entry key="cmccSipcCodecFilter"

value-ref="cmccSipcCodecFilter" />

</map>

</property>

</bean>

<!--構造屬性編輯器 -->

<bean

class="org.springframework.beans.factory.config.CustomEditorConfigure

r">

<property name="customEditors">

<map>

<entry key="java.net.SocketAddress">

<bean

class="org.apache.mina.integration.beans.InetSocketAddressEditor"/>

</entry>

<entry key="java.nio.charset.Charset">

<bean

class="com.cmcc.fetionwap.ms.mina.beans.CharsetEditor"/>

</entry>

</map>

</property>

</bean>

<!--構造Server-->

<bean id="cmccSipcIoAcceptor"

class="org.apache.mina.transport.socket.nio.NioSocketAcceptor"

init-method="bind" destroy-method="unbind">

<property name="defaultLocalAddress" value=":9123" />

<property name="handler" ref="sipcServerHandler" />

<property name="filterChainBuilder"

ref="cmccSipcFilterChainBuilder"/>

</bean>

這里我們看到與Spring 集成只不過(guò)是把代碼方式的寫(xiě)法按照編寫(xiě)順序改成了XML 的形式,

無(wú)外乎沒(méi)有脫離Spring 使用構造器、setXXX()的注入屬性的方法。這里你唯一需要注意的

就是“屬性編輯器”,這是因為一些屬性在XML 中寫(xiě)為String 類(lèi)型,但實(shí)際JAVA 類(lèi)型中要

求注入的是一個(gè)其他的對象類(lèi)型,你需要對此做出轉換。譬如:上面的“構造Server 端”

的參數defaultLocalAddress我們傳入的是一個(gè)字符串,但實(shí)際上NioSocketAcceptor

中需要的是一個(gè)InetSocketAddress,這里就需要一個(gè)編輯器將XML 中注入的字符串構造為

InetSocketAddress 對象。在Mina 自帶的包org.apache.mina.integration.beans 中提供

了很多的屬性編輯器,如果你發(fā)現某個(gè)屬性的編輯器沒(méi)有提供,可以自行編寫(xiě),譬如:上面

的協(xié)議編解碼過(guò)濾器中的字符集實(shí)際需要的是一個(gè)java.nio.charset.Charset,因此我編

寫(xiě)了如下的屬性編輯器:

public class CharsetEditorextends PropertyEditorSupport {

private Objectvalue;

@Override

public void setAsText(Stringtext) throws IllegalArgumentException

{

if (text!= null)

this.value = Charset.forName(text);

else

this.value = Charset.forName("UTF-8");

}

@Override

public ObjectgetValue() {

return value;

}

}

我們看到這里完成了UTF-8 這個(gè)字符串到java.nio.charset.Charset的轉換。具體的關(guān)于

Spring 的屬性編輯器的內容,請參看Spring 的文檔。

_______________________________________________________________________________

9. JMX集成:

Mina 提供org.apache.mina.integration.jmx 包可以讓你管理Mina 中的一些對象,我們舉

例如下所示:

MBeanServermBeanServer = ManagementFactory.getPlatformMBeanServer();

IoServiceMBeanacceptorMBean = new IoServiceMBean(acceptor);

ObjectNameacceptorName = new ObjectName(acceptor.getClass()

.getPackage().getName()+":type=acceptor,name="+

acceptor.getClass().getSimpleName());

mBeanServer.registerMBean(acceptorMBean,acceptorName);

通過(guò)這段代碼,我們就將IoAcceptor 的實(shí)例acceptor納入到JMX 的管理,我們運行Server

端,然后我們啟用JConsole,連接MyServer 所在的JVM 進(jìn)程,如下圖所示:

我們看到NioSocketAcceptor 成為了JMX 的受管組件,你可以在這里查看并修改每一個(gè)屬性,

調用每一個(gè)方法,譬如:你單擊dispose 按鈕,會(huì )發(fā)現MyServer所在的進(jìn)程被結束掉,因

NioSocketAcceptor 所關(guān)聯(lián)的所有資源都被銷(xiāo)毀。

除了上面演示所用的IoServiceMBean 之外,Mina 還未我們提供了IoFilterMBean、

IoSessionMBean、ObjectMBean 分別用于將IoFilter、IoSession 以及普通的JAVA Bean

對象包裝為JMX 的受管組件。

_______________________________________________________________________________

10. UDP協(xié)議的示例:

Mina 中使用UDP 協(xié)議與TCP 協(xié)議沒(méi)什么太大的區別,只不過(guò)是實(shí)現類(lèi)換成了

XXXDatagramYYY,這就是使用框架的好處,統一了編程模型,所以我們只是簡(jiǎn)單的看一下例

子。

(1.)UDPServer

public class UDPServer{

public static void main(String[] args) throws Exception{

IoAcceptoracceptor = new NioDatagramAcceptor();

acceptor.setHandler(new UDPServerHandler());

acceptor.getFilterChain().addLast("logger", new

LoggingFilter());

((DatagramSessionConfig)acceptor.getSessionConfig())

.setReuseAddress(true);

acceptor.bind(new InetSocketAddress(9122));

}

}

我們這里使用UDP 協(xié)議綁定了9122 端口,同時(shí)設置了這個(gè)端口可以被重用。

(2.) UDPServerHandler

public class UDPServerHandlerextends IoHandlerAdapter {

private final static Logger log =LoggerFactory

.getLogger(UDPServerHandler.class);

public void messageReceived(IoSessionsession, Object message)

throws Exception{

IoBufferbuffer = (IoBuffer) message;

Stringmsg = buffer.getString(3,

Charset.forName("UTF-8").newDecoder());

log.info("The message received is[" + msg + "]");

}

@Override

public void sessionClosed(IoSessionsession) throws Exception{

log.debug("******************* SessionClosed!");

}

@Override

public void sessionCreated(IoSessionsession) throws Exception{

log.debug("******************* SessionCreated!");

}

@Override

public void sessionIdle(IoSessionsession, IdleStatus status)

throws Exception{

log.debug(session+ "*******************Session Idle!");

}

@Override

public void sessionOpened(IoSessionsession) throws Exception{

log.debug("******************* SessionOpened!");

}

@Override

public void exceptionCaught(IoSessionsession, Throwable cause)

throws Exception{

log.error(cause.getMessage());

session.close(false);

}

@Override

public void messageSent(IoSessionsession, Object message) throws

Exception{

log.debug("*******************messageSent!");

}

}

這里我們在messageReceived()中接收數據,為了簡(jiǎn)單,我們這里不使用編解碼過(guò)濾器,因

message 轉換為IoBuffer,稍后你會(huì )看到客戶(hù)端只發(fā)送了三個(gè)英文字母,所以這里我們

讀取三個(gè)字節就可以了。

(3.) UDPClient

public class UDPClient{

public static void main(String[] args) throws Exception{

IoConnectorconnector = new NioDatagramConnector();

connector.setHandler(new UDPClientHandler());

ConnectFutureconnFuture = connector.connect(new

InetSocketAddress(

"localhost", 9122));

connFuture.awaitUninterruptibly();

IoSessionsession = connFuture.getSession();

IoBufferbuffer = IoBuffer.allocate(3);

buffer.putString("OK!",

Charset.forName("UTF-8").newEncoder());

buffer.flip();

WriteFuturefuture = session.write(buffer);

future.awaitUninterruptibly();

connector.dispose();

}

}

UDPClientHandler 的代碼全都是空實(shí)現,因為我們在main()方法中發(fā)送數據。

運行程序,你會(huì )發(fā)現數據發(fā)送、接收成功。__

本站僅提供存儲服務(wù),所有內容均由用戶(hù)發(fā)布,如發(fā)現有害或侵權內容,請點(diǎn)擊舉報。
打開(kāi)APP,閱讀全文并永久保存 查看更多類(lèi)似文章
猜你喜歡
類(lèi)似文章
深入理解Apache Mina (3)---- 與IoHandler相關(guān)的幾個(gè)類(lèi)
mina 之初體驗
Apache MINA實(shí)戰之 對象傳輸
Mina編程的兩個(gè)注意點(diǎn)
java中文件操作大全
Java RandomAccessFile的使用
更多類(lèi)似文章 >>
生活服務(wù)
分享 收藏 導長(cháng)圖 關(guān)注 下載文章
綁定賬號成功
后續可登錄賬號暢享VIP特權!
如果VIP功能使用有故障,
可點(diǎn)擊這里聯(lián)系客服!

聯(lián)系客服

欧美性猛交XXXX免费看蜜桃,成人网18免费韩国,亚洲国产成人精品区综合,欧美日韩一区二区三区高清不卡,亚洲综合一区二区精品久久