Written by
Yitong Huang
on
on
Java高并发 - 15.监控任务的生命周期
Thread为我们提供了可获取状态,以及判断是否alive的方法,但是这些方法是针对线程本身的,我们无法获得提交的任务的状态。一般情况下想要获得最终结果,我们不得不为Thread和Runnable传入共享变量。但是在多线程的情况下,共享变量将导致资源的竞争从而增加了数据不一致的隐患。
当观察者模式遇到Thread
观察者模式适合当某个对象发生状态改变需要通知第三方的场景。
1. 接口定义
Observable接口:
public interface Observable {
// 任务生命周期的枚举类型
enum Cycle {
STARTED, RUNNING, DONE, ERROR
}
// 获取当前任务的生命周期状态
Cycle getCycle();
// 定义启动线程的方法,主要作用是为了屏蔽Thread的其他方法
void start();
// 定义线程的打断方法,作用于start方法一样,也是为了屏蔽Thread的其他方法
void interrupt();
}
TaskLifecycle接口:
public interface TaskLifecycle<T> {
// 任务启动时会触发onStart方法
void onStart(Thread thread);
// 任务正在运行时会触发onRunning方法
void onRunning(Thread thread);
// 任务运行结束时会触发onFinish方法,其中result是任务执行后的结果
void onFinish(Thread thread, T result);
// 任务执行报错时会触发onError方法
void onError(Thread thread, Exception e);
// 生命周期接口的空实现(Adapter)
class EmptyLifecycle<T> implements TaskLifecycle<T> {
@Override
public void onStart(Thread thread) {}
@Override
public void onRunning(Thread thread) {}
@Override
public void onFinish(Thread thread, T result) {}
@Override
public void onError(Thread thread, Exception e) {}
}
}
Task函数接口定义
@FunctionalInterface
public interface Task<T> {
// 任务执行接口,该接口允许有返回值
T call();
}
2. ObservableThread实现
ObservableThread是任务监控的关键,它继承自Thread类和Observable接口,并且在构造期间传入Task的具体实现。
public class ObservableThread<T> extends Thread implements Observable {
private final TaskLifecycle<T> lifecycle;
private final Task<T> task;
private Cycle cycle;
// 指定Task的实现,默认情况下使用EmptyLifecycle
public ObservableThread(Task<T> task) {
this(new TaskLifecycle.EmptyLifecycle<>(), task);
}
// 指定TaskLifecycle的同时指定Task
public ObservableThread(TaskLifecycle<T> lifecycle, Task<T> task) {
super();
// task不允许为null
if (task == null)
throw new IllegalArgumentException("The task is required.");
this.liftcycle = lifecycle;
this.task = task;
}
@Override
public final void run() {
// 在执行线程逻辑单元的时候,分别触发相应的事件
this.update(Cycle.STARTED, null, null);
try {
this.update(Cycle.RUNNING, null, null);
T result = this.task.call();
this.update(Cycle.DONE, result, null);
} catch(Exception e) {
this.update(Cycle.ERROR, null, e);
}
}
private void update(Cycle cycle, T result, Exception e) {
this.cycle = cycle;
if (lifecycle == null)
return;
try {
switch(cycle) {
case STARTED:
this.lifecycle.onStart(currentThread());
break;
case RUNNING:
this.lifecycle.onRunning(currentThread());
break;
case DONE:
this.lifecycle.onFinish(currentThread(), result);
break;
case ERROR:
this.lifecycle.onError(currentThread(), e);
break;
}
} catch(Exception ex) {
if (cycle == Cycle.ERROR) {
throw ex;
}
}
}
}
用户可以通过实现TaskLifecycle监听感兴趣的事件,在生命周期的某些阶段做成相应动作。
public static void main(String[] args) {
final TaskLifecycle<String> lifecycle =
new TaskLifecycle.EmptyLifecycle<String>() {
public void onFinish(Thread thread, String result) {
System.out.println("The result is " + result);
}
};
Observable observableThread =
new ObservableThread<>(lifecycle, ()->{
try {
} catch(InterruptedException e) {
}
System.out.println("finished done.");
return "Hello Observer"
});
observableThread.start();
}