雖然新的 Java I/O 框架( java.nio )能解決 I/O 支持所具有的多數性能問(wèn)題,但是它并沒(méi)有滿(mǎn)足使用字節數組和管道的應用程序內部通信的所有性能需求。本文是分兩部分的系列文章的最后一篇,Java 密碼專(zhuān)家和作家 Merlin Hughes 開(kāi)發(fā)了一組新的流,以補充標準的 Java I/O 字節數組流類(lèi)和管道流類(lèi),在設計中強調以高性能為目標。請到關(guān)于本文的 討論論壇,與作者和其他讀者分享您對本文的看法。(您也可以單擊文章頂部或底部的 討論。)
在 本系列的第一篇文章中,您學(xué)習了解決從只能寫(xiě)出數據的源讀取數據的問(wèn)題的一些不同方法。在可能的解決方案中,我們研究了怎樣使用字節數組流、管道流以及直接處理該問(wèn)題的定制框架。定制方法顯然是最有效率的解決方案;但是,分析其它幾種方法有助于看清標準 Java 流的一些問(wèn)題。具體地說(shuō),字節數組輸出流并不提供可提供對它的內容進(jìn)行只讀訪(fǎng)問(wèn)的高效機制,管道流的性能通常很差。
為了處理這些問(wèn)題,我們將在本文中實(shí)現功能同樣齊全的替換類(lèi),但在實(shí)現時(shí)更強調性能。讓我們先來(lái)簡(jiǎn)要地討論一下同步問(wèn)題,因為它與 I/O 流有關(guān)。
同步問(wèn)題
一般來(lái)說(shuō),我推薦在不是特別需要同步的情況下避免不必要地使用同步。顯然,如果多個(gè)線(xiàn)程需并發(fā)地訪(fǎng)問(wèn)一個(gè)類(lèi),那么這個(gè)類(lèi)需確保線(xiàn)程安全。但是,在許多情況下并不需要并發(fā)的訪(fǎng)問(wèn),同步成了不必要的開(kāi)銷(xiāo)。例如,對流的并發(fā)訪(fǎng)問(wèn)自然是不確定的 ― 您無(wú)法預測哪些數據被先寫(xiě)入,也無(wú)法預測哪個(gè)線(xiàn)程讀了哪些數據 ― 也就是說(shuō),在多數情況下,對流的并發(fā)訪(fǎng)問(wèn)是沒(méi)用的。所以,對所有的流強制同步是不提供實(shí)際好處的花費。如果某個(gè)應用程序要求線(xiàn)程安全,那么通過(guò)應用程序自己的同步原語(yǔ)可以強制線(xiàn)程安全。
事實(shí)上,Collection 類(lèi)的 API 作出了同樣的選擇:在缺省的情況下,set、list 等等都不是線(xiàn)程安全的。如果應用程序想使用線(xiàn)程安全的 Collection,那么它可以使用 Collections 類(lèi)來(lái)創(chuàng )建一個(gè)線(xiàn)程安全的包裝器來(lái)包裝非線(xiàn)程安全的 Collection。如果這種作法是有用的,那么應用程序可以使用完全相同的機制來(lái)包裝流,以使它線(xiàn)程安全;例如, OutputStream out = Streams.synchronizedOutputStream (byteStream) 。請參閱附帶的 源代碼中的 Streams 類(lèi),這是一個(gè)實(shí)現的示例。
所以,對于我所認為的多個(gè)并發(fā)線(xiàn)程無(wú)法使用的類(lèi),我沒(méi)用同步來(lái)為這些類(lèi)提供線(xiàn)程安全。在您廣泛采用這種方式前,我推薦您研究一下 Java 語(yǔ)言規范(Java Language Specification)的 Threads and Locks那一章(請參閱 參考資料),以理解潛在的缺陷;具體地說(shuō),在未使用同步的情況下無(wú)法確保讀寫(xiě)的順序,所以,對不同步的只讀方法的并發(fā)訪(fǎng)問(wèn)可能導致意外的行為,盡管這種訪(fǎng)問(wèn)看起來(lái)是無(wú)害的。
更好的字節數組輸出流
當您需要把未知容量的數據轉儲到內存緩沖區時(shí), ByteArrayOutputStream 類(lèi)是使用效果很好的流。當我為以后再次讀取而存儲一些數據時(shí),我經(jīng)常使用這個(gè)類(lèi)。但是,使用 toByteArray() 方法來(lái)取得對結果數據的讀訪(fǎng)問(wèn)是很低效的,因為它實(shí)際返回的是內部字節數組的副本。對于小容量的數據,使用這種方式不會(huì )有太大問(wèn)題;然而,隨著(zhù)容量增大,這種方式的效率被不必要地降低了。這個(gè)類(lèi)必須復制數據,因為它不能強制對結果字節數組進(jìn)行只讀訪(fǎng)問(wèn)。如果它返回它的內部緩沖區,那么在一般的情況下,接收方無(wú)法保證該緩沖區未被同一數組的另一個(gè)接收方并發(fā)地修改。
StringBuffer 類(lèi)已解決了類(lèi)似的問(wèn)題;它提供可寫(xiě)的字符緩沖區,它還支持高效地返回能從內部字符數組直接讀取的只讀 String 。因為 StringBuffer 類(lèi)控制著(zhù)對它的內部數組的寫(xiě)訪(fǎng)問(wèn),所以它僅在必要時(shí)才復制它的數組;也就是說(shuō),當它導出了 String 且后來(lái)調用程序修改了 StringBuffer 的時(shí)候。如果沒(méi)有發(fā)生這樣的修改,那么任何不必要的復制都不會(huì )被執行。通過(guò)支持能夠強制適當的訪(fǎng)問(wèn)控制的字節數組的包裝器,新的 I/O 框架以類(lèi)似的方式解決了這個(gè)問(wèn)題。
我們可以使用相同的通用機制為需要使用標準流 API 的應用程序提供高效的數據緩沖和再次讀取。我們的示例給出了可替代 ByteArrayOutputStream 類(lèi)的類(lèi),它能高效地導出對內部緩沖區的只讀訪(fǎng)問(wèn),方法是返回直接讀取內部字節數組的只讀 InputStream 。
我們來(lái)看一下代碼。清單 1 中的構造函數分配了初始緩沖區,以存儲寫(xiě)到這個(gè)流的數據。為了存儲更多的數據,該緩沖區將按需自動(dòng)地擴展。
清單 1. 不同步的字節數組輸出流 package org.merlin.io; import java.io.*; /** * An unsynchronized ByteArrayOutputStream alternative that efficiently * provides read-only access to the internal byte array with no * unnecessary copying. * * @author Copyright (c) 2002 Merlin Hughes <merlin@merlin.org> */ public class BytesOutputStream extends OutputStream { private static final int DEFAULT_INITIAL_BUFFER_SIZE = 8192; // internal buffer private byte[] buffer; private int index, capacity; // is the stream closed? private boolean closed; // is the buffer shared? private boolean shared; public BytesOutputStream () { this (DEFAULT_INITIAL_BUFFER_SIZE); } public BytesOutputStream (int initialBufferSize) { capacity = initialBufferSize; buffer = new byte[capacity]; } |
清單 2 顯示的是寫(xiě)方法。這些方法按需擴展內部緩沖區,然后把新數據復制進(jìn)來(lái)。在擴展內部緩沖區時(shí),我們使緩沖區的大小增加了一倍再加上存儲新數據所需的容量;這樣,為了存儲任何所需的數據,緩沖區的容量成指數地增長(cháng)。為了提高效率,如果您知道您將寫(xiě)入的數據的預期容量,那么您應該指定相應的初始緩沖區的大小。 close() 方法只是設置了一個(gè)合適的標志。
清單 2. 寫(xiě)方法 public void write (int datum) throws IOException { if (closed) { throw new IOException ("Stream closed"); } else { if (index >= capacity) { // expand the internal buffer capacity = capacity * 2 + 1; byte[] tmp = new byte[capacity]; System.arraycopy (buffer, 0, tmp, 0, index); buffer = tmp; // the new buffer is not shared shared = false; } // store the byte buffer[index ++] = (byte) datum; } } public void write (byte[] data, int offset, int length) throws IOException { if (data == null) { throw new NullPointerException (); } else if ((offset < 0) || (offset + length > data.length) || (length < 0)) { throw new IndexOutOfBoundsException (); } else if (closed) { throw new IOException ("Stream closed"); } else { if (index + length > capacity) { // expand the internal buffer capacity = capacity * 2 + length; byte[] tmp = new byte[capacity]; System.arraycopy (buffer, 0, tmp, 0, index); buffer = tmp; // the new buffer is not shared shared = false; } // copy in the subarray System.arraycopy (data, offset, buffer, index, length); index += length; } } public void close () { closed = true; } |
清單 3 中的字節數組抽取方法返回內部字節數組的副本。因為我們無(wú)法防止調用程序把數據寫(xiě)到結果數組,所以我們無(wú)法安全地返回對內部緩沖區的直接引用。
清單 3. 轉換成字節數組 public byte[] toByteArray () { // return a copy of the internal buffer byte[] result = new byte[index]; System.arraycopy (buffer, 0, result, 0, index); return result; } |
當方法提供對存儲的數據的只讀訪(fǎng)問(wèn)的時(shí)候,它們可以安全地高效地直接使用內部字節數組。清單 4 顯示了兩個(gè)這樣的方法。 writeTo() 方法把這個(gè)流的內容寫(xiě)到輸出流;它直接從內部緩沖區進(jìn)行寫(xiě)操作。 toInputStream() 方法返回了可被高效地讀取數據的輸入流。它所返回的 BytesInputStream (這是 ByteArrayInputStream 的非同步替代品。)能直接從我們的內部字節數組讀取數據。在這個(gè)方法中,我們還設置了標志,以表示內部緩沖區正被輸入流共享。這一點(diǎn)很重要,因為這樣做可以防止在內部緩沖區正被共享時(shí)這個(gè)流被修改。
清單 4. 只讀訪(fǎng)問(wèn)方法 public void writeTo (OutputStream out) throws IOException { // write the internal buffer directly out.write (buffer, 0, index); } public InputStream toInputStream () { // return a stream reading from the shared internal buffer shared = true; return new BytesInputStream (buffer, 0, index); } |
可能會(huì )覆蓋共享數據的唯一的一個(gè)方法是顯示在清單 5 中的 reset() 方法,該方法清空了這個(gè)流。所以,如果 shared 等于 true 且 reset() 被調用,那么我們創(chuàng )建新的內部緩沖區,而不是重新設置寫(xiě)索引。
清單 5. 重新設置流 public void reset () throws IOException { if (closed) { throw new IOException ("Stream closed"); } else { if (shared) { // create a new buffer if it is shared buffer = new byte[capacity]; shared = false; } // reset index index = 0; } } } |
更好的字節數組輸入流
用 ByteArrayInputStream 類(lèi)來(lái)提供對內存中的二進(jìn)制數據基于流的讀訪(fǎng)問(wèn)是很理想的。但是,有時(shí)候,它的兩個(gè)設計特點(diǎn)使我覺(jué)得需要一個(gè)替代它的類(lèi)。第一,這個(gè)類(lèi)是同步的;我已講過(guò),對于多數應用程序來(lái)說(shuō)沒(méi)有這個(gè)必要。第二,如果在執行 mark() 前調用它所實(shí)現的 reset() 方法,那么 reset() 將忽略初始讀偏移。這兩點(diǎn)都不是缺陷;但是,它們不一定總是人們所期望的。
清單 6 中的 BytesInputStream 類(lèi)是不同步的較為普通的字節數組輸入流類(lèi)。
清單 6. 不同步的字節數組輸入流 package org.merlin.io; import java.io.*; /** * An unsynchronized ByteArrayInputStream alternative. * * @author Copyright (c) 2002 Merlin Hughes <merlin@merlin.org> */ public class BytesInputStream extends InputStream { // buffer from which to read private byte[] buffer; private int index, limit, mark; // is the stream closed? private boolean closed; public BytesInputStream (byte[] data) { this (data, 0, data.length); } public BytesInputStream (byte[] data, int offset, int length) { if (data == null) { throw new NullPointerException (); } else if ((offset < 0) || (offset + length > data.length) || (length < 0)) { throw new IndexOutOfBoundsException (); } else { buffer = data; index = offset; limit = offset + length; mark = offset; } } public int read () throws IOException { if (closed) { throw new IOException ("Stream closed"); } else if (index >= limit) { return -1; // EOF } else { return buffer[index ++] & 0xff; } } public int read (byte data[], int offset, int length) throws IOException { if (data == null) { throw new NullPointerException (); } else if ((offset < 0) || (offset + length > data.length) || (length < 0)) { throw new IndexOutOfBoundsException (); } else if (closed) { throw new IOException ("Stream closed"); } else if (index >= limit) { return -1; // EOF } else { // restrict length to available data if (length > limit - index) length = limit - index; // copy out the subarray System.arraycopy (buffer, index, data, offset, length); index += length; return length; } } public long skip (long amount) throws IOException { if (closed) { throw new IOException ("Stream closed"); } else if (amount <= 0) { return 0; } else { // restrict amount to available data if (amount > limit - index) amount = limit - index; index += (int) amount; return amount; } } public int available () throws IOException { if (closed) { throw new IOException ("Stream closed"); } else { return limit - index; } } public void close () { closed = true; } public void mark (int readLimit) { mark = index; } public void reset () throws IOException { if (closed) { throw new IOException ("Stream closed"); } else { // reset index index = mark; } } public boolean markSupported () { return true; } } |
使用新的字節數組流
清單 7 中的代碼演示了怎樣使用新的字節數組流來(lái)解決第一篇文章中處理的問(wèn)題(讀一些壓縮形式的數據):
清單 7. 使用新的字節數組流 public static InputStream newBruteForceCompress (InputStream in) throws IOException { BytesOutputStream sink = new BytesOutputStream (); OutputStream out = new GZIPOutputStream (sink); Streams.io (in, out); out.close (); return sink.toInputStream (); } |
更好的管道流
雖然標準的管道流既安全又可靠,但在性能方面不能令人滿(mǎn)意。幾個(gè)因素導致了它的性能問(wèn)題:
- 對于不同的使用情況,大小為 1024 字節的內部緩沖區并不都適用;對于大容量的數據,該緩沖區太小了。
- 基于數組的操作只是反復調用低效的一個(gè)字節一個(gè)字節地復制操作。該操作本身是同步的,從而導致非常嚴重的鎖爭用。
- 如果管道變空或變滿(mǎn)而在這種狀態(tài)改變時(shí)一個(gè)線(xiàn)程阻塞了,那么,即使僅有一個(gè)字節被讀或寫(xiě),該線(xiàn)程也被喚醒。在許多情況下,線(xiàn)程將使用這一個(gè)字節并立即再次阻塞,這將導致只做了很少有用的工作。
最后一個(gè)因素是 API 提供的嚴格的約定的后果。對于最通用的可能的應用程序中使用的流來(lái)說(shuō),這種嚴格的約定是必要的。但是,對于管道流實(shí)現,提供一種更寬松的約定是可能的,這個(gè)約定犧牲嚴格性以換取性能的提高:
- 僅當緩沖區的可用數據(對阻塞的讀程序而言)或可用空間(對寫(xiě)程序而言)達到指定的某個(gè) 滯后閾值或發(fā)生異常事件(例如管道關(guān)閉)時(shí),阻塞的讀程序和寫(xiě)程序才被喚醒。這將提高性能,因為僅當線(xiàn)程能完成適度的工作量時(shí)它們才被喚醒。
- 只有一個(gè)線(xiàn)程可以從管道讀取數據,只有一個(gè)線(xiàn)程可以把數據寫(xiě)到管道。否則,管道無(wú)法可靠地確定讀程序線(xiàn)程或寫(xiě)程序線(xiàn)程何時(shí)意外死亡。
這個(gè)約定可完全適合典型應用程序情形中獨立的讀程序線(xiàn)程和寫(xiě)程序線(xiàn)程;需要立即喚醒的應用程序可以使用零滯后級別。我們將在后面看到,這個(gè)約定的實(shí)現的操作速度比標準 API 流的速度快兩個(gè)數量級(100 倍)。
我們可以使用幾個(gè)可能的 API 中的一個(gè)來(lái)開(kāi)發(fā)這些管道流:我們可以模仿標準類(lèi),顯式地連接兩個(gè)流;我們也可以開(kāi)發(fā)一個(gè) Pipe 類(lèi)并從這個(gè)類(lèi)抽取輸出流和輸入流。我們不使用這兩種方式而是使用更簡(jiǎn)單的方式:創(chuàng )建一個(gè) PipeInputStream ,然后抽取關(guān)聯(lián)的輸出流。
這些流的一般操作如下:
- 我們把內部數組用作環(huán)緩沖區(請看圖 1):這個(gè)數組中維護著(zhù)一個(gè)讀索引和一個(gè)寫(xiě)索引;數據被寫(xiě)到寫(xiě)索引所指的位置,數據從讀索引所指的位置被讀??;當兩個(gè)索引到達緩沖區末尾時(shí),它們回繞到緩沖區起始點(diǎn)。任一個(gè)索引不能超越另一個(gè)索引。當寫(xiě)索引到達讀索引時(shí),管道是滿(mǎn)的,不能再寫(xiě)任何數據。當讀索引到達寫(xiě)索引時(shí),管道是空的,不能再讀任何數據。
- 同步被用來(lái)確保兩個(gè)協(xié)作線(xiàn)程看到管道狀態(tài)的最新值。Java 語(yǔ)言規范對內存訪(fǎng)問(wèn)的順序的規定是很寬容的,因此,無(wú)法使用無(wú)鎖緩沖技術(shù)。
圖 1. 環(huán)緩沖區 在下面的代碼清單中給出的是實(shí)現這些管道流的代碼。清單 8 顯示了這個(gè)類(lèi)所用的構造函數和變量。您可以從這個(gè) InputStream 中抽取相應的 OutputStream (請看清單 17 中的代碼)。在構造函數中您可以指定內部緩沖區的大小和滯后級別;這是緩沖區容量的一部分,在相應的讀程序線(xiàn)程或寫(xiě)程序線(xiàn)程被立即喚醒前必須被使用或可用。我們維護兩個(gè)變量, reader 和 writer ,它們與讀程序線(xiàn)程和寫(xiě)程序線(xiàn)程相對應。我們用它們來(lái)發(fā)現什么時(shí)候一個(gè)線(xiàn)程已死亡而另一個(gè)線(xiàn)程仍在訪(fǎng)問(wèn)流。
清單 8. 一個(gè)替代的管道流實(shí)現 package org.merlin.io; import java.io.*; /** * An efficient connected stream pair for communicating between * the threads of an application. This provides a less-strict contract * than the standard piped streams, resulting in much-improved * performance. Also supports non-blocking operation. * * @author Copyright (c) 2002 Merlin Hughes <merlin@merlin.org> */ public class PipeInputStream extends InputStream { // default values private static final int DEFAULT_BUFFER_SIZE = 8192; private static final float DEFAULT_HYSTERESIS = 0.75f; private static final int DEFAULT_TIMEOUT_MS = 1000; // flag indicates whether method applies to reader or writer private static final boolean READER = false, WRITER = true; // internal pipe buffer private byte[] buffer; // read/write index private int readx, writex; // pipe capacity, hysteresis level private int capacity, level; // flags private boolean eof, closed, sleeping, nonBlocking; // reader/writer thread private Thread reader, writer; // pending exception private IOException exception; // deadlock-breaking timeout private int timeout = DEFAULT_TIMEOUT_MS; public PipeInputStream () { this (DEFAULT_BUFFER_SIZE, DEFAULT_HYSTERESIS); } public PipeInputStream (int bufferSize) { this (bufferSize, DEFAULT_HYSTERESIS); } // e.g., hysteresis .75 means sleeping reader/writer is not // immediately woken until the buffer is 75% full/empty public PipeInputStream (int bufferSize, float hysteresis) { if ((hysteresis < 0.0) || (hysteresis > 1.0)) throw new IllegalArgumentException ("Hysteresis: " + hysteresis); capacity = bufferSize; buffer = new byte[capacity]; level = (int) (bufferSize * hysteresis); } |
清單 9 中的配置方法允許您配置流的超時(shí)值和非阻塞模式。超時(shí)值的單位是毫秒,它表示阻塞的線(xiàn)程在過(guò)了這段時(shí)間后將被自動(dòng)喚醒;這對于打破在一個(gè)線(xiàn)程死亡的情況下可能發(fā)生的死鎖是必要的。在非阻塞模式中,如果線(xiàn)程阻塞,那么 InterruptedIOException 將被拋出。
清單 9. 管道配置 public void setTimeout (int ms) { this.timeout = ms; } public void setNonBlocking (boolean nonBlocking) { this.nonBlocking = nonBlocking; } |
清單 10 中的讀方法都遵循相當標準的模式:如果我們還沒(méi)有讀線(xiàn)程的引用,那么我們先取得它,然后我們驗證輸入參數,核對流未被關(guān)閉或沒(méi)有異常待處理,確定可以讀取多少數據,最后把數據從內部的環(huán)緩沖區復制到讀程序的緩沖區。清單 12 中的 checkedAvailable() 方法在返回前自動(dòng)地等待,直到出現一些可用的數據或流被關(guān)閉。
清單 10. 讀數據 private byte[] one = new byte[1]; public int read () throws IOException { // read 1 byte int amount = read (one, 0, 1); // return EOF / the byte return (amount < 0) ? -1 : one[0] & 0xff; } public synchronized int read (byte data[], int offset, int length) throws IOException { // take a reference to the reader thread if (reader == null) reader = Thread.currentThread (); // check parameters if (data == null) { throw new NullPointerException (); } else if ((offset < 0) || (offset + length > data.length) || (length < 0)) { // check indices throw new IndexOutOfBoundsException (); } else { // throw an exception if the stream is closed closedCheck (); // throw any pending exception exceptionCheck (); if (length <= 0) { return 0; } else { // wait for some data to become available for reading int available = checkedAvailable (READER); // return -1 on EOF if (available < 0) return -1; // calculate amount of contiguous data in pipe buffer int contiguous = capacity - (readx % capacity); // calculate how much we will read this time int amount = (length > available) ? available : length; if (amount > contiguous) { // two array copies needed if data wrap around the buffer end System.arraycopy (buffer, readx % capacity, data, offset, contiguous); System.arraycopy (buffer, 0, data, offset + contiguous, amount - contiguous); } else { // otherwise, one array copy needed System.arraycopy (buffer, readx % capacity, data, offset, amount); } // update indices with amount of data read processed (READER, amount); // return amount read return amount; } } } public synchronized long skip (long amount) throws IOException { // take a reference to the reader thread if (reader == null) reader = Thread.currentThread (); // throw an exception if the stream is closed closedCheck (); // throw any pending exception exceptionCheck (); if (amount <= 0) { return 0; } else { // wait for some data to become available for skipping int available = checkedAvailable (READER); // return 0 on EOF if (available < 0) return 0; // calculate how much we will skip this time if (amount > available) amount = available; // update indices with amount of data skipped processed (READER, (int) amount); // return amount skipped return amount; } } |
當數據從這個(gè)管道被讀取或數據被寫(xiě)到這個(gè)管道時(shí),清單 11 中的方法被調用。該方法更新有關(guān)的索引,如果管道達到它的滯后級別,該方法自動(dòng)地喚醒阻塞的線(xiàn)程。
清單 11. 更新索引 private void processed (boolean rw, int amount) { if (rw == READER) { // update read index with amount read readx = (readx + amount) % (capacity * 2); } else { // update write index with amount written writex = (writex + amount) % (capacity * 2); } // check whether a thread is sleeping and we have reached the // hysteresis threshold if (sleeping && (available (!rw) >= level)) { // wake sleeping thread notify (); sleeping = false; } } |
在管道有可用空間或可用數據(取決于 rw 參數)前,清單 12 中的 checkedAvailable() 方法一直等待,然后把空間的大小或數據的多少返回給調用程序。在這個(gè)方法內還核對流未被關(guān)閉、管道未被破壞等。
清單 12. 檢查可用性 public synchronized int available () throws IOException { // throw an exception if the stream is closed closedCheck (); // throw any pending exception exceptionCheck (); // determine how much can be read int amount = available (READER); // return 0 on EOF, otherwise the amount readable return (amount < 0) ? 0 : amount; } private int checkedAvailable (boolean rw) throws IOException { // always called from synchronized(this) method try { int available; // loop while no data can be read/written while ((available = available (rw)) == 0) { if (rw == READER) { // reader // throw any pending exception exceptionCheck (); } else { // writer // throw an exception if the stream is closed closedCheck (); } // throw an exception if the pipe is broken brokenCheck (rw); if (!nonBlocking) { // blocking mode // wake any sleeping thread if (sleeping) notify (); // sleep for timeout ms (in case of peer thread death) sleeping = true; wait (timeout); // timeout means that hysteresis may not be obeyed } else { // non-blocking mode // throw an InterruptedIOException throw new InterruptedIOException ("Pipe " + (rw ? "full" : "empty")); } } return available; } catch (InterruptedException ex) { // rethrow InterruptedException as InterruptedIOException throw new InterruptedIOException (ex.getMessage ()); } } private int available (boolean rw) { // calculate amount of space used in pipe int used = (writex + capacity * 2 - readx) % (capacity * 2); if (rw == WRITER) { // writer // return amount of space available for writing return capacity - used; } else { // reader // return amount of data in pipe or -1 at EOF return (eof && (used == 0)) ? -1 : used; } } |
清單 13 中的方法關(guān)閉這個(gè)流;該方法還提供對讀程序或寫(xiě)程序關(guān)閉流的支持。阻塞的線(xiàn)程被自動(dòng)喚醒,該方法還檢查各種其它情況是否正常。
清單 13. 關(guān)閉流 public void close () throws IOException { // close the read end of this pipe close (READER); } private synchronized void close (boolean rw) throws IOException { if (rw == READER) { // reader // set closed flag closed = true; } else if (!eof) { // writer // set eof flag eof = true; // check if data remain unread if (available (READER) > 0) { // throw an exception if the reader has already closed the pipe closedCheck (); // throw an exception if the reader thread has died brokenCheck (WRITER); } } // wake any sleeping thread if (sleeping) { notify (); sleeping = false; } } |
清單 14 中的方法檢查這個(gè)流的狀態(tài)。如果有異常待處理,那么流被關(guān)閉或管道被破壞(也就是說(shuō),讀程序線(xiàn)程或寫(xiě)程序線(xiàn)程已死亡),異常被拋出。
清單 14. 檢查流狀態(tài) private void exceptionCheck () throws IOException { // throw any pending exception if (exception != null) { IOException ex = exception; exception = null; throw ex; // could wrap ex in a local exception } } private void closedCheck () throws IOException { // throw an exception if the pipe is closed if (closed) throw new IOException ("Stream closed"); } private void brokenCheck (boolean rw) throws IOException { // get a reference to the peer thread Thread thread = (rw == WRITER) ? reader : writer; // throw an exception if the peer thread has died if ((thread != null) && !thread.isAlive ()) throw new IOException ("Broken pipe"); } |
當數據被寫(xiě)入這個(gè)管道時(shí),清單 15 中的方法被調用??偟膩?lái)說(shuō),它類(lèi)似于讀方法:我們先取得寫(xiě)程序線(xiàn)程的副本,然后檢查流是否被關(guān)閉,接著(zhù)進(jìn)入把數據復制到管道的循環(huán)。和前面一樣,該方法使用 checkedAvailable() 方法,checkedAvailable() 自動(dòng)阻塞,直到管道中有可用的容量。
清單 15. 寫(xiě)數據 private synchronized void writeImpl (byte[] data, int offset, int length) throws IOException { // take a reference to the writer thread if (writer == null) writer = Thread.currentThread (); // throw an exception if the stream is closed if (eof || closed) { throw new IOException ("Stream closed"); } else { int written = 0; try { // loop to write all the data do { // wait for space to become available for writing int available = checkedAvailable (WRITER); // calculate amount of contiguous space in pipe buffer int contiguous = capacity - (writex % capacity); // calculate how much we will write this time int amount = (length > available) ? available : length; if (amount > contiguous) { // two array copies needed if space wraps around the buffer end System.arraycopy (data, offset, buffer, writex % capacity, contiguous); System.arraycopy (data, offset + contiguous, buffer, 0, amount - contiguous); } else { // otherwise, one array copy needed System.arraycopy (data, offset, buffer, writex % capacity, amount); } // update indices with amount of data written processed (WRITER, amount); // update amount written by this method written += amount; } while (written < length); // data successfully written } catch (InterruptedIOException ex) { // write operation was interrupted; set the bytesTransferred // exception field to reflect the amount of data written ex.bytesTransferred = written; // rethrow exception throw ex; } } } |
如清單 16 所示,這個(gè)管道流實(shí)現的特點(diǎn)之一是寫(xiě)程序可設置一個(gè)被傳遞給讀程序的異常。
清單 16. 設置異常 private synchronized void setException (IOException ex) throws IOException { // fail if an exception is already pending if (exception != null) throw new IOException ("Exception already set: " + exception); // throw an exception if the pipe is broken brokenCheck (WRITER); // take a reference to the pending exception this.exception = ex; // wake any sleeping thread if (sleeping) { notify (); sleeping = false; } } |
清單 17 給出這個(gè)管道的有關(guān)輸出流的代碼。 getOutputStream() 方法返回 OutputStreamImpl ,OutputStreamImpl 是使用前面給出的方法來(lái)把數據寫(xiě)到內部管道的輸出流。OutputStreamImpl 類(lèi)繼承了 OutputStreamEx ,OutputStreamEx 是允許為讀線(xiàn)程設置異常的輸出流類(lèi)的擴展。
清單 17. 輸出流 public OutputStreamEx getOutputStream () { // return an OutputStreamImpl associated with this pipe return new OutputStreamImpl (); } private class OutputStreamImpl extends OutputStreamEx { private byte[] one = new byte[1]; public void write (int datum) throws IOException { // write one byte using internal array one[0] = (byte) datum; write (one, 0, 1); } public void write (byte[] data, int offset, int length) throws IOException { // check parameters if (data == null) { throw new NullPointerException (); } else if ((offset < 0) || (offset + length > data.length) || (length < 0)) { throw new IndexOutOfBoundsException (); } else if (length > 0) { // call through to writeImpl() PipeInputStream.this.writeImpl (data, offset, length); } } public void close () throws IOException { // close the write end of this pipe PipeInputStream.this.close (WRITER); } public void setException (IOException ex) throws IOException { // set a pending exception PipeInputStream.this.setException (ex); } } // static OutputStream extension with setException() method public static abstract class OutputStreamEx extends OutputStream { public abstract void setException (IOException ex) throws IOException; } } |
使用新的管道流
清單 18 演示了怎樣使用新的管道流來(lái)解決上一篇文章中的問(wèn)題。請注意,寫(xiě)程序線(xiàn)程中出現的任何異常均可在流中被傳遞。
清單 18. 使用新的管道流 public static InputStream newPipedCompress (final InputStream in) throws IOException { PipeInputStream source = new PipeInputStream (); final PipeInputStream.OutputStreamEx sink = source.getOutputStream (); new Thread () { public void run () { try { GZIPOutputStream gzip = new GZIPOutputStream (sink); Streams.io (in, gzip); gzip.close (); } catch (IOException ex) { try { sink.setException (ex); } catch (IOException ignored) { } } } }.start (); return source; } |
性能結果
在下面的表中顯示的是這些新的流和標準流的性能,測試環(huán)境是運行 Java 2 SDK,v1.4.0 的 800MHz Linux 機器。性能測試程序與我在上一篇文章中用的相同:
管道流
15KB:21ms;15MB:20675ms
新的管道流
15KB:0.68ms;15MB:158ms
字節數組流
15KB:0.31ms;15MB:745ms
新的字節數組流
15KB:0.26ms;15MB:438ms
與上一篇文章中的性能差異只反映了我的機器中不斷變化的環(huán)境負載。您可以從這些結果中看到,在大容量數據方面,新的管道流的性能遠好于蠻力解決方案;但是,新的管道流的速度仍然只有我們分析的工程解決方案的速度的一半左右。顯然,在現代的 Java 虛擬機中使用多個(gè)線(xiàn)程的開(kāi)銷(xiāo)遠比以前小得多。
結束語(yǔ)
我們分析了兩組可替代標準 Java API 的流的流: BytesOutputStream 和 BytesInputStream 是字節數組流的非同步替代者。因為這些類(lèi)的預期的用例涉及單個(gè)線(xiàn)程的訪(fǎng)問(wèn),所以不采用同步是合理的選擇。實(shí)際上,執行時(shí)間的縮短(最多可縮短 40%)很可能與同步的消滅沒(méi)有多大關(guān)系;性能得到提高的主要原因是在提供只讀訪(fǎng)問(wèn)時(shí)避免了不必要的復制。第二個(gè)示例 PipeInputStream 可替代管道流;為了減少超過(guò) 99% 的執行時(shí)間,這個(gè)流使用寬松的約定、改進(jìn)的緩沖區大小和基于數組的操作。在這種情況下無(wú)法使用不同步的代碼;Java 語(yǔ)言規范排除了可靠地執行這種代碼的可能性,否則,在理論上是可以實(shí)現最少鎖定的管道。
字節數組流和管道流是基于流的應用程序內部通信的主要選擇。雖然新的 I/O API 提供了一些其它選擇,但是許多應用程序和 API 仍然依賴(lài)標準流,而且對于這些特殊用途來(lái)說(shuō),新的 I/O API 并不一定有更高的效率。通過(guò)適當地減少同步的使用、有效地采用基于數組的操作以及最大程度地減少不必要的復制,性能結果得到了很大的提高,從而提供了完全適應標準流框架的更高效的操作。在應用程序開(kāi)發(fā)的其它領(lǐng)域中采用相同的步驟往往能取得類(lèi)似地性能提升。
參考資料
在 developerWorks Java 技術(shù)專(zhuān)區,您能找到有關(guān) Java 編程的每個(gè)方面的許多文章。
關(guān)于作者