第14章 构建自定义的同步工具

创建状态依赖类的最简单方式通常是在类库中现有状在载赖类的基础上进行构造

状态依赖性的管理

可阻塞的状态依赖操作的结构

// acquire lock on object state
while (precondition dose not hold) {
    release lock
        wait until precondition might hold
        optionally fail if interrupted on timeout expires
            reacquire lock
}
perform action
    release lock

有界缓存实现的基类

@ThreadSafe
public abstract class BaseBoundedBuffer<V> {
    @GuardedBy("this")
    private final V[] buf;
    @GuradedBy("this")
    private final int tail;
    @GuardedBy("this")
    private final int head;
    @GuardedBy("this")
    private final int count;
    
    protected BaseBoundedBuffer(int capacity) {
        this.buf = (V[])new Object[capacity];
    }
    protected synchronized final void doPut(V v) {
        buf[tail] = v;
        if (++tail == buf.length) {
            tail = 0;
        }
        ++count;
    }
    protected synchronized final V doTake() {
        V v = buf[head];
        buf[head] = null;
        if (++head == buf.length) {
            head = 0;
        }
        --count;
        return v;
    }
    public synchronized final boolean isFull() {
        return count == buf.length;
    }
    public synchronized final boolean isEmpty() {
        return count == 0;
    }
}

示例:将前提条件的失败传递给调用者

当不满足前提条件时,有界缓存不会执行相应操作

@ThreadSafe
public class GrumpyBoundedBuffer<V> extends BaseBoundedBuffer<V> {
    public GrumpyBoundedBuffer(int size) {
        super(size);
    }
    
    public synchronized void put(V v) throws BufferFullException {
        if (isFull()) {
            throw new BufferFullException();
        }
        doPut(v);
    }
    public synchronized V take() throws BufferEmptyException {
        if (isEmpty()) {
            throw new BufferEmptyException();
        }
        return doTake();
    }
}

示例:通过轮询与休眠来实现简单的阻塞

@ThreadSafe
public class SleepyBoundedBuffer<V> extends BaseBoundedBuffer<V> {
    public SleepyBoundedBuffer(int size) {
        super(size);
    }
    public void put(V v) throws InterruptedException {
        while (true) {
            synchronized (this) {
                if (!isFull()) {
                    doPut(v);
                    return;
                }
            }
            Thread.sleep(SLEEP_GRANULARITY);
        }
    }
    public V take() throws InterruptedException {
        while (true) {
            synchronized (this) {
                if (!isEmpty()) {
                    return doTake();
                }
            }
            Thread.sleep(SLEEP_GRANULARITY);
        }
    }
}

条件队列

使用条件队列实现的有界缓存

@ThreadSafe
public class BoundedBuffer<V> extends BaseBoundedBuffer<V> {
    public BoundedBuffer(int size) {
        super(size);
    }
    
    // 阻塞直到 not-full
    public synchronized void put(V v) throws InterruptedException {
        while (isFull()) {
            wait();
        }
        doPut(v);
        notfiyAll();
    }
    // 阻塞直到 not-empty
    public synchronized V take() throws InterruptedException {
        while (isEmpty()) {
            wait();
        }
        V v = doTake();
        notifyAll();
        return v;
    }
}

使用条件队列

条件队列使构建高效以及高可响应性的状态依赖类变得更容易, 但同时也很容易被不正确地使用。

条件谓词

关键找出对象在哪个条件谓词上等待

每次wait调用都会隐式的与特定的条件谓词关联起来。当调用某个特定条件谓词的wait时,调用者必须已经持有与条件队列相关的锁,并且这个锁必须保持着构成条件谓词的状态变量

过早唤醒

内置条件队列可以与多个条件谓词一起使用。当一个线程由于调用 notifyAll 而醒来时,并不意味该线程正在等待的条件谓词已经变成真了。

所以,每当线程从wait中唤醒时,都必须再次测试条件谓词

void stateDependentMethod() throws InterruptedException {
    synchronized (lock) {
        while (!conditionPredicate()) {
            lock.wait();
        }
    }
}

