Java高并发 - 19.future设计模式

Future设计模式提供了一种凭据式的解决方案。

1. 接口定义

Future接口

Future提供了获取计算结果和判断任务是否完成的两个接口,其中获取计算结果将会导致调用阻塞(在任务还没完成的情况下)。

public interface Future<T> {
	// 返回计算后的结果,该方法会陷入阻塞状态
	T get() throws InterruptedException;
	
	// 判断任务是否已经被执行完成
	boolean done();
}

FutureService接口

FutureService主要用于提交任务,提交的任务主要有两种,第一种不需要返回值,第二种则需要获得最终的计算结果。FutureService接口中提供了对FutureServiceImpl构建的工厂方法。JDK8中不仅支持default方法还支持静态方法,JDK9甚至还支持接口私有方法。

public interface FutureService<IN, OUT> {
	// 提交不需要返回值的任务,Future.get方法返回的将会是null
	Future<?> submit(Runnable runnable);
	
	// 提交需要返回值的任务,其中Task接口替代了Runnable接口
	Future<OUT> submit(Task<IN, OUT> task, IN input);
	
	// 使用静态方法创建一个FutureService的实现
	static <T, R> FutureService<T, R> newService() {
		return new FutureServiceImpl<>();
	}
}

Task接口

Task接口主要是提供给调用者实现计算逻辑用的,可以接受一个参数并且返回最终的计算结果。

@FunctionalInterface
public interface Task<IN, OUT> {
	// 给定一个参数,经过计算返回结果
	OUT get(IN input);
}

2. 程序实现

FutureTask

FutureTask是Future的一个实现,除了实现Future中定义的get()和done()方法,还额外增加了protected的finish方法,用于接收任务被完成的通知。

public class FutureTask<T> implements Future<T> {
	// 计算结果
	private T result;
	// 任务是否完成
	private boolean isDone = false;
	// 定义对象锁
	private final Object Lock = new Object();
	
	@Override
	public T get() throws InterruptedException {
		synchronized (LOCK) {
			// 当任务还没完成,调用get方法会被挂起而进入阻塞
			while(!isDone) {
				LOCK.wait();
			}
			// 返回最终计算结果
			return result;
		}
	}
	
	// finish方法用于为FutureTask设置计算结果
	protected void finish(T result) {
		synchronized (LOCK) {
			// balking设计模式
			if (isDone) {
				return;
			}
			// 计算完成,为result指定结果,并且将isDone设为true,同时唤醒阻塞中的线程
			this.result = result;
			this.isDone = true;
			LOCK.notifyAll();
		}
	}
	
	//
	@Override
	public boolean done() {
		return isDone;
	}
}

FutureServiceImpl

FutureServiceImpl的主要作用在于当提交任务时创建一个新的线程来受理该任务,进而达到任务异步执行的效果。

public class FutureServiceImpl<IN, OUT> implements FutureService<IN, OUT> {
	// 为执行的线程指定名字前缀
	private final static String FUTURE_THREAD_PREFIX = "FUTURE-";
	
	private final AtomicInteger nextCounter = new AtomicInteger(0);
	
	private String getNextName() {
		return FUTURE_THREAD_PREFIX + nextCounter.getAndIncrement();
	}
	
	@Override
	public Future<?> submit(Runnable runnable) {
		final FutureTask<Void> future = new FutureTask<>();
		new Thread(() -> {
			runnable.run();
			// 任务执行结束后,将null作为结果传给future
			future.finish(null);
		}, getNextName()).start();
		return future;
	}
	
	@Override
	public Future<OUT> submit(Task<IN, OUT> task, IN input) {
		final FutureTask<OUT> future = new FutureTask<>();
		new Thread(() -> {
			OUT result = task.get(input);
			// 任务执行结束后,将真实的结果通过finish方法传递给future
			future.finish(result);
		}, getNextName()).start();
		return future;
	}
}

在FutureServiceImpl的submit方法中,分别启动新的线程运行任务,起到了异步的作用,在任务最终运行成功后,会通知FutureTask任务已完成。

Future的使用

1. 提交无返回值任务

// 定义不需要返回值的FutureService
FutureService<Void, Void> service = FutureService.newService();
// submit方法为立即返回的方法
Future<?> future = service.submit(() -> {
	try {
		TimeUnit.SECONDS.sleep(10);
	} catch(InterruptedException e) {
		e.printStackTrace();
	}
	System.out.println("I am finish done.");
});
// get方法会阻塞当前线程
future.get();

2. 提交有返回值任务

// 定义有返回值的FutureService
FutureService<String, Integer> service = FutureService.newService();
// submit方法会立即返回
Future<Integer> future = service.submit(input -> {
	try {
		TimeUnit.SECONDS.sleep(10);
	} catch(InterruptedException e) {
		e.printStackTrace();
	}
	return input.length();
}, "Hello");
// get方法会阻塞当前线程,最终会返回计算的结果
System.out.println(future.get());

当前的Future设计模式,提交任务时不会进入任何阻塞,但是当调用者需要获取结果的时候,还是有可能陷入阻塞直到任务完成。

增强FutureService使其支持回调

使用任务完成时回调的机制可以让调用者不再进行显式地通过get的方式获得数据而导致阻塞。对FutureService接口稍作修改,可以在提交任务的时候将回调接口一并注入。

// 增加回调接口Callback,当任务执行结束之后,Callback会得到执行
@Override
public Future<OUT> submit(Task<IN, OUT> task, IN input, Callback<OUT> callback) {
	final FutureTask<OUT> future = new FutureTask<>();
	new Thread(() -> {
		OUT result = task.get(input);
		future.finish(result);
		// 执行回调接口
		if (null != callback)
			callback.call(result);
	}, getNextName()).start();
	return future;
}

修改后的submit方法,增加一个Callback参数,主要用于接受并处理任务的计算结果。当提交的任务执行完成后,会将结果传递给Callback接口进行进一步的执行,这样在提交任务之后不再会因为通过get方法获得结果而陷入阻塞。

@FunctionalInterface
public interface Callback<T> {
	// 任务完成后会调用该方法,其中T为任务执行后的结果
	void call(T t);
}