观察者模式和回调思想

有时我们为了达到“当某件事情发生时,触发特定逻辑”的效果,但作为触发源又不想关心“谁对我发出的事件感兴趣”,此时可以利用观察者模式的思想进行解耦。

观察者模式

观察者模式很好理解,我们以中土世界的故事作为例子。

代码来自 java-design-patterns

image-20191104163538838

/**
 * 观察者接口
 */
public interface WeatherObserver {
  void update(WeatherType currentWeather);
}
/**
 * 可以通过实现 WeatherObserver 接口通过此类注册为监听器来观察天气的变化
 */
@Slf4j
public class Weather {

  private WeatherType currentWeather;
  private List<WeatherObserver> observers;

  public Weather() {
    observers = new ArrayList<>();
    currentWeather = WeatherType.SUNNY;
  }

  public void addObserver(WeatherObserver obs) {
    observers.add(obs);
  }

  public void removeObserver(WeatherObserver obs) {
    observers.remove(obs);
  }

  /**
   * 天气随着时间流逝而变化
   */
  public void timePasses() {
    WeatherType[] enumValues = WeatherType.values();
    currentWeather = enumValues[(currentWeather.ordinal() + 1) % enumValues.length];
    log.debug("==== 天气变化为【{}】 ====", currentWeather);
    notifyObservers();
  }

  private void notifyObservers() {
    for (WeatherObserver obs : observers) {
      obs.update(currentWeather);
    }
  }
}
/**
 * 天气类型枚举
 */
public enum WeatherType {

  SUNNY("晴天"), RAINY("下雨"), WINDY("大风"), COLD("严寒");

  private String desc;

  WeatherType(String desc) {
    this.desc = desc;
  }

  public String getDesc() {
    return desc;
  }

  @Override
  public String toString() {
    return this.getDesc();
  }
}
/**
 * 半兽人
 */
@Slf4j
public class Orcs implements WeatherObserver {

  @Override
  public void update(WeatherType currentWeather) {
    switch (currentWeather) {
      case COLD:
        log.debug("半兽人被冰冻住了");
        break;
      case RAINY:
        log.debug("半兽人被淋的浑身湿透");
        break;
      case SUNNY:
        log.debug("阳光灼伤了半兽人们的眼睛");
        break;
      case WINDY:
        log.debug("半兽人的气味在风中几乎消失殆尽");
        break;
      default:
        break;
    }
  }
}
/**
 * 霍比特人
 */
@Slf4j
public class Hobbits implements WeatherObserver {

  @Override
  public void update(WeatherType currentWeather) {
    switch (currentWeather) {
      case COLD:
        log.debug("霍比特人在寒冷的天气中颤抖");
        break;
      case RAINY:
        log.debug("霍比特人在雨中寻找庇护所");
        break;
      case SUNNY:
        log.debug("快乐的霍比特人在温暖的阳光下互相致敬");
        break;
      case WINDY:
        log.debug("霍比特人在大风天紧紧地戴上了帽子");
        break;
      default:
        break;
    }
  }
}

运行测试:

public static void main(String[] args) {
  Weather weather = new Weather();
  weather.addObserver(new Orcs());
  weather.addObserver(new Hobbits());

  weather.timePasses();
  weather.timePasses();
  weather.timePasses();
  weather.timePasses();
}

结果如下:

==== 天气变化为【下雨】 ====
半兽人被淋的浑身湿透
霍比特人在雨中寻找庇护所
==== 天气变化为【大风】 ====
半兽人的气味在风中几乎消失殆尽
霍比特人在大风天紧紧地戴上了帽子
==== 天气变化为【严寒】 ====
半兽人被冰冻住了
霍比特人在寒冷的天气中颤抖
==== 天气变化为【晴天】 ====
阳光灼伤了半兽人们的眼睛
快乐的霍比特人在温暖的阳光下互相致敬

回调思想

image-20191104163538838

interface Worker {
    void consume(Callback callback);
}
class WorkerImpl implements Worker {
    private ExecutorService pool;
    
    // ...
    