当使用条件等待时(Object.wait 或 Condition.await)

  • 通常都有一个条件谓词–包括一些对象的状态的测试,线程在执行前必须首先通过这些测试
  • 在调用wait之前测试条件谓词,并且从wait中返回时再次进行测试
  • 在一个循环中调用wait
  • 确保使用与条件队列相关的锁来保护构成条件谓词的各个状态变量
  • 当调用wait,notify,notifyAll等方法时,一定要持有与条件队列相关的锁
  • 在检查条件谓词之后以及开始执行相关的操作之前,不要释放锁

丢失的信号

丢失的信号是指:线程必须等待一个已经为真的条件,但在开始等待之前没有检查条件谓词。

通知

每当在等待一个条件时,一定要确保在条件谓词变为真时通过某种方式发出通知

只有同时满足两个条件,才能用单一的notify而不是notifyAll

  • 所有等待线程的类型都相同。只有一个条件谓词与条件队列相关,并且每个线程在从wait返回后将执行相同的操作

  • 单进单出。在条件变量上的每次通知,最多只能唤醒一个线程来执行。

普遍认可的做法是忧先使用 notifyAll 而不是notify. 虽然 notifyAll 可能比 notiy更低效,但却更容易确保类的行为是正确的。

在每个结程执行一个事件的同时,将出现大量的上下文切换操作以及发生竞争的锁获取操作。(最坏的情况是,在使notifyAll 时将导致 \(O(n^2)\) 次唤醒操作,而实际上只需要 n次唤醒操作就足够了)

public synchronized void put(V v) throws InterruptedException {
    while (ifFull()) {
        wait();
    }
    boolean wasEmpty = isEmpty();
    doPut(v);
    if (wasEmpty) {
        notifyAll();
    }
}

示例:阀门类

使用wait和notifyAll来实现可重新关闭的阀门

@ThreadSafe
public class ThreaadGate {
    @GuardedBy("this")
    private boolean isOpen;
    @GuardedBy("this")
    private int generation;
    
    public synchronized void close() {
        isOpen = false;
    }
    
    public synchronized void open() {
        ++generation;
        isOpen = true;
        notifyAll();
    }
    
    public synchronized void await() throws InterruptedException {
        int arrivalGeneration = generation;
        while (!isOpen && arrivalGeneration == generation) {
            wait();
        }
    }
}

子类的安全问题

如果在实施子类化时违背了条件通知或单次通知的某个需求,那么在子类中可以增加合适的通知机制来代表基类。

封装条件队列

通常,我们应该把条件队列封装起来,因而除了使用条件队列的类,就不能在其他地方坊访问它。

入口协议与出口协议

人口协议和出口协议( Entry and Exit Protocols )”来描述wait notify方法的正确使用。对于每个依赖状态的操件,以及每个修改其他操作依赖状态的操作,都应该定义一个入口协议和出口协议。入口协议就是该操作的条件谓词,出口协议则包括,检查被该操作修改的所有状态变量,并确认它们是否使某个其他的条件谓词变为真,如是,则通知相关的条件队列。

显式的Condition对象

Condition也是一种广义的内置条件队列

public interface Condition {
    void await() throws InterruptedException;
    boolean await(long time, TimeUnit unit) throws InterruptedException;
    long awaitNanos(long nanosTImeout) throws InterruptedException;
    void awaitUninterruptibly();
    boolean awaitUntil(Date deadline) throws InterruptedException;;
    
    void signal();
    void signalAll();
}

Condition比内置条件队列提供了更丰富的功能:在每个锁上可存在多个等待/条件等待可以时可中断或不可中断,基于时限的等待,以及公平的或非公平的队列操作

@ThreadSafe
public class ConditionBoundedBuffer<T> {
    protected final Lock lock = new ReentrantLock();
    private final Condition notFull = lock.newCondition();
    private final Condition notEmpty = lock.newCondition();
    @GuardedBy("lock")
    private final T[] items = (T[])new Object[BUFFER_SIZE];
    @GuardedBy("lock")
    private int tail, head, count;
    
    // 阻塞直到notFull
    public void put(T x) throws InterruptedException {
        lock.lock();
        try {
            while (count == items.length) {
                notFull.await();
            }
            items[tail] = x;
            if (++tail == items.length) {
                tail = 0;
            }
            ++count;
            notEmpty.signal();
        } finally {
            lock.unlock();
        }
    }
    // 阻塞直到notEmpty
    public T take() throws InterruptedException {
        lock.lock();
        try {
            while (count == 0) {
                notEmpty.await();
            }
            T x = items[head];
            items[head] = null;
            if (++head == items.length) {
                head = 0;
            }
            --count;
            notFull.signal();
            return x;
        } finally {
            lock.unlock();
        }
    }
}

