RxJava探索一

RxJava是什么?
为什么要学习RxJava?
如何使用RxJava?


RxJava是什么?

RxJava 是一个响应式编程框架,采用观察者设计模式。
响应式编程是一种基于异步数据流概念的编程模式。
数据流就像一条河:它可以被观测,被过滤,被操作,或者为新的消费者与另外一条流合并为一条新的流。

为什么要学习RxJava?

换个方式思考:同样是做异步,为什么要选择它,而不选择现成的 AsyncTask / Handler / XXX / …? 因为:

Rxjava 更简洁

它的简洁的与众不同之处在于,随着程序逻辑变得越来越复杂,它依然能够保持简洁。

举个例子

jglz

假如有这样的一个需求(服务端返回了一串包含各种后缀格式的数组url)要求如下:

  • 如果url后缀是apk,就下载apk到指定目录(apk)下
  • 如果url后缀是mp3、wav、aac,就下载音频到指定目录(music/[map3、wav、aac])下
  • 如果url后缀是png、webp、jpg、jpeg格式,就下载图片到指定目录(img/[png、webp、aac、jpg、jpeg])下,并显示在一个图片列表控件上
    (自定义图片集合控件名ImageViews,提供了addBitmap(Bitmap bmp)方法添加图片展示)

需求分析:

  • 已知,服务端返回的url集合
  • 未知,包含各种后缀格式
  • 图片、安装包、音频 需要分3大类
  • 其中图片、音频需分组
  • 其中图片还需要展示在控件上
  • 都需要下载到本地
