Java高并发 - 15.监控任务的生命周期

Thread为我们提供了可获取状态,以及判断是否alive的方法,但是这些方法是针对线程本身的,我们无法获得提交的任务的状态。一般情况下想要获得最终结果,我们不得不为Thread和Runnable传入共享变量。但是在多线程的情况下,共享变量将导致资源的竞争从而增加了数据不一致的隐患。

当观察者模式遇到Thread

观察者模式适合当某个对象发生状态改变需要通知第三方的场景。

1. 接口定义

Observable接口:

public interface Observable {
	// 任务生命周期的枚举类型
	enum Cycle {
		STARTED, RUNNING, DONE, ERROR
	}
	
	// 获取当前任务的生命周期状态
	Cycle getCycle();
	
	// 定义启动线程的方法,主要作用是为了屏蔽Thread的其他方法
	void start();
	
	// 定义线程的打断方法,作用于start方法一样,也是为了屏蔽Thread的其他方法
	void interrupt();
}

TaskLifecycle接口:

public interface TaskLifecycle<T> {
	// 任务启动时会触发onStart方法
	void onStart(Thread thread);
	
	// 任务正在运行时会触发onRunning方法
	void onRunning(Thread thread);
	
	// 任务运行结束时会触发onFinish方法,其中result是任务执行后的结果
	void onFinish(Thread thread, T result);
	
	// 任务执行报错时会触发onError方法
	void onError(Thread thread, Exception e);
	
	// 生命周期接口的空实现(Adapter)
	class EmptyLifecycle<T> implements TaskLifecycle<T> {
		@Override
		public void onStart(Thread thread) {}
		
		@Override
		public void onRunning(Thread thread) {}
		
		@Override
		public void onFinish(Thread thread, T result) {}
		
		@Override
		public void onError(Thread thread, Exception e) {}
	}
}

Task函数接口定义

@FunctionalInterface
public interface Task<T> {
	// 任务执行接口,该接口允许有返回值
	T call();
}

2. ObservableThread实现

ObservableThread是任务监控的关键,它继承自Thread类和Observable接口,并且在构造期间传入Task的具体实现。

public class ObservableThread<T> extends Thread implements Observable {
	private final TaskLifecycle<T> lifecycle;
	private final Task<T> task;
	private Cycle cycle;
	
	// 指定Task的实现,默认情况下使用EmptyLifecycle
	public ObservableThread(Task<T> task) {
		this(new TaskLifecycle.EmptyLifecycle<>(), task);
	}
	
	// 指定TaskLifecycle的同时指定Task
	public ObservableThread(TaskLifecycle<T> lifecycle, Task<T> task) {
		super();
		// task不允许为null
		if (task == null)
			throw new IllegalArgumentException("The task is required.");
		this.liftcycle = lifecycle;
		this.task = task;
	}
	
	@Override
	public final void run() {
		// 在执行线程逻辑单元的时候,分别触发相应的事件
		this.update(Cycle.STARTED, null, null);
		try {
			this.update(Cycle.RUNNING, null, null);
			T result = this.task.call();
			this.update(Cycle.DONE, result, null);
		} catch(Exception e) {
			this.update(Cycle.ERROR, null, e);
		}
	}
	
	private void update(Cycle cycle, T result, Exception e) {
		this.cycle = cycle;
		if (lifecycle == null)
			return;
		try {
			switch(cycle) {
			case STARTED:
				this.lifecycle.onStart(currentThread());
				break;
			case RUNNING:
				this.lifecycle.onRunning(currentThread());
				break;
			case DONE:
				this.lifecycle.onFinish(currentThread(), result);
				break;
			case ERROR:
				this.lifecycle.onError(currentThread(), e);
				break;
			}
		} catch(Exception ex) {
			if (cycle == Cycle.ERROR) {
				throw ex;
			}
		}
	}
}

用户可以通过实现TaskLifecycle监听感兴趣的事件,在生命周期的某些阶段做成相应动作。

public static void main(String[] args) {
	final TaskLifecycle<String> lifecycle = 
			new TaskLifecycle.EmptyLifecycle<String>() {
		public void onFinish(Thread thread, String result) {
			System.out.println("The result is " + result);
		}
	};
	
	Observable observableThread =
			new ObservableThread<>(lifecycle, ()->{
					try {
					} catch(InterruptedException e) {
					}
					System.out.println("finished done.");
					return "Hello Observer"
			});
	observableThread.start();
}