Core Java

Java 7: Closing NIO.2 file channels without loosing data

Closing an asynchronous file channel can be very difficult. If you submitted I/O tasks to the asynchronous channel you want to be sure that the tasks are executed properly. This can actually be a tricky requirement on asynchronous channels for several reasons. The default channel group uses deamon threads as worker threads, which isn’t a good choice, cause these threads just abandon if the JVM exits. If you use a custom thread pool executor with non-deamon threads you need to manage the lifecycle of your thread pool yourself. If you don’t the threads just stay alive when the main thread exits. Hence, the JVM actually does not exit at all, what you can do is kill the JVM.

Another issue when closing asynchronous channels is mentioned in the javadoc of AsynchronousFileChannel: “Shutting down the executor service while the channel is open results in unspecified behavior.” This is because the close() operation on AsynchronousFileChannel issues tasks to the associated executor service that simulate the failure of pending I/O operations (in that same thread pool) with an AsynchronousCloseException. Hence, you’ll get RejectedExecutionException if you perform close() on an asynchronous file channel instance when you previously closed the associated executor service.

That all being said, the proposed way to safely configure the file channel and shutdown that channel goes like this:

public class SimpleChannelClose_AsynchronousCloseException {

  private static final String FILE_NAME = "E:/temp/afile.out";
  private static AsynchronousFileChannel outputfile;
  private static AtomicInteger fileindex = new AtomicInteger(0);
  private static ThreadPoolExecutor pool = new ThreadPoolExecutor(1, 1, 0L, TimeUnit.MILLISECONDS,
    new LinkedBlockingQueue<Runnable>());

  public static void main(String[] args) throws InterruptedException, IOException, ExecutionException {
   outputfile = AsynchronousFileChannel.open(
   Paths.get(FILE_NAME),
   new HashSet<StandardOpenOption>(Arrays.asList(StandardOpenOption.WRITE, 
              StandardOpenOption.CREATE,StandardOpenOption.DELETE_ON_CLOSE)), pool);
   List<Future<Integer>> futures = new ArrayList<>();
   for (int i = 0; i < 10000; i++) {
    futures.add(outputfile.write(ByteBuffer.wrap("Hello".getBytes()), fileindex.getAndIncrement() * 5));
   }
   outputfile.close();
   pool.shutdown();
   pool.awaitTermination(60, TimeUnit.SECONDS);
   for (Future<Integer> future : futures) {
    try {
     future.get();
    } catch (ExecutionException e) {
     System.out.println("Task wasn't executed!");
    }
   }
  }
}

The custom thread pool executor service is defined in lines 6 and 7. The file channel is defined in lines 10 to 13. In the lines 18 to 20 the asynchronous channel is closed in an orderly manner. First the channel itself is closed, then the executor service is shutdown and last not least the thread awaits termination of the thread pool executor.

Although this is a safe way to close a channel with a custom executor service, there’s a new issue introduced. The clients submitted asynchronous write tasks (line 16) and may want be sure that, once they’ve been submitted successfully, those tasks will definitely be executed. Always waiting for Future.get() to return (line 23), isn’t an option, cause in many cases this would lead *asynchronous* file channels ad adsurdum. The snippet above will return lot’s of “Task wasn’t executed!” messages cause the channel is closed immediately after the write operations were submitted to the channel (line 18). To avoid such ‘data loss’ you can implement your own CompletionHandler and pass that to the requested write operation.

public class SimpleChannelClose_CompletionHandler {
...
 public static void main(String[] args) throws InterruptedException, IOException, ExecutionException {
...
   outputfile.write(ByteBuffer.wrap("Hello".getBytes()), fileindex.getAndIncrement() * 5, "", defaultCompletionHandler);
...
 }

 private static CompletionHandler<integer, string=""> defaultCompletionHandler = new CompletionHandler<Integer, String>() {
  @Override
  public void completed(Integer result, String attachment) {
   // NOP
  }

  @Override
  public void failed(Throwable exc, String attachment) {
  System.out.println("Do something to avoid data loss ...");
  }
 };
}

The CompletionHandler.failed() method (line 16) catches any runtime exception during task processing. You can implement any compensation code here to avoid data loss. When you work on mission critical data, then it may be a good idea to use CompletionHandlers. But *still* there’s another issue. The clients can submit tasks but they don’t know if the pool will successfully process these tasks. Successful in this context means that the bytes submitted actually reach their destination (the file on the hard disk). If you want to be sure that all submitted tasks are actually processed before closing, it gets a little trickier. You need a ‘graceful’ closing mechanism, that waits until the work queue is empty *before* it actually closes the channel and the associated executor service (this isn’t possible using standard lifecycle methods).

Introducing GracefulAsynchronousChannel

My last snippets introduce the GracefulAsynchronousFileChannel. You can get the complete code here in my Git repository. The behaviour of that channel is like this: guarantee to process all successfully submitted write operations and throw an NonWritableChannelException if the channel prepares shutdown. It takes two things to implement that behaviour. Firstly, you’ll need to implement the afterExecute() in an extension of ThreadPoolExecutor that sends a signal when the queue is empty. This is what DefensiveThreadPoolExecutor does.

private class DefensiveThreadPoolExecutor extends ThreadPoolExecutor {

 public DefensiveThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit,
   LinkedBlockingQueue<Runnable> workQueue, ThreadFactory factory, RejectedExecutionHandler handler) {
  super(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue, factory, handler);
 }

 /**
  * "Last" task issues a signal that queue is empty after task processing was completed.
  */
 @Override
 protected void afterExecute(Runnable r, Throwable t) {
  if (state == PREPARE) {
   closeLock.lock(); // only one thread will pass when closer thread is awaiting signal
   try {
    if (getQueue().isEmpty() && state < SHUTDOWN) {
     System.out.println("Issueing signal that queue is empty ...");
     isEmpty.signal();
     state = SHUTDOWN; // -> no other thread can issue empty-signal
    }
   } finally {
    closeLock.unlock();
   }
  }
  super.afterExecute(r, t);
 }
}

