RxJava 笔记

Table of Contents

zmantra.jpg

本文主要学习RxJava笔记.

简介

ReactiveX 是将"异步"和"事件驱动编程"结合的一个库, 结合的手段是通过 "可观察流"(observable sequences).

Rx 基于"观察者模式", 支持流式的数据和事件以及在其上的各种操作.

Observable

这张图介绍了 Observable 的形式:
observable.png

基本介绍

可以把 Observable 看作是 Iterable 的"push"版本. 对于 Iterable, 消费者通过 iterator 来获取数据. 而 Observable 是采用的"观察者模式", 所以数据生成之后生产者直接 push 给消费者. 这种操作的一个灵活性是 数据的到达即可以是同步的又可以是异步的.

  1. Observable 支持像操作数组一样操作异步数据.
  2. 可以发射单个/多个/无限的事件流.下面是 Observable 和 Iterable 的 对比.

    event Iterable Observable
    retrieve data T next() onNext(T)
    error throws onError(Exception)
    complete !hasNext() onCompleted()
  3. 客户端代码要一直把与 Observable 操作当作异步操作.
  4. callback 虽然和使用, 但是在内嵌多个 callback 的场景中变得 很笨重.
  5. 可以分为"热"和"冷"两种, 热方式会立刻发射生成的数据. 而冷方式会等有 subscriber 注册之后才发射.

支持的操作符

  1. 创建. 包括 Create, Defer, Empty/Never/Throw, From, Interval, Just, Range, Repeat, Start, and Timer
  2. 转化. 包括 Buffer, FlatMap, GroupBy, Map, Scan, and Window.
  3. 过滤. 包括 Debounce, Distinct, ElementAt, Filter, First, IgnoreElements, Last, Sample, Skip, SkipLast, Take, and TakeLast.
  4. 合并. 包括 And/Then/When, CombineLatest, Join, Merge, StartWith, Switch, and Zip.
  5. 条件. 包括 All, Amb, Contains, DefaultIfEmpty, SequenceEqual, SkipUntil, SkipWhile, TakeUntil, and TakeWhile
  6. 计算. 包括 Average, Concat, Count, Max, Min, Reduce, and Sum.
  7. 连接. 包括Connect, Publish, RefCount, and Replay.
  8. 工具类. 包括Delay, Do, Materialize/Dematerialize, ObserveOn, Serialize, Subscribe, SubscribeOn, TimeInterval, Timeout, Timestamp, and Using.

这些操作符有的会返回 Observable序列, 所以可以将这些操作性连在一起 做序列化操作.

操作符

创建类操作符

  • Create

    从头开始创建一个 Observable, 该函数的参数必须能够 接受一个 Observer 作为参数.这样通过该函数创建的 Observable 才能注册 Observer.

    在 Java 中,create()函数接受一个 OnSubscribe 对象.

    Observable.create(new Observable.OnSubscribe<Integer>() {
        @Override
        public void call(Subscriber<? super Integer> observer) {
            try {
                if (!observer.isUnsubscribed()) {
                    for (int i = 1; i < 5; i++) {
                        observer.onNext(i);
                    }
                    observer.onCompleted();
                }
            } catch (Exception e) {
                observer.onError(e);
            }
        }
     } ).subscribe(new Subscriber<Integer>() {
            @Override
            public void onNext(Integer item) {
                System.out.println("Next: " + item);
            }
    
            @Override
            public void onError(Throwable error) {
                System.err.println("Error: " + error.getMessage());
            }
    
            @Override
            public void onCompleted() {
                System.out.println("Sequence complete.");
            }
        });
    
  • Defer

    当有observer注册的时候才创建 Observable. 例如下面的例子, 虽然只创建了一个 defer() 返回值对象, 但是call()函数会调用两次. 说明每次注册一个新的观察者时, 都会生成一个新的 Observable 对象.

    Observable<String> observable = Observable.defer(new Func0<Observable<String>>() {
            @Override
            public Observable<String> call() {
                System.out.println("create observable id = " + cont++);
                return Observable.just(value);
            }
        });
    observable.subscribe(glbSubscriber);
    observable.subscribe(glbSubscriber);
    
  • Empty/Never/Throw

    empty()创建一个 Observable, 注册之后立刻调用 onComplete() never()创建一个 Observable, 注册之后永远都不调用 throw()(java叫 error())创建一个 Observable, 注册之后立刻调用 onError()

  • From

    将一系列其他类型(Iterable, array…)转化为 Observable, 一个一个发射出.

    RxJava 支持的类型包括: Iterable, Array, Callable, Future

    Observable.from(names).subscribe(new Action1<String>() {
        @Override
        public void call(String s) {
            System.out.println("hi:" + s);
        }
    });
    
  • Interval

    间隔性的发射数据. 一直下去,不停止.
    RxJava提供了多个重载版本.

  • Just

    接受一个参数,并原封不动的发射出去, 这与From不同, 后者 会把 array 分解然后一个个的发射.

    RxJava 不支持参数为 array.

  • Range

    发射一段范围内的整数. 接受两个参数:起始值和个数.

  • Repeat

    创建一个 Observable, 重复性的发射数据.

    RxJava 的该操作符实现并不初始化一个 observable, 它需要 在一个 Observable 对象内部调用.

    Observable.just("helo", "wold")
            .repeat(3)
            .subscribe(glbSubscriber);
    
  • TODO Start

    接受一个函数, 然后发射函数的返回值.

    RxJava 中该操作符的实现放在了一个单独的模块 rxjava-async 中

  • Timer

    延迟反射数据.

