java.util.concurrent 包包含了几个能帮助人们管理相互合作的线程集的类。这些机制具有为线程之间的共用集结点模式提供的“预置功能”。如果有一个相互合作的线程集满足这些行为模式之一,那么应该直接重用合适的库类而不要试图提供手工的锁与条件的集合。
- 同步工具类可以是任何一个对象,只要它根据自身的状态来协调线程的控制流
阻塞队列
可以作为同步工具类,其他类型的同步工具还有信号量(Semaphore)
,栅栏(Barrier)
以及闭锁(Latch)
- 所有同步工具类都包含一些特定的结构化属性:
- 封装了一些状态(决定线程是继续执行还是等待)
- 提供一些方法对状态进行操作
- 另一些方法用于高效地等待同步工具进入到预期状态
类 | 它能做什么 | 说明 |
---|---|---|
CyclicBarrier | 允许线程集等待直至其中预定数目的线程到达一个公共障栅,然后可以选择执行一个处理障栅的工作 | 当大量的线程需要在它们的结果可用之前完成时 |
Phaser | 类似于循环障栅,不过有一个可变的计数 | Java SE 7 中引入 |
CountDownLatch | 允许线程集等待直到计数器减为 0 | 当一个或多个线程需要等待直到指定数目的事件发生 |
Exchanger | 允许两个线程在要交换的对象准备好时交换对象 | 当两个线程工作在同一个数据结构的两个实例上的时候,一个向实例添加数据而另一个从实例清除数据 |
Semaphore | 允许线程集等待直到被允许继续运行为止 | 限制访问资源的线程总数。如果许可数是 1,常常阻塞线程直到另一个线程给出许可为止 |
SynchronousQueue | 允许一个线程把对象交给另一个线程 | 在没有显式同步的情况下,当两个线程准备好将一个对象从一个线程传递到另一个时 |
1 信号量(Semaphore)
信号量(Semaphore)可以控同时访问的线程个数。
计数信号量(Counting Semaphore)
用来控制同时访问某个特点资源的操作数量,或者同时执行某个指定操作的数量Semaphore
中管理着一组虚拟的许可,许可的初始数量可通过构造函数来指定
在执行操作时可以先获得许可,并在使用后release
释放许可给信号量
如果没有许可,那么acquire
将阻塞直到有许可Sequence
可以用于实现资源池,也可以将任何一种容器变成有界阻塞容器
方法
- acquire() :获取一个许可,如果没有就阻塞等待
- release() :释放一个许可。
- tryAcquire():非阻塞
- availablePermits():得到可用的许可数目
private Semaphore semaphore = new Semaphore(1);
semaphore.acquire();
Thread.currentThread().getName()
semaphore.release();
2 闭锁(CountDownLatch)
CountDownLatch 让一个线程集等待直到计数变为 0。倒计时门栓是一次性的。一旦计数为 0,就不能在重用了。
- 闭锁相当于一扇门:
- 在闭锁到达结束状态之前,这扇门一直是关闭的,并且没有任何线程能通过,当到达结束状态时,这扇门就会打开并允许所有线程通过
- 当闭锁到达结束状态后,将不会再改变状态,因此这扇门将永远保持打开状态
- 闭锁可以用来确保某些活动直到其他活动都完成后才能执行
- 闭锁状态包括:
- 一个计数器,该计数器被初始化为一个正数,表示需要等待的事件数量
countDown
方法递减计数器,表示有一个事件已经发生了await
方法等待计数器达到零,表示所有需要等待的事件都已经发生- 如果计数器的值非零(或者等待的线程中断/超时),那么
await
会一直阻塞到计数器为零
方法
- await():线程挂起,直到count值为0
- countDown():将count值减1
final CountDownLatch latch = new CountDownLatch(2);
latch.countDown();
latch.await();
3 栅栏(CyclicBarrier)
CyclicBarrier 类实现让一组线程等待至某个状态(barrier)之后再全部同时执行。当所有等待线程都被释放以后,CyclicBarrier可以被重用。
栅栏
与闭锁
的关键区别在于,所有线程必须都到达栅栏位置才能继续执行
闭锁
用于等待事件,而栅栏
用于等待其他线程
- 如果对
await
的调用超时,或者await
阻塞的线程被中断,那么栅栏就被认为是打破了,所有阻塞的await
调用都终止并抛出BrokenBarrierException
- 在模拟程序中通常需要使用栅栏(eg.等待所有玩家加载完毕开启游戏)
- 另一种形式的栅栏是
Exchanger
,它是一种两方(Two-Party)栅栏
,各方在栅栏位置上交换数据
当双方执行不对称时,Exchanger
非常有用
方法
- await():线程挂起,直至所有线程都到达 barrier 状态
//构造一个障栅,并给出参与的线程数
CyclicBarrier barrier = new CyclicBarrier(nthreads);
//每一个线程做一些工作,完成后在障栅上调用 await
public void run(){
doWork();
brrier.await();
...
}
如果任何一个在障栅上等待的线程离开了障栅,那么障栅就被破坏了。在这种情况下,所有其他线程的 await 方法抛出 BrokenBarrierException 异常。那些已经在等待的线程立即终止 await 的调用。
3.1 可选的障栅
可以提供一个可选的障栅(barrier action),当所有线程到达障栅的时候就会执行这一动作。
Runnable barrierAction = ...;
CyclicBarrier barrier = new CyclicBarrier(nthreads, barrierAction);
该动作可以收集那些单个线程的运行结果。
Phaser 类增加了更大的灵活性,允许改变不同阶段中参与线程的个数。
4 交换器(Exchanger)
当两个线程在同一数据缓冲区的两个实例上工作的时候,就可以使用交换器(Exchanger)。典型的情况是,一个线程向缓冲区填入数据,另一个线程消耗这些数据。当它们都完成以后,相互交换缓冲区。
5 同步队列(SynchronousQueue)
同步队列是一种将生产者与消费者线程配对的机制。当一个线程调用 SynchronousQueue 的 put 方法时,它会阻塞直到另一个线程调用 take 方法为止,反之亦然。与 Exchanger 的情况不同,数据仅仅沿一个方向传递,从生产者到消费者。
即使 SynchronousQueue 类实现了 BlockingQueue 接口,概念上讲,它依然不是一个队列。它没有包含任何元素,它的 size 方法总是返回 0。 对于许多线程问题,可以通过使用一个或多个队列以优雅且安全的方式将其形式化。
5.1 队列方法
- add 添加一个元素,如果队列满则抛错
- element 返回队列的头元素,如果队列空则抛错
- offer 添加一个元素返回true,如果队列满则返回false
- peek 返回队列头元素,如果队列空,返回null
- poll 移出并返回队列头元素,如果队列空,返回null
- put 添加一个元素,如果队列满则阻塞
- remove 移出并返回队列头元素,如果空则抛错
- take 移出并返回队列头元素,如果空则阻塞
5.2 八种队列
- ArrayBlockingQueue:一个由数组结构组成的有界阻塞队列。
- LinkedBlockingQueue:一个由链表结构组成的有界阻塞队列。
- PriorityBlockingQueue:一个支持优先级排序的无界阻塞队列。
- DelayQueue:一个使用优先级队列实现的无界阻塞队列。
- SynchronousQueue:一个不存储元素的阻塞队列。
- LinkedTransferQueue:一个由链表结构组成的无界阻塞队列。
- LinkedBlockingDeque:一个由链表结构组成的双向阻塞队列。
- LinkedTransferQueue:1.7 增加的,允许生产者线程等待,直到消费者准备就绪。
5.3 阻塞队列
阻塞队列(BlockingQueue)提供了可阻塞的put
和take
方法,以及支持定时的offer
和poll
方法
- 类库中包含了
BlockingQueue
的多种实现:LinkedBlockingQueue
和ArrayBlockingQueue
是 FIFO 队列,比同步 List 拥有更好的并发性能PriorityBlockingQueue
是按优先级排序的队列SynchronousQueue
不是一个真正的队列,不会为元素维护储存空间,它只维护一组等待着把元素加入或移出队列的线程- 实现了“直接交付”,从而降低了从
生产者
移动到消费者
的延迟,当交付被接受时,它就知道消费者已经得到了任务 - 仅当有足够多的消费者,并且总有一个消费者准备好获取交付的工作时,才适合使用“同步队列”
- 实现了“直接交付”,从而降低了从
1)串行线程封闭
可变对象
,生产者——消费者
与阻塞队列
一起,促进了串行线程封闭,将对象所有权从生产者
交付给消费者
封闭对象
只能由单个线程拥有,但可以通过安全地发布对象来"转移"所有权,在转移后,另一个线程独占该对象访问权限- 例如线程池
2)双端队列与工作密取
Deque
是一个双端队列
,实现了在队列头和队列尾的高效插入和移除。具体实现包括ArrayDeque
和LinkedBlockingDeque
- 工作密取(Work Stealing):每个
消费者
都有各自的双端队列
。如果一个消费者
完成了自己双端队列
中的全部工作,那它就可以从其他消费者
的双端队列
末尾秘密地获取工作 - 密取模式比传统的
生产者——消费者
模式具有更高的可伸缩性
,因为工作者线程不会在单个共享的任务队列上发生竞争
6 自定义同步工具
创建状态依赖类的最简单方法通常是在类库中现有状态依赖类的基础上进行构造。但如果类库没有提供你需要的功能,那么还可以使用 Java 语言和类库提供的底层机制来构造自己的同步机制,包括内置的条件队列、显式的 Condition 对象以及 AbstractQueuedSynchronizer 框架。
6.1 状态依赖性的管理
并发程序中,依赖状态的操作可以一直阻塞直到可以继续执行,这比使它们先失败再实现起来要更为方便且更不易出错。
条件队列
条件队列使得一组线程(称之为等待线程集合)能够通过某种方式来等待特定的条件变成真。
正如每个 Java 对象都可以作为一个锁,每个对象同样可以作为一个条件队列,并且 Object 中的 wait、not 的和 not 的 A11 方法就构成了内部条件队列的 API。
如果某个功能无法通过“轮询和休眠”来实现,那么使用条件队列也无法实现,但条件队列使得在表达和管理状态依赖性时更加简单和高效。
6.2 使用条件队列
条件队列使构建高效以及高可响应性的状态依赖类变得更容易,但同时也很容易被不正确地使用。要尽量基于 LinkedBlockingQueue、Latch、Semaphore 和 FutureTask 等类来构造程序。
1)条件谓词
将与条件队列相关联的条件谓词以及在这些条件谓词上等待的操作都写入文档。
每一次 wait 调用都会隐式地与特定的条件谓词关联起来。当调用某个特定条件谓词的 wait 时,调用者必须已经持有与条件队列相关的锁,并且这个锁必须保护着构成条件为此的状态变量。
2)过早唤醒
每当线程从 wait 中唤醒时,都必须再次测试条件谓词,如果条件谓词不为真,那么就继续等待(或者失败)。
当使用条件等待时(例如 Object.wait 或 Condition.await):
- 通常都有一个条件谓词——包括一些对象状态的测试,线程在执行前必须首先通过这些测试
- 在调用 wait 前测试条件谓词,并且从 wait 中返回时再次测试
- 在一个循环中调用 wait
- 确保使用与条件队列相关的锁来保护构成条件谓词的各个状态变量
- 当调用 wait、notify 或 notifyAll 等方法时,一定要持有与条件队列相关的锁。
- 在检查条件谓词之后以及开始执行相应的操作之前,不要释放锁
3)丢失的信号
丢失的信号是指:线程必须等待一个已经为真的条件,但在开始等待之前没有检查条件谓词。
4)通知
每当在等待一个条件时,一定要确保在条件谓词变为真时通过某种方式发出通知。
大多数情况下应该优先选择 notifyAll 而不是单个的 notify。
只有同时满足一下两个条件时,才能用单一的 notify 而不是 notufyAll:
- 所有等待线程的类型相同;
- 单进单出。
5)子类的安全问题
要想支持子类化,那么在设计类时需要保证:如果在实施子类化时违背了条件通知或单次通知的某个需求,那么在子类中可以增加合适的通知机制来代表基类。
对于状态依赖的类,要么将其等待和通知等协议完全向子类公开(并且写人正式文档),要么完全阻止子类参与到等待和通知等过程中。另外一种选择就是完全禁止子类化。
6)封装条件队列
通常,应该把条件队列封装起来
而线程安全类的最常见设计模式,建议使用对象的内置锁来保护对象自身的状态
可设计为使用私有的锁对象和条件队列
7)入口协议与出口协议
入口协议就是该操作的条件谓词,出口协议则包括,检查被该操作修改的所有状态变量,并确认它们是否使某个其他的条件谓词变为真,如果是,则通知相关的条件队列。
6.3 显式的 Condition 对象
1)内置条件队列的缺陷
每个内置锁只能有一个相关联的条件队列,多个线程可能在同一个条件队列上等待不同的条件谓词,并且在最常见的加锁模式下公开条件队列对象。无法满足在使用 notifyAll 时所有等待线程为同一类型的需求
2)Condition
对于每个 Lock,可以有任意数量的 Condition 对象。调用 Lock 的 Lock.newCondition 方法创建Condition。
Condition 比内置条件队列提供了更丰富的功能:在每个锁上可存在多个等待、条件等待可以使可中断的或不可中断的、基于时限的等待,以及公平或非公平的队列操作
在 Condition 对象中,与 wait、notify 和 notifyAll 方法对应的分别是 await、signal 和 signalAll。
3)选择
在使用显式的 Condition 和内置条件队列之间进行选择时,与在 ReentrantLock 和 synchronized 之间进行选择是一样的:如果需要一些高级功能,例如使用公平的队列操作或者 在每个锁上对应多个等待线程集,那么应该优先使用 Condition 而不是内置条件队列。
6.4 Synchronizer 剖析
AQS(AbstractQueuedSynchronizer),是 JDK 下提供的一套用于实现基于 FIFO 等待队列的阻塞锁和相关的同步器的一个同步框架。这个抽象类被设计为作为一些可用原子 int 值来表示状态的同步器的基类。
基于 AQS 构建的有 ReentrantLock 、Semaphore、 CountDownLatch、 ReentrantReadWriteLock、Synchronousueue 和 FutureTask。
基于 AQS 来构建同步器能带来许多好处。它不仅能极大地减少实现工作,而且也不必处理在多个位置上发生的竞争问题。
6.5 AbstractQueuedSynchronizer(AQS)
大多数开发者都不会直接使用 AQS,标准同步器类的集合能够满足绝大多数情况的需求。但如果能了解标唯同步器类的实现方式,那么对于理解它们的工作原理是非常有帮助的。
在基于 AQS 构建的同步器类中,最基本的操作包括各种形式的获取操作和释放操作。其次,就是更新同步器的状态。
java.util.concurrent 中的所有同步器类都没有直接扩展 AQS,而是都将它们的相应功能委托给私有的 AQS 子类来实现。
6.6 java.util.concurrent 同步器类中的 AQS
1)ReentrantLock
ReentrantLock 将同步状态用于保存锁获取操作的次数,并且还维护一个 owner 变量来保存当前所有者线程的标识符。
2)Semaphore 与 CountDownLatch
Semaphore 将 AQS 的同步状态用于保存当前可用许可的数量。
CountDownLatch 在同步状态中保存的是当前的计数值。
3)FutureTask
在 FutureTask 中,AQS 同步状态被用来保存任务的状态,例如,正在运行、已完成或已取消。
FutureTask 还维护一些额外的状态变量,用来保存计算结果或者抛出的异常。
此外,它还维护了一个引用,指向正在执行计算任务的线程(如果它当前处于运行状态),因而如果任务取消,该线程就会中断。
4)ReentrantReadWriteLock
基于 AQS 实现的 ReentrantReadWriteLock,单个 AQS 子类同时管理读取加锁和写入加锁。分别使用了一个 16 位的状态来表示写入锁和读取锁的计数。写入锁为独占锁,读取锁为共享锁。