RxJava2相关知识学习之一

最近心态控制不好,不断陷入自我否定中,负能量较多,也忘事大。很久没有动力写文章了。
鉴于RxJava2在项目中使用比较频繁。今天学习RxJava2的一些语法相关知识吧。此篇是开篇,希望可以一直坚持写下去!!!


RxJava2提供的一些运算符基类

  • io.reactivex.Flowable:以下特点
  • io.reactivex.Observable:0..N流动,没有背压,
  • io.reactivex.Single:正好1个项目的流程或错误 (只执行一次(onNext onCompletable) onError 互斥执行)
  • io.reactivex.Completable:没有项目但只有完成或错误信号的流程, (onCompletable onError 互斥执行)
  • io.reactivex.Maybe:没有项目,只有一个项目或错误的流程。 (Maybe是Single和Completable的组合体)

术语介绍

Upstream(上游), downstream(下游)

比如以下代码

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
source
.operator1()
.operator2()
.operator3()
.subscribe(consumer)
~~~
* 针对operator2()方法,operator1()就是上游数据,operator3()就是下游。

### Objects in motion(运动中的数据)

emission, emits, item, event, signal, data and message 都表示沿数据流传播的对象。

### Backpressure(背压)

简单说:就是上游数据发射 和 下游数据处理;有时间差。这个过程的处理。通常表现为由于临时缓冲或需要跳过丢弃数据而增加的内存使用量,Flowable类被指定为支持背压。

### Assembly time(组装时间)

通过应用各种中间运算符来准备数据流发生在所谓的汇编时间中
示例代码:
~~~java
Flowable<Integer> flow = Flowable.range(1, 5)
.map(v -> v* v)
.filter(v -> v % 3 == 0)
;

此时,数据尚未流动,并且没有发生任何副作用。

Subscription time (订阅时间)

当subscribe()在流上调用时,这是一个临时状态,在内部建立处理步骤链

1
flow.subscribe(System.out::println)

Runtime (运行)

这是流量主动发出项目,错误或完成信号时的状态

1
2
3
4
5
6
7
8
9
10
11
Observable.create(emitter -> {
while (!emitter.isDisposed()) {//是否有取消订阅
long time = System.currentTimeMillis();
emitter.onNext(time);//发射当前时间
if (time % 2 != 0) {//如果余数不为0执行 onError方法
emitter.onError(new IllegalStateException("Odd millisecond!"));
break;
}
}
})
.subscribe(System.out::println, Throwable::printStackTrace);

Simple background computation (简单示例)

RxJava的一个常见用例是在后台线程上运行一些计算或网络请求并在UI线程上显示结果(或错误)

示例代码:

1
2
3
4
5
6
7
8
9
10
11
import io.reactivex.schedulers.Schedulers;

Flowable.fromCallable(() -> {
Thread.sleep(1000); // imitate expensive computation
return "Done";
})
.subscribeOn(Schedulers.io())
.observeOn(Schedulers.single())
.subscribe(System.out::println, Throwable::printStackTrace);

Thread.sleep(2000); // <--- wait for the flow to finish

其实RxJava的反应类型是不可变的; 每个方法调用都返回一个Flowable带有添加行为的new

为了证明这一点,以上代码可替换为:

1
2
3
4
5
6
7
8
9
10
11
12
Flowable<String> source = Flowable.fromCallable(() -> {
Thread.sleep(1000); // imitate expensive computation
return "Done";
});

Flowable<String> runBackground = source.subscribeOn(Schedulers.io());

Flowable<String> showForeground = runBackground.observeOn(Schedulers.single());

showForeground.subscribe(System.out::println, Throwable::printStackTrace);

Thread.sleep(2000);

Schedulers (线程切换)