Synchronized剖析

AQS 是一个用于构建锁和同步器的框架,许多同步器都可以通过 AQS 很容易并且高效地构造出来。

使用Lock来实现信号量

@ThreadSafe
public class SemaphoreOnLock{
    private final Lock lock = new ReentrantLock();
    private final Condition permitsAvailable = lock.newCondition();
    @GuardedBy("this")
    private int permits;
    
    SemaphoreOnLock(int initialPermits) {
        lock.lock();
        try {
            permits = initialPermits;
        } finally {
            lock.unlock();
        }
    }
    
    public void acquire() throws InterruptedException {
        lock.lock();
        try {
            while (permits <= 0) {
                permitsAvailable.await();
                --permits;
            } finally {
                lock.unlock();
            }
        }
    }
    
    public void release() {
        lock.lock();
        try {
            ++permits;
            permitsAvailable.signal();
        } finally {
            lock.unlock();
        }
    }
}

AbstractQueuedSynchronizer

AQS中获取和释放操作的标准形式

boolean acquire() throws InterruptedException {
    while (当前状态不允许获取操作) {
        if (需要阻塞获取请求) {
            如果当前线程不在队列中则将其插入队列
            阻塞当前线程
        } else {
            返回失败
        }
    }
    可能更新同步器的状态
    如果线程位于队列中则将其移除队列
    返回成功
}

void release() {
    更新同步器的状态
    if (新的状态允许某个被阻塞的线程获取成功) 
        接触队列中一个或多个线程的阻塞状态
}

使用AbstractQueueSynchronizer实现的二元闭锁

@ThreadSafe
public class OneShotLatch{
    private final Sync sync = new Sync();
    
    public void signal() {
        sync.releaseShared(0);
    }
    public void await() throws InterruptedException {
        sync.acquireSharedInterruptibly(0);
    }
    private class Sync extends AbstractQueueSynchronizer {
        protected int tryAcquireShared(int ignored) {
            return (getState() == 1) ? 1 : -1;
        }
        protected boolean tryReleaseShared(int ignored) {
            setState(1);
            return true;
        }
    }
}

java.until.concurrent同步器类中的AQS

ReentrantLock

ReentrantLock 只支持强占方式的获取操作,因此它实现了tryAcquire tryRelease和isHeldExclusively

基于非公平的ReentrantLock实现tryAcquire

protected boolean tryAcquire(int ignored) {
    final Thread current = Thread.currentThread();
    int c = getState();
    if (c == 0) {
        if (compareAndSetState(0, 1)) {
            owner = current;
            return true;
        }
    } else if (current == owner) {
        setState(c + 1);
        return true;
    }
    return false;
}

Semaphore与CountDownLatch

Semaphore中的tryAcquireShared与tryRleaseShared

protected int tryAcquireShared(int acquires) {
    while (true) {
        int available = getState();
        int remaining = available - acquires;
        if (remaining < 0 || compareAndSetState(available, remaining)) {
            return remaining;
        }
    }
}
protected boolean tryReleaseShared(int releases) {
    while (true) {
        int p = getState();
        if (compareAndSetState(p, p + releases)) {
            return true;
        }
    }
}

FutureTask

FutureTask 中, AQS 同步状态被用来保存任务的状态,例如,正在运行、已完成或已取消。 FutureTask 还维护一些额外的状态变量,用来保存计算结果或者抛出的异常。此外,它还维护了一个引用,指向正在执行计算任务的线程〈如果它当前处于运行状态〉,因而如果任务取消,该线程会中断。

ReentrantReadWriteLock

ReadWriteLock 接口表示存在两个锁:一个读取锁和一个写入锁,但在基于 AQS 实现ReentrantReadWriteLock 中,单个 AQS子类将同时管理读取加锁如写入加锁。 ReentrantReadWriteLock 使用了一个 16 位的状态来表示在写入锁的计数,并且使用了另…个 16 位的状态来表示读取锁的计数。在读取锁上的操作将使用共享的获取方法与释放方法,在写入锁上的操作将使用独占的获取方法与释放方法。