    @Override
    public void consume(Callback callback) {
        pool.submit(() -> {
            try {
                // ...
                callback.onSuccess();
            } catch (Throwable e) {
                callback.onError(e);
            }
        });
    }
}
interface Callback {
    void onSuccess();
  
    void onError(Throwable e);
}
new WorkerImpl().consume(new Callback() {
    @Override
    public void onSuccess() {
        log.debug("success.");
        // ...
    }

    @Override
    public void onError(Throwable e) {
        log.error("failed.", e);
        // ...
    }
})

通过传入回调函数,我们的“完成任务后的后续代码”可以适时地被调用,也解决了异步编程中的串行化。

应用

JDK、NodeJS、Netty 等均有回调思想以及观察者模式的应用。

Netty

ChannelHandlerContext

Netty 为我们提供了声明式的编程方式,一个 ChannelInboundHandlerAdapter 可以订阅诸多事件,如 channelRegistered、channelRead、exceptionCaught 等等。在开发 Netty 应用的过程中,我们一般会指定一个 ChannelInitializer 来定制 Channel Pipeline 流水线:

protected void initChannel(SocketChannel ch) {
    ch.pipeline().addLast("loggingHandler", NettyComponent.HANDLER_SINGLETON)
      .addLast.......
}

Pipeline 上是一个双向 ChannelHandlerContext 链表:

protected DefaultChannelPipeline(Channel channel) {
    //...
    tail = new TailContext(this);
    head = new HeadContext(this);
    head.next = tail;
    tail.prev = head;
}

private void addLast0(AbstractChannelHandlerContext newCtx) {
    AbstractChannelHandlerContext prev = tail.prev;
    newCtx.prev = prev;
    newCtx.next = tail;
    prev.next = newCtx;
    tail.prev = newCtx;
}

以 ChannelInboundHandlerAdapter 的 channelRead() 方法的触发逻辑为例,请求被上一个 ChannelHandlerContext 绑定的 Handler 处理完后(一般是解码器),通过调用 ctx.fireChannelRead(msg) 将请求交给我们自定义的 ChannelHandlerContext:

@Override
public ChannelHandlerContext fireChannelRead(final Object msg) {
    // findContextInbound 会跳过一些 Handler(@Skip 功能)
    invokeChannelRead(findContextInbound(MASK_CHANNEL_READ), msg);
    return this;
}

static void invokeChannelRead(final AbstractChannelHandlerContext next, Object msg) {
    final Object m = next.pipeline.touch(ObjectUtil.checkNotNull(msg, "msg"), next);
    EventExecutor executor = next.executor();
    /*
      判断当前上下文的 EventExecutor 是否在本 EventLoop 中,
      若不是则交给与 Channel 绑定的那个 EventLoop 的线程去执行(线程绑定 Channel),
      否则直接触发本上下文当前的那个 ChannelHandler 的 `channelRegistered()` 方法
     */
    if (executor.inEventLoop()) {
        next.invokeChannelRead(m);
    } else {
        executor.execute(new Runnable() {
            @Override
            public void run() {
                next.invokeChannelRead(m);
            }
        });
    }
}

private void invokeChannelRead(Object msg) {
    if (invokeHandler()) {
        try {
            // 我们的 ChannelInboundHandler 被回调
            ((ChannelInboundHandler) handler()).channelRead(this, msg);
        } catch (Throwable t) {
            notifyHandlerException(t);
        }
    } else {
        fireChannelRead(msg);
    }
}

可以看出,对比上文观察者模式中的例子,我们发现区别只是 Netty 将监听器的“组织方式”从 List<WeatherObserver> 换成了 ChannelHandlerContext 链而已,本质上利用了观察者模式来实现声明式的编程语义。

ChannelPromise

有事我们需要在响应成功写入 ChannelOutboundBuffer 后执行一些回调方法(比如打印日志),若使用 channel.writeAndFlush(msg) 在调用后立即打印日志其实是不准确的,此时并不一定写 buffer 成功了,应该调用其重载的方法传入一个 ChannelPromise:

channel.writeAndFlush(msg, channel.newPromise().addListener(new ChannelFutureListener() {
    @Override
    public void operationComplete(ChannelFuture future) throws Exception {
        log.debug("success.");
    }
}));