RxJava运算符是不能直接使用Threads或ExecutorServices的。

  • Schedulers.computation(): 在后台运行固定数量的专用线程上的计算密集型工作。大多数异步操作符都将此作为默认值Scheduler。

  • Schedulers.io(): 在动态变化的线程集上运行类似I / O或阻塞操作。

  • Schedulers.single(): 以顺序和FIFO方式在单个线程上运行。 先进先出方式

  • Schedulers.trampoline(): 在其中一个参与线程中以顺序和FIFO方式运行,通常用于测试目的。 处于调用时的当前线程

  • 其他1:以上都可以在所有JVM平台,但一些特定的平台,如Android,都有自己的典型的Scheduler已定义:AndroidSchedulers.mainThread(),SwingScheduler.instance()或JavaFXSchedulers.gui()

  • 其他2: 可以选择将现有Executor(及其子类型ExecutorService) 使用 Schedulers.from(Executor) 方法来使用指定线程池

注意事项:
默认情况下Scheduler在守护程序线程上运行。这意味着一旦Java主线程退出,它们都会停止并且后台计算可能永远不会发生。

Concurrency within a flow (流中的并发)

RxJava中的流程本质上是顺序的,分为可以彼此同时运行的处理阶段

如下代码:

1
2
3
4
Flowable.range(1, 10)
.observeOn(Schedulers.computation())
.map(v -> v * v)
.blockingSubscribe(System.out::println);

此示例流程在计算时将数字得到1到10的平方。

顺序的含义是: lambda v -> v * v并不是为了这个流而并行运行; 它在一个接一个的同一计算线程上接收值1到10。

Parallel processing(并行处理)

并行处理数字1到10的示例:

1
2
3
4
5
6
7
Flowable.range(1, 10)
.flatMap(v ->
Flowable.just(v)
.subscribeOn(Schedulers.computation())
.map(w -> w * w)
)
.blockingSubscribe(System.out::println);

实际上,RxJava中的并行性意味着运行独立流并将其结果合并回单个流中。

注意:

  • flatMap不保证任何顺序
  • concatMap 一次映射并运行一个内部流程
  • concatMapEager 它会“同时”运行所有内部流程,但输出流程将按创建内部流程的顺序排列。

或者,Flowable.parallel()操作符和ParallelFlowable类型帮助实现相同的并行处理模式。

1
2
3
4
5
6
Flowable.range(1, 10)
.parallel()
.runOn(Schedulers.computation())
.map(v -> v * v)
.sequential()
.blockingSubscribe(System.out::println);

Dependent sub-flows (依赖子流程)

flatMap是一个强大的操作员,在很多情况下都有帮助。例如,给定一个返回a的服务Flowable,我们想要使用第一个服务发出的值调用另一个服务:

1
2
3
4
5
6
7
8
Flowable<Inventory> inventorySource = warehouse.getInventoryAsync();

inventorySource.flatMap(inventoryItem ->
erp.getDemandAsync(inventoryItem.getId())
.map(demand
-> System.out.println("Item " + inventoryItem.getName() + " has demand " + demand));
)
.subscribe();

实际运用场景: 比如一个接口获取列表,然后通过列表中的每个id,获取每个id对应的数据展示在另一个列表上。

Continuations (延续)

有时,当项目可用时,人们希望对其执行一些依赖计算。这有时被称为延续,并且取决于应该发生什么以及涉及什么类型,可能涉及各种操作员来完成。

打个比方:

数据源.
转换数据.
过滤数据.
等等操作.订阅

Dependent (依赖型)

一个数据拿到,传递给下一个数据,一层一层。
示例:

1
2
3
service.apiCall()
.flatMap(value -> service.anotherApiCall(value))
.flatMap(next -> service.finalCall(next))
1
2
3
4
5
service.apiCall()
.flatMap(value ->
service.anotherApiCall(value)
.flatMap(next -> service.finalCallBoth(value, next))
)

Non-dependent (非依赖型)

第一个源/数据流的结果是无关紧要的,并且人们希望继续使用准独立的另一个源。

示例:

