基于 RxJava 的 RxBus 作为一种事件总线,相信许多人都了解一些,Square 的 Otto 也因此弃用,因为现在 RxJava 太火了,用它几行代码就可以写出事件总线。不过大家所熟悉的是基于 RxJava 1.x 版本的,2016 年十月底 RxJava 更新到 2.x 版本了,具体变化请看 What’s different in 2.0,下面总结下适合不同场景的 RxJava 2 版本的 RxBus 写法。
没有背压处理(Backpressure)的 RxBus
有背压处理的 RxBus
有异常处理的 RxBus (订阅者处理事件出现异常也能继续收到事件)
没有背压处理(Backpressure)的 Rxbus
在 RxJava 2.0 之后,io.reactivex.Observable
中没有进行背压处理了,如果有大量消息堆积在总线中来不及处理会产生MissingBackpressureException
或者OutOfMemoryError
,有新的类io.reactivex.Flowable
专门针对背压问题。
|
|
有背压处理的 RxBus
|
|
有异常处理的 Rxbus
上面的两种 RxBus 在订阅者处理事件出现异常后,订阅者无法再收到事件,这是 RxJava 当初本身的设计原则,但是在事件总线中这反而是个问题。
网上看到使用onErrorResumeNext(Observable.never())
的方式,原理是 Observable 发送 OnError 事件时,不执行 Observer 的 onError 方法,再订阅一个不产生事件的 Observable.never()
,看起来好像解决了问题。但是 RxBus 本身就基本不会发送 error 事件,而且在 onNext 方法执行出现异常时,还是会走到 onError 方法并且结束订阅关系。
后面我以为基于 JakeWharton 大神写的 RxRelay 的 RxBus 可以解决问题,但是测试之后还是不行,因为 PublishRelay 只是不会发送 error 和 complete 事件而已,没有其他异常处理的逻辑。
于是开始从源码中查找问题的原因,最后查到是 io.reactivex.internal.observers.LambdaObserver
的原因:
|
|
所以我想到的一个方法是,自定义一个 LambdaObserver,在 onError 方法中不会取消订阅。重新写了基于 RxRelay 的 RxBus,订阅的时候使用自定义的 LambdaObserver 封装观察者,这样就相对优雅地实现了有异常处理的 RxBus。