Skip to main content

第6章 任务执行

在线程中执行任务

当围绕“任务执行”设计应用程序结构时,第一步就是要找出清晰的任务边界。大多数服务应用程序都提供了一种自然的边界选择方式:以独立的客户请求为边界。

串行地执行任务

串行处理机制通常都无法提供高吞吐率或快速响应性。

class SingleThreadWebServer {
public static void main(String[] args) throws IOException {
ServerSocket socket = new ServerSocket(80);
while (true) {
Socket connection = socket.accept();
handleRequest(connection);
}
}
}

显示的为任务创建线程

为每个请求创建一个新的线程来提供服务,从而实现更高的响应性

class ThreadPerTaskWebServer {
public static void main(String[] args) throws IOException {
ServerSocket socket = new ServerSocket(80);
while (true) {
final Socket connection = socket.accept();
Runnable task = new Runnable() {
public void run() {
handleRequest(connection);
}
};
new Thread(task).start();
}
}
}

无限制创建线程的不足

  • 线程生命周期的开销非常高
  • 资源消耗:线程多余处理器的数量时线程闲置。闲置线程占用更多内存,给垃圾回收器带来压力,竞争 CPU 资源时产生额外开销。
  • 稳定性:OOM

Executor 框架

任务是一组逻辑工作单元,而线程则是使任务异步执行的机制。通过有界队列防止高负荷的应用程序耗尽内存,线程池简化了线程的管理工作,并且java.util.concurrent提供一种灵活的线程池实现作为 Executor 框架的一部分。

public interface Executor {
void execute(Runnable command);
}

Executor 提供了标准的方法将任务的提交过程与执行过程解耦开,并用 Runnable 来表示任务。还提供了对生命周期的支持,以及统计信息收集、应用程序管理机制和性能监视等机制。

Executor 基于生产者-消费者模式,提交任务的操作相当于生产者,执行任务的线程则相当于消费者。

示例:基于 Executor 的 Web 服务器

class TaskExecutionWebServer {
private static final int NTHREADS = 100;
private static final Executor exec = Executors.newFixedThreadPool(NTHREADS);

public static void main(String[] args) throws IOException {
ServerSocket socket = new ServerSocket(80);
while (true) {
final Socket connection = socket.accept();
Runnable task = new Runnable() {
public void run() {
handleRequest(connection);
}
};
exec.execute(task);
}
}
}

执行策略

通过将任务的提交与执行解耦开来,从而无须太大的困难就可以为某种类型的任务指定和修改执行策略。

线程池

是指管理一组同构工作线程的资源池。工作队列(Work Queue)保存了所有等待执行的任务,工作者线程(Worker Thread)从工作队列中获取一个任务,执行任务,然后返回线程池并等待下一个任务。

通过重用现有的线程,分摊线程创建和销售产生的巨大开销,节省线程创建时间从而提高响应性。通过适当调整线程池的大小,可以创建足够多的线程以便使处理器保持忙碌的状态,同时还可以防止过多的线程相互竞争而使应用程序耗尽内存或失败。

  • newFixedThreadPool 固定长度的线程池
  • newCachedThreadPool 可缓存的线程池
  • newSingleThreadExecutor 一个单线程的 Executor
  • newScheduledThreadPool 固定长度的线程池,而且以延迟或定时的方式来执行任务

Executor的生命周期

JVM只有在所有(非守护)线程全部终止后才会退出。为了解决执行服务的生命周期问题,Executor扩展了ExecutorService接口,添加了生命周期管理的方法

public interface ExecutorService extends Executor {
void shutdown();
List<Runnable> shutdownNow();
boolean isShutdown();
boolean isTerminated();
boolean awaitTermination(long timeout, TimeUnit unit) throws InterrutedException;
}

生命周期三种状态:运行/关闭/已终止。ExecutorService在初始创建时处于运行状态。shutdown将执行平缓的关闭过程:不再接受新的任务,同时等待已经提交的任务执行完成。shutdownNow将执行粗暴的关闭过程,并且不再启动队列中尚未开始执行的任务。
在Executor关闭后提交的任务将由“拒绝执行处理器(Rejected Execution Handler)”来处理,它会抛弃任务,或使得execute方法抛出一个未检查的RejectedExecutionException。调用awaitTermination来等待ExecutorService到达终止状态,或者通过调用isTerminated来轮询是否已经终止。通常在调用awaitTermination之后立刻调用shutdown,从而产生同步关闭ExecutorService的效果。

class LifecycleWebServer {
private static final int NTHREADS = 100;
private static final Executor exec = Executors.newFixedThreadPool(NTHREADS);

public void start() throws IOException {
ServerSocket socket = new ServerSocket(80);
while (!exec.isShutdown()) {
try {
final Socket conn = socket.accept();
exec.execute(new Runnable() {
public void run() { handleRequest(conn); }
})
} catch (RejectedExecutionException e) {
if (!exec.isShutdown) {
log("task submission rejected", e);
}
}
}
}
public void stop() {
exec.shutdown();
}

void handleRequest(Socket connection) {
Request req = readRequest(connection);
if (isShutdownRequest(req)) {
stop();
} else {
dispatchRequest(req);
}
}
}

延迟任务与周期任务

Timer类负责管理延迟任务以及周期任务,存在一些缺陷。应该考虑使用ScheduledThreadPoolExecutor来代替他。

Timer在执行所有定时任务时只会创建一个线程。如果某个任务的执行时间过长,将破坏其他TimerTask定时的精确性。

