Rxjava使用帮助

简介

RxJava – Reactive Extensions for the JVM – a library for composing asynchronous and event-based programs using observable sequences for the Java VM.
Rxjava是一个在 Java VM 上使用可观测的序列来组成异步的、基于事件的程序。换句话说就是利用观察者模式实现异步操作的一个库。

链式调用

比如说这么一个情景:
用户输入用户名密码进行登录,同时保存用户信息到本地数据库,之后利用获取的token去服务器获取书籍列表信息,并从书籍信息中找出类型为推理的小说显示到界面。

首先我们使用普通的java来实现,它的代码将如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
//登录
login(username, password, new CallBack<User>() {
@Override
public void onSuccess(User data) {
//保存信息到数据库
saveToDao(data);
//获取token
final String token = data.getToken();
//请求书籍列表
getBookList(token, new CallBack<List<Book>>() {
@Override
public void onSuccess(List<Book> data) {
MainActivity.this.runOnUiThread(new Runnable() {
@Override
public void run() {
//对书籍信息进行筛选并显示到UI界面中
for (Book b : data) {
if (b.getType().equals("推理")) {
showDataToScreen(b);
}
}

}
});
}

@Override
public void onFail(String msg) {

}
});
}

@Override
public void onFail(String msg) {

}
});

而如果使用rxjava会变成这样子:

1
2
3
public Observable<User> login(String username, String password) {
...
}

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
login(username, password)
.subscribeOn(Schedulers.io())
.observeOn(Schedulers.io())
.map(new Func1<User, String>() {
@Override
public String call(User user) {
saveToDao(user);
return user.getToken();
}
})
.flatMap(new Func1<String, Observable<List<Book>>>() {
@Override
public Observable<List<Book>> call(String s) {
return getBookList(s);
}
})
.flatMap(new Func1<List<Book>, Observable<Book>>() {
@Override
public Observable<Book> call(List<Book> books) {
return Observable.from(books);
}
})
.filter(new Func1<Book, Boolean>() {
@Override
public Boolean call(Book book) {
return book.getType().equals("推理");
}
})
.observeOn(AndroidSchedulers.mainThread())
.subscribe(new Action1<Book>() {
@Override
public void call(Book book) {
showDataToScreen(book);
}
});

是不是变得很舒服,起码看上去简洁多了。
再使用 java8 试试,看上去又不一样了

1
2
3
4
5
6
7
8
9
10
11
12
login(username, password)
.subscribeOn(Schedulers.io())
.observeOn(Schedulers.io())
.map(user -> {
saveToDao(user);
return user.getToken();
})
.flatMap(s -> getBookList(s))
.flatMap(books -> Observable.from(books))
.filter(book -> book.getType().equals("推理"))
.observeOn(AndroidSchedulers.mainThread())
.subscribe(this::showDataToScreen);

当一个业务需求很长时,使用平时的方法难免会陷入缩进的陷阱中,等过了一段时间后回头再看自己的代码又必须要重头梳理一遍。而使用rxjava,虽然看起来也是很长,但是一条链式调用就能满足整个需求,很长但很清晰。

介绍

前面也提到过 rxjava 是一个利用观察者模式实现异步操作的库,使用 rxjava 必须要明白一下四个概念(Observable,Observer,subscribe ,事件), Observable 和 Observer 通过 subscribe() 方法实现订阅关系,从而 Observable 可以在需要的时候发出事件来通知 Observer。

Observer —— 观察者

Observer 和 Subscriber(继承自Observer) 都属于观察者,它们决定观察者被触发后所做的操作。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
Subscriber<String> subscriber = new Subscriber<String>() {

@Override
public void onStart() {
super.onStart();
}

@Override
public void onCompleted() {

}

@Override
public void onError(Throwable e) {

}

@Override
public void onNext(String s) {

}
};

  • onStart() 是在产生 subscribe() 订阅动作之后调用,可以做一些初始化,发生在 onNext() 之前。
  • onCompleted() 是在整个序列发生结束后调用。
  • onError() 是在序列发生错误时发生,它与 onCompleted() 互斥。

Observable —— 被观察者

