新聞中心
前言
前段時(shí)間寫了一篇對(duì)協(xié)程的一些理解,里面提到了不管是協(xié)程還是callback,本質(zhì)上其實(shí)提供的是一種異步無阻塞的編程模式;并且介紹了java中對(duì)異步無阻賽這種編程模式的支持,主要提到了Future和CompletableFuture;之后有同學(xué)在下面留言提到了RxJava,剛好最近在看微服務(wù)設(shè)計(jì)這本書,里面提到了響應(yīng)式擴(kuò)展(Reactive extensions,Rx),而RxJava是Rx在JVM上的實(shí)現(xiàn),所有打算對(duì)RxJava進(jìn)一步了解。

RxJava簡介
RxJava的官網(wǎng)地址:https://github.com/ReactiveX/RxJava,
其中對(duì)RxJava進(jìn)行了一句話描述:RxJava – Reactive Extensions for the JVM – a library for composing asynchronous and event-based programs using observable sequences for the Java VM.
大意就是:一個(gè)在Java VM上使用可觀測(cè)的序列來組成異步的、基于事件的程序的庫。
更詳細(xì)的說明在Netflix技術(shù)博客的一篇文章中描述了RxJava的主要特點(diǎn):
- 易于并發(fā)從而更好的利用服務(wù)器的能力。
- 易于有條件的異步執(zhí)行。
- 一種更好的方式來避免回調(diào)地獄。
- 一種響應(yīng)式方法。
與CompletableFuture對(duì)比
之前提到CompletableFuture真正的實(shí)現(xiàn)了異步的編程模式,一個(gè)比較常見的使用場(chǎng)景:
CompletableFuturefuture = CompletableFuture.supplyAsync(耗時(shí)函數(shù)); Future f = future.whenComplete((v, e) -> { System.out.println(v); System.out.println(e); }); System.out.println("other...");
下面用一個(gè)簡單的例子來看一下RxJava是如何實(shí)現(xiàn)異步的編程模式:
bservableobservable = Observable.just(1, 2) .subscribeOn(Schedulers.io()).map(new Func1 () { @Override public Long call(Integer t) { try { Thread.sleep(1000); //耗時(shí)的操作 } catch (InterruptedException e) { e.printStackTrace(); } return (long) (t * 2); } }); observable.subscribe(new Subscriber () { @Override public void onCompleted() { System.out.println("onCompleted"); } @Override public void onError(Throwable e) { System.out.println("error" + e); } @Override public void onNext(Long result) { System.out.println("result = " + result); } }); System.out.println("other...");
Func1中以異步的方式執(zhí)行了一個(gè)耗時(shí)的操作,Subscriber(觀察者)被訂閱到Observable(被觀察者)中,當(dāng)耗時(shí)操作執(zhí)行完會(huì)回調(diào)Subscriber中的onNext方法。
其中的異步方式是在subscribeOn(Schedulers.io())中指定的,Schedulers.io()可以理解為每次執(zhí)行耗時(shí)操作都啟動(dòng)一個(gè)新的線程。
結(jié)構(gòu)上其實(shí)和CompletableFuture很像,都是異步的執(zhí)行一個(gè)耗時(shí)的操作,然后在有結(jié)果的時(shí)候主動(dòng)告訴我結(jié)果。那我們還需要RxJava干嘛,不知道你有沒有注意,上面的例子中其實(shí)提供2條數(shù)據(jù)流[1,2],并且處理完任何一個(gè)都會(huì)主動(dòng)告訴我,當(dāng)然這只是它其中的一項(xiàng)功能,RxJava還有很多好用的功能,在下面的內(nèi)容會(huì)進(jìn)行介紹。
異步觀察者模式
上面這段代碼有沒有發(fā)現(xiàn)特別像設(shè)計(jì)模式中的:觀察者模式;首先提供一個(gè)被觀察者Observable,然后把觀察者Subscriber添加到了被觀察者列表中;
RxJava中一共提供了四種角色:Observable、Observer、Subscriber、Subjects
Observables和Subjects是兩個(gè)被觀察者,Observers和Subscribers是觀察者;
當(dāng)然我們也可以查看一下源碼,看一下jdk中的Observer和RxJava的Observer
jdk中的Observer:
public interface Observer {
void update(Observable o, Object arg);
}RxJava的Observer:
public interface Observer{ void onCompleted(); void onError(Throwable e); void onNext(T t); }
同時(shí)可以發(fā)現(xiàn)Subscriber是implements Observer的:
public abstract class Subscriberimplements Observer , Subscription
可以發(fā)現(xiàn)RxJava中在Observer中引入了2個(gè)新的方法:onCompleted()和onError()
onCompleted():即通知觀察者Observable沒有更多的數(shù)據(jù),事件隊(duì)列完結(jié)
onError():在事件處理過程中出異常時(shí),onError()會(huì)被觸發(fā),同時(shí)隊(duì)列自動(dòng)終止,不允許再有事件發(fā)出。
正是因?yàn)镽xJava提供了同步和異步兩種方式進(jìn)行事件的處理,個(gè)人覺得異步的方式更能體現(xiàn)RxJava的價(jià)值,所以這里給他命名為異步觀察者模式。
好了,下面正式介紹RxJava的那些靈活的操作符,這里僅僅是簡單的介紹和簡單的實(shí)例,具體用在什么場(chǎng)景下,會(huì)在以后的文章中介紹
Maven引入
io.reactivex rxjava 1.2.4
創(chuàng)建Observable
1.create()創(chuàng)建一個(gè)Observable,并為它定義事件觸發(fā)規(guī)則
Observableobservable = Observable .create(new Observable.OnSubscribe () { @Override public void call(Subscriber super Integer> observer) { for (int i = 0; i < 5; i++) { observer.onNext(i); } observer.onCompleted(); } }); observable.subscribe(new Observer () {...});
2.from()可以從一個(gè)列表中創(chuàng)建一個(gè)Observable,Observable將發(fā)射出列表中的每一個(gè)元素
Listitems = new ArrayList (); for (int i = 0; i < 5; i++) { items.add(i); } Observable observable = Observable.from(items); observable.subscribe(new Observer () {...});
3.just()將傳入的參數(shù)依次發(fā)送出來
Observableobservable = Observable.just(1, 2, 3); observable.subscribe(new Observer () {...});
過濾Observable
1.filter()來過濾我們觀測(cè)序列中不想要的值
Listitems = new ArrayList (); for (int i = 0; i < 5; i++) { items.add(i); } Observable observable = Observable.from(items).filter( new Func1 () { @Override public Boolean call(Integer t) { return t == 1; } }); observable.subscribe(new Observer () {...});
2.take()和taskLast()分別取前幾個(gè)元素和后幾個(gè)元素
Listitems = new ArrayList (); for (int i = 0; i < 5; i++) { items.add(i); } Observable observable = Observable.from(items).take(3); observable.subscribe(new Observer () {...});
Observableobservable = Observable.from(items).takeLast(2);
3.distinct()和distinctUntilChanged()
distinct()過濾掉重復(fù)的值
Listitems = new ArrayList (); items.add(1); items.add(10); items.add(10); Observable observable = Observable.from(items).distinct(); observable.subscribe(new Observer () {...});
distinctUntilChanged()列發(fā)射一個(gè)不同于之前的一個(gè)新值時(shí)讓我們得到通知
Listitems = new ArrayList (); items.add(1); items.add(100); items.add(100); items.add(200); Observable observable = Observable.from(items).distinctUntilChanged(); observable.subscribe(new Observer () {...});
4.first()和last()分別取***個(gè)元素和***一個(gè)元素
Listitems = new ArrayList (); for (int i = 0; i < 5; i++) { items.add(i); } // Observable observable = Observable.from(items).first(); Observable observable = Observable.from(items).last(); observable.subscribe(new Observer () {...});
5.skip()和skipLast()分別從前或者后跳過幾個(gè)元素
Listitems = new ArrayList (); for (int i = 0; i < 5; i++) { items.add(i); } // Observable observable = Observable.from(items).skip(2); Observable observable = Observable.from(items).skipLast(2); observable.subscribe(new Observer () {...});
6.elementAt()取第幾個(gè)元素進(jìn)行發(fā)射
Listitems = new ArrayList (); for (int i = 0; i < 5; i++) { items.add(i); } Observable observable = Observable.from(items).elementAt(2); observable.subscribe(new Observer () {...});
7.sample()指定發(fā)射間隔進(jìn)行發(fā)射
Listitems = new ArrayList (); for (int i = 0; i < 50000; i++) { items.add(i); } Observable observable = Observable.from(items).sample(1,TimeUnit.MICROSECONDS); observable.subscribe(new Observer () {...});
8.timeout()設(shè)定的時(shí)間間隔內(nèi)如果沒有得到一個(gè)值則發(fā)射一個(gè)錯(cuò)誤
Listitems = new ArrayList (); for (int i = 0; i < 5; i++) { items.add(i); } Observable observable = Observable.from(items).timeout(1,TimeUnit.MICROSECONDS); observable.subscribe(new Observer () {...onError()...});
9.debounce()在一個(gè)指定的時(shí)間間隔過去了仍舊沒有發(fā)射一個(gè),那么它將發(fā)射***的那個(gè)
Listitems = new ArrayList (); for (int i = 0; i < 5; i++) { items.add(i); } Observable observable = Observable.from(items).debounce(1,TimeUnit.MICROSECONDS); observable.subscribe(new Observer () {...});
轉(zhuǎn)換Observable
1.map()接收一個(gè)指定的Func對(duì)象然后將它應(yīng)用到每一個(gè)由Observable發(fā)射的值上
Listitems = new ArrayList (); for (int i = 0; i < 5; i++) { items.add(i); } Observable observable = Observable.from(items).map( new Func1 () { @Override public Integer call(Integer t) { return t * 2; } }); observable.subscribe(new Observer () {...});
2.flatMap()函數(shù)提供一種鋪平序列的方式,然后合并這些Observables發(fā)射的數(shù)據(jù)
final Scheduler scheduler = Schedulers.from(Executors.newFixedThreadPool(3)); Listitems = new ArrayList (); for (int i = 0; i < 5; i++) { items.add(i); } Observable observable = Observable.from(items).flatMap( new Func1 >() { @Override public Observable extends Integer> call(Integer t) { List items = new ArrayList (); items.add(t); items.add(99999); return Observable.from(items).subscribeOn(scheduler); } }); observable.subscribe(new Observer () {...});
重要的一點(diǎn)提示是關(guān)于合并部分:它允許交叉。這意味著flatMap()不能夠保證在最終生成的Observable中源Observables確切的發(fā)射
順序。
3.concatMap()函數(shù)解決了flatMap()的交叉問題,提供了一種能夠把發(fā)射的值連續(xù)在一起的鋪平函數(shù),而不是合并它們。
示例代碼同上,將flatMap替換為concatMap,輸出的結(jié)果來看是有序的
4.switchMap()和flatMap()很像,除了一點(diǎn):每當(dāng)源Observable發(fā)射一個(gè)新的數(shù)據(jù)項(xiàng)(Observable)時(shí),它將取消訂閱并停止監(jiān)視之前那個(gè)數(shù)據(jù)項(xiàng)產(chǎn)生的Observable,并開始監(jiān)視當(dāng)前發(fā)射的這一個(gè)。
示例代碼同上,將flatMap替換為switchMap,輸出的結(jié)果只剩***一個(gè)值
5.scan()是一個(gè)累積函數(shù),對(duì)原始Observable發(fā)射的每一項(xiàng)數(shù)據(jù)都應(yīng)用一個(gè)函數(shù),計(jì)算出函數(shù)的結(jié)果值,并將該值填充回可觀測(cè)序列,等待和下一次發(fā)射的數(shù)據(jù)一起使用。
Listitems = new ArrayList (); for (int i = 0; i < 5; i++) { items.add(i); } Observable observable = Observable.from(items).scan( new Func2 () { @Override public Integer call(Integer t1, Integer t2) { System.out.println(t1 + "+" + t2); return t1 + t2; } }); observable.subscribe(new Observer () {...});
6.groupBy()來分組元素
Listitems = new ArrayList (); for (int i = 0; i < 5; i++) { items.add(i); } Observable > observable = Observable .from(items).groupBy(new Func1 () { @Override public Integer call(Integer t) { return t % 3; } }); observable.subscribe(new Observer >() { @Override public void onNext(final GroupedObservable t) { t.subscribe(new Action1 () { @Override public void call(Integer value) { System.out.println("key:" + t.getKey()+ ", value:" + value); } }); });
7.buffer()函數(shù)將源Observable變換一個(gè)新的Observable,這個(gè)新的Observable每次發(fā)射一組列表值而不是一個(gè)一個(gè)發(fā)射。
Listitems = new ArrayList (); for (int i = 0; i < 5; i++) { items.add(i); } Observable > observable = Observable.from(items).buffer(2); observable.subscribe(new Observer
>() {...});
8.window()函數(shù)和 buffer()很像,但是它發(fā)射的是Observable而不是列表
Listitems = new ArrayList (); for (int i = 0; i < 5; i++) { items.add(i); } Observable > observable = Observable.from(items).window(2); observable.subscribe(new Observer >() { @Override public void onNext(Observable t) { t.subscribe(new Action1 () { @Override public void call(Integer t) { System.out.println("this Action1 = " + this+ ",result = " + t); } }); //onCompleted和onError });
9.cast()它將源Observable中的每一項(xiàng)數(shù)據(jù)都轉(zhuǎn)換為新的類型,把它變成了不同的Class
Listitems = new ArrayList (); items.add(new Son()); items.add(new Son()); items.add(new Father()); items.add(new Father()); Observable observable = Observable.from(items).cast(Son.class); observable.subscribe(new Observer () {...}); class Father { } class Son extends Father { }
組合Observables
1.merge()方法將幫助你把兩個(gè)甚至更多的Observables合并到他們發(fā)射的數(shù)據(jù)項(xiàng)里
Listitems1 = new ArrayList (); for (int i = 0; i < 5; i++) { items1.add(i); } List items2 = new ArrayList (); for (int i = 5; i < 10; i++) { items2.add(i); } Observable observable1 = Observable.from(items1); Observable observable2 = Observable.from(items2); Observable observableMerge = Observable.merge(observable1,observable2); observable.subscribe(new Observer () {...});
2.zip()合并兩個(gè)或者多個(gè)Observables發(fā)射出的數(shù)據(jù)項(xiàng),根據(jù)指定的函數(shù) Func* 變換它們,并發(fā)射一個(gè)新值
Listitems1 = new ArrayList (); for (int i = 0; i < 5; i++) { items1.add(i); } List items2 = new ArrayList (); for (int i = 5; i < 10; i++) { items2.add(i); } Observable observable1 = Observable.from(items1); Observable observable2 = Observable.from(items2); Observable observableZip = Observable.zip(observable1, observable2, new Func2 () { @Override public Integer call(Integer t1, Integer t2) { return t1 * t2; } }); observable.subscribe(new Observer () {...});
3.combineLatest()把兩個(gè)Observable產(chǎn)生的結(jié)果進(jìn)行合并,這兩個(gè)Observable中任意一個(gè)Observable產(chǎn)生的結(jié)果,都和另一個(gè)Observable***產(chǎn)生的結(jié)果,按照一定的規(guī)則進(jìn)行合并。
Observableobservable1 = Observable.interval(1000,TimeUnit.MILLISECONDS); Observable observable2 = Observable.interval(1000,TimeUnit.MILLISECONDS); Observable.combineLatest(observable1, observable2, new Func2 () { @Override public Long call(Long t1, Long t2) { System.out.println("t1 = " + t1 + ",t2 = " + t2); return t1 + t2; } }).subscribe(new Observer () {...}); Thread.sleep(100000);
4.join()類似combineLatest(),但是join操作符可以控制每個(gè)Observable產(chǎn)生結(jié)果的生命周期,在每個(gè)結(jié)果的生命周期內(nèi),可以與另一個(gè)Observable產(chǎn)生的結(jié)果按照一定的規(guī)則進(jìn)行合并
Observableobservable1 = Observable.interval(1000, TimeUnit.MILLISECONDS); Observable observable2 = Observable.interval(1000, TimeUnit.MILLISECONDS); observable1.join(observable2, new Func1 >() { @Override public Observable call(Long t) { System.out.println("left=" + t); return Observable.just(t).delay(1000, TimeUnit.MILLISECONDS); } }, new Func1 >() { @Override public Observable call(Long t) { System.out.println("right=" + t); return Observable.just(t).delay(1000, TimeUnit.MILLISECONDS); } }, new Func2 () { @Override public Long call(Long t1, Long t2) { return t1 + t2; } }).subscribe(new Observer () { @Override public void onCompleted() { System.out.println("Observable completed"); } @Override public void onError(Throwable e) { System.out.println("Oh,no! Something wrong happened!"); } @Override public void onNext(Long t) { System.out.println("[result=]" + t); } }); Thread.sleep(100000);
5.switchOnNext()把一組Observable轉(zhuǎn)換成一個(gè)Observable,對(duì)于這組Observable中的每一個(gè)Observable所產(chǎn)生的結(jié)果,如果在同一個(gè)時(shí)間內(nèi)存在兩個(gè)或多個(gè)Observable提交的結(jié)果,只取***一個(gè)Observable提交的結(jié)果給訂閱者
Observable> observable = Observable.interval(2, TimeUnit.SECONDS) .map(new Func1 >() { @Override public Observable call(Long aLong) { return Observable.interval(1, TimeUnit.MILLISECONDS).take(5); } }).take(2); Observable.switchOnNext(observable).subscribe(new Observer () {...}); Thread.sleep(1000000);
6.startWith()在Observable開始發(fā)射他們的數(shù)據(jù)之前,startWith()通過傳遞一個(gè)參數(shù)來先發(fā)射一個(gè)數(shù)據(jù)序列
Observable.just(1000, 2000).startWith(1, 2).subscribe(new Observer() {...});
總結(jié)
本文主要對(duì)rxjava進(jìn)行了簡單的介紹,從異步編程這個(gè)角度對(duì)rxjava進(jìn)行了分析;并且針對(duì)Observable的過濾,轉(zhuǎn)換,組合的API進(jìn)行了簡單的介紹,當(dāng)然我們更關(guān)心的是rxjava有哪些應(yīng)用場(chǎng)景。
標(biāo)題名稱:快速了解異步編程 RxJava
當(dāng)前URL:http://www.dlmjj.cn/article/coechch.html


咨詢
建站咨詢