转化类操作符

  • Buffer

    收集到一定数量之后才发射全部收集到的数据.

    RxJava 把收集到的数据放到一个 List 中

  • TODO FlatMap

    flatMap()接受一个能够返回 Observable 的参数. 这样, 该参数会将 flatMap()的调用者(原始 Observable)里面的值转化为一个个的 Observable, 然后再把这些 Observable 合成一个 Observable.

  • GroupBy

    把一个 observable 分成根据条件几个 observable, 每个 observable 都有 一个 key.

    RxJava 实现了该操作符, 会返回一个 GroupedObservable 类.

    public void groupBy() {
        Integer[] integers = new Integer[]{4, 2, 16, 6, 1, 20, 5};
        Observable.from(integers)
                .groupBy(new Func1<Integer, String>() {
                    @Override
                    public String call(Integer integer) {
                        if(integer % 4 == 0) {
                            return "can";
                        } else {
                            return "can't";
                        }
                    }
                })
                .subscribe(new Subscriber<GroupedObservable<String, Integer>>() {
                    @Override
                    public void onCompleted() {
    
                    }
    
                    @Override
                    public void onError(Throwable e) {
    
                    }
    
                    @Override
                    public void onNext(GroupedObservable<String, Integer> objectIntegerGroupedObservable) {
                        String key = objectIntegerGroupedObservable.getKey();
                        if("can".equals(key)) {
                            objectIntegerGroupedObservable.subscribe(glbIntSub);
                        }else{
                            objectIntegerGroupedObservable.subscribe(glbIntSub2);
                        }
                    }
                });
    }
    
  • Map

    转化发射的值为另一个值发射. 该函数接受一个函数作为参数 用来转化 item.

    Observable.just(2, 5, 8).map(new Func1<Integer, String>() {
        @Override
        public String call(Integer integer) {
            return "map " + integer;
        }
    }).subscribe(glbSubscriber);
    
  • Scan

    该操作符接受一个函数, 该函数将源 Observable 发射的数据 转化为另外的数据, 并基于该转化的数据和源 Observable 的下一个 数据生成自己下一个数据.

    Observable.just(1, 9, 8, 8, 02, 06)
    .scan(new Func2<Integer, Integer, Integer>() {
        @Override
        public Integer call(Integer integer, Integer integer2) {
            return integer * integer;
        }
    }).subscribe(glbIntSub);
    
  • TODO Window

    把发射的元素分成几块, 每块都通过 observable 发射.

    RxJava: 下面的代码有问题, 如果注册一个全局的 subscriber, 则只会输出第一个 window 的内容. 第二块代码是输出正确的代码. 猜测应该是全局的 subscriber 调用了onComplete()导致.

            Observable.just(1, 9, 8, 3, 1, 6, 4)
                    .window(2)
                    .subscribe(new Subscriber<Observable<Integer>>() {
                        @Override
                        public void onCompleted() { System.out.println(this + " ends"); }
    
                        @Override
                        public void onError(Throwable e) { }
    
                        @Override
                        public void onNext(Observable<Integer> integerObservable) {
                            System.out.println("onNext called " + integerObservable);
                            integerObservable.subscribe(glbIntSub);
                        }
                    });
    
    //修复版
            Observable.just(1, 9, 8, 3, 1, 6, 4)
                    .window(4)
                    .subscribe(new Subscriber<Observable<Integer>>() {
                        @Override
                        public void onCompleted() { System.out.println(this + " ends"); }
    
                        @Override
                        public void onError(Throwable e) { }
    
                        @Override
                        public void onNext(Observable<Integer> integerObservable) {
                            System.out.println("onNext called " + integerObservable);
                            integerObservable.subscribe(new Subscriber<Integer>() {
                                @Override
                                public void onCompleted() {
                                    System.out.println();
                                }
    
                                @Override
                                public void onError(Throwable e) {
    
                                }
    
                                @Override
                                public void onNext(Integer integer) {
                                    System.out.print(integer + ", ");
                                }
                            });
                        }
                    });
    

过滤类操作符

  • TODO Debounce

    接受一个 timeout 值, 在 timeout 结束之前所产生的源 Observable 的值都会被抛弃.

  • Distinct

    只发射没发射过的项目, 已经发射过的会被过滤掉.

  • ElementAt

    接受一个 index 值参数, 只发射第 index 个参数(下标从1开始).

  • Filter

    只发射通过 filter 函数的元素.

  • First

    只发射第一个元素.

  • IgnoreElements

    忽略所有元素

  • Last

    发射最后一个元素

  • TODO Sample
  • Skip/SkipLast

    跳过前/后 N 个元素.

  • Take/TakeLast

    只拿前/后 N 个元素.

Subscribe

Subscribe即是Observable 又是 Observer.

AsyncSubject

当源 Observable 结束之后, 该 Subject 会将源 Observable 的最后一个 item 发射出来. 即源 Observable 的结束会激活 该 Subject 的发射/结束动作. 后续所有的 observer 都会得到 同样的值.

BehaviorSubject

当一个 Observer 注册到该 Subject 时,该 Subject 会 开始发射最近刚发射过的 item,及后面生成的 item. 即一个 Observer 得到从"它注册时间的上一个 item + 之后开始的序列".

PublishSubject

只发射注册时间之后的 item.

ReplaySubject

每一个 Observer 都发射所有的 item.

RxJava 在 Android 中应用

RxBus

Created At <2016-04-18 Mon 22:15> by Luis Xu. Email: xuzhengchaojob@gmail.com