程序師世界是廣大編程愛好者互助、分享、學習的平台,程序師世界有你更精彩!
首頁
編程語言
C語言|JAVA編程
Python編程
網頁編程
ASP編程|PHP編程
JSP編程
數據庫知識
MYSQL數據庫|SqlServer數據庫
Oracle數據庫|DB2數據庫
 程式師世界 >> 編程語言 >> JAVA編程 >> JAVA綜合教程 >> RxJava前奏之原理分析

RxJava前奏之原理分析

編輯:JAVA綜合教程

RxJava前奏之原理分析


RxJava 之前奏:原理分析

前言:RxJava在國內越來越火,本文只是原理分析的總結。代碼不多,但有些繞,一步一步的推進,對於理解RxJava來說還是十分有好處的。但確實需要很大的毅力去堅持下去看完。

首先我們進入一個例子,關於貓的例子。

我們有個 Web API,能根據給定的查詢請求搜索到整個互聯網上貓的圖片。每個圖片包含可愛指數的參數(描述圖片可愛度的整型值)。我們的任務將會下載到一個貓列表的集合,選擇最可愛的那個,然後把它保存到本地。

首先定義實體類

public class Cat implements Comparable {
    /**
     * 圖片
     */
    String image;

    /**
     * 喜愛度
     */
    int cuteness;

    @Override
    public int compareTo(Cat another) {
        return Integer.compare(cuteness, another.cuteness);
    }

}

定義一個Uri類,用來定義圖片保存的地址

public class Uri {

}

定義Api接口類,主要查詢貓和保存貓的方法。

public interface Api {

    /**
     * 查詢所有貓
     * @param query
     * @return
     */
    List queryCats(String query);

    /**
     * 保存
     * @param cat
     * @return
     */
    Uri store(Cat cat);

}

定義業務類CastsHelp

public class CastsHelp {
    Api api;

    //保存貓
    public Uri saveTheCutestCat(String query) {
        List queryCats = api.queryCats(query);
        Cat cat = findCustest(queryCats);

        Uri savedUri = api.store(cat);
        return savedUri;
    }

    //查詢最受喜愛的貓
    private Cat findCustest(List queryCats) {
        return Collections.max(queryCats);
    }
}

ok,我們的第一個例子寫完了,讓我們回過頭來再仔細看一下這個例子,該業務邏輯類CastsHelp,裡面包含了兩個方法,其中saveTheCutestCat方法裡面,包含了三個組合的方法,最終獲取到保存貓的地址,並返回。這種組合方式即簡單,又直白。一眼看去,清晰,明了。但我們往下看。

異步執行

我們知道,對於一些網絡請求,我們通常都需要進行異步去執行,那麼,對於剛才說saveTheCutestCat方法,我們就不能如此寫,對於api的中的方法,我們需要利用接口回調的方式回傳結果。於是我們修改我們的代碼。

public interface Api {


    interface CatsQueryCallback{
        void onCatListReceived(List cats);
        void onCatsQueryError(Exception e);
    }


    interface StoreCallback{
        void onCateStored(Uri uri);
        void onStoteFailed(Exception e);
    }
    /**
     * 查詢所有貓
     * @param query
     * @return
     */
    List queryCats(String query,CatsQueryCallback castQueryCallback);


    /**
     * 保存
     * @param cat
     * @return
     */
    Uri store(Cat cat,StoreCallback storeCallback);

}

修改了一下api中的方法,因為對於貓和保存貓,分別為網絡請求,和I/O操作,我們不能進行同步操縱,所以添加了兩個接口,分別查詢貓的和保存貓地址的結果的回調。

同時我們需要修改一下我們的業務邏輯類CastsHelp,也需要添加接口回調。

public interface CutestCatCallback{
        void onCutestCatSaved(Uri uri);

