Java高并发 - 23.Latch设计模式

在本文中,我们将介绍Latch(门阀)设计模式,该模式指定了一个屏蔽,只有所有的条件都达到满足的时候,门阀才能打开。

CountDownLatch程序实现

1. 无线等待的Latch

首先定义了一个无限等待的抽象类Latch,在Latch抽象类中定义了await方法、countDown方法以及getUnarrived方法,代码注释中有方法的用途介绍。Latch中的limit属性至关重要,当limit降低到0时门阀将会打开。

public abstract class Latch {
    // 用于控制多少个线程完成任务时才能打开门阀
    protected int limit;
    
    // 通过构造函数传入limit
    public Latch(int limit) {
        this.limit = limit;
    }
    
    // 该方法会使得当前线程一直等待,直到所有的线程都完成工作,被阻塞的线程是允许被中断的
    public abstract void await() throws InterruptedException;
    
    // 当任务线程完成工作之后调用该方法使得计数器减一
    public abstract void countDown();
    
    // 获取当前还有多少个线程没有完成任务
    public abstract int getUnarrived();
}

下面的CountDownLatch实现一个无限等待门阀打开的Latch,当limit>0时调用await方法的线程将会进入无限的等待。

public class CountDownLatch extends Latch {
    public CountDownLatch(int limit) {
        super(limit);
    }
    
    @Override
    public void await() throws InterruptedException {
        synchronized(this) {
            // 当limit>0时,当前线程进入阻塞状态
            while (limit > 0) {
                this.wait();
            }
        }
    }
    
    @Override
    public void countDown() {
        synchronized(this) {
            if (limit <= 0)
                throw new IllegalStateException("all of task already arrived");
            // 使limit减一,并且通知阻塞线程
            limit--;
            this.notifyAll();
        }
    }
    
    @Override
    public int getUnarrived() {
        // 返回有多少线程还未完成任务
        // 该返回值不一定准确,只是个评估值
        return limit;
    }
}

下面程序模拟程序猿乘坐不同的交通工具达到城市广场的场景:

/**
 * 程序猿旅游线程
 */
public class ProgrammerTravel extends Thread {
    // 门阀
    private final Latch latch;
    // 程序猿
    private final String programmer;
    // 交通工具 
    private final String transportation;
    
    // 通过构造函数传入latch、programmer和transportation
    public ProgrammerTravel(Latch latch, String programmer, String transportation) {
        this.latch = latch;
        this.programmer = programmer;
        this.transportation = transportation;
    }
    
    @Override
    public void run() {
        System.out.println(programmer
                + " start take the transportation ["
                + transportation + "]");
        try {
            // 程序猿乘坐交通工具花费在路上的时间(模拟)
            TimeUnit.SECONDS.sleep(ThreadLocalRandom.current().nextInt(10));
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        System.out.println(programmer +" arrived by "+ transportation);
        // 完成任务时使计数器减一
        latch.countDown();
    }
}

下面是Latch的测试代码:

public static void main(String[] args) throws InterruptedException {
    // 定义Latch,limit为4
    Latch latch = new CountDownLatch(4);
    new ProgrammerTravel(latch, "Alex", "Bus").start();
    new ProgrammerTravel(latch, "Gavin", "Walking").start();
    new ProgrammerTravel(latch, "Jack", "Subway").start();
    new ProgrammerTravel(latch, "Dillon", "Bicycle").start();
    
    // 当前线程(main线程会进入阻塞,直到四个程序猿全部到达目的地)
    latch.await();
    System.out.println("== all of programmer arrived ==");
}

2. 有超时设置的Latch

上节的Latch实现有一个问题,假设Jack在搭地铁途中接到公司领导的来电临时有急事需要处理不能参加此次聚会,或者Gavin睡了懒觉忘记了约定,到达广场的其他人不可能一直等下去。下面我们为Latch增加可超时的功能,在等待了指定时间后,还有人未完成任务,则会收到超时通知。

首先,Latch中增加可超时的抽象方法await(TimeUnit unit, long time):

public abstract void await(TimeUnit unit, long time)
        throws InterruptedException, WaitTimeoutException;

该方法增加了WaitTimeoutException用于通知当前的等待已经超时:

public class WaitTimeoutException {
    public WaitTimeoutException(String message) {
        super(message);
    }
}

另外在CountDownLatch中实现超时功能:

@Override
public void await(TimeUnit unit, long time) 
        throws InterruptedException, WaitTimeoutException {
    if (time <= 0)
        throw new IllegalArgumentException("The time is invalid");
    long remainingNanos = unit.toNanos(time); // 将time转换为纳秒
    // 等待任务将在endNanos纳秒后超时
    final long endNanos = System.nanoTime() + remainingNanos();
    synchronized(this) {
        while (limit > 0) {
            // 如果超时则抛出WaitTimeoutException异常
            if (TimeUnit.NANOSECONDS.toMillis(remainingNanos) <= 0)
                throw new WaitTimeoutException("The wait time over specify time.");
            // 等待remainingNanos,在等待的过程中有可能会被中断,需要重新计算remainingNanos
            this.wait(TimeUnit.NANOSECONDS.toMillis(remainingNanos));
            remainingNanos = endNanos - System.nanoTime();
        }
    }
}

下面是带超时Latch的测试代码:

public static void main(String[] args) {
    Latch latch = new CountDownLatch(4);
    new ProgrammerTravel(latch, "Alex", "Bus").start();
    new ProgrammerTravel(latch, "Gavin", "Walking").start();
    new ProgrammerTravel(latch, "Jack", "Subway").start();
    new ProgrammerTravel(latch, "Dillon", "Bicycle").start();
    
    try {
        latch.await(TimeUnit.SECONDS, 5);
        System.out.println("== all of programmer arrived ==");
    } catch(WaitTimeoutException e) {
        e.printStackTrace();
    }
}

3. 带回调的Latch

上文实现的Latch当await超时的时候,已完成任务的线程自然正常结束,但是未完成的则不会被中断还会继续执行下去,也就是说CountDownLatch只提供了门阀的功能,并不负责对线程的管理控制,对线程的控制还需要程序员自己控制。我们可以再次对其扩展,增加回调接口用于运行所有子任务完成后的其他任务:

    public CountDownLatch(int limit, Runnable runnable) {
        super(limit);
        this.runnable = runnable;
    }

    @Override
    public void await()
            throws InterruptedException, WaitTimeoutException {
        if (time <= 0)
            throw new IllegalArgumentException("The time is invalid");
        long remainingNanos = unit.toNanos(time);
        final long endNanos = System.nanoTime() + remainingNanos;
        synchronized(this) {
            while (limit > 0) {
                if (TimeUnit.NANOSECONDS.toMillis(remainingNanos) <= 0)
                    throw new WaitTimeoutException("The wait time over specify time.");
                remainingNanos = endNanos - System.nanoTime();
            }
        }
        if (null != runnable) {
            runnable.run();
        }
    }