RxJava 2 版本的 Rxbus

基于 RxJava 的 RxBus 作为一种事件总线,相信许多人都了解一些,Square 的 Otto 也因此弃用,因为现在 RxJava 太火了,用它几行代码就可以写出事件总线。不过大家所熟悉的是基于 RxJava 1.x 版本的,2016 年十月底 RxJava 更新到 2.x 版本了,具体变化请看 What’s different in 2.0,下面总结下适合不同场景的 RxJava 2 版本的 RxBus 写法。

  • 没有背压处理(Backpressure)的 RxBus

  • 有背压处理的 RxBus

  • 有异常处理的 RxBus (订阅者处理事件出现异常也能继续收到事件)

没有背压处理(Backpressure)的 Rxbus

在 RxJava 2.0 之后,io.reactivex.Observable中没有进行背压处理了,如果有大量消息堆积在总线中来不及处理会产生MissingBackpressureException或者OutOfMemoryError,有新的类io.reactivex.Flowable 专门针对背压问题。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
import io.reactivex.Observable;
import io.reactivex.subjects.PublishSubject;
import io.reactivex.subjects.Subject;
public class RxBus {
private final Subject<Object> mBus;
private RxBus() {
// toSerialized method made bus thread safe
mBus = PublishSubject.create().toSerialized();
}
public static RxBus get() {
return Holder.BUS;
}
public void post(Object obj) {
mBus.onNext(obj);
}
public <T> Observable<T> toObservable(Class<T> tClass) {
return mBus.ofType(tClass);
}
public Observable<Object> toObservable() {
return mBus;
}
public boolean hasObservers() {
return mBus.hasObservers();
}
private static class Holder {
private static final RxBus BUS = new RxBus();
}
}

有背压处理的 RxBus

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
import io.reactivex.Flowable;
import io.reactivex.processors.FlowableProcessor;
import io.reactivex.processors.PublishProcessor;
public class RxBus {
private final FlowableProcessor<Object> mBus;
private RxBus() {
// toSerialized method made bus thread safe
mBus = PublishProcessor.create().toSerialized();
}
public static RxBus get() {
return Holder.BUS;
}
public void post(Object obj) {
mBus.onNext(obj);
}
public <T> Flowable<T> toFlowable(Class<T> tClass) {
return mBus.ofType(tClass);
}
public Flowable<Object> toFlowable() {
return mBus;
}
public boolean hasSubscribers() {
return mBus.hasSubscribers();
}
private static class Holder {
private static final RxBus BUS = new RxBus();
}
}

有异常处理的 Rxbus

上面的两种 RxBus 在订阅者处理事件出现异常后,订阅者无法再收到事件,这是 RxJava 当初本身的设计原则,但是在事件总线中这反而是个问题。

网上看到使用onErrorResumeNext(Observable.never())的方式,原理是 Observable 发送 OnError 事件时,不执行 Observer 的 onError 方法,再订阅一个不产生事件的 Observable.never(),看起来好像解决了问题。但是 RxBus 本身就基本不会发送 error 事件,而且在 onNext 方法执行出现异常时,还是会走到 onError 方法并且结束订阅关系。

后面我以为基于 JakeWharton 大神写的 RxRelay 的 RxBus 可以解决问题,但是测试之后还是不行,因为 PublishRelay 只是不会发送 error 和 complete 事件而已,没有其他异常处理的逻辑。

于是开始从源码中查找问题的原因,最后查到是 io.reactivex.internal.observers.LambdaObserver 的原因:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
@Override
public void onNext(T t) {
if (!isDisposed()) {
try {
onNext.accept(t);
} catch (Throwable e) {
Exceptions.throwIfFatal(e);
onError(e); // 出现异常会跳到下面的 onError 方法
}
}
}
@Override
public void onError(Throwable t) {
if (!isDisposed()) {
lazySet(DisposableHelper.DISPOSED); // 结束订阅关系
try {
onError.accept(t);
} catch (Throwable e) {
Exceptions.throwIfFatal(e);
RxJavaPlugins.onError(new CompositeException(t, e));
}
}
}

所以我想到的一个方法是,自定义一个 LambdaObserver,在 onError 方法中不会取消订阅。重新写了基于 RxRelay 的 RxBus,订阅的时候使用自定义的 LambdaObserver 封装观察者,这样就相对优雅地实现了有异常处理的 RxBus。

项目地址:https://github.com/JohnnyShieh/RxBus

END
Johnny Shieh wechat
我的公众号,不只有技术,还有咖啡和彩蛋!