        void onCutestCatFail(Exception e);
    }

    Api api;

    public void saveTheCutestCat(String query,final CutestCatCallback custestCatCallback) {
        List queryCats = api.queryCats(query,new CatsQueryCallback() {

            @Override
            public void onCatsQueryError(Exception e) {
                //查詢貓失敗
                custestCatCallback.onCutestCatFail(e);
            }


            @Override
            public void onCatListReceived(List cats) {
                Cat cat = findCustest(cats);

                api.store(cat, new StoreCallback() {

                    @Override
                    public void onStoteFailed(Exception e) {
                        //保存貓失敗
                        custestCatCallback.onCutestCatFail(e);

                    }

                    @Override
                    public void onCateStored(Uri uri) {
                        custestCatCallback.onCutestCatSaved(uri);
                    }
                });
            }
        });

    }

    private Cat findCustest(List queryCats) {
        return Collections.max(queryCats);
    }

代碼開始復雜了,我們再來分析一下代碼,在CastsHelp的saveTheCutestCat方法,我們傳入了一個url,以及結果的回調,及該方法調用者用來實現的接口回調。
- 首先,調用api的queryCats方法查詢所有的貓,因為該方法是異步的,所以我們傳入一個接口的實現類,該接口包含了兩個回調方法,onCatsQueryError()查詢失敗的方法,在此方法中,我們直接調用調用者實現的接口告訴調用者獲取信息出現異常,onCatListReceived(List cats)方法,表明查詢貓成功,並返回貓列表。
- 我們在onCatListReceived(List cats)方法內,此時查詢貓列表的數據已經返回,我們繼續調用了獲取最喜愛的貓方法findCustest,然後保存貓。store()方法又包含了兩個回調方法,在onStoteFailed()方法中,依然如上。在成功的方法onCateStored()中,我們將獲取到的值通過調用者的實現接口進行回傳給調用者。

好了,我們完成了異步任務的處理,代碼復雜度大大的提升了,而且對於這麼多接口回調,看著都心理難受。

接口回調的封裝

我們加入異步任務之後,整個程序在邏輯上的復雜度,代碼的復雜度上都大大的提高了,但我們發現該接口有些共同點,我們可以進行封裝。
首先,先把之前添加的接口放在一起類比一下:

    //業務邏輯類
    public interface CutestCatCallback{
        void onCutestCatSaved(Uri uri);

        void onCutestCatFail(Exception e);
    }


    //查詢貓
    interface CatsQueryCallback{
        void onCatListReceived(List cats);
        void onCatsQueryError(Exception e);
    }

    //保存貓
    interface StoreCallback{
        void onCateStored(Uri uri);
        void onStoteFailed(Exception e);
    }

看到三個接口,我們會發現,他們都有一個成功的回調和一個失敗的回調,失敗的回調參數一樣,而成功的回調接口不一樣。那麼我們來抽取一下,抽取了一個如下的新接口回調:

public interface Callback {

    void onResult(T result);

    void onError(Exception e);

}

新的接口回調,比較簡單,成功和失敗的方法,添加了泛型,那麼我們把它移入項目中,這樣關於我們的api接口又有所變動:

public interface Api {
    /**
     * 查詢所有貓
     * @param query
     * @return
     */
    void queryCats(String query,Callback castQueryCallback);


    /**
     * 保存
     * @param cat
     * @return
     */
    void store(Cat cat,Callback storeCallback);

}

在看一下我們的業務邏輯類

public class CastsHelp {

    Api api;

    public void saveTheCutestCat(String query,final Callback custestCatCallback) {
        api.queryCats(query, new Callback>() {

            @Override
            public void onResult(List result) {
                Cat cat = findCustest(result);
                api.store(cat, new Callback() {

                    @Override
                    public void onResult(Uri result) {
                        custestCatCallback.onResult(result);
                    }

                    @Override
                    public void onError(Exception e) {
                        custestCatCallback.onError(e);
                    }
                });
            }

            @Override
            public void onError(Exception e) {
                custestCatCallback.onError(e);

            }
        });

    }