1
2
3
Observable continued = sourceObservable.flatMapSingle(ignored -> someSingleSource)//关键代码,这呃ignored没有用
continued.map(v -> v.toString())
.subscribe(System.out::println, Throwable::printStackTrace);

通过使用Completable作为调解器及其运算符andThen来恢复其他内容,有一种方式更具表现力(并且还有更低的开销):

1
2
3
4
sourceObservable
.ignoreElements() // 关键代码 returns Completable
.andThen(someSingleSource) // 类似中转了
.map(v -> v.toString())

sourceObservable和之间唯一的依赖关系someSingleSource是前者应该正常完成,以便后者被消费。

Deferred-dependent (递延依赖)

有时,前一个序列和新序列之间存在隐含的数据依赖关系,由于某种原因,它不会流经“常规通道”。人们倾向于写下如下的延续:

1
2
3
4
5
6
7
AtomicInteger count = new AtomicInteger();

Observable.range(1, 10)
.doOnNext(ignored -> count.incrementAndGet())
.ignoreElements()
.andThen(Single.just(count.get()))
.subscribe(System.out::println);

不好的是,这种结果打印出来的示0。

是因为在数据流尚未运行时Single.just(count.get())在汇编时进行评估。
在主源完成时,我们需要将这个Single源的评估推迟到运行时

1
2
3
4
5
6
7
AtomicInteger count = new AtomicInteger();

Observable.range(1, 10)
.doOnNext(ignored -> count.incrementAndGet())
.ignoreElements()
.andThen(Single.defer(() -> Single.just(count.get())))
.subscribe(System.out::println);

或者

1
2
3
4
5
6
7
AtomicInteger count = new AtomicInteger();

Observable.range(1, 10)
.doOnNext(ignored -> count.incrementAndGet())
.ignoreElements()
.andThen(Single.fromCallable(() -> count.get()))
.subscribe(System.out::println);

Type conversions (输入转化次数)

有时,源或服务返回的类型与应该使用它的流不同。例如,在上面的库存示例中,getDemandAsync可以返回一个Single。如果代码示例保持不变,这将导致编译时错误(但是,通常会出现关于缺少过载的误导性错误消息)。

在这种情况下,通常有两个选项来修复转换:1)转换为所需类型 或 2)查找并使用支持不同类型的特定运算符的重载。

个人理解:

  • 比如一个列表需要返回列表的id
  • 通过列表id获取内容列表 (此时,内容可能不是个列表,就一个内容数据。)

每个反应基类都具有可以执行此类转换的运算符,包括协议转换,以匹配其他类型。以下表格显示了可用的转换选项:

说明
1:当将多值源转换为单值源时,应该决定应该将多个源值中的哪一个视为结果。

2:Observable转入Flowable需要一个额外的决定:如何处理潜在的无约束流源Observable?有通过几个可用的策略(如缓冲,下降,保持最新的)BackpressureStrategy参数,或者通过标准的Flowable运营商,如onBackpressureBuffer,onBackpressureDrop,onBackpressureLatest这也让背压行为的进一步定制。

3:当只有(最多)一个源项目时,背压没有问题,因为它可以一直存储,直到下游准备好消耗。

Using an overload with the desired type (使用所需类型的重载)

许多经常使用的运算符具有可以处理其他类型的重载。这些通常以目标类型的后缀命名:

操作者 重载
flatMap flatMapSingle,flatMapMaybe,flatMapCompletable,flatMapIterable
concatMap concatMapSingle,concatMapMaybe,concatMapCompletable,concatMapIterable
switchMap switchMapSingle,switchMapMaybe,switchMapCompletable