它决定什么时候触发 Observer , Rxjava 通过 create() 方法来建立一个被观察者(也可以通过操作符建立)。

1
2
3
4
5
6
Observable<String> observable = Observable.create(new Observable.OnSubscribe<String>() {
@Override
public void call(Subscriber<? super String> subscriber){
subscriber.onNext("cpacm");
}
});

subscribe() —— 订阅

Observable 调用 subscribe() 时就会触发其 call() 方法产生动作。所以记住,所有的动作都是在subscribe()订阅后才开始进行
除去完整的 Subscriber,还有不完整的 Action 可供调用。

1
2
3
4
5
6
// 自动创建 Subscriber ,并使用 onNextAction 来定义 onNext()
observable.subscribe(onNextAction);
// 自动创建 Subscriber ,并使用 onNextAction 和 onErrorAction 来定义 onNext() 和 onError()
observable.subscribe(onNextAction, onErrorAction);
// 自动创建 Subscriber ,并使用 onNextAction、 onErrorAction 和 onCompletedAction 来定义 onNext()、 onError() 和 onCompleted()
observable.subscribe(onNextAction, onErrorAction, onCompletedAction);

事件变换

所谓变换,就是将事件序列中的对象或整个序列进行加工处理,转换成不同的事件或事件序列。这部分先放着,我们先看看线程操作。

Scheduler

在RxJava 中,Scheduler —— 调度器,相当于线程控制器,RxJava 通过它来指定每一段代码应该运行在什么样的线程。

  • Schedulers.immediate(): 直接在当前线程运行,相当于不指定线程。这是默认的 Scheduler。
  • Schedulers.newThread(): 总是启用新线程,并在新线程执行操作。
  • Schedulers.io(): I/O 操作(读写文件、读写数据库、网络信息交互等)所使用的 Scheduler。行为模式和 newThread() 差不多,区别在于 io() 的内部实现是是用一个无数量上限的线程池,可以重用空闲的线程,因此多数情况下 io() 比 newThread() 更有效率。不要把计算工作放在 io() 中,可以避免创建不必要的线程。
  • Schedulers.computation(): 计算所使用的 Scheduler。这个计算指的是 CPU 密集型计算,即不会被 I/O 等操作限制性能的操作,例如图形的计算。这个 Scheduler 使用的固定的线程池,大小为 CPU 核数。不要把 I/O 操作放在 computation() 中,否则 I/O 操作的等待时间会浪费 CPU。
  • AndroidSchedulers.mainThread(),它指定的操作将在 Android 主线程运行。

在 Rxjava 中,可以通过subscribeOn()observeOn() 对线程进行控制。
subscribeOn() 指定被观察者即 Observable 动作发生所在线程。只能指定一次。
observeOn() 控制下一层 subscriber 发生的线程,若想多次切换线程时则可以多次调用。

1
2
3
4
5
6
7
8
Observable.just(1, 2, 3, 4) // IO 线程,由 subscribeOn() 指定
.subscribeOn(Schedulers.io())
.observeOn(Schedulers.newThread())
.map(mapOperator) // 新线程,由 observeOn() 指定
.observeOn(Schedulers.io())
.map(mapOperator2) // IO 线程,由 observeOn() 指定
.observeOn(AndroidSchedulers.mainThread)
.subscribe(subscriber); // Android 主线程,由 observeOn() 指定

Observable.doOnSubscribe():它和 Subscriber.onStart() 同样是在 subscribe() 调用后而且在事件发送前执行,但区别在于它可以指定线程。在 doOnSubscribe()的后面跟一个 subscribeOn() ,就能指定准备工作的线程了。

Subject = Observable + Observer

如标题所示,Subject既可以像Observer订阅一个Observable,也可以像Observable发射一个新的数据。rxjava中提供了四种Subject

PublishSubject

利用Subject实现传统的Observable + Observer

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
PublishSubject<String> stringPublishSubject = PublishSubject.create();
Subscription subscriptionPrint = stringPublishSubject.subscribe(new Observer<String>() {
@Override
public void onCompleted() {
System.out.println("Observable completed");
}

@Override
public void onError(Throwable e) {
System.out.println("Oh,no!Something wrong happened!");
}

@Override
public void onNext(String message) {
System.out.println(message);
}
});
stringPublishSubject.onNext("Hello World");