[非Rxjava实现](https://gist.github.com/luck-fc/d8c129bb599bcf2a72af5fe2a70886f3) **VS** [Rxjava实现](https://gist.github.com/luck-fc/f4e2506160c30481df2eb680b57e9170)

如何使用RxJava?

Rxjava 有两个不同的版本 即(Rxjava1.x 和 Rxjava2.x),官方推荐使用的是2.x

这里讲解最常用的:创建操作、变换操作、过滤操作。

引入项目

这里直接贴出RxJava RxAndroid的开源地址

代码: Github:

https://github.com/ReactiveX/RxJava

https://github.com/ReactiveX/RxAndroid

引入依赖:

  • RxJava1.x
    1
    2
    compile 'io.reactivex:rxandroid:1.2.1'
    compile 'io.reactivex:rxjava:1.1.6'
  • RxJava2.x
    1
    2
    compile 'io.reactivex.rxjava2:rxandroid:2.0.1'
    compile 'io.reactivex.rxjava2:rxjava:2.0.1'

以下讲解的是基于 Rxjava1.x 版本

创建操作
方法名 调度器类型 作用
just DEFAULT 将一个或多个对象,转换成发射这个或这些对象的一个Observable
from DEFAULT 将一个Iterable, 一个Future, 或者一个数组转换成一个Observable
repeat Schedulers.trampoline( ) 创建一个重复发射指定数据或数据序列的Observable
repeatWhen DEFAULT 创建一个重复发射指定数据或数据序列的Observable,它依赖于另一个Observable发射的数据
create DEFAULT 使用一个函数从头创建一个Observable
defer DEFAULT 只有当订阅者订阅才创建Observable;为每个订阅创建一个新的Observable
range DEFAULT 创建一个发射指定范围的整数序列的Observable
interval Schedulers.computation( ) 创建一个按照给定的时间间隔发射整数序列的Observable
timer Schedulers.computation( ) 创建一个按照给定的时间间隔发射整数序列的Observable
empty DEFAULT 创建一个什么都不做直接通知完成的Observable
error DEFAULT 创建一个什么都不做直接通知错误的Observable
never DEFAULT 创建一个不发射任何数据的Observable
变换操作
方法名 调度器类型 作用
map DEFAULT 对序列的每一项都应用一个函数来变换Observable发射的数据序列
flatMap DEFAULT 将Observable发射的数据集合变换为Observables集合,然后将这些Observable发射的数据平坦化的放进一个单独的Observable
concatMap DEFAULT 将Observable发射的数据集合变换为Observables集合,然后将这些Observable发射的数据平坦化的放进一个单独的Observable
switchMap DEFAULT 将Observable发射的数据集合变换为Observables集合,然后只发射这些Observables最近发射的数据
scan DEFAULT 对Observable发射的每一项数据应用一个函数,然后按顺序依次发射每一个值
groupBy DEFAULT 将Observable分拆为Observable集合,将原始Observable发射的数据按Key分组,每一个Observable发射一组不同的数据
buffer Schedulers.computation( ) “它定期从Observable收集数据到一个集合,然后把这些数据集合打包发射,而不是一次发射一个
window Schedulers.computation( ) 定期将来自Observable的数据分拆成一些Observable窗口,然后发射这些窗口,而不是每次发射一项
cast DEFAULT 在发射之前强制将Observable发射的所有数据转换为指定类型
过滤操作
方法名 调度器类型 作用
filter DEFAULT 过滤数据
takeLast DEFAULT 只发射最后的N项数据
last DEFAULT 只发射最后的一项数据
lastOrDefault DEFAULT 只发射最后的一项数据,如果Observable为空就发射默认值
takeLastBuffer DEFAULT 将最后的N项数据当做单个数据发射
skip DEFAULT 跳过开始的N项数据
skipLast DEFAULT 跳过最后的N项数据
take DEFAULT 只发射开始的N项数据
first() and takeFirst DEFAULT 只发射第一项数据,或者满足某种条件的第一项数据
firstOrDefault DEFAULT 只发射第一项数据,如果Observable为空就发射默认值
elementAt DEFAULT 发射第N项数据
elementAtOrDefault DEFAULT 发射第N项数据,如果Observable数据少于N项就发射默认值
sample() or throttleLast DEFAULT 定期发射Observable最近的数据
throttleFirst Schedulers.computation( ) 定期发射Observable发射的第一项数据
throttleWithTimeout() or debounce DEFAULT 只有当Observable在指定的时间后还没有发射数据时,才发射一个数据
timeout Schedulers.immediate() 如果在一个指定的时间段后还没发射数据,就发射一个异常
distinct DEFAULT 过滤掉重复数据
distinctUntilChanged DEFAULT 过滤掉连续重复的数据
ofType DEFAULT 只发射指定类型的数据
ignoreElements DEFAULT 丢弃所有的正常数据,只发射错误或完成通知

我们需要特别注意的!

subscribeOn 和 observeOn 的区别

可能我们刚刚接触rxjava会有这样的同感 (subscribeon observeon 简直傻傻分不清啊!)
以下总结或许可以拯救你:

1
2
3
4
5
subscribeOn指定的是上游发送事件的线程
observeOn指定的是下游接收事件的线程
对于同一个上游,多次调用subscribeOn只有第一次的有效, 其余的会被忽略.
每调用一次observeOn下游的线程就会切换一次.
observeOn之后,调用subscribeOn无效

实例分析,如果我们有一段这样的序列

1
2
3
4
5
6
7
8
9
10
11
Observable
.map // 操作1
.flatMap // 操作2
.subscribeOn(io)
.map //操作3
.flatMap //操作4
.observeOn(main)
.map //操作5
.flatMap //操作6
.subscribeOn(io) //!!特别注意
.subscribe(handleData)

假设这里我们是在主线程上调用这段代码,
那么

  • 操作1,操作2是在io线程上,因为之后subscribeOn切换了线程
  • 操作3,操作4也是在io线程上,因为在subscribeOn切换了线程之后,并没有发生改变。
  • 操作5,操作6是在main线程上,因为在他们之前的observeOn切换了线程。
  • 特别注意那一段,对于操作5和操作6是无效的

这里引用了Gemini的文章迷之RxJava —— subscribeOn 和 observeOn 的区别中的总结。

调度器类型解释

同样我们刚刚接触rxjava会有这样的同感 (调度器这多,都有何种含义呢?)

调度器类型 效果
Schedulers.io( ) 用于IO密集型任务,如异步阻塞IO操作(读写文件、读写数据库、网络信息交互等)所使用的 Scheduler。行为模式和 newThread() 差不多,区别在于 io() 的内部实现是是用一个无数量上限的线程池,可以重用空闲的线程,因此多数情况下 io() 比 newThread() 更有效率。不要把计算工作放在 io() 中,可以避免创建不必要的线程。这个调度器的线程池会根据需要增长;对于普通的计算任务,请使用Schedulers.computation();Schedulers.io( )默认是一个CachedThreadScheduler,很像一个有线程缓存的新线程调度器
Schedulers.computation( ) 用于计算任务,使用的固定的线程池,大小为 CPU 核数。 这个计算指的是 CPU 密集型计算,即不会被 I/O 等操作限制性能的操作,例如图形的计算,事件循环或和回调处理。不要把 I/O 操作放在 computation() 中,否则 I/O 操作的等待时间会浪费 CPU。
Schedulers.from(executor) 使用指定的Executor作为调度器
Schedulers.immediate( ) 在当前线程立即开始执行任务,相当于不指定线程。这是默认的 Scheduler
Schedulers.trampoline( ) 当其它排队的任务完成后,在当前线程排队开始执行
Schedulers.newThread() 总是启用新线程,并在新线程执行操作。
AndroidSchedulers.mainThread() Android专用的,指定的操作将在 Android 主线程运行,涉及到更新UI的操作必须用此线程
界面退出一定要记得unsubscribe

它表示取消订阅,也就是说它调用后Subscriber对象不再接收传过来的事件。
当然它还有一个更重要的作用就是释放掉Observable的引用,以避免内存泄漏的威胁。

1
2
3
if (Subscriber != null && !Subscriber.isUnsubscribed()) {
Subscriber.unsubscribe();
}
那些不明白的RxJava

RxJava2相关学习demo资料

RxJava2-Android-Samples

其他RxJava相关学习资料

本章讲解了对于Rxjava的认识、了解
罗列了Rxjava1.x版本(创建操作、变换操作、过滤操作)一系列方法的作用
注意事项