    private Cat findCustest(List queryCats) {
        return Collections.max(queryCats);
    }

邏輯上本有太大的變動,但在代碼上,我們將之前定義的三個接口,統一改成了我們新定義的Callback接口。代碼簡潔了一些,同時也便於理解了一些。

再次進階

在上面我們的Demo中,我們通過定義一個泛型回調,大大降低了代碼量,但還是存在一個問題,及我們每次調用這個方法,都需要傳入兩個參數,分別為url和接口回調。如果,我們能不能只請求一個參數url,同時返回一個對象,該對象包含了我們所需要的回調信息。

那麼我們在思考一下,我們需要的是一個對象,該對象包含了一個只有接口回調為參數的方法,我們稱之為AsyncJob。

public abstract class AsyncJob {

    public abstract void start(Callback callback);

}

我們讓查詢的方法等的返回值為該對象,則返回的該對象中調用start方法即可開始執行回調。

因為我們的Api中的兩個方法是通過接口回掉來進行的,所以我們需要對Api類進行包裝。ApiWrapper.

/**
 * 接口的包裝類 備注人: Alex_MaHao
 * 
 * @date 創建時間:2016年2月26日 上午10:11:28
 */
public class ApiWrapper {
    Api api;

    public AsyncJob> queryCats(final String query) {
        return new AsyncJob>() {

            @Override
            public void start(final Callback> callback) {
                api.queryCats(query, new Callback>() {

                    @Override
                    public void onResult(List result) {
                        callback.onResult(result);

                    }

                    @Override
                    public void onError(Exception e) {
                        callback.onError(e);

                    }
                });
            }
        };

    }


    public AsyncJob store(final Cat cat) {
        return new AsyncJob() {

            @Override
            public void start(final Callback callback) {
                api.store(cat, new Callback() {

                    @Override
                    public void onResult(Uri result) {
                        callback.onResult(result);
                    }

                    @Override
                    public void onError(Exception e) {
                        callback.onError(e);
                    }
                });
            }

        };
    }

}

我們看一下我們的ApiWrapper中的方法,因為兩個方法比較類似,我們只看queryCats方法,理解一下即可。如果單純的看邏輯,會發現比較繞,我們看一下第一個方法應該怎麼使用。

        //調用方法,返回包含我們回調信息的asyncJob類
        AsyncJob> async = new ApiWrapper().queryCats("query");

        //啟動回調方法,獲取回調結果
        async.start(new Callback>() {

            @Override
            public void onResult(List result) {
                //所有貓的集合
            }

            @Override
            public void onError(Exception e) {
                //錯誤回調
            }
        });

可以看到,我們的queryCats()傳入一個url地址,返回一個包含我們回調接口的對象,這時候,我們只需要調用start()方法,即可獲取到我們的回調結果。

那麼,同時我們也需要修改一下CastsHelp中的代碼

public class CastsHelp {

    ApiWrapper apiWrapper;

    public AsyncJob saveTheCutestCat(final String query) {
        return new AsyncJob() {

            @Override
            public void start(final Callback callback) {
                apiWrapper.queryCats(query).start(new Callback>() {

                    @Override
                    public void onResult(List result) {
                        Cat cat = findCustest(result);

                        apiWrapper.store(cat).start(new Callback() {

                            @Override
                            public void onResult(Uri result) {
                                callback.onResult(result);
                            }

                            @Override
                            public void onError(Exception e) {
                                callback.onError(e);
                            }
                        });
                    }

                    @Override
                    public void onError(Exception e) {
                        callback.onError(e);
                    }
                });;
            }
        };

    }

    private Cat findCustest(List queryCats) {
        return Collections.max(queryCats);
    }
}

該方法也只是把我們的方法的返回值對象做了一下轉換,修改了其返回值對象為我們的AsyncJob。

我們把該方法在進行整理,於是就有了如下代碼

public class CastsHelp {

    ApiWrapper apiWrapper;

