第7章 取消与关闭

在任务与执行策略之间的隐性耦合

有些类型的任务需要明确的指明执行策略:

  • 依赖性任务。如果提交线程池的任务需要依赖其他任务,那么就隐含的给执行策略带来了约束,此时应小心的避免产生活跃性问题
  • 使用线程封闭机制的任务。单线程的 Executor 能够保证任务不会并发执行
  • 对响应时间敏感的任务。
  • 使用 ThreadLocal 的任务。线程池的线程中不应该使用 ThreadLocal 在任务之间传递值

只有当任务是相同类型并且相互独立时,线程池的性能才能达到最佳。

线程饥饿死锁

在单线程 Executor 中任务发生死锁

public class ThreadDeaklock {
  ExecutorService exec = Executors.newSingleThreadExecutor();
  
  public class RenderPageTask implements Callable<String> {
    public String call() throws Exception {
      Future<String> header, footer;
      header = exec.submit(new LoadFileTask("header.html"));
      footer = exec.submit(new LoadFileTask("footer.html"));
      String page = renderBody();
      // 将发生死锁 -- 由于任务在等待子任务的结果
      return header.get() + page + footer.get();
    }
  }
   public void submitTask(){
       executorService.submit(new RenderPageTask());
   }
}

运行时间较长的任务

如果线程池中线程的数量远小于在稳定状态下执行时间较长的任务的数量,那么到最后可能所有的线程都会运行这些执行时间过长的任务,从而影响整体的响应性。

如果线程池总是充满了阻塞任务,也有可能线程池规模较小

设置线程池大小

对于计算密集型的任务,在拥有 \(N_{cpu}\) 个处理器的系统上,当线程池的大小为 \(N_{cpu} + 1\) 时,通常能实现最优的利用率。即使当计算密集型的线程偶尔由于页缺失故障或其它原因而暂停时,这个额外的线程也能确保 CPU 的时钟周期不会被浪费。

对于包含 IO 操作或其它阻塞操作的任务,由于线程并不会一直执行,因此线程池的规模应该更大。 $$ N_{cpu}=number\ of\ CPUs \

U_{cpu}=target\ CPU\ utilization,\ 0<=U_{cpu}<=1 \

\frac{W}{C}={ratio\ of\ wait\ to\ compute\ time}\

N_{thread}=N_{cpu}U_{cpu}(1+\frac{W}{C}) $$ 获得 CPU 数目

int N_CPUS = Runtime.getRuntime().availableProcessors();

配置 ThreadPoolExecutor

ThreadPollExecutor 通用构造函数

public ThreadPollExecutor(int corePoolSize,
                          int maximumPoolSize,
                          long keepAvlieTime,
                          TimeUnit unit,
                          BlockingQueue<Runable> workQueue,
                          ThreadFactory threadFactory,
                          RejectedExecutionHandle handle) { ... }

线程的创建与销毁

线程池的基本大小(Core Pool Size)、最大大小(Maximum Pool Size)以及存活时间等因素共同负责线程的创建与销毁。

newFixedThreadPool工厂方法将线程池的基本大小和最大大小设置为参数中指定的值,且不会超时。

newCachedThreadPool工厂方法将线程池的最大大小设置为Interge.MAX_VALUE,而将基本大小设置为0,超时设为一分钟,可以被无线扩展,并且需求降低时自动收缩。

管理队列任务

如果新情求的到达速率越过了钱程池的处理速率,那么新到来的请求将累积起来,可能会耗尽资源。甚至再耗尽内存之前,响应性能也将随着任务队列的增长而变得越来越糟。

ThreadPoolExecutor允许提供一个BlockingQueue来保存等待执行的任务。基本的任务排队方法有3种:无界队列/有界队列和同步移交(synchronnous Handoff)。

newFixedThreadPool和newSingleThreadExecutor在默认情况下使用一个无界的LinkedBlockingQueue,队列无限制的增加。

ArrayBlockingQueue有界队列是一种更稳妥的资源管理策略,有助于避免资源耗尽的情况。当队列填满后,有许多饱和策略(Saturation Policy)可以解决这个问题。

对于非常大的或者无界的线程池,可以通过使用SynchronousQueue来避免任务排队,以及直接将任务从生产者移交给工作者线程。SynchronousQueue不是一个真正的队列,而是一种在线程之间进行移交的机制。要将一个元素放入 SynchronousQueue 中,必续有另一个线程正在等待接受这个元素。newCachedThreadPool使用了SynchronousQueue。

当使用像 LinkedBlockingQueue ArrayBlockingQueue 这样的 FIFO(先进先出)队列时,任务的执行顺序与它们的到达顺序相同。如提想进一步任务执行顺序,还可以使用PriorityBlockingQueue,这个队列将根据优先级来安排任务。

饱和策略

