Skip to main content

第15章 原子变量与非阻塞同步机制

Semaphore和CurrentLinkedQueue都提供了比synchronized机制更高性能和可伸缩性,原子变量和非阻塞的同步机制

锁的劣势

与锁相比,volatile变量时一种更轻量级的同步机制,局限:虽然他们提供了相似的可见性保证,但不能用于构建原子的复合操作。++i包含了3个独立的操作

Counter是线程安全的,并且在没有竞争的情况下能运行得很好。但在竞争的情况下,其性能由于上下文的切换的开销和调度延迟而降低。如果锁的时间非常短,那么在当不恰当的时间请求锁时,使线程休眠将付出很高的代价。

锁定还存在其他一些缺点:当一个线程正在等待锁时,它不能做任何其他事情。如果一个线程在持有锁的情况下被延迟执行(缺页错误,调度延迟)那么所需要锁的线程都无法执行下去。如果被阻塞的线程的优先级高,而持有锁的线程优先级较低(优先级反转 Priority Inversion)高优先级的线程可以抢先执行,但仍然需要等待锁被释放

锁定方式对于细粒度的操作时一种高开销的机制

硬件对并发的支持

独占锁时一项悲观技术,它假设最坏的情况,并且只有在确保其他线程不会造成干扰的情况下能执行下去

对于细粒度的操作,乐观锁更高效

比较并交换

在大多数处理器架构〈包括 IA32和Spare )中采用的方法是实现一个比较井交换( CAS)指令。

非阻塞的计数器

基于CAS实现的非阻塞计数器

@ThreadSafe
public class CasCounter {
private SimulatedCAS value;

public int getValue() {
return value.get();
}
public int increment() {
int v;
do {
v = value.get();
}
while (v != value.compareAndSwap(v, v + 1));
return v + 1;
}
}

当竞争程序不高时,基于CAS的计数器在性能上远远超过了基于锁的计数器,CAS在大多数情况下都能成功执行,因此硬件能够正确预测while循环中的分支,从而把复杂控制逻辑的开销降至最低

CAS主要缺点时,它将使调用者处理竞争问题(通过重试,回退,放弃),而在锁中能自动处理竞争问题(线程在获得锁之前将一直阻塞)

在多CPU系统中CAS需要10到150个时钟周期的开销

在大多数处理器上,在无竞争的锁获取和释放的快速代码路径上的开销,大约时CAS开销的两倍

JVM对CAS的支持

Java 5.0 中引入了底层的支持,在 int,long和对象的引用等类型上都公开了CAS 操作,并且 JVM 把它们编译为底层硬件提供的最有效方法。在支持 CAS 的平台上,运行时把它们编译为相应的〈多条〉机器指令。在最坏的情况下,如果不支持 CAS 指令,那么 JVM 将使用自旋锁。

原子变量类

原子变量比锁的粒度更细,量级更轻,并且对于在多处理器系统上实现高性能的并发代码来说是非常关键的。原子变量将发生竞争的范围缩小到单个变量上,这是你获得的粒度最细的情况

标量类(Scalar),更新器类,数组类,复合变量类

原子变量是一种“更好的volatile

通过CAS来维持包含多个变量的不变性条件

public class CasNumberRange {
@Immutable
private static class IntPair {
final int lower;
final int upper;
}
private final AtomicReference<IntPair> values = new AtomicReference<IntPair>(new IntPair(0, 0));

public int getLower() {
return values.get().lower;
}
public int getUpper() {
return values.get().upper;
}
public void setLower(int i) {
while (true) {
IntPair oldv = values.get();
if (i > oldv.upper) {
throw new IllegalArgumentException("Can't set lower to " + i + " > upper");
}
if (values.compareAndSet(oldv, newv))
return;
}
}
}

性能比较:锁与原子变量

基于ReentrantLock实现的随机数生成器