与之前的Observable不一样,调用subscribe()并不会发送数据,因为没有数据可以发送,所以subject会一直等待,等待subject调用onNext手动发送数据来触发。Rxbus 就是利用 subject 的这个特性来实现的。

BehaviorSubject

简单的说,BehaviorSubject会首先向他的订阅者发送截至订阅前最新的一个数据对象(或初始值),然后正常发送订阅后的数据流。

1
BehaviorSubject<Integer> behaviorSubject = BehaviorSubject.create(1);

ReplaySubject

ReplaySubject会缓存它所订阅的所有数据,向任意一个订阅它的观察者重发:

1
ReplaySubject<Integer> replaySubject = ReplaySubject.create();

AsyncSubject

当Observable完成时AsyncSubject只会发布最后一个数据给已经订阅的每一个观察者。

1
AsyncSubject<Integer> asyncSubject = AsyncSubject.create();

Operator —— 操作符(事件变换)

以上的介绍是使用 Rxjava 要了解的基础,而事件变换才是 rxjava 的精髓。

原理

操作符包括Schedulers的线程控制大部分是基于基础变换方法 lift(Operator)加上 Operator 变换器 实现。

1
2
3
public final <R> Observable<R> lift(final Operator<? extends R, ? super T> operator) {
return new Observable<R>(new OnSubscribeLift<T, R>(onSubscribe, operator));
}

可以看到,lift返回了一个新的 Observable , 而原来的 onSubscribe 作为参数和 operator 一起传入到新的 Observable 中。
Operator 会通过自己的 call() 方法将新 Subscriber 和原始 Subscriber 进行关联,并插入自己的变换代码以实现变换。

基本的实现思想:执行变换之后,会返回一个新的Observable,而这个新的Observable会负责接收旧的Observable,并通过Operator进行转化。
一个将String转变为Integer的例子:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
observable.lift(new Observable.Operator<String, Integer>() {
@Override
public Subscriber<? super Integer> call(final Subscriber<? super String> subscriber) {
// 将事件序列中的 Integer 对象转换为 String 对象
return new Subscriber<Integer>() {
@Override
public void onNext(Integer integer) {
subscriber.onNext("" + integer);
}

@Override
public void onCompleted() {
subscriber.onCompleted();
}

@Override
public void onError(Throwable e) {
subscriber.onError(e);
}
};
}
});

延伸

Retrofit的结合

1
2
compile 'com.squareup.retrofit2:retrofit:2.1.0'
compile 'com.squareup.retrofit2:adapter-rxjava:2.1.0'

Retrofit的请求,返回一个 Observable

1
2
3
4
@GET(HttpUtil.WIKI)
Observable<WikiBean> getWiki(
@Query("wiki_id") long wiki_id

);

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
getWiki(wiki_id)
.subscribeOn(Schedulers.io())
.observeOn(AndroidSchedulers.mainThread())
.subscribe(new Subscriber<ApiResponse<RadioDetailData>>() {
@Override
public void onCompleted() {
}

@Override
public void onError(Throwable e) {
}

@Override
public void onNext(WikiBean wiki) {
//do something
}
});

RxBinding

RxJava binding APIs for Android’s UI widgets.

一套基于Android平台的Binging API,举个例子:

1
2
3
4
5
6
7
RxView.clickEvents(button) // 以 Observable 形式来反馈点击事件
.subscribe(new Action1<ViewClickEvent>() {
@Override
public void call(ViewClickEvent event) {
// Click handling
}
});

RxBus

使用 eventbus 或者 otto 的思想用 rxjava 来替代。实现思路可以参考 rxbus这篇文章。

Rxjava 各类操作符实例

总算到了最重要的部分了,这一部分用来演示常用的操作符,尽量覆盖完全。

创建类Obervable

create

create
从新建造一个 Observable 以便调用 Observer,举个例子