当有界队列被填满后,饱和策略开始发挥作用。 Thre dPoolExecutor 的楼辑策略可以通过Z调用 setRejectedExecutionHandler 来修改。JDK实现:AbortPolicy, CallerRunsPolicy, DiscardPolicy, DiscardOldestPolicy

中止Abort策略是默认的饱和策略,将抛出未检查的RejectedExecutionException。

抛弃Discard策略会悄悄的抛弃该任务

抛弃最旧(Discard-Oldest)的策略则会抛弃下一个将被执行的任务,然后尝试重新提交新的任务。不要和优先级队列一起使用。

调用者运行(Caller-Runs)策略实现了一种调节机制,该策略不会抛弃任务,也不会抛出异常,而是将某些任务退回到调用者,从而降低新任务的流量

创建一个固定大小的线程池,并采用有界队列以及调用者运行饱和策略

ThreadPoolExecutor executor = new ThreadPoolExecutor(N_THREADS, N_THREADS, 0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<Runnable>(CAPACITY));
executor.setRejectedExecutionHandler(new ThreadPoolExecutor.CallerRunsPolicy());

使用Semaphore来控制任务的提交速率

@ThreadSafe
public class BoundedExecutor {
    private final Executor exec;
    private final Semaphore semaphore;
    
    public BoundedExecutor(Executor exec, int bound) {
        this.exec = exec;
        this.semaphore = new Semaphore(bound);
    }
    
    public void submitTask(final Runnable command) throws InterruptedException {
        semaphore.acquire();
        try {
            exec.execute(new Runnable() {
               public void run() {
                   try {
                       command.run();
                   } finally {
                       semaphore.release();
                   }
               } 
            });
        } catch (RejectedExecutionException e) {
            semaphore.release();
        }
    }
}

线程工厂

每当线程池需要创建一个线程时,都是通过线程工厂方法来完成的。

public interface ThreadFactory {
    Thread newThread(Runnable r);
}

自定义的线程工厂

public class MyThreadFactory implements ThreadFactory {
    private final String poolName;
    
    public MyThreadFactory(String poolName) {
        this.poolName = poolName;
    }
    
    public Thread newThread(Runnable runnable) {
        return new MyAppThread(runnable, poolName);
    }
}

定制Thread基类

public class MyAppThread extends Thread {
    
}

在调用构造函数后再定制ThreadPoolExecutor

可以通过设置函数( Setter )来修改大多数传递错它的构造函数的参数〈倒如线程池的基本大小、最大大小、存活时间、线程工厂以及拒绝执行处理器(Rejected Execution Handler )

ExecutorService exec = Executors.newCachedThreadPool();
if (exec instanceof ThreadPoolExecutor) {
    ((ThreadPoolExecutor) exec).setCorePoolSize(10);
} else {
    throw new AssertionError("Oops, bad assumption");
}

扩展ThreadPoolExecutor

子类化中改写的方法: beforeExecute,afterExecute, terminated

public class TimingThreadPool extends ThreadPoolExecutor {
    private final ThreadLocal<Long> startTime = new ThreadLocal<Long>();
    private final Logger log = Logger.getLogger("TimingThreadPool"); 
    private final AtomicLong numTasks = new AtomicLong();
    private final AtomicLong totalTime = new AtomicLong();
    
    protected void beforeExecute(Thread t, Runnable r) {
        super.beforeExecute(t, r);
        log.line(String.format("Thread %s: start %s", t, r));
        startTime.set(System.nanoTime());
    }
    
    protected void afterExecute(Runnable r, Throwable t) {
        try {
            long endTime = System.nanoTime();
            long taskTime = endTime - startTime.get();
            numTasks.incrementAndGet();
            totalTime.addAndGet(taskTime);
            log.line(String.format("Thread %s: end %s, time=%dns", t, r, taskTime));
        } finally {
            super.afterExecute(r, t);
        }
    }
    
    protected void terminated() {
        try {
            log.info(String.format("Terminated: avg time=%dns", totalTIme.get() / numTasks.get()));
        } finally {
            super.terminated();
        }
    }
}

递归算法的并行化

将串行执行转换为并行执行

void processInParallel(Executor exec, List<Element> elements) {
    for (final Element e : elements) {
        exec.execute(new Runnable() {
            public void run() { process(e); }
        })
    }
}

等待通过并行方式计算的结果

public <T> Collection<T> getParallelResults(List<Node<T>> nodes) throws InterruptedException {
    ExecutorService exec = Executors.newCachedThreadPool();
    Queue<T> resultQueue = new ConcurrentLinkedQueue<T>();
    paralleRecursive(exec, nodes, resultQueue);
    exec.shutdown();
    exec.awaitTermination(Long.MAX_VALUE, TimeUnit.SECONDS);
    return resultQueue;
}