The afterExecute() method (line 12) is executed after each processed task by the thread that processed that given task. The implementation sends the isEmpty signal in line 18. The second part you need two gracefully close a channel is a custom implementation of the close() method of AsynchronousFileChannel.

/**
 * Method that closes this file channel gracefully without loosing any data.
 */
@Override
public void close() throws IOException {
 AsynchronousFileChannel writeableChannel = innerChannel;
 System.out.println("Starting graceful shutdown ...");
 closeLock.lock();
 try {
  state = PREPARE;
  innerChannel = AsynchronousFileChannel.open(Paths.get(uri),
    new HashSet<StandardOpenOption>(Arrays.asList(StandardOpenOption.READ)), pool);
  System.out.println("Channel blocked for write access ...");
  if (!pool.getQueue().isEmpty()) {
   System.out.println("Waiting for signal that queue is empty ...");
   isEmpty.await();
   System.out.println("Received signal that queue is empty ... closing");
  } else {
   System.out.println("Don't have to wait, queue is empty ...");
  }
 } catch (InterruptedException e) {
  Thread.interrupted();
  throw new RuntimeException("Interrupted on awaiting Empty-Signal!", e);
 } catch (Exception e) {
  throw new RuntimeException("Unexpected error" + e);
 } finally {
  closeLock.unlock();
  writeableChannel.force(false);
  writeableChannel.close(); // close the writable channel
  innerChannel.close(); // close the read-only channel
  System.out.println("File closed ...");
  pool.shutdown(); // allow clean up tasks from previous close() operation to finish safely
  try {
   pool.awaitTermination(1, TimeUnit.MINUTES);
  } catch (InterruptedException e) {
   Thread.interrupted();
   throw new RuntimeException("Could not terminate thread pool!", e);
  }
  System.out.println("Pool closed ...");
 }
}

Study that code for a while. The interesting bits are in line 11 where the innerChannel gets replaced by a read-only channel. That causes any subsequent asynchronous write requests to fail with an NonWritableChannelException. In line 16 the close() method waits for the isEmpty signal to happen. When this signal is send after the last write task the close() method continues with an orderly shutdown procedure (line 27 ff.). Basically, the code adds a shared lifecycle state across the file channel and the associated thread pool. That way both objects can communicate during the shutdown procedure and avoid data loss.

Here is a logging client that uses the GracefulAsynchronousFileChannel.

public class MyLoggingClient {
 private static AtomicInteger fileindex = new AtomicInteger(0);
 private static final String FILE_URI = "file:/E:/temp/afile.out";

 public static void main(String[] args) throws IOException {
  new Thread(new Runnable() { // arbitrary thread that writes stuff into an asynchronous I/O data sink

     @Override
     public void run() {
      try {
       for (;;) {
        GracefulAsynchronousFileChannel.get(FILE_URI).write(ByteBuffer.wrap("Hello".getBytes()),
          fileindex.getAndIncrement() * 5);
       }
      } catch (NonWritableChannelException e) {
       System.out.println("Deal with the fact that the channel was closed asynchronously ... "
         + e.toString());
      } catch (Exception e) {
       e.printStackTrace();
      }
     }
    }).start();

  Timer timer = new Timer(); // asynchronous channel closer
  timer.schedule(new TimerTask() {
   public void run() {
    try {
     GracefulAsynchronousFileChannel.get(FILE_URI).close();
     long size = Files.size(Paths.get("E:/temp/afile.out"));
     System.out.println("Expected file size (bytes): " + (fileindex.get() - 1) * 5);
     System.out.println("Actual file size (bytes): " + size);
     if (size == (fileindex.get() - 1) * 5)
      System.out.println("No write operation was lost!");
     Files.delete(Paths.get("E:/temp/afile.out"));
    } catch (IOException e) {
     e.printStackTrace();
    }
   }
  }, 1000);


 }
}

The client starts two threads, one thread issues write operations in an infinite loop (line 6 ff.). The other thread closes the file channel asynchronously after one second of processing (line 25 ff.). If you run that client, then the following output is produced:

Starting graceful shutdown ...
Deal with the fact that the channel was closed asynchronously ... java.nio.channels.NonWritableChannelException
Channel blocked for write access ...
Waiting for signal that queue is empty ...
Issueing signal that queue is empty ...
Received signal that queue is empty ... closing
File closed ...
Pool closed ...
Expected file size (bytes): 400020
Actual file size (bytes): 400020
No write operation was lost!

The output shows the orderly shutdown procedure of participating threads. The logging thread needs to deal with the fact that the channel was closed asynchronously. After the queued tasks are processed the channel resources are closed. No data was lost, everything that the client issued was really written to the file destination. No AsynchronousClosedExceptions or RejectedExecutionExceptions in such a graceful closing procedure.

That’s all in terms of safely closing asynchronous file channels. The complete code is here in my Git repository. I hope you’ve enjoyed it a little. Looking forward to your comments.

Reference: “Java 7: Closing NIO.2 file channels without loosing data” from our JCG partner Niklas.

Ilias Tsagklis

Ilias is a software developer turned online entrepreneur. He is co-founder and Executive Editor at Java Code Geeks.
Subscribe
Notify of
guest

This site uses Akismet to reduce spam. Learn how your comment data is processed.

0 Comments
Inline Feedbacks
View all comments
Back to top button