有时我们为了达到“当某件事情发生时,触发特定逻辑”的效果,但作为触发源又不想关心“谁对我发出的事件感兴趣”,此时可以利用观察者模式的思想进行解耦。
观察者模式
观察者模式很好理解,我们以中土世界的故事作为例子。
代码来自 java-design-patterns。
/**
* 观察者接口
*/
public interface WeatherObserver {
void update(WeatherType currentWeather);
}
/**
* 可以通过实现 WeatherObserver 接口通过此类注册为监听器来观察天气的变化
*/
public class Weather {
private WeatherType currentWeather;
private List<WeatherObserver> observers;
public Weather() {
observers = new ArrayList<>();
currentWeather = WeatherType.SUNNY;
}
public void addObserver(WeatherObserver obs) {
observers.add(obs);
}
public void removeObserver(WeatherObserver obs) {
observers.remove(obs);
}
/**
* 天气随着时间流逝而变化
*/
public void timePasses() {
WeatherType[] enumValues = WeatherType.values();
currentWeather = enumValues[(currentWeather.ordinal() + 1) % enumValues.length];
log.debug("==== 天气变化为【{}】 ====", currentWeather);
notifyObservers();
}
private void notifyObservers() {
for (WeatherObserver obs : observers) {
obs.update(currentWeather);
}
}
}
/**
* 天气类型枚举
*/
public enum WeatherType {
SUNNY("晴天"), RAINY("下雨"), WINDY("大风"), COLD("严寒");
private String desc;
WeatherType(String desc) {
this.desc = desc;
}
public String getDesc() {
return desc;
}
public String toString() {
return this.getDesc();
}
}
/**
* 半兽人
*/
public class Orcs implements WeatherObserver {
public void update(WeatherType currentWeather) {
switch (currentWeather) {
case COLD:
log.debug("半兽人被冰冻住了");
break;
case RAINY:
log.debug("半兽人被淋的浑身湿透");
break;
case SUNNY:
log.debug("阳光灼伤了半兽人们的眼睛");
break;
case WINDY:
log.debug("半兽人的气味在风中几乎消失殆尽");
break;
default:
break;
}
}
}
/**
* 霍比特人
*/
public class Hobbits implements WeatherObserver {
public void update(WeatherType currentWeather) {
switch (currentWeather) {
case COLD:
log.debug("霍比特人在寒冷的天气中颤抖");
break;
case RAINY:
log.debug("霍比特人在雨中寻找庇护所");
break;
case SUNNY:
log.debug("快乐的霍比特人在温暖的阳光下互相致敬");
break;
case WINDY:
log.debug("霍比特人在大风天紧紧地戴上了帽子");
break;
default:
break;
}
}
}
运行测试:
public static void main(String[] args) {
Weather weather = new Weather();
weather.addObserver(new Orcs());
weather.addObserver(new Hobbits());
weather.timePasses();
weather.timePasses();
weather.timePasses();
weather.timePasses();
}
结果如下:
==== 天气变化为【下雨】 ====
半兽人被淋的浑身湿透
霍比特人在雨中寻找庇护所
==== 天气变化为【大风】 ====
半兽人的气味在风中几乎消失殆尽
霍比特人在大风天紧紧地戴上了帽子
==== 天气变化为【严寒】 ====
半兽人被冰冻住了
霍比特人在寒冷的天气中颤抖
==== 天气变化为【晴天】 ====
阳光灼伤了半兽人们的眼睛
快乐的霍比特人在温暖的阳光下互相致敬
回调思想
interface Worker {
void consume(Callback callback);
}
class WorkerImpl implements Worker {
private ExecutorService pool;
// ...
public void consume(Callback callback) {
pool.submit(() -> {
try {
// ...
callback.onSuccess();
} catch (Throwable e) {
callback.onError(e);
}
});
}
}
interface Callback {
void onSuccess();
void onError(Throwable e);
}
new WorkerImpl().consume(new Callback() {
public void onSuccess() {
log.debug("success.");
// ...
}
public void onError(Throwable e) {
log.error("failed.", e);
// ...
}
})
通过传入回调函数,我们的“完成任务后的后续代码”可以适时地被调用,也解决了异步编程中的串行化。
应用
JDK、NodeJS、Netty 等均有回调思想以及观察者模式的应用。
Netty
ChannelHandlerContext
Netty 为我们提供了声明式的编程方式,一个 ChannelInboundHandlerAdapter 可以订阅诸多事件,如 channelRegistered、channelRead、exceptionCaught 等等。在开发 Netty 应用的过程中,我们一般会指定一个 ChannelInitializer 来定制 Channel Pipeline 流水线:
protected void initChannel(SocketChannel ch) {
ch.pipeline().addLast("loggingHandler", NettyComponent.HANDLER_SINGLETON)
.addLast.......
}
Pipeline 上是一个双向 ChannelHandlerContext 链表:
protected DefaultChannelPipeline(Channel channel) {
//...
tail = new TailContext(this);
head = new HeadContext(this);
head.next = tail;
tail.prev = head;
}
private void addLast0(AbstractChannelHandlerContext newCtx) {
AbstractChannelHandlerContext prev = tail.prev;
newCtx.prev = prev;
newCtx.next = tail;
prev.next = newCtx;
tail.prev = newCtx;
}
以 ChannelInboundHandlerAdapter 的 channelRead()
方法的触发逻辑为例,请求被上一个 ChannelHandlerContext 绑定的 Handler 处理完后(一般是解码器),通过调用 ctx.fireChannelRead(msg)
将请求交给我们自定义的 ChannelHandlerContext:
public ChannelHandlerContext fireChannelRead(final Object msg) {
// findContextInbound 会跳过一些 Handler(@Skip 功能)
invokeChannelRead(findContextInbound(MASK_CHANNEL_READ), msg);
return this;
}
static void invokeChannelRead(final AbstractChannelHandlerContext next, Object msg) {
final Object m = next.pipeline.touch(ObjectUtil.checkNotNull(msg, "msg"), next);
EventExecutor executor = next.executor();
/*
判断当前上下文的 EventExecutor 是否在本 EventLoop 中,
若不是则交给与 Channel 绑定的那个 EventLoop 的线程去执行(线程绑定 Channel),
否则直接触发本上下文当前的那个 ChannelHandler 的 `channelRegistered()` 方法
*/
if (executor.inEventLoop()) {
next.invokeChannelRead(m);
} else {
executor.execute(new Runnable() {
public void run() {
next.invokeChannelRead(m);
}
});
}
}
private void invokeChannelRead(Object msg) {
if (invokeHandler()) {
try {
// 我们的 ChannelInboundHandler 被回调
((ChannelInboundHandler) handler()).channelRead(this, msg);
} catch (Throwable t) {
notifyHandlerException(t);
}
} else {
fireChannelRead(msg);
}
}
可以看出,对比上文观察者模式中的例子,我们发现区别只是 Netty 将监听器的“组织方式”从 List<WeatherObserver>
换成了 ChannelHandlerContext 链而已,本质上利用了观察者模式来实现声明式的编程语义。
ChannelPromise
有事我们需要在响应成功写入 ChannelOutboundBuffer 后执行一些回调方法(比如打印日志),若使用 channel.writeAndFlush(msg)
在调用后立即打印日志其实是不准确的,此时并不一定写 buffer 成功了,应该调用其重载的方法传入一个 ChannelPromise:
channel.writeAndFlush(msg, channel.newPromise().addListener(new ChannelFutureListener() {
public void operationComplete(ChannelFuture future) throws Exception {
log.debug("success.");
}
}));
这里的 ChannelPromise 其实就是一个 ChannelFutureListener 的注册中心,当异步写入完成后,其上注册的 ChannelFutureListener 会被一一回调,我们查看 ChannelOutboundBuffer 相关方法(这里省略了一些步骤,具体可以看后续的源码解析篇):
public boolean remove() {
// 成功
ChannelPromise promise = e.promise;
// ...
if (!e.cancelled) {
// ...
safeSuccess(promise);
// ...
}
// ...
}
public boolean remove(Throwable cause) {
// 失败
return remove0(cause, true);
}
private boolean remove0(Throwable cause, boolean notifyWritability) {
// ...
ChannelPromise promise = e.promise;
// ...
if (!e.cancelled) {
// ...
safeFail(promise, cause);
// ...
}
// ...
}
若 ChannelPromise 类型为 ChannelProgressivePromise,参见
progress()
方法。
JDK
FutureTask
JDK 提供 Future 接口,定义一种“即将在未来某个时间点完成”的任务结果,Future 是动态的,任务结果将在未来的某个时间点变得可用。
利用 JDK 1.5 的多线程实现,我们可以提交 Callable 类型的任务对象给线程池并返回一个 Future 对象,调用 Future 相应的 API 可以以不同方式(同步、异步)获取结果。
// 将“求 10 个数的累加和”的任务切分成 3 个子任务
MyTask t1 = new MyTask(new Integer[]{1, 5, -10});
MyTask t2 = new MyTask(new Integer[]{43, 2, 89, 2});
MyTask t3 = new MyTask(new Integer[]{11, 23, -9});
ExecutorService pool = Executors.newFixedThreadPool(3);
List<Future<Integer>> futures = null;
try {
futures = pool.invokeAll(Arrays.asList(t1, t2, t3));
} catch (InterruptedException e) {
// ...
}
int sum = 0;
for (Future<Integer> future : futures) {
try {
sum += future.get();
} catch (Exception ignored) {
}
}
log.debug("sum={}", sum);
pool.shutdown();
class MyTask implements Callable<Integer> {
private Integer[] counts;
public MyTask(Integer[] counts) {
this.counts = counts;
}
public Integer call() throws Exception {
Integer sum = 0;
int len = counts.length;
for (int i = 0; i < len; i++) {
sum += counts[i];
}
return sum;
}
}
结果:
sum=157
上面的例子实现了一个简单的利用多核多线程加快计算的例子,在海量数据的情况下是有意义的。但是,我们可以看出,JDK8 之前提供的 FutureTask 并不好用,因为我需要关注结果的调用时机,且当时我并不知道 Future 是否可用,若不可用,那个调用 get()
的线程还会阻塞,这一点不符合“好莱坞原则”,如果能在任务完成之后自动调用 Callback 的话会更优雅一点。
为此,JDK8 提供了 CompletableFuture。
CompletableFuture
比如“异步 new 一个对象打印其内存地址”:
CompletableFuture.supplyAsync(Object::new).thenAccept(System.out::println);
Dubbo 2.7.0 版本开始利用 CompletableFuture 来优化实现全异步调用链,具体可以参见 这篇文章。
弊端
通过回调函数,可以解决一些异步编程问题,但比较容易形成箭头型代码及“回调地狱”:
不过这也有解决方式,如 JS 的链式 Promise,通过每次 then 的调用后新包装一个外层 Promise 解决了编程风格问题,让本身的业务代码读起来像同步调用的顺序一样清晰明了:
new Promise(() => ...)
.then(() => ...)
.then(() => ...)
.then(() => ...)
.then(() => ...);
或者使用 ES6 的 async/await 关键字
Java 的 CompletableFuture 也同理支持类似风格:
CompletableFuture.supplyAsync(Object::new)
.thenAccept(System.out::println)
.thenAccept(...)
.thenAccept(...);
当然,你也可以自行编写 Generator 代码去接收一些“Function”入参,封装“箭头型代码”的生成,让外层看起来舒服一点,限于篇幅,这里就不再展开了。