1
2
3
4
5
6
7
8
9
10
11
12
13
Observable<String> observable = Observable.create(new Observable.OnSubscribe<String>() {
@Override
public void call(Subscriber<? super String> subscriber) {
subscriber.onNext("cpacm");
}
});
observable.subscribe(new Action1<String>() {
@Override
public void call(String msg) {
System.out.println(msg);
}
});
// 输出:cpacm

defer

延迟 Observable 的创建直到被订阅,举个例子
defer

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
public Observable<String> createObservable() {
System.out.println("create observable");
Observable<String> observable = Observable.create(new Observable.OnSubscribe<String>() {

@Override
public void call(Subscriber<? super String> subscriber) {
subscriber.onNext("cpacm");
}
});
return observable;
}

public void defer() {
Observable<String> observable = Observable.defer(new Func0<Observable<String>>() {
@Override
public Observable<String> call() {
System.out.println("defer");
return createObservable();
}
});
System.out.println("subscribe");
observable.subscribe(System.out::println);
}

/*输出:
subscribe
defer
create observable
cpacm
*/

Empty/Never/Throw

  • Empty:创建一个 Observable 不发送数据,正常结束;
  • Never:创建一个 Observable 不发送数据,不会终止;
  • Throw:创建一个 Observable 不发送数据,终止时发出一个错误。
    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    32
    33
    34
    35
    36
    37
    38
    39
    40
    41
    42
    43
    44
    45
    46
    47
    48
    49
    50
    51
    52
    53
    54
    Observable.empty().subscribe(new Subscriber<Object>() {
    @Override
    public void onCompleted() {
    System.out.println("empty:complete");
    }

    @Override
    public void onError(Throwable e) {
    System.out.println("empty:error");
    }

    @Override
    public void onNext(Object o) {
    System.out.println("empty:next");
    }
    });

    Observable.never().subscribe(new Subscriber<Object>() {
    @Override
    public void onCompleted() {
    System.out.println("never:complete");
    }

    @Override
    public void onError(Throwable e) {
    System.out.println("never:error");
    }

    @Override
    public void onNext(Object o) {
    System.out.println("never:next");
    }
    });

    Observable.error(new NullPointerException()).subscribe(new Subscriber<Object>() {
    @Override
    public void onCompleted() {
    System.out.println("error:complete");
    }

    @Override
    public void onError(Throwable e) {
    System.out.println("error:error");
    }

    @Override
    public void onNext(Object o) {
    System.out.println("error:next");
    }
    });
    /*输出:
    empty:complete
    error:error
    */

from

从列表中发出数据,并按顺序发射
from

1
2
3
String[] name = new String[]{"c","p","a","c","m"};
Observable.from(name).subscribe(System.out::println);
//输出:c p a c m

interval

轮询,每隔一定时间重新发射
interval

1
2
Observable.interval(1, TimeUnit.SECONDS).subscribe(System.out::println);
//输出:0 1 2 3 4 ... 间隔1秒

just

单次发射
just

1
2
Observable.just("cpacm").subscribe(System.out::println);
//输出:cpacm

just()方法可以传入一到九个参数,它们会按照传入的参数的顺序来发射它们。just()方法也可以接受列表或数组,就像from()方法,但是它不会迭代列表发射每个值,它将会发射整个列表。通常,当我们想发射一组已经定义好的值时会用到它。

Range

在指定范围内发射数据,range(int start,int count)
range

1
2
Observable.range(2,5).subscribe(System.out::println);
//输出:2,3,4,5,6

repeat

重复发射数据,可填写数字控制重复次数
repeat

1
2
3
//重复发射3次
Observable.just("cpacm").repeat(3).subscribe(System.out::println);
//输出:cpacm cpacm cpacm

start

需要额外模块,以后再补充

timer

一定时间后发射数据,然后完成
timer

1
2
Observable.timer(3,TimeUnit.SECONDS).subscribe(System.out::println);
// 3秒后输出:0

变换类Obervable

Buffer