@ThreadSafe
public class ReentrantLockPseudoRandom extends PseudoRandom {
private final Lock lock = new ReentrantLock(false);
private int seed;

ReentrantLockPseudoRandom(int seed) {
this.seed = seed;
}
public int nextInt(int n) {
lock.lock();
try {
int s = seed;
seed = calculateNext(s);
int remainder = s % n;
return remainder > 0 ? remainder : remainder + n;
} finally {
lock.unlock();
}
}
}

基于AtomicInterger实现的随机数生成器

@ThreadSafe
public class AtomicPseudoRandom extends PseudoRandom {
private AtomicInterge seed;

AtomicPseudoRandom(int seed) {
this.seed = new Atomicinteger(seed);
}
public int nextInt(int n) {
while (true) {
int s = seed.get();
int nextSeed = calculateNext(s);
if (seed.compareAndSet(s, nextSeed)) {
int remainder = s % n;
return remainder > 0 ? remainder : remainder + n;
}
}
}
}

锁与原子变量在不同竞争程度上的性能差异主是很好地说明了各自的优势和劣势。在中低程度的竞争下,原子变量能提供更高的可掉缩性,而在高强度的竟争下,锁能够更有效避免竞争

非阻塞算法

非阻塞的栈

创建非阻塞算法的关键在于,找出如何将原子修改的范围缩小到单个变量上,同时还要维护数据的一致性。在链式容器类(例如队列〉中,有时候无须将状态转换操作表示为对节点链接的修改,也无须使用AtomicReference 来表示每个必须采用原子操作来更新的链接。

程序清单15-6 使用Treiber算法构造的非阻塞栈

@ThreadSafe
public class ConcurrentStack<E> {
AtomicReferenct<Node<E>> top = new AtomicReference<Node<E>>();

public void push(E item) {
Node<E> newHead = new Node<E>(item);
Node<E> oldHead;
do {
oldHead = top.get();
newHead.next = oldHead;
} while (!top.compareAndSet(oldHead, newHead));
}
public E pop() {
Node<E> oldHead;
Node<E> newHead;
do {
oldHead = top.get();
if (oldHead == null) {
return null;
}
newHead = oldHead.next;
} while (!top.compareAndSet(oldHead, newHead));
return oldHead.item;
}
private static class Node<E> {
public final E item;
public Node<E> next;

public Node(E item) {
this.item = item;
}
}
}

非阻塞的链表

非阻塞算法中的插入算法

@ThreadSafe
public class LinkedQueue<E> {
private static class Node<E> {
final E item;
final AtomicReference<Node<E>> next;

public Node(E item, Node<E> next) {
this.item = item;
this.next = new AtomicReference<Node<E>>(next);
}
}
private final Node<E> dummy = new Node<E>(null, null);
private final AtomicReference<Node<E>> head = new AtomicReference<Node<E>>(dummy);
private final AtomicReference<Node<E>> tail = new AtomicReference<Node<E>>(dummy);

public boolean put(E item) {
Node<E> newNode = new Node<E>(item, null);
while (true) {
Node<E> curTail = tail.get();
Node<E> tailNext = curTail.next.get();
if (curTail == tail.get()) {
if (curTail != null) {
// 队列处于中间状态,推进尾节点
tail.compareAndSet(curTail, tailNext);
} else {
// 处于稳定状态,尝试插入新节点
if (curTail.next.compareAndSet(null, newNode)) {
// 插入成功,尝试推进尾节点
tail.compareAndSet(curTail, newNode);
return true;
}
}
}
}
}
}

原子的域更新器

在ConcurrentLinkedQueue中使用原子的域更新器

private class Node<E> {
private final E item;
private volatile Node<E> next;

public Node(E item) {
this.item = item;
}
}
private static AtomicReferenceFieldUpdate<Node, Node> nextUpdater = AtomicReferenceFieldUPdater.newUpdater(Node.class, Node.class, "next");

原子的域更新器类表示现有 volatile 域的一种基于反射的“视圈”,从而能够在已有volatile 域上使用 CAS 。

ABA问题

不是更新某个引用的值,雨是更新两个值,包括一个引用和一个版本号。