第7章 取消与关闭

Java没有提供任何机制来安全的终止线程,但它提供了中断(interruption)协调机制,能够使一个线程终止另外一个线程当前的工作。

生命周期结束(End-of-Lifecycle)的问题会使任务、服务以及程序的设计和实现等过程变得复杂

任务取消

  • 用户请求取消
  • 有时间限制的操作
  • 应用程序事件
  • 错误
  • 关闭

Java 没有一种安全的抢占式方法来停止线程,只有协作式的机制,使请求取消的任务和代码都遵循一种协商好的协议。

@ThreadSafe
public class PrimeGenerator implements Runnable {
  @GuardedBy("this")
  private final List<BigInteger> primes = new ArrayList<BigInteger>();
  // “已请求取消(Cancellation Requested)”标志
  private volatile boolean cancelled;
  
  public void run() {
    BigInteger p = BigInteger.ONE;
    while (!cancelled) {
      p = p.nextProbablePrime();
      synchronzied (this) {
        primes.add(p);
      }
    }
  }
  
  public void cancel() { cancelled = true; }
  
  public synchronized List<BigInteger> get() {
    return new ArrayList<BigIntegere>(primes);
  }
}

一个仅运行一秒钟的素数生成器

List<BigInteger> aSecondOfPrimes() throws InterruptedException {
  PrimeGenerator generator = new PrimeGenerator();
  new Thread(generator).start();
  try {
    SECONDS.sleep(1);
  } finally {
    generator.cancel();
  }
  return generator.get();
}

中断

通常,中断是实现取消的最合理的方式

使用取消的任务调用了一个阻塞方法,任务可能永远不会检查取消标志。

不可靠的取消操作将把生产者置于阻塞的操作中

Object.wait, Thread.sleep,Thread.join方法,会不断的轮询监听 interrupted 标志位,发现其设置为true后,会停止阻塞并抛出 InterruptedException异常。以及各种AQS衍生类Lock.lockInterruptibly()等任何声明throws InterruptedException的方法。

class BrokenPrimeProducer extends Thread {
  private final BlockingQueue<BigInteger> queue;
  private volatile boolean cancelled = false;
  
  BrokenPrimeProducer(BlockingQueue<BigInteger> queue) {
    this.queue = queue;
  }
  
  public void run() {
    try {
      BigInteger p = BigInteger.ONE;
      while (!cancelled) {
        queue.put(p = p.nextProbablePrime());
      }
    } catch (InterruptedException consumed) {}
  }
  
  public void cancel() { cancelled = true; }
}

void consumePrimes() throws InterruptedException {
  BlockingQueue<BigInteger> primes = ...;
  BrokenPrimeProducer producer = new BrokenPrimeProducer(primes);
  producer.start();
  try {
    while (needMorePrimes()) {
      consume(primes.take());
    }
  } finally {
    producer.cancel();
  }
}

每个线程都有一个 boolean 类型的中断状态,Thread 中包含了中断线程以及查询线程中断状态的方法

public class Thread {
  public void interrupte() {}
  public boolean isInterrupted() {}
  public static boolean interrupted() {}
}

调用 interrupt只是传递了请求中断的消息,在使用静态的 interrupted 时应该小心,因为它会清除当前线程的中断状态。

class PrimeProducer extends Thread {
  private final BlockingQueue<BigInteger> queue;
  
  PrimeProducer(BlockingQueue<BigInteger> queue) {
    this.queue = queue;
  }
  
  public void run() {
    try {
      BigInteger p = BigInteger.ONE;
      while (!Thread.currentThread().isInterrupted()) {
        queue.put(p = p.nextProbablePrime());
      } catch (InterruptedException consumed) {
        
      }
    }
  }
  
  public void cancel() { interrupt(); }
}

中断策略

最合理的中断策略是某种形式的线程级(Thread-Level)取消操作或服务级(Service-Level)取消操作:尽快退出,在必要时进行清理,通知某个所有者该线程已经退出。