将一个个数据按照count和skip组装成一组数据再发射。也可以通过其他buffer方式进行组装。
buffer1

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
List<Integer> list = new ArrayList<>();
for (int i = 0; i < 20; i++) {
list.add(i);
}
Observable.from(list).buffer(4, 3).subscribe(new Action1<List<Integer>>() {
@Override
public void call(List<Integer> integers) {
System.out.println(integers);
}
});
/*
输出:
[0, 1, 2, 3]
[3, 4, 5, 6]
[6, 7, 8, 9]
[9, 10, 11, 12]
[12, 13, 14, 15]
[15, 16, 17, 18]
[18, 19]
* */

Map

RxJava的map函数接收一个指定的Func对象然后将它应用到每一个由Observable发射的值上

1
2
3
4
5
6
7
8
9
10
11
12
13
14
 Observable.range(0, 5)
.map(new Func1<Integer, String>() {
@Override
public String call(Integer integer) {
return String.valueOf(integer);
}
})
.subscribe(new Action1<String>() {
@Override
public void call(String s) {
System.out.println(s);
}
});
// 输出: 0 1 2 3 4

FlatMap

flatmap

注意:这里返回是乱序的

在 FlatMap 中可以生成新的 Observable 来替代原来的。

1
2
3
4
5
6
7
public class Line{
public Integer[] points;

public Line() {
points = new Integer[]{1,2,3,4,5};
}
}

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
Line line = new Line();
Observable.just(line)
.flatMap(new Func1<Line, Observable<Integer>>() {
@Override
public Observable<Integer> call(Line line) {
return Observable.from(line.points);
}
})
.subscribe(new Action1<Integer>() {
@Override
public void call(Integer integer) {
System.out.println(integer);
}
});
// 输出: 1 2 3 4 5

ConcatMap

concatMap
ConcatMap 与 FlatMap 相比是有序的发射数据
.concatMap()与.flatMap()的比较

SwitchMap


switchMap()和flatMap()很像,除了一点:每当源Observable发射一个新的数据项(Observable)时,它将取消订阅并停止监视之前那个数据项产生的Observable,并开始监视当前发射的这一个。

GroupBy

GroupBy
groupBy操作符是对源Observable产生的结果进行分组,形成一个类型为GroupedObservable的结果集,GroupedObservable中存在一个方法为getKey(),可以通过该方法获取结果集的Key值(类似于HashMap的key)。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
List<String> list = new ArrayList<>();
for (int i = 0; i < 10; i++) {
list.add(String.valueOf(i));
}
Observable.from(list)
.groupBy(new Func1<String, Integer>() {
@Override
public Integer call(String s) {
return Integer.valueOf(s) % 3;//分成3组
}
})
.subscribe(new Action1<GroupedObservable<Integer, String>>() {
@Override
public void call(GroupedObservable<Integer, String> integerStringGroupedObservable) {
integerStringGroupedObservable.subscribe(new Action1<String>() {
@Override
public void call(String s) {
System.out.println("key:" + integerStringGroupedObservable.getKey() + ",value:" + s);
}
});
}
});
//输出:
/* key:0,value:0
* key:1,value:1
* key:2,value:2
* key:0,value:3
* key:1,value:4
* key:2,value:5
* key:0,value:6
* key:1,value:7
* key:2,value:8
* key:0,value:9
* */

Scan

scan
scan操作符对一个序列的数据应用一个函数,并将这个函数的结果发射出去作为下个数据应用这个函数时候的第一个参数使用,有点类似于递归操作

1
2
3
4
5
6
7
8
Observable.range(1, 10).scan(new Func2<Integer, Integer, Integer>() {
@Override
public Integer call(Integer sum, Integer integer) {
return sum + integer;
}
}).subscribe(System.out::println);
//输出: 1 3 6 10 15 21 28 36 45 55
//tip: 1、第一个数不会调用scan中的方法;2、参数sum是上一个返回的数,integer是新传入的数

window

window
RxJava的window()函数和buffer()很像,但是它发射的是Observable而不是列表.

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
Observable.range(0,10)
.window(3,3)
.subscribe(new Action1<Observable<Integer>>() {
@Override
public void call(Observable<Integer> integerObservable) {
System.out.println("-----group");
integerObservable.subscribe(new Action1<Integer>() {
@Override
public void call(Integer integer) {
System.out.println("value:"+ integer);
}
});
}
});
/* 输出:
* -----group
* value:0
* value:1
* value:2
* -----group
* value:3
* value:4
* value:5
* -----group
* value:6
* value:7
* value:8
* -----group
* value:9
* */

