rx java操作符_RxJava操作符(一)
本篇将介绍rxjava中的创建操作、合并操作、过滤操作、条件/布尔操作、聚合操作、转换操作以及变换操作,只针对用法不涉及原理,对RxJava不熟悉的可参考:http://gank.io/post/560e15be2dca930e00da1083
创建操作
create:使用OnSubscrib直接创建一个Observable
Observable.create(new Observable.OnSubscribe() {
@Override
public void call(Subscriber super String> subscriber) {
subscriber.onNext("item1");
subscriber.onNext("item2");
subscriber.onCompleted();
}
});
from:将数组或集合拆分成具体对象后,转换成发送这些对象的Observable
String[] arr = {"item1", "item2", "item3"};
Observable.from(arr)
.subscribe(new Action1() {
@Override
public void call(String s) {
Log.d("debug", s); //调用多次,分别打印出item1,item2,item3
}
});
just:将一个或多个对象转换成发送这些对象的Obserbable
Observable.just("item1","item2","item3")
.subscribe(new Action1() {
@Override
public void call(String s) {
Log.d("debug", s); //调用多次,分别打印出item1,item2,item3
}
});
empty:创建一个直接通知完成的Observable
error:创建一个直接通知错误的Observable
never:创建一个什么都不做的Observable
Observable observable1 = Observable.empty(); //直接调用onCompleted()方法
Observable observable2 = Observable.error(new RuntimeException()); //直接调用onError()方法
Observable observable3 = Observable.never(); //onNext(),onCompleted(),onError()均不调用
timer:创建一个延时发射数据的Observable
Observable.timer(1000, TimeUnit.MILLISECONDS)
.subscribe(new Action1() {
@Override
public void call(Long aLong) {
Log.d("debug", aLong.toString()); //aLong为0
}
});
interval:创建一个按照给定的时间间隔发射送0开始的整数序列的Obervable
Observable.interval(2, 1, TimeUnit.SECONDS)
.subscribe(new Action1() {
@Override
public void call(Long aLong) {
//等待2秒后开始发射数据,发射的时间间隔为1秒,从0开始计数
}
});
Observable.interval(1, TimeUnit.SECONDS)
.subscribe(new Action1() {
@Override
public void call(Long aLong) {
//等待1秒后开始发射数据,发射的时间间隔为1秒,从0开始计数
//相当于Observable.interval(1, 1, TimeUnit.SECONDS)
}
});
range:创建一个发射指定范围的整数序列的Observable
Observable.range(3, 4)
.subscribe(new Action1() {
@Override
public void call(Integer integer) {
Log.d("debug", integer.toString());//依次发射3,4,5,6,从3开始发射4个数据
}
});
defer:观察者订阅时才创建Observable,每次订阅返回一个新的Observable
Observable.defer(new Func0>() {
@Override
public Observable call() {
return Observable.just("s");
}
}).subscribe(new Action1() {
@Override
public void call(String s) {
Log.d("debug", s); //打印s
}
});
合并操作(用于组合多个Observavle)
concat:按顺序连接多个Observable,注:Observable.concat(a,b)等价于a.concatWith(b)
Observable observable1 = Observable.just(1, 2, 3);
Observable observable2 = Observable.just(4, 5, 6);
Observable.concat(observable1, observable2)
.subscribe(new Action1() {
@Override
public void call(Integer integer) {
Log.d("debug", integer.toString()); //打印1,2,3,4,5,6
}
});
startWith:在数据序列的开头增加一项数据,内部调用concat
Observable.just(1, 2, 3)
.startWith(Observable.just(4, 5)) //添加一个Observable
.subscribe(new Action1() {
@Override
public void call(Integer integer) {
Log.d("debug", integer.toString()); //打印4,5,1,2,3
}
});
Observable.just(1,2,3)
.startWith(4,5) //添加多个数据
.subscribe(new Action1() {
@Override
public void call(Integer integer) {
Log.d("debug", integer.toString()); //打印4,5,1,2,3
}
});
List integers = new ArrayList<>();
integers.add(4);
integers.add(5);
Observable.just(1,2,3)
.startWith(integers) //添加一个集合
.subscribe(new Action1() {
@Override
public void call(Integer integer) {
Log.d("debug", integer.toString()); //打印4,5,1,2,3
}
});
merge / mergeDelayError:将多个Observable合并为一个。不同于concat,merge不是按照添加顺序连接,而是按照时间线来连接。其中mergeDelayError将异常延迟到其它没有错误的Observable发送完毕后才发射。而merge则是一遇到异常将停止发射数据,发送onError通知
merge工作流程
Observable observable1 = Observable.create(new Observable.OnSubscribe() {
@Override
public void call(Subscriber super Integer> subscriber) {
subscriber.onNext(1);
SystemClock.sleep(1000);
subscriber.onNext(2);
subscriber.onNext(3);
subscriber.onCompleted();
}
}).subscribeOn(Schedulers.computation());
Observable observable2 = Observable.create(new Observable.OnSubscribe() {
@Override
public void call(Subscriber super Integer> subscriber) {
SystemClock.sleep(500);
subscriber.onNext(4);
subscriber.onNext(5);
SystemClock.sleep(1000);
subscriber.onNext(6);
subscriber.onCompleted();
}
}).subscribeOn(Schedulers.computation());
Observable.merge(observable1, observable2)
.subscribe(new Action1() {
@Override
public void call(Integer integer) {
Log.d("debug", integer.toString()); //打印1,4,5,2,3,6
}
});
zip:使用一个函数组合多个Observable发射的数据集合,再发射这个结果。如果多个Observable发射的数据量不一样,则以最少的Observable为标准进行压合
zip工作流程
Observable observable1 = Observable.just(1, 2, 3, 4, 5);
Observable observable2 = Observable.just("A", "B", "C", "D");
Observable.zip(observable1, observable2, new Func2() {
@Override
public String call(Integer integer, String s) {
return integer + s;
}
}).subscribe(new Action1() {
@Override
public void call(String s) {
Log.d("debug", s); //打印1A,2B,3C,4D
}
});
combineLatest:当两个Observable中任何一个发射了一个数据时,通过一个指定的函数组合每个Observable发射的最新数据(一共两个数据),然后发射这个函数的结果。类似于zip,但是,不同的是zip只有在每个Observable都发射了数据才工作,而combineLatest任何一个发射了数据都可以工作,每次与另一个Observable最近的数据压合
combineLatest工作流程
Observable observable1 = Observable.create(new Observable.OnSubscribe() {
@Override
public void call(Subscriber super Integer> subscriber) {
subscriber.onNext(1);
SystemClock.sleep(500);
subscriber.onNext(2);
SystemClock.sleep(1000);
subscriber.onNext(3);
SystemClock.sleep(300);
subscriber.onNext(4);
SystemClock.sleep(500);
subscriber.onNext(5);
subscriber.onCompleted();
}
}).subscribeOn(Schedulers.computation());
Observable observable2 = Observable.create(new Observable.OnSubscribe() {
@Override
public void call(Subscriber super String> subscriber) {
SystemClock.sleep(300);
subscriber.onNext("A");
SystemClock.sleep(300);
subscriber.onNext("B");
SystemClock.sleep(500);
subscriber.onNext("C");
subscriber.onNext("D");
subscriber.onCompleted();
}
}).subscribeOn(Schedulers.computation());
Observable.combineLatest(observable1, observable2, new Func2() {
@Override
public String call(Integer integer, String s) {
return integer + s;
}
}).subscribe(new Action1() {
@Override
public void call(String s) {
Log.d("debug", s); //打印1A,2A,2B,2C,2D,3D,4D,5D
}
});
过滤操作
filter:过滤数据
Observable.just(1, 2, 3, 4)
.filter(new Func1() {
@Override
public Boolean call(Integer integer) {
return integer % 2 == 0; //过滤偶数
}
})
.subscribe(new Action1() {
@Override
public void call(Integer integer) {
Log.d("debug", integer.toString()); //打印2,4
}
});
ofType:过滤指定类型数据
Observable.just(1, "2", 3, "4")
.ofType(Integer.class) //过滤整形数据
.subscribe(new Action1() {
@Override
public void call(Integer integer) {
Log.d("debug", integer.toString()); //打印1,3
}
});
take:只发射前n项数据或者一定时间内的数据(无需考虑索引越界问题,配合interval操作符可作为定时器使用)
Observable.just(1, 2, 3, 4)
.take(2) //只发射前2项
.subscribe(new Action1() {
@Override
public void call(Integer integer) {
Log.d("debug", integer.toString()); //打印1,2
}
});
Observable.interval(1, TimeUnit.SECONDS)
.take(3, TimeUnit.SECONDS) //只发射3秒内的数据
.subscribe(new Action1() {
@Override
public void call(Long aLong) {
//打印0,1(打印出来的并不是相像中的0,1,2,应该与代码代码执行的时间有关,使用时需要注意!)
Log.d("debug", aLong.toString());
}
});
takeLast:只发射最后的N项数据或者一定时间内的数据(无需考虑索引越界问题)
Observable.just(1, 2, 3, 4)
.takeLast(3) //只发射后3项
.subscribe(new Action1() {
@Override
public void call(Integer integer) {
Log.d("debug", integer.toString()); //打印2,3,4
}
});
Observable.interval(1, TimeUnit.SECONDS)
.take(10) //每1秒发射一个数据,发射10秒
.takeLast(3, TimeUnit.SECONDS) //只发射最后3秒的数据
.subscribe(new Action1() {
@Override
public void call(Long aLong) {
Log.d("debug", aLong.toString()); //打印6,7,8,9(同样存在些许误差,使用时需注意!)
}
});
Observable.interval(1, TimeUnit.SECONDS)
.take(10)
.takeLast(2, 3, TimeUnit.SECONDS) //只发射最后3秒内的最后2个数据
.subscribe(new Action1() {
@Override
public void call(Long aLong) {
Log.d("debug", aLong.toString()); //打印8,9
}
});
takeFirst:只发射满足条件的第一项(其实就是filter+take)
Observable.just(1, 2, 3, 4)
.takeFirst(new Func1() {
@Override
public Boolean call(Integer integer) {
return integer > 1;
}
})
.subscribe(new Action1() {
@Override
public void call(Integer integer) {
Log.d("debug", integer.toString()); //打印2
}
});
*first / firstOrDefault:只发射第一项或者满足条件的第一项数据,其中firstOrDefault可以指定默认值(建议使用firstOrDefault,找不到对应元素时first会报异常)
Observable.just(1, 2, 3)
.first() //发射第一项
.subscribe(new Action1() {
@Override
public void call(Integer integer) {
Log.d("debug", integer.toString()); //打印1
}
});
Observable.just(1, 2, 3, 4)
.first(new Func1() { //发射大于2的第一项
@Override
public Boolean call(Integer integer) {
return integer > 2;
}
})
.subscribe(new Action1() {
@Override
public void call(Integer integer) {
Log.d("debug", integer.toString()); //打印3
}
});
Integer[] arr = {};
Observable.from(arr)
.firstOrDefault(2) //发射第一项,没有可发射的数据时,发射默认值2
.subscribe(new Action1() {
@Override
public void call(Integer integer) {
Log.d("debug", integer.toString()); //打印2
}
});
last / lastOrDefault:只发射最后一项或满足条件的最后一项,其中lastOrDefault可以指定默认值
Observable.just(1, 2, 3)
.last() //发射最后一项
.subscribe(new Action1() {
@Override
public void call(Integer integer) {
Log.d("debug", integer.toString()); //打印3
}
});
Observable.just(1, 2, 3, 4)
.last(new Func1() { //发射大于2的最后一项
@Override
public Boolean call(Integer integer) {
return integer > 2;
}
})
.subscribe(new Action1() {
@Override
public void call(Integer integer) {
Log.d("debug", integer.toString()); //打印4
}
});
Integer[] arr = {};
Observable.from(arr)
.lastOrDefault(2) //发射最后一项,没有可发射的数据时,发射默认值2
.subscribe(new Action1() {
@Override
public void call(Integer integer) {
Log.d("debug", integer.toString()); //打印2
}
});
skip:跳过开始的n项数据或者一定时间内的数据(与take类似)
Observable.just(1, 2, 3, 4)
.skip(2) //跳过前2项
.subscribe(new Action1() {
@Override
public void call(Integer integer) {
Log.d("debug", integer.toString()); //打印3,4
}
});
Observable.interval(1, TimeUnit.SECONDS)
.take(5)
.skip(3, TimeUnit.SECONDS) //跳过前3秒
.subscribe(new Action1() {
@Override
public void call(Long aLong) {
Log.d("debug", aLong.toString()); //打印2,3,4,同样存在误差!
}
});
skipLast:跳过最后的n项数据或一定时间内的数据
Observable.just(1, 2, 3, 4)
.skipLast(2) //跳过最后2项
.subscribe(new Action1() {
@Override
public void call(Integer integer) {
Log.d("debug", integer.toString()); //打印1,2
}
});
Observable.interval(1, TimeUnit.SECONDS)
.take(7)
.skipLast(3, TimeUnit.SECONDS) //跳过最后3秒
.subscribe(new Action1() {
@Override
public void call(Long aLong) {
Log.d("debug", aLong.toString()); //打印0,1,2,同样存在误差!
}
});
elementAt / elementAtOrDefault:发射某一项数据,其中elementAtOrDefault可以指定索引越界时发射的默认值
Observable.just(1, 2, 3, 4)
.elementAt(2) //发射索引为2的数据
.subscribe(new Action1() {
@Override
public void call(Integer integer) {
Log.d("debug", integer.toString()); //打印3
}
});
Observable.just(1, 2, 3)
.elementAtOrDefault(4, 5) //发射索引为4的数据,索引越界时发射默认数据5
.subscribe(new Action1() {
@Override
public void call(Integer integer) {
Log.d("debug", integer.toString()); //打印5
}
});
ignoreElements:丢弃所有数据,只发射错误或正常终止的通知,即只触发观察者的onError()或onCompleted()方法
distinct:过滤重复数据,可指定判定唯一的标准
Observable.just(1, 1, 2, 3, 2, 4)
.distinct()
.subscribe(new Action1() {
@Override
public void call(Integer integer) {
Log.d("debug", integer.toString()); //打印1,2,3,4
}
});
Observable.just(1, 1, 2, 3, 2, 4)
//根据发射的数据生成对应的key,通过key值来判断唯一,如果两个数据的key相同,则只发射第一个
.distinct(new Func1() {
@Override
public Integer call(Integer integer) {
//奇数对应的key为1,偶数对应的key为2
if (integer % 2 == 0) {
return 2;
}
return 1;
}
})
.subscribe(new Action1() {
@Override
public void call(Integer integer) {
Log.d("debug", integer.toString()); //打印1,2
}
});
distinctUntilChanged:过滤掉连续重复的数据,可指定判定唯一的标准
Observable.just(1, 1, 2, 3, 2, 4)
.distinctUntilChanged()
.subscribe(new Action1() {
@Override
public void call(Integer integer) {
Log.d("debug", integer.toString()); //打印1,2,3,2,4
}
});
Observable.just(1, 1, 2, 3, 2, 4)
//根据发射的数据生成对应的key,通过key值来判断唯一,如果两个数据的key相同,则只发射第一个
.distinctUntilChanged(new Func1() {
@Override
public Integer call(Integer integer) {
//奇数对应的key为1,偶数对应的key为2
if (integer % 2 == 0) {
return 2;
}
return 1;
}
})
.subscribe(new Action1() {
@Override
public void call(Integer integer) {
Log.d("debug", integer.toString()); //打印1,2,3,2
}
});
Observable.just(1, 1, 2, 3, 2, 4)
//传入比较器的方式
.distinctUntilChanged(new Func2() {
@Override
public Boolean call(Integer integer, Integer integer2) {
return integer % 2 == integer2 % 2; //同为奇数或偶数返回true
}
})
.subscribe(new Action1() {
@Override
public void call(Integer integer) {
Log.d("debug", integer.toString()); //打印1,2,3,2
}
});
throttleFirst:定期发射Observable在该时间段发射的第一项数据
Observable.interval(0, 500, TimeUnit.MILLISECONDS)
.take(10) //每500毫秒发射一次数据,发射10次
.throttleFirst(1000, TimeUnit.MILLISECONDS) //每1秒发射该秒内发射数据中的第一项数据
.subscribe(new Action1() {
@Override
public void call(Long aLong) {
//打印0,2,5,8(即第一秒发射0,1,第二秒发射2,3,4,第三秒发射5,6,7,第四秒发射8,9),同样存在误差!
Log.d("debug", aLong.toString());
}
});
throttleWithTimeout / debounce(两者使用及效果相同):发射数据时,如果两次数据的发射间隔小于指定时间,就会丢弃前一次的数据,直到指定时间内都没有新数据发射时才进行发射
Observable.create(new Observable.OnSubscribe() {
@Override
public void call(Subscriber super Integer> subscriber) { //依次发射1-6,发射间隔不同
subscriber.onNext(1);
SystemClock.sleep(500);
subscriber.onNext(2);
SystemClock.sleep(500);
subscriber.onNext(3);
SystemClock.sleep(1000);
subscriber.onNext(4);
SystemClock.sleep(1000);
subscriber.onNext(5);
SystemClock.sleep(500);
subscriber.onNext(6);
subscriber.onCompleted();
}
}).throttleWithTimeout(700, TimeUnit.MILLISECONDS) //指定最小发射间隔时间为700毫秒
.subscribe(new Action1() {
@Override
public void call(Integer integer) {
Log.d("debug", integer.toString()); //打印3,4,6
}
});
sample / throttleLast(两者使用及效果相同):定期发射Observable在该时间段发射的最后一项数据,与throttleFirst相反
Observable.interval(0, 500, TimeUnit.MILLISECONDS)
.take(10) //每500毫秒发射一次数据,发射10次
.throttleLast(1000, TimeUnit.MILLISECONDS) //每1秒发射该秒内发射数据中的最后一项数据
.subscribe(new Action1() {
@Override
public void call(Long aLong) {
//打印1,3,5,7,9(即第一秒发射0,1,第二秒发射2,3,第三秒发射4,5,第四秒发射6,7,第五秒发射8,9)
Log.d("debug", aLong.toString());
}
});
timeout:如果指定时间内没有发射任何数据,就发射一个异常或者使用备用的Observavle
Observable.timer(5, TimeUnit.SECONDS)
.timeout(3, TimeUnit.SECONDS) //超时则发射异常
.subscribe(new Subscriber() {
@Override
public void onCompleted() {
}
@Override
public void onError(Throwable e) {
Log.d("debug", "onError()"); //抛出异常
}
@Override
public void onNext(Long aLong) {
Log.d("debug", aLong.toString());
}
});
Observable.timer(5, TimeUnit.SECONDS)
.timeout(3, TimeUnit.SECONDS, Observable.just(2L)) //设置备用Observable
.subscribe(new Subscriber() {
@Override
public void onCompleted() {
}
@Override
public void onError(Throwable e) {
Log.d("debug", "onError()");
}
@Override
public void onNext(Long aLong) {
Log.d("debug", aLong.toString()); //发射备用Observable,打印2
}
});
条件 / 布尔操作
all:判断所有数据中是否都满足某个条件
Observable.just(1, 2, 3, 4)
.all(new Func1() {
@Override
public Boolean call(Integer integer) {
return integer < 5;//所有项都小于5
}
})
.subscribe(new Action1() {
@Override
public void call(Boolean aBoolean) {
Log.d("debug", aBoolean.toString()); //打印true
}
});
Observable.just(1, 2, 3, 4)
.all(new Func1() {
@Override
public Boolean call(Integer integer) {
return integer > 2;//所有项都大于2
}
})
.subscribe(new Action1() {
@Override
public void call(Boolean aBoolean) {
Log.d("debug", aBoolean.toString()); //打印false
}
});
exists:判断是否存在数据项满足某个条件
Observable.just(1, 2, 3, 4)
.exists(new Func1() {
@Override
public Boolean call(Integer integer) {
return integer > 2; //存在某项大于2
}
})
.subscribe(new Action1() {
@Override
public void call(Boolean aBoolean) {
Log.d("debug", aBoolean.toString()); //打印true
}
});
contains:判断所有数据中是否包含指定的数据(内部调用exists)
Observable.just(1, 2, 3, 4)
.contains(2) //是否包含2
.subscribe(new Action1() {
@Override
public void call(Boolean aBoolean) {
Log.d("debug", aBoolean.toString()); //打印true
}
});
sequenceEqual:判断两个Observable发射的数据是否相同(数据,发射顺序,终止状态)
Observable.sequenceEqual(Observable.just(1, 2, 3), Observable.just(1, 2, 3))
.subscribe(new Action1() {
@Override
public void call(Boolean aBoolean) {
Log.d("debug", aBoolean.toString()); //打印true
}
});
isEmpty:用于判断Observable是否没有发射任何数据(发射null返回为false)
Observable.from(new ArrayList()) //集合中没有数据
.isEmpty()
.subscribe(new Action1() {
@Override
public void call(Boolean aBoolean) {
Log.d("debug", aBoolean.toString()); //打印true
}
});
Observable.empty()
.isEmpty()
.subscribe(new Action1() {
@Override
public void call(Boolean aBoolean) {
Log.d("debug", aBoolean.toString()); //打印true
}
});
amber:指定多个Observable,只允许第一个开始发射数据的Observable发射全部数据,其他Observable将会被会忽略
Observable observable1 = Observable.create(new Observable.OnSubscribe() {
@Override
public void call(Subscriber super Integer> subscriber) {
SystemClock.sleep(500); //延迟500毫秒
subscriber.onNext(1);
subscriber.onNext(2);
subscriber.onCompleted();
}
}.subscribeOn(Schedulers.computation())); //指定为新的线程
Observable observable2 = Observable.create(new Observable.OnSubscribe() {
@Override
public void call(Subscriber super String> subscriber) {
subscriber.onNext("a");
subscriber.onNext("b");
subscriber.onCompleted();
}
});
Observable.amb(observable1, observable2)
.subscribe(new Action1() {
@Override
public void call(Serializable serializable) {
Log.d("debug", serializable.toString()); //打印a,b
}
});
switchIfEmpty:如果原始Observable正常终止后仍没有发射任何数据,就使用备用的Observable
Observable.from(new ArrayList())
.switchIfEmpty(Observable.just(1, 2))
.subscribe(new Action1() {
@Override
public void call(Integer integer) {
Log.d("debug", integer.toString()); //打印1,2
}
});
defaultIfEmpty:如果原始Observable正常终止后仍没有发射任何数据,就发射一个默认值(内部调用switchIfEmpty)
Observable.from(new ArrayList())
.defaultIfEmpty(1)
.subscribe(new Action1() {
@Override
public void call(Integer integer) {
Log.d("debug", integer.toString()); //打印1
}
});
takeUntil:当发射的数据满足某个条件后(包含该数据),或者第二个Observable发射了一项数据或发射了一个终止通知时(观察者接受不到第二个Observable发射的数据),终止第一个Observable发送数据
Observable.just(1, 2, 3, 4)
.takeUntil(new Func1() {
@Override
public Boolean call(Integer integer) {
return integer == 3;
}
})
.subscribe(new Action1() {
@Override
public void call(Integer integer) {
Log.d("debug", "just" + integer.toString()); //打印1,2,3
}
});
Observable.interval(0, 500, TimeUnit.MILLISECONDS)
.subscribeOn(Schedulers.computation())
.takeUntil(Observable.timer(1200, TimeUnit.MILLISECONDS))
.subscribe(new Action1() {
@Override
public void call(Long aLong) {
Log.d("debug", aLong.toString()); //打印0,1,2
}
});
takeWhile:当发射的数据对应某个条件为false时(不包含该数据),Observable终止发送数据
Observable.just(1, 2, 3, 4)
.takeWhile(new Func1() {
@Override
public Boolean call(Integer integer) {
return integer != 3;
}
})
.subscribe(new Action1() {
@Override
public void call(Integer integer) {
Log.d("debug", integer.toString()); //打印1,2
}
});
skipUnit:丢弃Observable发射的数据,直到第二个Observable开始发射数据或者发射一个终止通知时
Observable.interval(0, 500, TimeUnit.MILLISECONDS)
.take(5)
.subscribeOn(Schedulers.computation())
.skipUntil(Observable.timer(1200, TimeUnit.MILLISECONDS))
.subscribe(new Action1() {
@Override
public void call(Long aLong) {
Log.d("debug", aLong.toString()); //打印3,4
}
});
skipWhile:丢弃Observable发射的数据,直到一个指定的条件不成立(不丢弃条件数据)
Observable.just(1, 2, 3, 4)
.skipWhile(new Func1() {
@Override
public Boolean call(Integer integer) {
return integer < 3;
}
})
.subscribe(new Action1() {
@Override
public void call(Integer integer) {
Log.d("debug", integer.toString()); //打印3,4
}
});
聚合操作
reduce:用一个函数接收Observable发射的数据,将函数的计算结果作为下次计算的参数,最后输出结果。
Observable.just(1, 2, 3, 4)
.reduce(new Func2() {
@Override
public Integer call(Integer integer, Integer integer2) {
Log.d("debug", "integer1:" + integer + ",integer2:" + integer2);
return integer + integer2; //求和操作
}
})
.subscribe(new Action1() {
@Override
public void call(Integer integer) {
Log.d("debug", "result:" + integer);
}
});
/**
* 日志输出
* integer1:1,integer2:2
* integer1:3,integer2:3
* integer1:6,integer2:4
* result:10
*/
collect:用于将数据收集到一个可变的数据结构(如List,Map)
Observable.just(1, 2, 3, 4)
.collect(new Func0>() {
@Override
public List call() {
return new ArrayList(); //创建List用于收集数据
}
}, new Action2, Integer>() {
@Override
public void call(List integers, Integer integer) {
integers.add(integer); //将数据添加到List中
}
})
.subscribe(new Action1>() {
@Override
public void call(List integers) {
Log.d("debug", integers.toString()); //打印[1, 2, 3, 4]
}
});
Observable.just(1, 2, 3, 4)
.collect(new Func0>() {
@Override
public Map call() {
return new HashMap(); //创建Map用于收集数据
}
}, new Action2, Integer>() {
@Override
public void call(Map integerStringMap, Integer integer) {
integerStringMap.put(integer, "value" + integer); //将数据添加到Map中
}
})
.subscribe(new Action1>() {
@Override
public void call(Map integerStringMap) {
//打印{4=value4, 1=value1, 3=value3, 2=value2},注:HashMap保存的数据是无序的
Log.d("debug", integerStringMap.toString());
}
});
count / countLong:计算发射的数量,内部调用的是reduce
Observable.just(1, 2, 3, 4)
.count()
.subscribe(new Action1() {
@Override
public void call(Integer integer) {
Log.d("debug", "integer:" + integer.toString()); //打印4
}
});
转换操作
toList:将Observable发射的所有数据收集到一个列表中,返回这个列表
Observable.just(1, 2, 3, 4)
.toList()
.subscribe(new Action1>() {
@Override
public void call(List integers) {
Log.d("debug", integers.toString()); //打印[1, 2, 3, 4]
}
});
toSortedList:将Observable发射的所有数据收集到一个有序列表中,返回这个列表
Observable.just(3, 2, 5, 4, 1)
.toSortedList() //默认升序排序
.subscribe(new Action1>() {
@Override
public void call(List integers) {
Log.d("debug", integers.toString()); //打印[1, 2, 3, 4, 5]
}
});
Observable.just(3, 2, 5, 4, 1)
.toSortedList(new Func2() {
@Override
public Integer call(Integer integer, Integer integer2) {
return integer2 - integer; //自定义排序规则(倒序)
}
})
.subscribe(new Action1>() {
@Override
public void call(List integers) {
Log.d("debug", integers.toString()); //打印[5, 4, 3, 2, 1]
}
});
toMap:将序列数据转换为一个Map,根据数据项生成key和value
Observable.just(1, 2, 3, 4)
.toMap(new Func1() { //根据数据项生成key,value为原始数据
@Override
public String call(Integer integer) {
return "key:" + integer;
}
})
.subscribe(new Action1>() {
@Override
public void call(Map stringIntegerMap) {
Log.d("debug", stringIntegerMap.toString()); //打印{key:4=4, key:2=2, key:1=1, key:3=3}
}
});
Observable.just(1, 2, 3, 4)
.toMap(new Func1() { //根据数据项生成key和value
@Override
public String call(Integer integer) {
return "key:" + integer;
}
}, new Func1() {
@Override
public String call(Integer integer) {
return "value:" + integer;
}
})
.subscribe(new Action1>() {
@Override
public void call(Map stringStringMap) {
Log.d("debug", stringStringMap.toString()); //打印{key:4=value:4, key:2=value:2, key:1=value:1, key:3=value:3}
}
});
Observable.just(1, 2, 3, 4)
.toMap(new Func1() { //根据数据项生成key和value,创建指定类型的Map
@Override
public String call(Integer integer) {
return "key:" + integer;
}
}, new Func1() {
@Override
public String call(Integer integer) {
return "value:" + integer;
}
}, new Func0>() {
@Override
public Map call() {
return new LinkedHashMap(); //LinkedHashMap保证存取顺序相同
}
})
.subscribe(new Action1>() {
@Override
public void call(Map stringStringMap) {
Log.d("debug", stringStringMap.toString()); //打印{key:1=value:1, key:2=value:2, key:3=value:3, key:4=value:4}
}
});
toMultiMap:类似toMap,不同的地方在于map的value是一个集合,使一个key可以映射多个value,多用于分组
Observable.just(1, 2, 1, 4)
.toMultimap(new Func1() { //根据数据项生成key,value为原始数据
@Override
public String call(Integer integer) {
return "key:" + integer;
}
})
.subscribe(new Action1>>() {
@Override
public void call(Map> stringCollectionMap) {
Log.d("debug", stringCollectionMap.toString()); //打印{key:4=[4], key:2=[2], key:1=[1, 1]}
}
});
Observable.just(1, 2, 1, 4)
.toMap(new Func1() {
@Override
public String call(Integer integer) {
return "key:" + integer;
}
})
.subscribe(new Action1>() {
@Override
public void call(Map stringIntegerMap) {
Log.d("debug", stringIntegerMap.toString()); //打印{key:4=4, key:2=2, key:1=1}
}
});
变换操作
map:对Observable发射的每一项数据都应用一个函数进行变换
Observable.just(1, 2, 3, 4)
.map(new Func1() {
@Override
public String call(Integer integer) {
return "item:" + integer;
}
})
.subscribe(new Action1() {
@Override
public void call(String s) {
Log.d("debug", s); //打印item:1,item:2,item:3,item:4
}
});
cast:在发射之前强制将Observable发射的所有数据转换为指定类型(父类强转为子类)
List list = new ArrayList();
Observable.just(list)
.cast(ArrayList.class) //将List强转为ArrayList
.subscribe(new Action1() {
@Override
public void call(ArrayList arrayList) {
}
});
flatMap:将Observable发射的数据变换为Observables集合,然后将这些Observable发射的数据平坦化的放进一个单独的Observable,内部使用marge合并(可用于一对多转换或多对多转换,也可用于网络请求的嵌套)
Observable.just(1, 2, 3)
.flatMap(new Func1>() {
@Override
public Observable call(Integer integer) {
return Observable.create(new Observable.OnSubscribe() {
@Override
public void call(Subscriber super Integer> subscriber) {
subscriber.onNext(integer * 10);
subscriber.onNext(integer * 100);
subscriber.onCompleted();
}
});
}
})
.subscribe(new Action1() {
@Override
public void call(Integer integer) {
Log.d("debug", integer.toString()); //打印10,100,20,200,30,300
}
});
flatMapIterable:和flatMap作用一样,只不过生成的是Iterable而不是Observable
Observable.just(1, 2, 3)
.flatMapIterable(new Func1>() {
@Override
public Iterable call(Integer integer) {
return Arrays.asList(integer * 10, integer * 100);
}
})
.subscribe(new Action1() {
@Override
public void call(Integer integer) {
Log.d("debug", integer.toString()); //打印10,100,20,200,30,300
}
});
concatMap:类似于flatMap,由于内部使用concat合并,所以是按照顺序连接发射
switchMap:和flatMap很像,将Observable发射的数据变换为Observables集合,当原始Observable发射一个新的数据(Observable)时,它将取消订阅前一个Observable
Observable.interval(0, 500, TimeUnit.MILLISECONDS) //每500毫秒发射一次
.take(4)
.switchMap(new Func1>() {
@Override
public Observable call(Long aLong) {
return Observable.create(new Observable.OnSubscribe() {
@Override
public void call(Subscriber super String> subscriber) {
subscriber.onNext(aLong + "A");
SystemClock.sleep(800); //延迟800毫秒
subscriber.onNext(aLong + "B");
subscriber.onCompleted();
}
}).subscribeOn(Schedulers.newThread());
}
})
.subscribe(new Action1() {
@Override
public void call(String s) {
Log.d("debug", s); //打印0A,1A,2A,3A,3B
}
});
与reduce很像,对Observable发射的每一项数据应用一个函数,然后按顺序依次发射每一个值
Observable.just(1, 2, 3, 4)
.scan(new Func2() {
@Override
public Integer call(Integer integer, Integer integer2) {
Log.d("debug", "integer1:" + integer + ",integer2:" + integer2);
return integer + integer2;
}
})
.subscribe(new Action1() {
@Override
public void call(Integer integer) {
Log.d("debug", "result:" + integer);
}
});
/**
* 日志输出
* result:1
* integer1:1,integer2:2
* result:3
* integer1:3,integer2:3
* result:6
* integer1:6,integer2:4
* result:10
*/
groupBy:将Observable分拆为Observable集合,将原始Observable发射的数据按Key分组,每个Observable发射一组不同的数据(类似于toMultiMap)
Observable.just(1, 2, 3, 4)
.groupBy(new Func1() {
@Override
public String call(Integer integer) { //根据数据项生成key
return integer % 2 == 0 ? "偶数" : "奇数";
}
})
.subscribe(new Action1>() {
@Override
public void call(GroupedObservable o) {
o.subscribe(new Action1() {
@Override
public void call(Integer integer) {
Log.d("debug", o.getKey() + ":" + integer); //打印奇数:1,偶数:2,奇数:3,偶数:4
}
});
}
});
Observable.just(1, 2, 3, 4)
.groupBy(new Func1() {
@Override
public String call(Integer integer) { //根据数据项生成key
return integer % 2 == 0 ? "偶数" : "奇数";
}
}, new Func1() {
@Override
public Integer call(Integer integer) { //根据数据项生成value
return integer * 10;
}
})
.subscribe(new Action1>() {
@Override
public void call(GroupedObservable o) {
o.subscribe(new Action1() {
@Override
public void call(Integer integer) {
Log.d("debug", o.getKey() + ":" + integer); //打印奇数:10,偶数:20,奇数:30,偶数:40
}
});
}
});
buffer:定期从Observable收集数据到一个集合,然后将这些数据集合打包发射
Observable.just(1, 2, 3, 4, 5)
.buffer(3, 1) //count:表示从当前指针位置开始打包3个数据项到集合中,skip:表示指针向后移1位,
.subscribe(new Action1>() {
@Override
public void call(List integers) {
Log.d("debug", "skip" + integers.toString()); //打印[1, 2, 3],[2, 3, 4],[3, 4, 5],[4, 5],[5]
}
});
Observable.just(1, 2, 3, 4, 5)
.buffer(3) //每3个打包成一个集合,内部就是.buffer(3,3)
.subscribe(new Action1>() {
@Override
public void call(List integers) {
Log.d("debug", integers.toString()); //打印[1, 2, 3],[4, 5]
}
});
Observable.interval(0, 100, TimeUnit.MILLISECONDS)
.take(5)
.buffer(250, TimeUnit.MILLISECONDS, 2) //将每250毫秒内发射的数据收集到多个集合中,每个集合最多存放2个数据
.subscribe(new Action1>() {
@Override
public void call(List longs) {
//打印[0, 1],[2],[3, 4],[]
Log.d("debug", "count:" + longs.toString());
}
});
Observable.interval(0, 100, TimeUnit.MILLISECONDS)
.take(5)
.buffer(250, TimeUnit.MILLISECONDS) //将每250毫秒内发射的数据收集到一个集合中,集合不限制大小
.subscribe(new Action1>() {
@Override
public void call(List longs) {
Log.d("debug", longs.toString()); //打印[0, 1, 2],[3, 4]
}
});
Observable.interval(0, 100, TimeUnit.MILLISECONDS)
.take(5)
//从指定时间节点开始,将该节点后250毫秒内发射的数据收集的一个集合中,初始节点为0,每发射一次集合,
//节点的时间增加150毫秒,即下一次收集数据从150毫秒开始,收集150毫秒到400毫秒之间发射的数据
.buffer(250, 150, TimeUnit.MILLISECONDS)
.subscribe(new Action1>() {
@Override
public void call(List longs) {
Log.d("debug", longs.toString()); //打印[0, 1, 2],[2,3],[4]
}
});
window:定期将来自Observable的数据分拆成一些Observable窗口,然后发射这些窗口,而不是每次发射一项,类似于buffer,buffer发射的是集合,而window发射的是Observable
Observable.just(1, 2, 3, 4, 5)
.window(3, 1)
.subscribe(new Action1>() {
@Override
public void call(Observable integerObservable) {
integerObservable.toList() //将所有数据搜集成一个集合,便于观察
.subscribe(new Action1>() {
@Override
public void call(List integers) {
Log.d("debug", integers.toString()); //打印[1, 2, 3],[2, 3, 4],[3, 4, 5],[4, 5],[5]
}
});
}
});
Observable.just(1, 2, 3, 4, 5)
.window(3) //相当于window(3,3)
.subscribe(new Action1>() {
@Override
public void call(Observable integerObservable) {
integerObservable.toList() //将所有数据搜集成一个集合,便于观察
.subscribe(new Action1>() {
@Override
public void call(List integers) {
Log.d("debug", integers.toString()); //打印[1, 2, 3],[4, 5]
}
});
}
});
//剩下其余重载方法也与buffer基本一样,不重复了
篇幅有限,第一部分介绍到这里