- Phaser
- 主要机制
- Registration(注册机制)
- Synchronization(同步机制)
- Arrival(到达机制)
- Waiting(等待机制)
- Termination(终止机制)
- Tiering(分层结构)
- Monitoring(监控)
- 组成
- 内部类QNode
- 成员变量
- 构造函数
- 核心方法
- 方法说明
- 案例
- 动态修改线程数+重入性(单个Phaser多个阶段)
与其他屏障的情况不同,在Phaser上注册同步的线程数量可能会随着时间而变化。 可以随时注册任务(使用register、bulkRegister或建立初始所需线程数量的构造函数),并且可以选择在任何到达时取消注册(使用arriveAndDeregister)。 与大多数基本同步结构一样,注册和注销仅影响内部计数; 他们没有建立任何进一步的内部簿记,因此任务无法查询它们是否已注册。 (但是,您可以通过子类化此类来引入此类簿记。)
动态性-支持屏障所需线程的数量
Synchronization(同步机制)像CyclicBarrier一样,Phaser可能会被反复等待。 方法arriveAndAwaitAdvance具有类似于CyclicBarrier.await的效果。 每一代Phaser都有一个相关的阶段编号(phase)。 阶段编号从零开始,当所有线程到达屏障时前进,在达到Integer.MAX_VALUE后重置0。 阶段编号的使用可以通过任何注册方可以调用的两种方法,在到达阶段和等待其他阶段时独立控制操作:
重入性-一个Phaser可以支持多个阶段同步操作
Arrival(到达机制)方法arrive和到arriveAndDeregister记录到达。 这些方法不会阻塞,而是返回相关的到达阶段编号; 即,到达应用的Phaser的相位编号。 当给定阶段的最后一方到达时,将执行一个可选操作并且该阶段前进。 这些动作由触发阶段推进的一方执行,并通过覆盖方法onAdvance(int, int) 进行安排,该方法也支持控制终止。 覆盖此方法与向CyclicBarrier提供屏障操作类似,但更灵活。
方法awaitAdvance需要一个指示到达阶段编号的参数,并在Phaser前进到(或已经处于)不同阶段时返回。 与使用CyclicBarrier的类似构造不同,即使等待线程被中断,方法awaitAdvance也会继续等待。 可中断和超时版本也可用,但在任务等待中断或超时时遇到的异常不会改变Phaser的状态。 如有必要,您可以在这些异常的处理程序中执行任何相关的恢复,通常是在调用forceTermination之后。 在ForkJoinPool中执行的任务也可以使用Phaser,这将确保在其他人被阻塞等待阶段推进时执行任务有足够的并行性。
Phaser可以进入终止状态,可以使用方法isTerminated进行检查。 终止后,所有同步方法立即返回,无需等待提前,返回值为负数。 同样,在终止时尝试注册也无效。 当onAdvance的调用返回 true 时触发终止。 如果取消注册导致注册方的数量变为零,则默认实现返回 true。 如下图所示,当Phaser控制具有固定迭代次数的动作时,通常可以方便地覆盖此方法以在当前阶段数达到阈值时导致终止。 方法forceTermination也可用于突然释放等待线程并允许它们终止。
Phaser可以分层(即以树型结构构建)以减少争用。 可以改为设置具有大量参与方的Phaser,否则这些Phaser将经历沉重的同步争用成本,以便子Phaser组共享一个共同的父级。 这可能会大大增加吞吐量,即使它会产生更大的每次操作开销。
在分层Phaser树中,自动管理子移相器与其父级的注册和注销。 每当子移相器的注册方数量变为非零时(如在Phaser(Phaser, int)构造函数、register或bulkRegister中建立的那样),子Phaser将向其父移相器注册。 每当注册方的数量由于调用到达和取消注册而变为零时,子Phaser就会从其父Phaser中注销。
层次性-将多个Phaser树形结构组织起来,通过牺牲操作的开销增加吞吐量。
虽然同步方法只能由注册方调用,但Phaser的当前状态可以由任何调用者监视。 在任何给定时刻,总共有getRegisteredParties方,其中getArrivedParties已到达当前阶段 (getPhase)。 当剩余的 (getUnarrivedParties) 方到达时,阶段前进。 这些方法返回的值可能反映瞬态状态,因此通常对同步控制没有用处。 方法toString以一种便于非正式监控的形式返回这些状态查询的快照。
// 代表等待队列的 Treiber 堆栈的等待节点
static final class QNode implements ForkJoinPool.ManagedBlocker {
final Phaser phaser;
final int phase;
final boolean interruptible;
final boolean timed;
boolean wasInterrupted;
long nanos;
final long deadline;
volatile Thread thread; // nulled to cancel wait
QNode next;
QNode(Phaser phaser, int phase, boolean interruptible,
boolean timed, long nanos) {
this.phaser = phaser;
this.phase = phase;
this.interruptible = interruptible;
this.nanos = nanos;
this.timed = timed;
this.deadline = timed ? System.nanoTime() + nanos : 0L;
thread = Thread.currentThread();
}
public boolean isReleasable() {
if (thread == null)
return true;
if (phaser.getPhase() != phase) {
thread = null;
return true;
}
if (Thread.interrupted())
wasInterrupted = true;
if (wasInterrupted && interruptible) {
thread = null;
return true;
}
if (timed) {
if (nanos > 0L) {
nanos = deadline - System.nanoTime();
}
if (nanos 0L)
LockSupport.parkNanos(this, nanos);
return isReleasable();
}
}
成员变量
// 同步状态位
private volatile long state;
// 此移相器的父级,如果没有则为 null
private final Phaser parent;
// 移相器树的根。 如果不在树中,则等于此值。
private final Phaser root;
// 偶数阶段单向链表
private final AtomicReference evenQ;
// 奇数阶段单向链表
private final AtomicReference oddQ;
state状态位的含义,如下图 所有状态更新都通过 CAS 执行,除了子相位器的初始注册(即具有非 null 父级的子相位器)。在这种(相对罕见的)情况下,我们在首次向其父级注册时使用内置同步来锁定。子相位器的相位可以滞后于其祖先的相位,直到它被实际访问——参见方法 reconcileState。
在Phaser中使用奇偶两个单向链表来实现阻塞队列,降低操作的冲突。
使用给定的父级和已注册的未到达方数量创建一个新的移相器。 当给定的父节点不为空并且给定的参与方数量大于零时,此子移相器将向其父节点注册。
/**
* @param parent 父移相器
* @param parties 进入下一阶段所需的参与方数量
*/
public Phaser(Phaser parent, int parties) {
// 无符号右移16位--验证parties是否在合理范围内
if (parties >>> PARTIES_SHIFT != 0)
throw new IllegalArgumentException("Illegal number of parties");
// 阶段0
int phase = 0;
this.parent = parent;
if (parent != null) {
// 获取父阶段的根节点
final Phaser root = parent.root;
this.root = root;
// 偶数head
this.evenQ = root.evenQ;
// 奇数head
this.oddQ = root.oddQ;
// 成员量不为0,需要注册一个阶段
if (parties != 0)
phase = parent.doRegister(1);
}
// 不存在实际父阶段器
else {
this.root = this;
this.evenQ = new AtomicReference();
this.oddQ = new AtomicReference();
}
// 设置同步状态值--最终二进制位数含义和分布,如上图所示。
this.state = (parties == 0) ? (long)EMPTY :
((long)phase > PHASE_SHIFT);
// 此时年代小于0 说明年代取消或损坏 不能再上面注册参与者线程,跳出无限循环
if (phase PHASE_SHIFT)) !=
(int)(s >>> PHASE_SHIFT) &&
!UNSAFE.compareAndSwapLong
(this, stateOffset, s,
s = (((long)phase PARTIES_SHIFT) == 0) ? EMPTY :
((s & PARTIES_MASK) | p))))))
s = state;
}
return s;
}
internalAwaitAdvance(int phase, QNode node)
阻塞等待phase到下一代
private int internalAwaitAdvance(int phase, QNode node) {
// assert root == this;
// 释放上一代
releaseWaiters(phase-1); // ensure old queue clean
boolean queued = false; // true when node is enqueued
int lastUnarrived = 0; // to increase spins upon change
int spins = SPINS_PER_ARRIVAL;
long s;
int p;
while ((p = (int)((s = state) >>> PHASE_SHIFT)) == phase) {
if (node == null) { // spinning in noninterruptible mode
int unarrived = (int)s & UNARRIVED_MASK;
if (unarrived != lastUnarrived &&
(lastUnarrived = unarrived) > PHASE_SHIFT) == phase) // avoid stale enq
queued = head.compareAndSet(q, node);
}
else {
try {
ForkJoinPool.managedBlock(node);
} catch (InterruptedException ie) {
node.wasInterrupted = true;
}
}
}
if (node != null) {
if (node.thread != null)
node.thread = null; // avoid need for unpark()
if (node.wasInterrupted && !node.interruptible)
Thread.currentThread().interrupt();
if (p == phase && (p = (int)(state >>> PHASE_SHIFT)) == phase)
return abortWait(phase); // possibly clean up on abort
}
releaseWaiters(phase);
return p;
}
releaseWaiters(int phase)
private void releaseWaiters(int phase) {
QNode q; // first element of queue
Thread t; // its thread
// 0是偶数,通过与1进行位与运算判断用那个链表
AtomicReference head = (phase & 1) == 0 ? evenQ : oddQ;
// 遍历栈 开始唤醒链表里面的线程,
while ((q = head.get()) != null &&
q.phase != (int)(root.state >>> PHASE_SHIFT)) {
if (head.compareAndSet(q, q.next) &&
(t = q.thread) != null) {
q.thread = null;
LockSupport.unpark(t);
}
}
}
arrive()和arriveAndDeregister() 都是表示到达了变相器,都不需要等他其他线程到达后,才能继续执行。主要由doArrive(int adjust)实现。
doArrive(int adjust)
//
private int doArrive(int adjust) {
// 获取根结点
final Phaser root = this.root;
// 无限循环
for (;;) {
// 如果没有父结点,那么状态不需要动,否则需要重新协调状态
long s = (root == this) ? state : reconcileState();
// 获取阶段数
int phase = (int)(s >>> PHASE_SHIFT);
if (phase > PARTIES_SHIFT;
// 如果此线程是根结点
if (root == this) {
// 检查是否终止此phaser
if (onAdvance(phase, nextUnarrived))
n |= TERMINATION_BIT;
// 初始化状态
else if (nextUnarrived == 0)
n |= EMPTY;
else
n |= nextUnarrived;
// 阶段+1
int nextPhase = (phase + 1) & MAX_PHASE;
n |= (long)nextPhase > PHASE_SHIFT);
if (phase >> PARTIES_SHIFT;
if (onAdvance(phase, nextUnarrived))
n |= TERMINATION_BIT;
else if (nextUnarrived == 0)
n |= EMPTY;
else
n |= nextUnarrived;
int nextPhase = (phase + 1) & MAX_PHASE;
n |= (long)nextPhase > PHASE_SHIFT); // terminated
releaseWaiters(phase);
return nextPhase;
}
}
}
int awaitAdvance(int phase)
public int awaitAdvance(int phase) {
final Phaser root = this.root;、
//
long s = (root == this) ? state : reconcileState();
// 获取阶段数
int p = (int)(s >>> PHASE_SHIFT);
//
if (phase = 0) {
// 将根结点的终止标识置为已终止状态
if (UNSAFE.compareAndSwapLong(root, stateOffset,
s, s | TERMINATION_BIT)) {
// signal all threads 唤醒所有线程
releaseWaiters(0); // Waiters on evenQ 偶数链表
releaseWaiters(1); // Waiters on oddQ 奇数链表
return;
}
}
}
案例
动态修改线程数+重入性(单个Phaser多个阶段)
import java.text.SimpleDateFormat;
import java.util.Date;
import java.util.Random;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Phaser;
import java.util.concurrent.TimeUnit;
public class PhaserDemo {
private static Random random = new Random(System.currentTimeMillis());
public static void main(String[] args) {
Phaser phaser = new Phaser(5);
ExecutorService e = Executors.newFixedThreadPool(5);
// 第一阶段的 4个子线程
System.out.println("[" + new SimpleDateFormat("HH:mm:ss").format(new Date()) + "-" + Thread.currentThread().getName() + "] " + "go to phase=" + phaser.getPhase() + ".");
for (int i = 0; i {
try {
System.out.println("[" + new SimpleDateFormat("HH:mm:ss").format(new Date()) + "-" + Thread.currentThread().getName() + "] " + " phase=" + phaser.getPhase() + " start.");
TimeUnit.SECONDS.sleep(random.nextInt(5));
System.out.println("[" + new SimpleDateFormat("HH:mm:ss").format(new Date()) + "-" + Thread.currentThread().getName() + "] " + " phase=" + phaser.getPhase() + " end.");
phaser.arriveAndAwaitAdvance();
} catch (InterruptedException ex) {
ex.printStackTrace();
}
});
}
// 第一个阶段的 主线程,第一阶段所需5个线程都满足了。
phaser.arriveAndAwaitAdvance();
// 此时phaser的第一个阶段已经完成。phase加1 开始第二阶段开始
System.out.println("[" + new SimpleDateFormat("HH:mm:ss").format(new Date()) + "-" + Thread.currentThread().getName() + "] " + "go to phase=" + phaser.getPhase() + ".");
// 此时第二阶段只需要6个线程,通过register增加一个线程
phaser.register();
// 第二阶段的 5个子线程
for (int i = 0; i {
try {
System.out.println("[" + new SimpleDateFormat("HH:mm:ss").format(new Date()) + "-" + Thread.currentThread().getName() + "] " + " phase=" + phaser.getPhase() + " start.");
TimeUnit.SECONDS.sleep(random.nextInt(5));
System.out.println("[" + new SimpleDateFormat("HH:mm:ss").format(new Date()) + "-" + Thread.currentThread().getName() + "] " + " phase=" + phaser.getPhase() + " end.");
phaser.arriveAndAwaitAdvance();
} catch (InterruptedException ex) {
ex.printStackTrace();
}
});
}
// 第二阶段的 主线程,第二阶段所需6个线程都满足了。
phaser.arriveAndAwaitAdvance();
// 第三阶段的 主线程,第二阶段所需6个线程都满足了。
System.out.println("[" + new SimpleDateFormat("HH:mm:ss").format(new Date()) + "-" + Thread.currentThread().getName() + "] " + "now phase=" + phaser.getPhase() + ", exit.");
// 强制终止phaser
phaser.forceTermination();
System.out.println("[" + new SimpleDateFormat("HH:mm:ss").format(new Date()) + "-" + Thread.currentThread().getName() + "] " + "Phaser Termination Status is " + phaser.isTerminated() + ", exit.");
e.shutdown();
}
}
执行结果
[17:58:26-main] go to phase=0.
[17:58:26-pool-1-thread-1] phase=0 start.
[17:58:26-pool-1-thread-2] phase=0 start.
[17:58:26-pool-1-thread-4] phase=0 start.
[17:58:26-pool-1-thread-3] phase=0 start.
[17:58:26-pool-1-thread-4] phase=0 end.
[17:58:26-pool-1-thread-3] phase=0 end.
[17:58:28-pool-1-thread-1] phase=0 end.
[17:58:30-pool-1-thread-2] phase=0 end.
[17:58:30-main] go to phase=1.
[17:58:30-pool-1-thread-4] phase=1 start.
[17:58:30-pool-1-thread-3] phase=1 start.
[17:58:30-pool-1-thread-1] phase=1 start.
[17:58:30-pool-1-thread-2] phase=1 start.
[17:58:30-pool-1-thread-2] phase=1 end.
[17:58:30-pool-1-thread-5] phase=1 start.
[17:58:31-pool-1-thread-4] phase=1 end.
[17:58:32-pool-1-thread-5] phase=1 end.
[17:58:33-pool-1-thread-1] phase=1 end.
[17:58:34-pool-1-thread-3] phase=1 end.
[17:58:34-main] now phase=2, exit.
[17:58:34-main] Phaser Termination Status is true, exit.
Phaser主要用来解决什么问题?
Phaser与CyclicBarrier和CountDownLatch的区别是什么?
如果用CountDownLatch来实现Phaser的功能应该怎么实现?
Phaser运行机制是什么样的?