过滤类Obervable

Debounce

debounce
触发后等待一段时间后发射,期间被重新触发则重置时间开始新的等待。

使用场景:搜索框输入文字后需要在一段时间内无操作后访问服务器获取数据。

Distinct

distinct
对序列使用distinct()函数去掉重复的数据

ElementAt/elementAtOrDefault

elementAt()函数仅从一个序列中发射第n个元素然后就完成了。如果我们想查找第五个元素但是可观测序列只有三个元素可供发射时该怎么办?我们可以使用elementAtOrDefault()。

filter

RxJava让我们使用filter()方法来过滤我们观测序列中不想要的值

1
2
3
4
5
6
7
8
9
10
11
12
13
14
Observable.range(0,10)
.filter(new Func1<Integer, Boolean>() {
@Override
public Boolean call(Integer integer) {
return integer%2==0;
}
})
.subscribe(new Action1<Integer>() {
@Override
public void call(Integer integer) {
System.out.println(integer);
}
});
//输出:0 2 4 6 8

First/last

first()方法和last()方法很容易弄明白。它们从Observable中只发射第一个元素或者最后一个元素。

ignoreElements

忽略所有的数据,只在终止时有反应

Sample

在Observable后面加一个sample(),我们将创建一个新的可观测序列,它将在一个指定的时间间隔里由Observable发射最近一次的数值

take/takeLast

take()函数用整数N来作为一个参数,从原始的序列中发射前N个元素,然后完成。
takeLast,从原始的序列中发射后N个元素,然后完成。

skip/skipLast

跳过前面或者后N个元素

组合类Obervable

And/Then/When

属于rxjava-joins模块

combineLatest

combineLatest
将两个队列通过fun方法联合,注意联合的两个数据分别为各个队列当前最后发射的数据。(若其中一个队列未发射数据,fun方法不会发生)

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
Observable<Long> oa = Observable.interval(2, TimeUnit.SECONDS).map(new Func1<Long, Long>() {
@Override
public Long call(Long aLong) {
return 100 + aLong;
}
});
Observable<Long> ob = Observable.interval(3, TimeUnit.SECONDS);
Observable
.combineLatest(oa, ob, new Func2<Long, Long, String>() {
@Override
public String call(Long aLong, Long aLong2) {
Long value = aLong + aLong2;
return "oa:" + aLong + ",ob:" + aLong2 + ",value:" + value;
}
})
.subscribe(new Action1<String>() {
@Override
public void call(String s) {
System.out.println(s);
}
});
/**
* 输出:
* oa:100,ob:0,value:100
* oa:101,ob:0,value:101
* oa:102,ob:0,value:102
* oa:102,ob:1,value:103
* oa:103,ob:1,value:104
* oa:103,ob:2,value:105
* oa:104,ob:2,value:106
* oa:105,ob:2,value:107
* oa:105,ob:3,value:108
* ...
* */

join

join
如果一个Observable发射了一个数据项,只要在另一个Observable发射的数据项定义的时间窗口内,就将两个Observable发射的数据合并发射

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
Observable<Long> oa = Observable.interval(1, TimeUnit.SECONDS).map(aLong -> 1000 + aLong);
Observable<Long> ob = Observable.interval(1, TimeUnit.SECONDS);
oa.join(ob, new Func1<Long, Observable<Long>>() {
@Override
public Observable<Long> call(Long aLong) {
return Observable.timer(2, TimeUnit.SECONDS);
}
}, new Func1<Long, Observable<Long>>() {
@Override
public Observable<Long> call(Long aLong) {
return Observable.timer(2, TimeUnit.SECONDS);
}
}, new Func2<Long, Long, String>() {
@Override
public String call(Long aLong, Long aLong2) {
return "oa:" + String.valueOf(aLong) + ",ob:" + String.valueOf(aLong2);
}
}).subscribe(new Action1<String>() {
@Override
public void call(String s) {
System.out.println(s);
}
});
// 解释:
// join有4个参数,oa.join(ob,leftFunc,rightFunc,resultFunc),leftFunc和rightFunc都会返回一个Observable.
// oa对应leftFunc,从oa进入leftFunc的Observable到被发射,我们称其为oa时间段
// ob对应rightFunc,同理ob也有一个时间段
// 假设oa时间段,此时oa数据为1000,ob队列发射了0,1两个数据,那么最后rightFunc会被调用2次(排列组合),其分别为
// 1000 0
// 1000 1
// 同理,ob时间段,此时ob数据为0,oa队列发射了1000,1001两个数据,那么最后rightFunc会被调用2次(排列组合),为
// 1000 0
// 1001 0

