Skip to main content

第7章 取消与关闭

Java没有提供任何机制来安全的终止线程,但它提供了中断(interruption)协调机制,能够使一个线程终止另外一个线程当前的工作。

生命周期结束(End-of-Lifecycle)的问题会使任务、服务以及程序的设计和实现等过程变得复杂

任务取消

  • 用户请求取消
  • 有时间限制的操作
  • 应用程序事件
  • 错误
  • 关闭

Java 没有一种安全的抢占式方法来停止线程,只有协作式的机制,使请求取消的任务和代码都遵循一种协商好的协议。

@ThreadSafe
public class PrimeGenerator implements Runnable {
@GuardedBy("this")
private final List<BigInteger> primes = new ArrayList<BigInteger>();
// “已请求取消(Cancellation Requested)”标志
private volatile boolean cancelled;

public void run() {
BigInteger p = BigInteger.ONE;
while (!cancelled) {
p = p.nextProbablePrime();
synchronzied (this) {
primes.add(p);
}
}
}

public void cancel() { cancelled = true; }

public synchronized List<BigInteger> get() {
return new ArrayList<BigIntegere>(primes);
}
}

一个仅运行一秒钟的素数生成器

List<BigInteger> aSecondOfPrimes() throws InterruptedException {
PrimeGenerator generator = new PrimeGenerator();
new Thread(generator).start();
try {
SECONDS.sleep(1);
} finally {
generator.cancel();
}
return generator.get();
}

中断

通常,中断是实现取消的最合理的方式

使用取消的任务调用了一个阻塞方法,任务可能永远不会检查取消标志。

不可靠的取消操作将把生产者置于阻塞的操作中

Object.wait, Thread.sleep,Thread.join方法,会不断的轮询监听 interrupted 标志位,发现其设置为true后,会停止阻塞并抛出 InterruptedException异常。以及各种AQS衍生类Lock.lockInterruptibly()等任何声明throws InterruptedException的方法。

class BrokenPrimeProducer extends Thread {
private final BlockingQueue<BigInteger> queue;
private volatile boolean cancelled = false;

BrokenPrimeProducer(BlockingQueue<BigInteger> queue) {
this.queue = queue;
}

public void run() {
try {
BigInteger p = BigInteger.ONE;
while (!cancelled) {
queue.put(p = p.nextProbablePrime());
}
} catch (InterruptedException consumed) {}
}

public void cancel() { cancelled = true; }
}

void consumePrimes() throws InterruptedException {
BlockingQueue<BigInteger> primes = ...;
BrokenPrimeProducer producer = new BrokenPrimeProducer(primes);
producer.start();
try {
while (needMorePrimes()) {
consume(primes.take());
}
} finally {
producer.cancel();
}
}

每个线程都有一个 boolean 类型的中断状态,Thread 中包含了中断线程以及查询线程中断状态的方法

public class Thread {
public void interrupte() {}
public boolean isInterrupted() {}
public static boolean interrupted() {}
}

调用 interrupt只是传递了请求中断的消息,在使用静态的 interrupted 时应该小心,因为它会清除当前线程的中断状态。

class PrimeProducer extends Thread {
private final BlockingQueue<BigInteger> queue;

PrimeProducer(BlockingQueue<BigInteger> queue) {
this.queue = queue;
}

public void run() {
try {
BigInteger p = BigInteger.ONE;
while (!Thread.currentThread().isInterrupted()) {
queue.put(p = p.nextProbablePrime());
} catch (InterruptedException consumed) {

}
}
}

public void cancel() { interrupt(); }
}

中断策略

最合理的中断策略是某种形式的线程级(Thread-Level)取消操作或服务级(Service-Level)取消操作:尽快退出,在必要时进行清理,通知某个所有者该线程已经退出。

如果除了将 InterruptedException 传递给调用者外还需要执行其他操作,那么应该在捕捉之后恢复中断状态:Thread.currentThread().interrupt();

由于每个线程拥有各自的中断策略,因此除非知道中断对该线程的含义,否则就不应该中断这个线程

响应中断

两种使用策略处理 InterruptedException

  • 传递异常
  • 恢复中断状态

传递异常:

BlockingQueue<Task> queue;

public Task getNextTask() throws InterruptedExecption {
return queue.take();
}

不可取消的任务在退出前恢复中断

public Task getNextTask(BlockingQueue<Task> queue) {
boolean interrupted = false;
try {
while (true) {
try {
return queue.take();
} catch (InterruptedException e) {
interrupted = true;
// 重新尝试
}
}
} finally {
if (interrupted) {
Thread.currentThread().interrupt();
}
}
}

在效率和响应性之间进行权衡,选择合适的轮询频率

示例:计时运行

破坏了规则:在中断线程之前,应该了解它的中断策略

private static final ScheduleExcutorService cancelExec = ...;

public static void timeRun(Runnable r, long timeout, TimeUnit unit) {
final Thread taskThread = Thread.currentThread();
cancelExec.schedule(new Runnable() {
public void run() {
taskThread.interrupt();
}
}, timeout, unit);
r.run();
}

在专门的线程中中断任务,(依赖限制的 join,因此无法知道是正常退出,还是超时返回)

public static void timeRun(final Runnable r, long timeout, TimeUnit unit) {
class RethrowableTask implements Runnable {
private volatile Throwable t;

public void run() {
try {
r.run();
} catch (Throwable t) {
this.t = t;
}
}

public void rethrow() {
if (t != null) {
throw launderThrowable(t);
}
}
}

RethrowableTask task = new RethrowableTask();
final Thread taskThread = new Thread(task);
taskThread.start();
cancelExec.schedule(new Runnable() {
public void run() {
taskThread.interrupt();
}
}, timeout, unit);
taskThread.join(unit.toMillis(timeout));
task.rethrow();
}