如果除了将 InterruptedException 传递给调用者外还需要执行其他操作,那么应该在捕捉之后恢复中断状态:Thread.currentThread().interrupt();

由于每个线程拥有各自的中断策略,因此除非知道中断对该线程的含义,否则就不应该中断这个线程

响应中断

两种使用策略处理 InterruptedException

  • 传递异常
  • 恢复中断状态

传递异常:

BlockingQueue<Task> queue;

public Task getNextTask() throws InterruptedExecption {
  return queue.take();
}

不可取消的任务在退出前恢复中断

public Task getNextTask(BlockingQueue<Task> queue) {
  boolean interrupted = false;
  try {
    while (true) {
      try {
        return queue.take();
      } catch (InterruptedException e) {
        interrupted = true;
        // 重新尝试
      }
    }
  } finally {
    if (interrupted) {
      Thread.currentThread().interrupt();
    }
  }
}

在效率和响应性之间进行权衡,选择合适的轮询频率

示例:计时运行

破坏了规则:在中断线程之前,应该了解它的中断策略

private static final ScheduleExcutorService cancelExec = ...;

public static void timeRun(Runnable r, long timeout, TimeUnit unit) {
  final Thread taskThread = Thread.currentThread();
  cancelExec.schedule(new Runnable() {
    public void run() {
      taskThread.interrupt();
    }
  }, timeout, unit);
  r.run();
}

在专门的线程中中断任务,(依赖限制的 join,因此无法知道是正常退出,还是超时返回)

public static void timeRun(final Runnable r, long timeout, TimeUnit unit) {
  class RethrowableTask implements Runnable {
    private volatile Throwable t;
    
    public void run() {
      try {
        r.run();
      } catch (Throwable t) {
        this.t = t;
      }
    }
    
    public void rethrow() {
      if (t != null) {
        throw launderThrowable(t);
      }
    }
  }
  
  RethrowableTask task = new RethrowableTask();
  final Thread taskThread = new Thread(task);
  taskThread.start();
  cancelExec.schedule(new Runnable() {
    public void run() {
      taskThread.interrupt();
    }
  }, timeout, unit);
  taskThread.join(unit.toMillis(timeout));
  task.rethrow();
}

通过 Future 来实现取消

Future 带有一个 boolean 类型的参数 mayInterruptIfRunning,表示取消是否成功。true且任务正在某个线程运行,那么线程能被中断。false那么意味着任务还没启动,就不要运行它

public static void timedRun(Runnable r, long timeout, TimeUnit unit) throws InterruptedException {
  Future<?> task = taskExec.submit(r);
  try {
    task.get(timeout, unit);
  } catch (TimeoutExcepption e) {
    
  } catch (ExecutionException e) {
    throw launderThrowable(e.getCause());
  } finally {
    // 如果任务已经结束,那么执行取消操作也不会带来任何影响,如果任务正在执行,那么将被中断
    task.cancel(true);
  }
}

处理不可中断的阻塞

由于执行不可中断操作而被阻塞的线程,可使用类似于中断手段来停止线程,但要求我们知道线程阻塞的原因。

  • Java.io 包中的同步 Socket I/O:虽然 InputStream 和 OutputStream 中的 read 和 write 等方法都不会响应中断,但通过关闭底层的套接字,可以使得执行 read 和 write 等方法而被阻塞的线程抛出一个 SocketException

  • Java.io 包中的同步 I/O:当中断一个正在 InterrutibleChannel 上等待的线程时,将抛出 ClosedByInterruptException 并关闭链路(其它在这条链路上阻塞的线程同样抛出)。当关闭一个 InterruptibleChannel 时,将导致所有在链路上阻塞的线程都抛出 AsynchronousCloseException
  • Selector 的异步 I/O:如果一个线程在调用 Selector.select 方法时阻塞了,那么调用 close 和 wakeup 方法会使线程抛出 ClosedSelectorException 并提前返回。
  • 获取某个锁:如果一个线程等待某个内置锁而阻塞时,将无法响应中断。但,Lock 类中提供了 lockInterruptibly 方法,允许等待一个锁的同时仍能响应中断
