Phaser的使用

Phaser提供动态增减parties(屏障点)计数,这点币CyclicBarrier类操作parties更加方便,通过若干个方法来控制多个线程之间同步运行的效果,还可以实现针对某一个线程取消同步运行的效果,而且支持在指定屏障处等待,在等待时还支持中断或非中断等功能。对线程并发进行分组同步控制时,它比CyclicBarrier类功能更加强大,更建议使用。

简单示例:

public class App {
    public static void main(String[] args) {
        final Phaser phaser = new Phaser(3);
        Runnable r1 = () -> {
            System.out.println(Thread.currentThread().getName() + " A1 begin=" + System.currentTimeMillis());
            phaser.arriveAndAwaitAdvance();
            System.out.println(Thread.currentThread().getName() + " A1 end=" + System.currentTimeMillis());
            System.out.println(Thread.currentThread().getName() + " A2 begin=" + System.currentTimeMillis());
            phaser.arriveAndAwaitAdvance();
            System.out.println(Thread.currentThread().getName() + " A2 end=" + System.currentTimeMillis());
        };
        Runnable r2 = () -> {
            try {
                System.out.println(Thread.currentThread().getName() + " A1 begin=" + System.currentTimeMillis());
                Thread.sleep(2000);
                phaser.arriveAndAwaitAdvance();
                System.out.println(Thread.currentThread().getName() + " A1 end=" + System.currentTimeMillis());
                System.out.println(Thread.currentThread().getName() + " A2 begin=" + System.currentTimeMillis());
                Thread.sleep(2000);
                phaser.arriveAndAwaitAdvance();
                System.out.println(Thread.currentThread().getName() + " A2 end=" + System.currentTimeMillis());
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        };
        new Thread(r1, "A").start();
        new Thread(r1, "B").start();
        new Thread(r2, "C").start();
    }
}

运行结果:

A A1 begin=1525358412447
B A1 begin=1525358412447
C A1 begin=1525358412447
C A1 end=1525358414450
C A2 begin=1525358414451
B A1 end=1525358414451
A A1 end=1525358414451
B A2 begin=1525358414451
A A2 begin=1525358414451
C A2 end=1525358416456
B A2 end=1525358416456
A A2 end=1525358416456

移除代码中的18-21行的内容

// System.out.println(Thread.currentThread().getName() + " A2 begin=" + System.currentTimeMillis());
// Thread.sleep(2000);
// phaser.arriveAndAwaitAdvance();
// System.out.println(Thread.currentThread().getName() + " A2 end=" + System.currentTimeMillis());

运行结果:
[image]
运行结果
从运行结果看,因为某一个线程达到屏障点,导致另外两个线程一直等待。

为了解决上面的问题,不再继续向下一个屏障点执行的线程调用
arriveAndDeregister(),可以实现将当前线程退出该屏障,并且将屏障点(parties)值减1.
将上述Runnable r2代码改为:

Runnable r2 = () -> {
    try {
        System.out.println(Thread.currentThread().getName() + " A1 begin=" + System.currentTimeMillis());
        Thread.sleep(2000);
        System.out.println("A:" + phaser.getRegisteredParties());
        phaser.arriveAndDeregister();
        System.out.println("B:" + phaser.getRegisteredParties());
        System.out.println(Thread.currentThread().getName() + " A1 end=" + System.currentTimeMillis());
    } catch (InterruptedException e) {
        e.printStackTrace();
    }
};

运行结果:

A A1 begin=1525359548165
C A1 begin=1525359548165
B A1 begin=1525359548165
A:3
B:2
C A1 end=1525359550168
A A1 end=1525359550168
B A1 end=1525359550168
A A2 begin=1525359550169
B A2 begin=1525359550169
B A2 end=1525359550169
A A2 end=1525359550169

方法介绍

arriveAndAwaitAdvance()当前线程已经到达屏障,在此等待条件满足后继续向后执行
arriveAndDeregister()当前线程退出该屏障,并且将屏障点(parties)值减1
getRegisteredParties()当前phaser注册的屏障点数
register()没执行一次该方法就动态添加一个parties值
getPhase()获取已经到达第几个屏障
onAdvance()通过新的屏障时调用
// 该方法返回true表示不等待类,Phaser呈无效/销毁状态
// 该方法返回false表示Phaser继续工作
bulkRegister()批量增加parties值
getArrivedParties()获取已经被使用的parties个数
getUnarrivedParties()获取未被使用的parties个数
arrive()使到达屏障的线程数加1,且当前线程不在屏障处等待,直接向下面的代码继续运行,并且Phaser到达的线程达到parties时会重置计数
awaitAdvance(int phase)如果传入的phase值和当前getPhase()方法返回值一样,则在屏障处等待,否则继续向下运行。类似于旁观者的作用,当观察的条件满足了就等待(旁观),如果条件不满足,则程序向下继续运行
awaitAdvance(int phase)不可中断
awaitAdvanceInterruptibly(int phase)可中断的
awaitAdvanceInterruptibly(int phase, long timeout, TimeUnit unit)指定最大的等待时间
forceTermination()是Phaser对象的屏障功能失效,并且处于屏障处等待的线程继续执行,并不抛出一场
isTerminated()判断Phaser对象是否已经呈销毁状态