merge

merge
合并多个序列,根据时间排序
mergeDelayError:它能从一个Observable中继续发射数据即便是其中有一个抛出了错误

zip

zip
zip() 合并两个或者多个Observables发射出的数据项,根据指定的函数Func* 变换它们,并发射一个新值。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
Observable<Long> oa = Observable.interval(2, TimeUnit.SECONDS).map(aLong -> 1000 + aLong);
Observable<Long> ob = Observable.interval(4, TimeUnit.SECONDS);
Observable.zip(oa, ob, new Func2<Long, Long, String>() {
@Override
public String call(Long aLong, Long aLong2) {
return "oa:" + String.valueOf(aLong) + ",ob:" + String.valueOf(aLong2);
}
}).subscribe(System.out::println);
/**
* 输出:
* oa:1000,ob:0
* oa:1001,ob:1
* oa:1002,ob:2
* oa:1003,ob:3
* oa:1004,ob:4
* ...
* */

//tip: 必定一一对应进行压缩合并

其他操作符

materialize( ) — 将Observable转换成一个通知列表convert an Observable into a list of Notifications
dematerialize( ) — 将上面的结果逆转回一个Observable
timestamp( ) — 给Observable发射的每个数据项添加一个时间戳
serialize( ) — 强制Observable按次序发射数据并且要求功能是完好的
cache( ) — 记住Observable发射的数据序列并发射相同的数据序列给后续的订阅者
observeOn( ) — 指定观察者观察Observable的调度器
subscribeOn( ) — 指定Observable执行任务的调度器
doOnEach( ) — 注册一个动作,对Observable发射的每个数据项使用
doOnCompleted( ) — 注册一个动作,对正常完成的Observable使用
doOnError( ) — 注册一个动作,对发生错误的Observable使用
doOnTerminate( ) — 注册一个动作,对完成的Observable使用,无论是否发生错误
doOnSubscribe( ) — 注册一个动作,在观察者订阅时使用
doOnUnsubscribe( ) — 注册一个动作,在观察者取消订阅时使用
finallyDo( ) — 注册一个动作,在Observable完成时使用
delay( ) — 延时发射Observable的结果
delaySubscription( ) — 延时处理订阅请求
timeInterval( ) — 定期发射数据
using( ) — 创建一个只在Observable生命周期存在的资源
single( ) — 强制返回单个数据,否则抛出异常
singleOrDefault( ) — 如果Observable完成时返回了单个数据,就返回它,否则返回默认数据
toFuture( ), toIterable( ), toList( ) — 将Observable转换为其它对象或数据结构

StrictMode

StrictMode帮助我们侦测敏感的活动,如我们无意的在主线程执行磁盘访问或者网络调用。正如你所知道的,在主线程执行繁重的或者长时的任务是不可取的。因为Android应用的主线程时UI线程,它被用来处理和UI相关的操作:这也是获得更平滑的动画体验和响应式App的唯一方法。

1
2
3
4
if (BuildConfig.DEBUG) {
StrictMode.setThreadPolicy(new StrictMode.ThreadPolicy.Builder().detectAll().penaltyLog().build());
StrictMode.setVmPolicy(new StrictMode.VmPolicy.Builder().detectAll().penaltyLog().build());
}

参考文章

给 Android 开发者的 RxJava 详解
reactivex官方文档