这里的 ChannelPromise 其实就是一个 ChannelFutureListener 的注册中心,当异步写入完成后,其上注册的 ChannelFutureListener 会被一一回调,我们查看 ChannelOutboundBuffer 相关方法(这里省略了一些步骤,具体可以看后续的源码解析篇):

public boolean remove() {
    // 成功
    ChannelPromise promise = e.promise;
    // ...
    if (!e.cancelled) {
        // ...
        safeSuccess(promise);
        // ...
    }
    // ...
}

public boolean remove(Throwable cause) {
    // 失败
    return remove0(cause, true);
}

private boolean remove0(Throwable cause, boolean notifyWritability) {
    // ...
    ChannelPromise promise = e.promise;
    // ...
    if (!e.cancelled) {
        // ...
        safeFail(promise, cause);
        // ...
    }
    // ...
}

若 ChannelPromise 类型为 ChannelProgressivePromise,参见 progress() 方法。

JDK

FutureTask

JDK 提供 Future 接口,定义一种“即将在未来某个时间点完成”的任务结果,Future 是动态的,任务结果将在未来的某个时间点变得可用。

利用 JDK 1.5 的多线程实现,我们可以提交 Callable 类型的任务对象给线程池并返回一个 Future 对象,调用 Future 相应的 API 可以以不同方式(同步、异步)获取结果。

// 将“求 10 个数的累加和”的任务切分成 3 个子任务
MyTask t1 = new MyTask(new Integer[]{1, 5, -10});
MyTask t2 = new MyTask(new Integer[]{43, 2, 89, 2});
MyTask t3 = new MyTask(new Integer[]{11, 23, -9});

ExecutorService pool = Executors.newFixedThreadPool(3);
List<Future<Integer>> futures = null;
try {
    futures = pool.invokeAll(Arrays.asList(t1, t2, t3));
} catch (InterruptedException e) {
    // ...
}

int sum = 0;
for (Future<Integer> future : futures) {
    try {
        sum += future.get();
    } catch (Exception ignored) {
    }
}
log.debug("sum={}", sum);
pool.shutdown();
class MyTask implements Callable<Integer> {
    private Integer[] counts;

    public MyTask(Integer[] counts) {
        this.counts = counts;
    }

    @Override
    public Integer call() throws Exception {
        Integer sum = 0;
        int len = counts.length;
        for (int i = 0; i < len; i++) {
            sum += counts[i];
        }
        return sum;
    }
}

结果:

sum=157

上面的例子实现了一个简单的利用多核多线程加快计算的例子,在海量数据的情况下是有意义的。但是,我们可以看出,JDK8 之前提供的 FutureTask 并不好用,因为我需要关注结果的调用时机,且当时我并不知道 Future 是否可用,若不可用,那个调用 get() 的线程还会阻塞,这一点不符合“好莱坞原则”,如果能在任务完成之后自动调用 Callback 的话会更优雅一点。

为此,JDK8 提供了 CompletableFuture。

CompletableFuture

比如“异步 new 一个对象打印其内存地址”:

CompletableFuture.supplyAsync(Object::new).thenAccept(System.out::println);

Dubbo 2.7.0 版本开始利用 CompletableFuture 来优化实现全异步调用链,具体可以参见 这篇文章

弊端

通过回调函数,可以解决一些异步编程问题,但比较容易形成箭头型代码及“回调地狱”:

image-20200307214638809

不过这也有解决方式,如 JS 的链式 Promise,通过每次 then 的调用后新包装一个外层 Promise 解决了编程风格问题,让本身的业务代码读起来像同步调用的顺序一样清晰明了:

new Promise(() => ...)
    .then(() => ...)
    .then(() => ...)
    .then(() => ...)
    .then(() => ...);

或者使用 ES6 的 async/await 关键字

Java 的 CompletableFuture 也同理支持类似风格:

CompletableFuture.supplyAsync(Object::new)
                .thenAccept(System.out::println)
                .thenAccept(...)
                .thenAccept(...);

当然,你也可以自行编写 Generator 代码去接收一些“Function”入参,封装“箭头型代码”的生成,让外层看起来舒服一点,限于篇幅,这里就不再展开了。