public class ReaderThread extends Thread {
  private final Socket socket;
  private final InputStream in;
  
  public ReaderThread(Socket socket) throws IOException {
    this.socket = socket;
    this.in = socket.getInputStream();
  }
  
  public void inturrupt() {
    try {
      socket.close();
    } catch (IOException ignored) {
      
    } finally {
      super.interrupt();
    }
  }
  
  public void run() {
    try {
      byte[] buf = new byte[BUFSZ];
      while (true) {
        int count = in.read(buf);
        if (count < 0) {
          break;
        } else if (count > 0) {
          processBuffer(buf, count);
        }
      }
    } catch (IOException e) {
      // 允许线程退出
    }
  }
}

采用 newTaskFor 来封装非标准的取消

public interface CancellableTask<T> extends Callable<T> {
  void cancel();
  RUnableFuture<T> newTask();
}

@ThreadSafe
public class CancellingExecutor extends ThreadPollExecutor {
  protected<T> RunnableFuture<T> newTaskFor(Callable<T> callable) {
    if (callable instanceof CancellableTask) {
      return ((CancellableTask<T>) callable).newTask();
    } else {
      return super.newTaskFor(callable);
    }
  }
}

public abstract class SocketUsingTask<T> implements CancellableTask<T> {
  @GuardedBy("this") private Socket socket;
  
  protected synchronized void setSocket(Socket s) {
    socket = s;
  }
  
  public synchronized void cancel() {
    try {
      if (socket != null) {
        socket.close();
      }
    } catch (IOException ignored) {}
  }
  
  public RunnableFuture<T> newTask() {
    return new FutureTask<T>(this) {
      public boolean cancel(boolean mayInterruptIfRunning) {
        try {
          SocketUsingTask.this.cancel();
        } finally {
          return super.cancel(mayInterruptIfRunning);
        }
      }
    };
  }
}

处理非正常的线程终止

任何代码都可能抛出 RuntimeException

// 典型的线程池工作者线程结构,存在争议:当线程抛出一个未检查异常时,整个应用程序都会受到影响
public void run() {
  Throwable thrown = null;
  try {
    while (!isInterrupted()) {
      runTask(getTaskFromWorkQueue());
    } catch (Throwable e) {
      thrown = e;
    } finally {
      threadExited(this, thrown);
    }
  }
}

Thread API 提供了 UncaughtExceptionHandler 检测某个线程由于未捕获的异常而终止的情况

@FunctionalInterface
public interface UncaughtExceptionHandler {
  void uncaughtException(Thread t, Throwable e);
}

要为线程池中的所有线程设置一个 UncaughtExceptionHandler ,需要为 ThreadPoolExecutor 的构造最数提供一个 ThreadFactory

JVM 关闭

JVM 既可以正常关闭(System.exit、SIGINT 信号 、Ctrl+C),也可以强行关闭(Runtime.halt、或在操作系统中杀死 JVM 进程)

关闭钩子

正常关闭中,JVM首先调用所有已注册的关闭钩子(Shutdown Hook)。关闭钩子是通过 Runtime.addShutdownHook 注册的但尚未开始的线程

守护线程

守护线程(Daemon Thread)执行辅助工作,但又不阻碍 JVM 的关闭

终结器

避免使用终结器(finalize),终结器可以在某个由 JVM 管理的线程中运行,任何状态都可能被多个线程访问,必须对其访问操作进行同步。终结器并不能保证它们将在何时运行甚至是否会运行,并且复杂的终结器通常还会在对象上产生巨大的性能开销。要编写正确的终结器是非常困难的。在大多数精况下,通过使 finally 代码块和显式的 close方法,能够比使用终结器更好地管理资源。唯一的例外情况在于 当需要管理对象,并且该对象持有的资源是通过本地方能获得的。基于这些原因以及其他原因,我们要尽量避免编写或使用包含终结器的类(除非是平台库中的类)