    public AsyncJob saveTheCutestCat(final String query) {

        //貓列表數據的接口回調
        final AsyncJob> catsListAsyncJob = apiWrapper
                .queryCats(query);

        //查詢最喜愛的貓
        final AsyncJob cutestCatAsyncJob = new AsyncJob() {

            @Override
            public void start(final Callback callback) {
                catsListAsyncJob.start(new Callback>() {

                    @Override
                    public void onResult(List result) {
                        callback.onResult(findCustest(result));

                    }

                    @Override
                    public void onError(Exception e) {
                        callback.onError(e);
                    }
                });
            }
        };

        //獲取保存的url地址的回掉對象並返回
        AsyncJob storedUriAsyncJob = new AsyncJob() {

            @Override
            public void start(final Callback callback) {
                cutestCatAsyncJob.start(new Callback() {

                    @Override
                    public void onResult(Cat result) {
                        apiWrapper.store(result).start(callback);
                    }

                    @Override
                    public void onError(Exception e) {
                        callback.onError(e);
                    }
                });
            }
        };

        return storedUriAsyncJob;

    }

    private Cat findCustest(List queryCats) {
        return Collections.max(queryCats);
    }
}

map和flatMap

在我們的CastsHelp的saveTheCutestCat()方法中,有一個查詢喜愛都最高的貓的列表,那麼我們能不能這樣理解呢,從輸入和輸出來看,我們期望將一個AsyncJob

public interface Func {
    R call(T t);
}

該接口很簡單,傳入T,通過call方法返回R。

我們在AsyncJob中添加map方法,先看代碼

public  AsyncJob map(final Func func){
        final AsyncJob source = this;

        return new AsyncJob(){

            @Override
            public void start(final Callback callback) {
                source.start(new Callback() {

                    @Override
                    public void onResult(T result) {
                        //調用者實現的call方法
                        R r = func.call(result);

                        callback.onResult(r);
                    }

                    @Override
                    public void onError(Exception e) {
                        callback.onError(e);
                    }
                });
            }

        };

    }

我們看一下這個方法,在之前我們這樣分析,我們需要的是將AsyncJob

public  AsyncJob map(final Func,Cat> func){
}

ok,我們在深入的看一下map方法,可以看到我們返回的對象,無非就是將當前的AsyncJob對象的回調信息與我們返回的信息所綁定,同時在onResult方法中將數據類型進行轉換。

那麼,在CastsHelp中的查詢最喜愛的貓的方法,可改成如下所示:

    final AsyncJob cutestCatAsyncJob = catsListAsyncJob.map(new Func, Cat>() {

            @Override
            public Cat call(List t) {
                return findCustest(t);
            }
        });

map對象,在轉換的時候是將一個對象轉換為另一個對象,通過的是返回值的方式,業績是同步的方式進行的,那麼我們會不會有另一種情況呢,即我們的call方法返回的也是一個回調信息,及也是異步的呢。

這種情況也是存在的,我們看一下我們的保存貓的方法,該方法是一個異步的任務,他不能直接返回值,而是返回一個回調的信息,我們看一下保存貓的方法,public AsyncJob store(final Cat cat),很明顯,他不能向我們之前查詢喜愛都最高的貓時直接獲取Cat對象,並調用onResult方法並返回,他是一個回調對象,我們需要做的是將返回的回調對象和我們轉換過後的相綁定。這時候我們定義了flatMap();

public  AsyncJob flatMap(final Func> func ){
        final AsyncJob source = this;
        return new AsyncJob(){

            @Override
            public void start(final Callback callback) {
                source.start(new Callback() {

                    @Override
                    public void onResult(T result) {
                        AsyncJob mapped = func.call(result);

                        mapped.start(callback);
                    }

                    @Override
                    public void onError(Exception e) {
                        callback.onError(e);

                    }
                });

            }

        };
    }

該方法將返回的Func回調對象與我們的flatMap對象綁定。那麼,保存貓的方式也可以修改如下:

    AsyncJob storedUriAsyncJob = cutestCatAsyncJob.flatMap(new Func>() {

            @Override
            public AsyncJob call(Cat t) {
                // TODO Auto-generated method stub
                return apiWrapper.store(t);
            }
        });

最後,貼上CastHelps的完整代碼,