通过 Future 来实现取消

Future 带有一个 boolean 类型的参数 mayInterruptIfRunning,表示取消是否成功。true且任务正在某个线程运行,那么线程能被中断。false那么意味着任务还没启动,就不要运行它

public static void timedRun(Runnable r, long timeout, TimeUnit unit) throws InterruptedException {
Future<?> task = taskExec.submit(r);
try {
task.get(timeout, unit);
} catch (TimeoutExcepption e) {

} catch (ExecutionException e) {
throw launderThrowable(e.getCause());
} finally {
// 如果任务已经结束,那么执行取消操作也不会带来任何影响,如果任务正在执行,那么将被中断
task.cancel(true);
}
}

处理不可中断的阻塞

由于执行不可中断操作而被阻塞的线程,可使用类似于中断手段来停止线程,但要求我们知道线程阻塞的原因。

  • Java.io 包中的同步 Socket I/O:虽然 InputStream 和 OutputStream 中的 read 和 write 等方法都不会响应中断,但通过关闭底层的套接字,可以使得执行 read 和 write 等方法而被阻塞的线程抛出一个 SocketException

  • Java.io 包中的同步 I/O:当中断一个正在 InterrutibleChannel 上等待的线程时,将抛出 ClosedByInterruptException 并关闭链路(其它在这条链路上阻塞的线程同样抛出)。当关闭一个 InterruptibleChannel 时,将导致所有在链路上阻塞的线程都抛出 AsynchronousCloseException

  • Selector 的异步 I/O:如果一个线程在调用 Selector.select 方法时阻塞了,那么调用 close 和 wakeup 方法会使线程抛出 ClosedSelectorException 并提前返回。

  • 获取某个锁:如果一个线程等待某个内置锁而阻塞时,将无法响应中断。但,Lock 类中提供了 lockInterruptibly 方法,允许等待一个锁的同时仍能响应中断

public class ReaderThread extends Thread {
private final Socket socket;
private final InputStream in;

public ReaderThread(Socket socket) throws IOException {
this.socket = socket;
this.in = socket.getInputStream();
}

public void inturrupt() {
try {
socket.close();
} catch (IOException ignored) {

} finally {
super.interrupt();
}
}

public void run() {
try {
byte[] buf = new byte[BUFSZ];
while (true) {
int count = in.read(buf);
if (count < 0) {
break;
} else if (count > 0) {
processBuffer(buf, count);
}
}
} catch (IOException e) {
// 允许线程退出
}
}
}

采用 newTaskFor 来封装非标准的取消

public interface CancellableTask<T> extends Callable<T> {
void cancel();
RUnableFuture<T> newTask();
}

@ThreadSafe
public class CancellingExecutor extends ThreadPollExecutor {
protected<T> RunnableFuture<T> newTaskFor(Callable<T> callable) {
if (callable instanceof CancellableTask) {
return ((CancellableTask<T>) callable).newTask();
} else {
return super.newTaskFor(callable);
}
}
}

public abstract class SocketUsingTask<T> implements CancellableTask<T> {
@GuardedBy("this") private Socket socket;

protected synchronized void setSocket(Socket s) {
socket = s;
}

public synchronized void cancel() {
try {
if (socket != null) {
socket.close();
}
} catch (IOException ignored) {}
}

public RunnableFuture<T> newTask() {
return new FutureTask<T>(this) {
public boolean cancel(boolean mayInterruptIfRunning) {
try {
SocketUsingTask.this.cancel();
} finally {
return super.cancel(mayInterruptIfRunning);
}
}
};
}
}

处理非正常的线程终止

任何代码都可能抛出 RuntimeException

// 典型的线程池工作者线程结构,存在争议:当线程抛出一个未检查异常时,整个应用程序都会受到影响
public void run() {
Throwable thrown = null;
try {
while (!isInterrupted()) {
runTask(getTaskFromWorkQueue());
} catch (Throwable e) {
thrown = e;
} finally {
threadExited(this, thrown);
}
}
}

Thread API 提供了 UncaughtExceptionHandler 检测某个线程由于未捕获的异常而终止的情况

@FunctionalInterface
public interface UncaughtExceptionHandler {
void uncaughtException(Thread t, Throwable e);
}

要为线程池中的所有线程设置一个 UncaughtExceptionHandler ,需要为 ThreadPoolExecutor 的构造最数提供一个 ThreadFactory

JVM 关闭

JVM 既可以正常关闭(System.exit、SIGINT 信号 、Ctrl+C),也可以强行关闭(Runtime.halt、或在操作系统中杀死 JVM 进程)

关闭钩子

正常关闭中,JVM首先调用所有已注册的关闭钩子(Shutdown Hook)。关闭钩子是通过 Runtime.addShutdownHook 注册的但尚未开始的线程

守护线程

守护线程(Daemon Thread)执行辅助工作,但又不阻碍 JVM 的关闭

终结器

避免使用终结器(finalize),终结器可以在某个由 JVM 管理的线程中运行,任何状态都可能被多个线程访问,必须对其访问操作进行同步。终结器并不能保证它们将在何时运行甚至是否会运行,并且复杂的终结器通常还会在对象上产生巨大的性能开销。要编写正确的终结器是非常困难的。在大多数精况下,通过使 finally 代码块和显式的 close方法,能够比使用终结器更好地管理资源。唯一的例外情况在于 当需要管理对象,并且该对象持有的资源是通过本地方能获得的。基于这些原因以及其他原因,我们要尽量避免编写或使用包含终结器的类(除非是平台库中的类)