这些运算符具有后缀而不是简单地具有不同签名的相同名称的原因是类型擦除。Java不考虑诸如operator(Function<T, Single>)和operator(Function<T, Maybe>)不同的签名(与C#不同),并且由于擦除,两个operators最终将作为具有相同签名的重复方法。

Operator naming conventions (运算符命名约定)

编程中的命名是最困难的事情之一,因为名称不会长,表达,捕捉和容易记忆。不幸的是,目标语言(以及预先存在的约定)在这方面可能不会提供太多帮助(不可用的关键字,类型擦除,类型模糊等)

Unusable keywords (不可用的关键字)

在原始的Rx.NET中,调用发出单个项然后完成的运算符Return(T)。由于Java约定是以小写字母开始一个方法名称,因此这将return(T)是Java中的关键字,因此不可用。因此,RxJava选择命名此运算符just(T)。Switch对于必须命名的运算符存在相同的限制switchOnNext。另一个例子是Catch命名onErrorResumeNext。

Type erasure (键入擦除)

许多期望用户提供返回反应类型的函数的操作符不能被重载,因为围绕Function<T, X>这种方法的类型擦除将这种方法签名为重复。RxJava选择通过将类型附加为后缀来命名此类运算符:

1
2
Flowable<R> flatMap(Function<? super T, ? extends Publisher<? extends R>> mapper)
Flowable<R> flatMapMaybe(Function<? super T, ? extends MaybeSource<? extends R>> mapper)

Type ambiguities (输入歧义)

即使某些操作符没有类型擦除的问题,它们的签名也可能变得模棱两可,特别是如果使用Java 8和lambdas。例如,有几种重载concatWith将各种其他反应基类型作为参数(为了在底层实现中提供方便和性能优势):

1
2
Flowable < T > concatWith(Publisher <?extends T > other);
Flowable < T > concatWith(SingleSource <?extends T > other);

两者Publisher并SingleSource显示为功能接口(类型的具有一个抽象方法),并可以鼓励用户尝试提供lambda表达式:

1
2
someSource.concatWith(s -> Single.just(2))
.subscribe(System.out::println, Throwable::printStackTrace);

不幸的是,这种方法不起作用,并且该示例根本不打印2。实际上,从版本2.1.10开始,它甚至都没有编译,因为至少concatWith存在4个重载,并且编译器发现上面的代码不明确。
在这种情况下的用户可能想要推迟一些计算直到someSource完成,因此正确的明确运算符应该是defer:

1
2
someSource.concatWith(Single.defer(() -> Single.just(2)))
.subscribe(System.out::println, Throwable::printStackTrace);

有时,会添加一个后缀以避免可能编译但在流中产生错误类型的逻辑歧义:

1
2
Flowable<T> merge(Publisher<? extends Publisher<? extends T>> sources);
Flowable<T> mergeArray(Publisher<? extends T>... sources);

当函数接口类型作为类型参数参与时,这也会变得模糊不清T。

错误处理

数据流可能会失败,此时会将错误发送给消费者。但有时候,多个源可能会失败,此时可以选择是否等待所有源完成或失败。为了表明这个机会,许多运算符名称后缀为DelayError单词(而其他运算符名称在其重载之一中具有一个delayError或一个delayErrors布尔标志):

1
2
Flowable<T> concat(Publisher<? extends Publisher<? extends T>> sources);
Flowable<T> concatDelayError(Publisher<? extends Publisher<? extends T>> sources);

当然,各种后缀可能会一起出现:

1
Flowable<T> concatArrayEagerDelayError(Publisher<? extends T>... sources);

基类与基类型

类型 接口 消费者
0..N背压 Flowable Publisher1 Subscriber
0..N无界限 Observable ObservableSource2 Observer
1个元素或错误 Single SingleSource SingleObserver
0..1元素或错误 Maybe MaybeSource MaybeObserver
0元素或错误 Completable CompletableSource CompletableObserver

说明:

1.它org.reactivestreams.Publisher是外部Reactive Streams库的一部分。它是通过由Reactive Streams规范管理的标准化机制与其他反应库交互的主要类型。
2.接口的命名约定是附加Source到半传统的类名。有没有FlowableSource因为Publisher是由无流库提供的(和子类型就不会与互操作或者帮助)。但是,这些接口在Reactive Streams规范的意义上并不是标准的,并且目前仅针对RxJava。

参考来源