第12章 并发程序的测试

安全性测试:不发生任何错误的行为

活跃性测试:某个良好的行为终究会发生

性能测试:

  • 吞吐量 一组并发任务中已完成任务所占的比例
  • 响应性 请求从出发到完成之间的时间(延迟)
  • 可伸缩性 增加更多资源的情况下(通常指CPU)吞吐量(或者缓解短缺)的提升情况

正确性测试

基于信号量的有界缓存

@ThreadSafe
public class BoundedBuffer<E> {
    private final Semaphore availableItems, availableSpaces;
    @GuardedBy("this")
    private final E[] items;
    @GuardedBy("this")
    private final int putPosition = 0, takePosition = 0;
    
    public BoundedBuffer(int capacity) {
        availableItems = new Semaphore(0);
        availableSpaces = new Semaphore(capacity);
        items = (E[]) new Object[capacity];
    }
    public boolean isEmpty() {
        return availableItems.availablePermits() == 0;
    }
    public boolean isFull() {
        return availableSpace.availablePermits() == 0;
    }
    public void put(E x) throws InterruptedException {
        availableSpace.acquire();
        doInsert(x);
        availableItems.release();
    }
    public E take() throws InterruptedException{
        availableItems.acquire();
        E item = doExtract();
        availableSpace.release();
        return item;
    }
    private synchronized void doInsert(E x) {
        int i = putPosition;
        items[i] = x;
        putPosition = (++i == items.length) ? 0 : i;
    }
    private synchronized E doExtract() {
        int i = takePosition;
        E x = items[i];
        items[i] = null;
        takePosition = (++i == item.length) ? 0 : i;
        return x;
    }
}

基本的单元测试

不变性条件:新建立的缓存应该是空的,插入N个元素到容量N的缓存中应该是满的

class BoundedBufferTest extends TestCase {
    void testIsEmptyWhenConstructed() {
        BoundedBuffer<Integer> bb = new BoundedBuffer<Integer>(10);
        assertTure(bb.isEmpty());
        assertFalse(bb.isFull());
    }
    void testIsFullAfterPuts() throws InterruptedException {
        BoundedBuffer<Integer> bb = new BoundedBuffer<Integer>(10);
        for (int i = 0; i < 10; i++) {
            bb.put(i);
        }
        assertTrue(bb.isFull());
        assertFalse(bb.isEmpty());
    }
}

对阻塞操作的测试

void testTakeBlockWhenEmpty () {
    final BoundedBuffer<Integer> bb = new BoundedBuffer<Integer>(10);
    Thread taker = new Thread() {
        public void run() {
            try {
                int unused = bb.take();
                fail();
            } catch (InterruptedException success) {}
        }
    };
    try {
        taker.start();
        Thread.sleep(LOCKUP_DETECT_TIMEOUT);
        taker.interrupt();
        take.join(LOCKUP_DETECT_TIMEOUT);
        assertFalse(taker.isAlive());
    } catch (Exception unexception) {
        fail();
    }
}

Thread.getState验证线程能否在一个条件等待上阻塞并不可靠,JVM可以选择通过自旋来实现阻塞。

安全性测试

要开发一个良好的并发测试程序,或许比开发这些程序要测试的类更加困难。

在构建对并发类的安全性测试中,需要解决的关键问题在于,要找出那些容易检查的属性,这些属性在发生错误的情况下极有可能失败,同时又不会使得错误检查代码人为的限制并发性。理想情况下,在测试属性中不需要任何同步机制

避免编译器预先猜测到校验和的值,应该采用随机方式生成测试数据(RNG Random Number Generator)。

与其使用通用RNG,不如使用一些简单的伪随机函数

// 测试生产者-消费者程序
public class PutTakeTest {
    private static final ExecutorService pool = Executors.newCachedThreadPool();
    private final AtomicInteger putSum = new AtomicInteger(0);
    private final AtomicInteger takeSum = new AtomicInteger(0);
    private final CyclicBarrier barrier;
    private final BoundedBuffer<Integer> bb;
    private final int nTrials, nPairs;
    
    public static void main(String[] args) {
        new PutTakeTest(10, 10, 10000).test();
        pool.shutdown();
    }
    PutTakeTest(int capacity, int npairs, int ntrials) {
        this.bb = new BoundedBuffer<Integer>(capacity);
        this.nTrials = ntrials;
        this.nPairs = npairs;
        this.barrier = new CyclicBarrier(npairs * 2 + 1);
    }
    void test() {
        try {
            for (int i = 0; i < nPairs; i++) {
                pool.execute(new Producer());
                pool.execute(new Consumer());
            }
            barrier.await();
            barrier.await();
            assertEquals(putSum.get, takeSum.get());
        } catch (Exception e) {
            throw new RuntimeException(e);
        }
    }
}

class Producer implenments Runnable {
    public void run() {
        try {
            int seed = this.hashCode() ^ (int)System.nanoTime();
            int sum = 0;
            barrier.await();
            for (int i = nTrials; i > 0; i--) {
                bb.put(seed);
                sum += seed;
                sedd = xorShift(seed);
            }
            putSum.getAndAdd(sum);
            barrier.await();
        } catch (Exception e) {
            throw new RuntimeException(e);
        }
    }
}

class Consumer implements Runnable {
    public void run() {
        try {
            barrier.await();
            int sum = 0;
            for (int i = nTrials; i > 0; --i) {
                sum += bb.take();
            }
            takeSum.getAndAdd(sum);
            barrier.await();
        } catch (Exception e) {
            throw new RuntimeException(e);
        }
    }
}

