關閉異步通道時,另一個問題在AsynchronousFileChannel
的javadoc中提到:“在通道打開時關閉執行程序服務會導致未指定的行為。” 這是因為close()
上的操作AsynchronousFileChannel
問題的任務是模擬與掛起的I / O操作(在同一個線程池)的故障相關的執行服務AsynchronousCloseException
。 因此,如果您在先前關閉關聯的執行程序服務時在異步文件通道實例上執行close()
,則會得到RejectedExecutionException
。
綜上所述,安全配置文件通道并關閉該通道的建議方法如下:
public class SimpleChannelClose_AsynchronousCloseException {private static final String FILE_NAME = "E:/temp/afile.out";private static AsynchronousFileChannel outputfile;private static AtomicInteger fileindex = new AtomicInteger(0);private static ThreadPoolExecutor pool = new ThreadPoolExecutor(1, 1, 0L, TimeUnit.MILLISECONDS,new LinkedBlockingQueue<Runnable>());public static void main(String[] args) throws InterruptedException, IOException, ExecutionException {outputfile = AsynchronousFileChannel.open(Paths.get(FILE_NAME),new HashSet<StandardOpenOption>(Arrays.asList(StandardOpenOption.WRITE, StandardOpenOption.CREATE,StandardOpenOption.DELETE_ON_CLOSE)), pool);List<Future<Integer>> futures = new ArrayList<>();for (int i = 0; i < 10000; i++) {futures.add(outputfile.write(ByteBuffer.wrap("Hello".getBytes()), fileindex.getAndIncrement() * 5));}outputfile.close();pool.shutdown();pool.awaitTermination(60, TimeUnit.SECONDS);for (Future<Integer> future : futures) {try {future.get();} catch (ExecutionException e) {System.out.println("Task wasn't executed!");}}}
}
在第6和7行中定義了定制線程池執行程序服務。在第10至13行中定義了文件通道。在第18至20行中,異步通道以有序方式關閉。 首先關閉通道本身,然后關閉執行程序服務,最后最重要的一點是線程等待線程池執行程序的終止。
盡管這是使用自定義執行程序服務關閉通道的安全方法,但還是引入了新問題。 客戶端提交了異步寫入任務(第16行),并且可能希望確保一旦成功提交了這些任務,這些任務肯定會被執行。 始終不等待Future.get()
返回(第23行),因為在許多情況下,這將導致*異步*文件通道adurdum。 上面的代碼段將返回很多“未執行任務!” 消息導致將寫操作提交到通道后立即關閉通道(第18行)。 為了避免這種“數據丟失”,您可以實現自己的CompletionHandler
并將其傳遞給請求的寫操作。
public class SimpleChannelClose_CompletionHandler {
...public static void main(String[] args) throws InterruptedException, IOException, ExecutionException {
...outputfile.write(ByteBuffer.wrap("Hello".getBytes()), fileindex.getAndIncrement() * 5, "", defaultCompletionHandler);
...}private static CompletionHandler<integer, string=""> defaultCompletionHandler = new CompletionHandler<Integer, String>() {@Overridepublic void completed(Integer result, String attachment) {// NOP}@Overridepublic void failed(Throwable exc, String attachment) {System.out.println("Do something to avoid data loss ...");}};
}
CompletionHandler.failed()
方法(第16行)在任務處理期間捕獲任何運行時異常。 您可以在此處實施任何補償代碼,以避免數據丟失。 處理關鍵任務數據時,最好使用CompletionHandler
。 但是*仍然*還有另一個問題。 客戶端可以提交任務,但是他們不知道池是否將成功處理這些任務。 在這種情況下,成功表示已提交的字節實際上到達了目的地(硬盤上的文件)。 如果您想確保所有提交的任務在關閉前都已得到實際處理,則會有些棘手。 您需要一個“優美的”關閉機制,該機制要等到工作隊列為空時才*實際上*先關閉通道和關聯的執行程序服務(使用標準生命周期方法無法實現)。
引入GracefulAsynchronousChannel
我的最后一個片段介紹了GracefulAsynchronousFileChannel
。 您可以在我的Git存儲庫中獲取完整的代碼。 該通道的行為是這樣的:保證處理所有成功提交的寫操作,如果通道準備關閉,則拋出NonWritableChannelException
。 實現該行為需要兩件事。 首先,您需要在ThreadPoolExecutor
的擴展中實現afterExecute()
,該擴展在隊列為空時發送信號。 這就是DefensiveThreadPoolExecutor
所做的。
private class DefensiveThreadPoolExecutor extends ThreadPoolExecutor {public DefensiveThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit,LinkedBlockingQueue<Runnable> workQueue, ThreadFactory factory, RejectedExecutionHandler handler) {super(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue, factory, handler);}/*** "Last" task issues a signal that queue is empty after task processing was completed.*/@Overrideprotected void afterExecute(Runnable r, Throwable t) {if (state == PREPARE) {closeLock.lock(); // only one thread will pass when closer thread is awaiting signaltry {if (getQueue().isEmpty() && state < SHUTDOWN) {System.out.println("Issueing signal that queue is empty ...");isEmpty.signal();state = SHUTDOWN; // -> no other thread can issue empty-signal}} finally {closeLock.unlock();}}super.afterExecute(r, t);}
}
afterExecute()
方法(第12行)在每個處理的任務之后由處理給定任務的線程執行。 該實現在第18行中發送isEmpty
信號。第二個需要您優雅地關閉一個通道的部分是AsynchronousFileChannel
的close()
方法的自定義實現。
/*** Method that closes this file channel gracefully without loosing any data.*/
@Override
public void close() throws IOException {AsynchronousFileChannel writeableChannel = innerChannel;System.out.println("Starting graceful shutdown ...");closeLock.lock();try {state = PREPARE;innerChannel = AsynchronousFileChannel.open(Paths.get(uri),new HashSet<StandardOpenOption>(Arrays.asList(StandardOpenOption.READ)), pool);System.out.println("Channel blocked for write access ...");if (!pool.getQueue().isEmpty()) {System.out.println("Waiting for signal that queue is empty ...");isEmpty.await();System.out.println("Received signal that queue is empty ... closing");} else {System.out.println("Don't have to wait, queue is empty ...");}} catch (InterruptedException e) {Thread.interrupted();throw new RuntimeException("Interrupted on awaiting Empty-Signal!", e);} catch (Exception e) {throw new RuntimeException("Unexpected error" + e);} finally {closeLock.unlock();writeableChannel.force(false);writeableChannel.close(); // close the writable channelinnerChannel.close(); // close the read-only channelSystem.out.println("File closed ...");pool.shutdown(); // allow clean up tasks from previous close() operation to finish safelytry {pool.awaitTermination(1, TimeUnit.MINUTES);} catch (InterruptedException e) {Thread.interrupted();throw new RuntimeException("Could not terminate thread pool!", e);}System.out.println("Pool closed ...");}
}
研究該代碼一段時間。 有趣的位在第11行中,其中的innerChannel
被只讀通道替換。 這將導致任何后續的異步寫入請求均由于NonWritableChannelException
而失敗。 在第16行中, close()
方法等待isEmpty
信號發生。 在上一個寫任務之后發送此信號時, close()
方法將繼續執行有序的關閉過程(第27頁及其后的內容)。 基本上,代碼在文件通道和關聯的線程池之間添加了共享的生命周期狀態。 這樣,兩個對象都可以在關閉過程中進行通信,并避免數據丟失。
這是使用GracefulAsynchronousFileChannel
的日志記錄客戶端。
public class MyLoggingClient {private static AtomicInteger fileindex = new AtomicInteger(0);private static final String FILE_URI = "file:/E:/temp/afile.out";public static void main(String[] args) throws IOException {new Thread(new Runnable() { // arbitrary thread that writes stuff into an asynchronous I/O data sink@Overridepublic void run() {try {for (;;) {GracefulAsynchronousFileChannel.get(FILE_URI).write(ByteBuffer.wrap("Hello".getBytes()),fileindex.getAndIncrement() * 5);}} catch (NonWritableChannelException e) {System.out.println("Deal with the fact that the channel was closed asynchronously ... "+ e.toString());} catch (Exception e) {e.printStackTrace();}}}).start();Timer timer = new Timer(); // asynchronous channel closertimer.schedule(new TimerTask() {public void run() {try {GracefulAsynchronousFileChannel.get(FILE_URI).close();long size = Files.size(Paths.get("E:/temp/afile.out"));System.out.println("Expected file size (bytes): " + (fileindex.get() - 1) * 5);System.out.println("Actual file size (bytes): " + size);if (size == (fileindex.get() - 1) * 5)System.out.println("No write operation was lost!");Files.delete(Paths.get("E:/temp/afile.out"));} catch (IOException e) {e.printStackTrace();}}}, 1000);}
}
客戶端啟動兩個線程,一個線程在無限循環中(第6行以下)發出寫操作。 在處理一秒鐘后,另一個線程異步關閉文件通道(第25 ff行)。 如果運行該客戶端,那么將產生以下輸出:
Starting graceful shutdown ...
Deal with the fact that the channel was closed asynchronously ... java.nio.channels.NonWritableChannelException
Channel blocked for write access ...
Waiting for signal that queue is empty ...
Issueing signal that queue is empty ...
Received signal that queue is empty ... closing
File closed ...
Pool closed ...
Expected file size (bytes): 400020
Actual file size (bytes): 400020
No write operation was lost!
輸出顯示參與線程的有序關閉過程。 日志記錄線程需要處理通道異步關閉的事實。 處理排隊的任務后,將關閉通道資源。 沒有數據丟失,客戶端發出的所有內容均已真正寫入文件目標位置。 在這種正常的關閉過程中,沒有AsynchronousClosedException
或RejectedExecutionException
。
這就是安全關閉異步文件通道的全部方法。 完整的代碼在我的Git存儲庫中 。 希望您喜歡它。 期待您的評論。
翻譯自: https://www.javacodegeeks.com/2012/05/java-7-closing-nio2-file-channels.html