TImer线程并不捕捉异常,如果TimerTask抛出了一个未检查的异常,将终止定时线程。

找出可利用的并行性

Executor框架帮助指定执行策略,如果要使用Executor,必须将任务表述为一个Runnable。

示例:串行的页面渲染器

图像下载过程大部分时间都是在等待I/O操作执行完成,在这期间CPU几乎不做任何工作,串行执行方法并没有充分的利用CPU。

携带结果的任务Callable与Future

Executor框架使用Runnable作为其基本的任务表示形式。但它不能返回值或抛出受检查的异常。

Callable是一种更好的抽象:它认为主入口点(cal)将返回一个值,并可能抛出一个异常。

Runnable和Callable描述的都是抽象的计算任务。

Future表示一个任务的生命周期,并提供了相应的方法来判断是否已经完成或取消,以及获取任务的结果和取消任务等。

public interface Callable<V> {
V call() throws Exception;
}

public interface Future<V> {
boolean cancel(boolean mayInterruptIfRunning);
boolean isCancelled();
boolean isDone();
V get() throws InterruptedException, ExecutionException, CancellationException, TimeoutException;
}

ExecutorService中的所有submit方法都将返回一个Future,从而将一个Runnable或Callable提交给Executor,并得到一个Future用来获取任务的执行结果或者取消任务。还可以显示的为某个指定的Runnable或Callable实例化一个FutureTask。

protected <T> RunnableFuture<T> newTaskFor(Callable<T> task) {
return new FutureTask<T>(task);
}

在将Runnable或Callable提交到Executor的过程中,包含了一个安全发布过程,即将Runnable或Callable从提交线程发布到最终执行任务的过程。

示例:使用Future实现页面渲染器

将渲染分解为两个任务:渲染文本,下载图像。(CPU密集型,IO密集型)

在异构任务并行化中存在的局限

只有当大量相互独立且同构的任务可以并发处理时,才能体现出将程序的工作负载分配到多个任务中带来的真正性能提升。

CompletionService:Executor与BlockingQueue

ExecutorCompletionService在构造函数中创建一个BlockQueue来保存计算完成的结果

private class QueueingFuture<V> extends FutureTask<V> {
QueueingFuture(Callable<V> c) { supre(c); }
QueueingFuture(Runnable t, V r) { super(t, r); }

protected void done() {
completionQueue.add(this);
}
}

示例:使用CompletionService实现页面渲染器

通过CompletionService从两个方面来提高页面渲染器的性能:缩短总运行时间以及提高响应性。为每一幅图像的下载都创建一个独立任务,并在线程池中执行。通过从CompletionService中获取结果以及使每张图片在下载完成后立刻显示出来,能使用户获得一个更加动态和更高响应性的用户界面。

public class Renderer {
private final ExecutorService executor;

Renderer(ExecutorService executor) {
this.executor = executor;
}

void renderPage(CharSequeue source) {
List<ImageInfo> info = scanImageInfo(source);
CompletionService<ImageData> completionService = new ExecutorCompletionService<ImageData>(executor);
for (final ImageInfo imageInfo: info) {
completionService.submit(new Callable<ImageData>() {
public ImageData call() {
return imageInfo.downloadImage();
}
});
}
reunderText(source);

try {
for (int t = 0, n = info.size(); t < n; t++) {
Future<ImageData> f = completionService.take();
ImageData imageData = f.get();
renderImage(imageData);
}
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
} catch (ExecutionException e) {
throw launderThrowable(e.getCause());
}
}
}

为任务设置时限

当任务超时时应该立刻停止,从而避免为继续计算一个不再使用的结果而浪费计算资源。

Page renderPageWithAd() throws InterruptedException {
long endNanos = System.nanoTime() + TIME_BUDGET;
Future<Ad> f = exec.submit(new FetchAdTask());
Page page = renderPageBody();
Ad ad;
try {
long timeLeft = endNanos - System.nanoTime();
ad = f.get(timeLeft, NANOSECONDS);
} catch (ExecutionException e) {
ad = DEFAULT_AD;
} catch (TimeoutException e) {
ad = DEFAULT_AD;
f.cannel(true);
}
page.setAd(ad);
return page;
}

示例:旅行预订门户网站

invokeAll 创建n个任务,将其提交到一个线程池,保留n个Future,并使用限时的get方法通过Future串行地获取每一个结果

private class QuoteTask implements Callable<TravelQuote> {
private final TravelCompany company;
private final TravelInfo travelInfo;

public TravelQuote call() throws Exception {
return company.solicitQuote(travelInfo);
}
}
public List<TravelQuote> getRankedTravelQuotes(
TravelInfo travelInfo,
Set<TravelCompany> companies,
Comparator<TravelQuote> ranking,
long time,
TimeUnit unit) throws InterruptedException {

List<QuoteTask> tasks = new ArrayList<QuoteTask>();
for (TravelCompany company: companies) {
tasks.add(new QuoteTask(company, travelInfo));
}
List<Future<TravelQuote>> futures = exec.invokeAll(tasks, time, unit);

List<TravelQuote> quotes = new ArrayList<TravelQuote>(tasks.size());
Iterator<QuoteTask> taskIter = tasks.iterator();
for (Future<TravelQuote> f : futures) {
QuoteTask task = taskIter.next();
try {
quotes.add(f.get());
} catch (ExecutionException e) {
quotes.add(task.getFailureQuote(e.getCause()));
} catch (CancellationException e) {
quotes.add(task.getTimeoutQuote(e));
}
}
Collections.sort(quotes, ranking);
return quotes;
}