RxJava2 相关运算符使用示例

本篇文章主要介绍RxJava2中的运算符。
暂时主要包含:map, zip, take, reduce, flatMap, filter,
buffer, skip, merge, defer ,concat, replay ,range
以下示例并不代表该方法所有用法,可在使用的时候查看相关重载方法。
相信有更多惊喜,RxJava的封装对于数据的处理,可以说封装得非常完善了。

强烈推荐使用,目前示例并不全,后续遇到其他方法的使用再慢慢添加


引入 rxjava2 库

  • 添加以下文件到app build.gradle

    1
    2
    3
    4
    //rxjava
    compile 'io.reactivex.rxjava2:rxjava:2.1.1'
    //rxandroid 如需使用rxandroid
    compile 'io.reactivex.rxjava2:rxandroid:2.0.1'

    运算符示例介绍

    创建操作

  • defer 当订阅者订阅时才创建Observable,内部通过OnSubscribeDefer在订阅时调用Func0创建Observable。

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    Observable.defer(new Func0<Observable<Integer>>() {
    @Override
    public Observable<Integer> call() {
    return Observable.just(1,2,3,5);
    }
    }).subscribe(new Action1<Integer>() {
    @Override
    public void call(Integer integer) {
    Log.i(tag,""+integer);
    }
    });
  • range 根据初始值n和数目m,发射m个从n开始的整数序列

    1
    2
    3
    4
    5
    Observable.range(2, 2).subscribe(new Action1<Integer>() {
    @Override
    public void call(Integer integer) {
    Log.d(TAG, "" + integer); //2,3
    }
  • 更多创建Observable的方法

变换操作

  • map 转换Observable发出的数据的数据类型。(map(new Function(A,B))即:将发射的B数据类型转换为A数据类型)

    1
    2
    3
    4
    5
    6
    7
    getObservable()//这里最初发射的数据类型为List<ApiUser>
    .map(new Function<List<ApiUser>, List<User>>() {//经过map之后,可将List<ApiUser>转换为List<User>
    @Override
    public List<User> apply(List<ApiUser> apiUsers) throws Exception {
    return Utils.convertApiUserListToUserList(apiUsers);
    }
    })
  • flatMap 将发射的数据变换为一个单独的Observables,内部采用merge合并。(以下示例将最初发生的数字,转换成了一个个Observable,最后merge为一个Observable

    1
    2
    3
    4
    5
    6
    7
    Observable.just(1,2,3,4,5)
    .flatMap(new Function<Integer, Observable<String>>() {
    @Override
    public Observable<String> apply(Integer integer) throws Exception {
    return Observable.just("我是数据:"+integer);
    }
    });
  • 更多变化操作的各种操作符

组合

错误处理

辅助操作符

条件操作符

math 操作符

rxjava-async模块

To将Observable转换为另一个对象或数据结构

BlockingObservable 子类相关

ConnectableObservable 和它的子类以及它们的操作符

处理字符串序列和流的特殊操作符

Observable的辅助操作符

聚合操作

  • reduce 将发射的数据一个求和,发射最终结果给订阅者

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    Observable.from(new Integer[]{1,2,3,4,5,6,7,8,9,10}).reduce(new Func2<Integer, Integer, Integer>() {
    @Override
    public Integer call(Integer x, Integer y) {
    return x+y; // 1+2+3+4+5+6+7+8+9+10
    }
    }).subscribe(new Action1<Integer>() {
    @Override
    public void call(Integer integer) {
    Log.d(TAG,"result="+ integer); // result = 55
    }
    });

    过滤操作

  • filter 过滤Observable发出这些数据 (以下示例就是满足 integer % 2 == 0的数据可以发射出去)

    1
    2
    3
    4
    5
    6
    7
    Observable.just(1, 2, 3, 4, 5, 6)
    .filter(new Predicate<Integer>() {
    @Override
    public boolean test(Integer integer) throws Exception {
    return integer % 2 == 0;//满足为true的数据可以继续发射给订阅者
    }
    });
  • buffer 将发射的数据收集到指定长度的集合中,然后将一个个满足指定长度的集合一个个发射出去。(注:数据发射结束,最后一条数据不满足指定长度也不会发给订阅者)

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    32
    33
    // 示例1
    Observable.just(1,2,3,4,5)
    .buffer(2)
    .subscribe(new Action1<List<Integer>>() {
    @Override
    public void call(List<Integer> integers) {
    //[1,2]
    //[3,4]
    //[5]
    }
    });
    //示例2
    RxView.clickEvents(mButton)
    .buffer(2, TimeUnit.SECONDS)
    .subscribe(new Subscriber<List<ViewClickEvent>>() {
    @Override
    public void onCompleted() {}
    @Override
    public void onError(Throwable e) {}
    @Override
    public void onNext(List<ViewClickEvent> viewClickEvents) {
    if (viewClickEvents.size() > 0) {
    Toast.makeText(MainActivity.this, "2秒内点击了" + viewClickEvents.size() + "次", Toast.LENGTH_SHORT).show();
    } else {
    }
    }
    });
    ~~~
    * take 只发射开始的N项数据或者一定时间内的数据。内部通过OperatorTake和OperatorTakeTimed过滤数据
    ~~~java
    Observable.just(1,2,3,4,5)
    .take(3)//发射前三个数据项
    .take(100, TimeUnit.MILLISECONDS)//发射100ms内的数据
  • skip 跳过前面n项,发射后面的数据(take是只发射前n项,skip相反)

    1
    2
    3
    Observable.just(1, 2, 3, 4, 5, 6, 7, 8, 9)
    .skip(6)
    // 7 8 9
  • 更多用于过滤和选择Observable发射的各种操作符

合并操作