    public AsyncJob saveTheCutestCat(final String query) {

        //查詢貓
        final AsyncJob> catsListAsyncJob = apiWrapper
                .queryCats(query);

        //獲取最喜愛的貓
        final AsyncJob cutestCatAsyncJob = catsListAsyncJob.map(new Func, Cat>() {

            @Override
            public Cat call(List t) {
                return findCustest(t);
            }
        });

        //獲取地址
        AsyncJob storedUriAsyncJob = cutestCatAsyncJob.flatMap(new Func>() {

            @Override
            public AsyncJob call(Cat t) {
                return apiWrapper.store(t);
            }
        });

        return storedUriAsyncJob;

    }
map和flatMap的總結
相同點:都是通過Func對象的Call方法,讓調用者來實現不同數據的對象轉換 不同點:Call方法轉換的目標結果不同,導致對結果的處理不同。如果是同步的轉換,不涉及接口回掉,異步等,map處理。如果Call方法返回的是一個回調對象,那麼我們需要去綁定,所以使用flatMap.

細心的可以看到我們saveTheCutestCat方法還可以精簡


    public AsyncJob saveTheCutestCat(final String query) {

        return apiWrapper
                .queryCats(query)
                .map(new Func, Cat>() {

                    @Override
                    public Cat call(List t) {


                        return findCustest(t);
                    }
                }).flatMap(new Func>() {

                    @Override
                    public AsyncJob call(Cat t) {

                        return apiWrapper.store(t);
                    }
                });

    }

RxJava 的引入

在引入RxJava 之前,我們之前寫了那麼多的代碼勢必要使用一下。那麼怎麼使用呢:

public static void main(String[] args) {
        CastsHelp castsHelp = new CastsHelp();
        castsHelp.saveTheCutestCat("ss").start(new Callback() {

            @Override
            public void onResult(Uri result) {
                //Uri 地址對象
            }

            @Override
            public void onError(Exception e) {
                //異常
            }
        });
    }

ok,開始和RxJava進行類比。

AsyncJob類比Observable
CallBack類比Observer
start()類比 subscribe();

那麼我們開始對我的類進行修改,在這之前需要導入rxjava的jar包

public class ApiWrapper {
    Api api;

    public Observable> queryCats(final String query) {

        return Observable.create(new OnSubscribe>() {

            @Override
            public void call(final Subscriber> arg0) {
                api.queryCats(query, new CatsQueryCallback() {

                    @Override
                    public void onCatsQueryError(Exception e) {
                        arg0.onError(e);
                    }

                    @Override
                    public void onCatListReceived(List cats) {
                        arg0.onNext(cats);
                    }
                });
            }
        });
    }

    public Observable store(final Cat cat) {
        return Observable.create(new OnSubscribe() {

            @Override
            public void call(final Subscriber subscriber) {
                api.store(cat, new StoreCallback() {

                    @Override
                    public void onStoteFailed(Exception e) {
                        subscriber.onError(e);
                    }

                    @Override
                    public void onCateStored(Uri uri) {
                        subscriber.onNext(uri);
                    }
                });
            }
        });
    }

}

public class CastsHelp {

    ApiWrapper apiWrapper;

    public Observable saveTheCutestCat(final String query) {
        return apiWrapper
                .queryCats(query)
                .map(new Func1, Cat>() {

                        @Override
                        public Cat call(List cats) {

                            return findCustest(cats);
                        }
                    })
                .flatMap(new Func1>() {

                        @Override
                        public Observable call(Cat cat) {

                            return apiWrapper.store(cat);
                        }
                    });

    }

    private Cat findCustest(List queryCats) {
        return Collections.max(queryCats);
    }
}
public class Test {
    public static void main(String[] args) {
        CastsHelp castsHelp = new CastsHelp();
        castsHelp.saveTheCutestCat("ss").subscribe(new Observer() {

            @Override
            public void onCompleted() {

            }

            @Override
            public void onError(Throwable arg0) {

            }

            @Override
            public void onNext(Uri arg0) {

            }
        });
    }
}

RxJava的原理已經理解和整理完畢。下面就開進入到了RxJava的整理與總結了。沒多少代碼,就不共享了。

長路漫漫啊。加油!!!

  1. 上一頁:
  2. 下一頁:
Copyright © 程式師世界 All Rights Reserved