资源管理的测试

类中有没有做它不该做的事情,如资源泄露。

class Big {
    double[] data = new double[100000];
}
void testLeak() throws InterruptedException {
    BoundedBuffer<Big> bb = new BoundedBuffer<Big>(CAPACITY);
    int heapSize1 = /* 堆快照 */;
    for (int i = 0; i < CAPACITY; i++) {
        bb.put(new Big());
    }
    for (int i = 0; i < CAPACITY; i++) {
        bb.take();
    }
    int heapSize2 = /* 堆快照 */;
    assertTure(Math.abs(heapSize1 - heapSize2) < THRESHOLD);
}

使用回调

回调函数的执行通常是在对象生命周期的一些已知位置上,并且在这些位置上非常适合判断不变性条件是否被破坏。

class TestingThreadFactory implements ThreadFactory {
    public final AtomicInteger numCreated = new AtomicInteger();
    private final ThreadFactory factory = Executors.defaultThreadFactory();
    
    public Thread newThread(Runnable r) {
        newCreated.incrementAndGet();
        return factory.newThread(r);
    }
}

public void testPoolExpansion() throws InterruptedException {
    int MAX_SIZE = 10;
    ExecutorService exec = Executors.newFixedThreadPool(MAX_SIZE);
    for (int i = 0; i < 10 * MAX_SIZE; i++) {
        exec.execute(new Runnable() {
            try {
            	Thread.sleep(Long.MAX_VALUE);
        	} catch (InterruptedException e) {
            	Thread.currentThread().interrupt();
        	}
        });
    }
    for (int i = 0; i < 20 && threadFactory.numCreated.get() < MAX_SIZE; i++) {
        Thread.sleep(100);
    }
    assertEquals(threadFactory.numCreated.get(), MAX_SIZE);
    exec.shutdown();
}

产生更多的交替操作

有一种有用的方法可以提高交替操作的数量, 以便能更有效地搜索程序的状态空阔: 在访问共享状态的操作中,使用 Thread.yield 将产生更多的上下文切换。〈这项技术的有效性与具体的平台相关,因为 JVM 可以将 Thread.yield 作为一个空操作〈no-op)[JLS 17. 9]。如果使用一个睡眠时间较短的 sleep ,那么虽然更慢些,但却更可靠。

两个账户之间转账操作,在两次更新操作之间,所有账户的综合应该等于零,通过调用yield把一些对执行时序敏感的错误暴露出来。这种方法需要在测试中添加一些调用,并在正式环境中删除这些调用。AOP可以降低这些不便性。

public synchronized void transferCredits(Account from, Account to, int amount) {
    from.setBalance(from.getBalance() - amount);
    if (random.nextInt(1000) > THRESHOLD)
        Thread.yield();
    to.setBalance(to.getBalance() + amount);
}

性能测试

性能测试通常是功能测试的延伸。

在PutTakeTest中增加计时功能

多种算法的比较

响应性衡量

避免性能测试的陷阱

垃圾回收

垃圾回收的执行时序是无法预测的,因此在执行测试时,垃圾回收期可能在任何时刻运行。

第一种策略是,确保垃圾回收操作在测试运行的整个期间都不会运行(可以在调用 JVM 指定-verbose: gc 来判断是否执行〉。第二种策略是,确保垃圾回收操作在测试期间执行多次,这样测试程序就能充分反映运行期间的内存分就与垃圾回收等开销。通常第二策略更好,它要求更长的测试时间,并且更有可能反映实际环境下的性能。

动态编译

在 HotSpot JVM (以及其他现代的 JVM )中将字节码的解释与动态编译结合起来使用。

有一种方式可以防止动态编译对测试结果产生偏差,就是使程序运行足够长的时向(至少数分钟〉,这样编译过程以及解释执行都只是总运行时间的很小一部分。另一种方法是使代码预先运行一段时间并且不测试这段时间内的代同性能,这样在开始计时前代码就已往被完全编译了。

对代码路径的不真实采样

动态编译器可能会针对一个单线程测试程序进行一些专门优化,但只要在真实的应用程序中略微包含一些并行,都会使这略战化不复存在。因此,即使只是想测试单线程的性能,也应该将单线程的性能测试与多线程的性能测试结合在一起。

不真实的竞争程度

要获得有实际意义的结果,在并发性能测试中应该尽量模拟典型应用程序中的线程本地计算量以及并发协调开销。

无用代码消除

优化编译器能找出并消除那些不会对输出结果产生任何影响的无用代码(Dead Code)。由于基准测试通常不会执行任何计算,因此它们很容易在编译器的优化过程中被消除

其他的测试方法

测试的目标不是更多的发现错误,而是提高代码能够按照预期方式工作的可信度。由于找出所有的错误是不现实的,所以质量保证( Quality Assurance, QA )的目标应该是在给定的测试资源下实现最高的可信度。

代码审查

多人参与的代码审查通常是不可替代的

静态分析工具

在一些静态分析工具〈例如,开源的 FindBugs )中包含了许多错误模式检查器,能够检测出多种常见的编码错误,其中许多错误都很容易在测试与代码审查中遗漏。

面向方面的测试技术

AOP可用来确保不变性条件不被破坏,或者与同步策略的某些方面保持一致

分析与检测工具

大多数商业分析工具都支持钱程。这些工具在功能与执行效率上存在着差异,但通常都能给出对程序内部的详细信息