首页 > 安全资讯 >

RxJava学习(二)——RxJava使用场景总结

16-09-12

MainActivity——RxJava基础用法 simplestAty——RxJava简单实例 timer——延时操作 interval——周期性操作

11个RxJava的实例集合

类名一一对应内容,分别是:

  • MainActivity——RxJava基础用法
  • simplestAty——RxJava简单实例
  • timer——延时操作
  • interval——周期性操作
  • twoexample——两个RxJava初始化demo
  • schedulePeriodically——使用RxJava做轮询请求
  • merge——合并两个数据源
  • map_flatmap——RxJava实现变换
  • foreach——实现数组,List遍历
  • concat——实现比较一堆item,一旦满足后面的item不执行的功能
  • lifecycle——回收Observable以防止内存泄漏
  •  

代码已上传GitHub,地址请点击这里,求start求watch,当然也欢迎putrequest。

本文对这些实例做代码分析以及讲解:

MainActivity:

最基础的RxJava执行过程,观察者,被观察者,以及订阅的体现。

 

/**
 * 基本的观察者,被观察者创建方式以及实现订阅。
 */
public class MainActivity extends AppCompatActivity {
    String tag = "MainActivity";

    //定义观察者
    Observer observer = new Observer() {
        @Override
        public void onCompleted() {
            Log.d(tag,"Completed!");
        }

        @Override
        public void onError(Throwable e) {
            Log.d(tag,"Error!");
        }

        @Override
        public void onNext(String s) {
            Log.d(tag,"Item: " + s);
        }
    };

    //实现了Observer的抽象类,对其进行了一些扩展,基本使用方式完全一样。
    Subscriber subscriber = new Subscriber() {

        @Override
        public void onCompleted() {
            Log.d(tag,"Completed!");
        }

        @Override
        public void onError(Throwable e) {
            Log.d(tag,"Error!");
        }

        @Override
        public void onNext(String s) {
            Log.d(tag,"Item: " + s);
        }
    };

    //被观察者  使用onCreate创建
    Observable observable = Observable.create(new Observable.OnSubscribe(){
        @Override
        public void call(Subscriber subscriber) {
            subscriber.onNext("Hello");
            subscriber.onNext("whale");
            subscriber.onNext("nangua");
            subscriber.onCompleted();
        }
    });

    //被观察者 使用just创建
    Observable observable1 = Observable.just("Hello","Hi","Aloha");

    //被观察者 使用数组传入创建
    String[] words = {"Hello","Hi","Aloha"};
    Observable observable2 = Observable.from(words);

    @Override
    protected void onCreate(Bundle savedInstanceState) {
        super.onCreate(savedInstanceState);
        setContentView(R.layout.activity_main);
        //订阅
        observable.subscribe(subscriber);
        //observable.subscribe(observer);
    }

    @Override
    protected void onDestroy() {
        super.onDestroy();
    }
}
simplestAty:

 

不完整回调ActionX类的使用方式,该类一共有Action0、Action1...到Action9,每个类对应不同的参数设置以及构造方法,FuncX方法同理。

 

/**
 * subscribe()支持的不完整回调
 * Created by jiangzn on 16/9/8.
 */
public class simplestAty extends Activity {
    String tag = "xiaojingyu";
    Action1 onNextAction = new Action1() {
        @Override
        public void call(String s) {
            Log.d(tag,s);
        }
    };
    Action1 onErrorAction = new Action1() {
        @Override
        public void call(Throwable throwable) {
            //错误处理
        }
    };
    Action0 onCompleteAction = new Action0() {
        @Override
        public void call() {
            Log.d(tag,"completed");
        }
    };
    @Override
    public void onCreate(Bundle savedInstanceState, PersistableBundle persistentState) {
        super.onCreate(savedInstanceState, persistentState);
        Observable observable = Observable.just("hello");
        observable.subscribe(onNextAction,onErrorAction,onCompleteAction);
    }
}
timer:

 

执行延时操作,在一些需要等待执行的场景中使用:

 

Observable.timer(2, TimeUnit.SECONDS)
                .subscribe(new Observer() {
                    @Override
                    public void onCompleted() {
                        MyLog.d("completed");
                    }

                    @Override
                    public void onError(Throwable e) {
                        MyLog.d("error");
                    }

                    @Override
                    public void onNext(Long aLong) {
                        MyLog.d("啪啪啪!");
                    }
                });
interval:

 

做周期性操作,比如轮回显示什么的。

 

  Observable.interval(2, TimeUnit.SECONDS)
                .subscribe(new Subscriber() {
                    @Override
                    public void onCompleted() {
                        MyLog.d("completed");
                    }

                    @Override
                    public void onError(Throwable e) {
                        MyLog.d("error");
                    }

                    @Override
                    public void onNext(Long aLong) {
                        MyLog.d("啪啪啪");
                    }
                });
twoexample:

 

两个实践制定线程方法的小栗子,重要的是如下两句:

 

subscribeOn(Schedulers.io()) //指定OnSubscribe被激活时处在的线程,事件产生线程
.observeOn(AndroidSchedulers.mainThread())  //Subscriber所运行的线程,事件消费的线程
记住subscribeOn和observeOn的区别和用法,特别重要!!这是RxJava实现异步操作的基础!

 

 

/**
 * 两个小栗子
 * a打印字符串数组
 * b由id取得突破并显示  设置执行的线程
 * Created by jiangzn on 16/9/8.
 */
public class twoexample extends Activity {
    String tag = "xiaojingyu";
    String[] names = {"a","b","c"};
    @Override
    protected void onCreate(Bundle savedInstanceState) {
        super.onCreate(savedInstanceState);
        setContentView(R.layout.test2);
        Log.d(tag,"MainActivity");
        // a();
         b();
    }

    private void b() {
        final int drawableRes = R.drawable.aa;
        final ImageView imageView = (ImageView) findViewById(R.id.iv_test2);
        Observable.create(new Observable.OnSubscribe() {
            @Override
            public void call(Subscriber subscriber) {
                Log.d(tag,"call:" + Thread.currentThread().getName());
                //打印结果:call:RxCachedThreadScheduler-1...
                Drawable drawable = getResources().getDrawable(drawableRes);
                subscriber.onNext(drawable);
                subscriber.onCompleted();
            }
        }).subscribeOn(Schedulers.io()) //指定OnSubscribe被激活时处在的线程,事件产生线程
                .observeOn(AndroidSchedulers.mainThread())  //Subscriber所运行的线程,事件消费的线程
                .subscribe(new Subscriber() {
            @Override
            public void onCompleted() {
                Log.d(tag,"Completed!");
            }

            @Override
            public void onError(Throwable e) {
                Log.d(tag,e.getMessage());
            }

            @Override
            public void onNext(Drawable drawable) {
                imageView.setImageDrawable(drawable);
                Log.d(tag,"加载线程:" + Thread.currentThread().getName());
                //打印结果:加载线程:main
            }
        });
    }
    private void a() {
        Observable.from(names)
                .subscribe(new Action1() {
                    @Override
                    public void call(String s) {
                        Log.d(tag,s);
                    }
                });
    }
}
schedulePeriodically:

 

轮询请求,在一些需要反复需要获取数据的场景,比如过几十分钟更新一下天气数据之类的。

 

   Observable.create(new Observable.OnSubscribe(
        ) {
            @Override
            public    void call(final Subscriber subscriber) {
                    Schedulers.io().createWorker()  //指定在io线程执行
                            .schedulePeriodically(new Action0() {
                                @Override
                                public void call() {
                                    subscriber.onNext("doNetworkCallAndGetStringResult");
                                }
                            }, 2000, 1000, TimeUnit.MILLISECONDS);//初始延迟,polling延迟
                }

        })
                .subscribe(new Action1() {
            @Override
            public void call(String s) {
                MyLog.d("polling..." + s);
            }
        });
merge:

 

简而言之就是合并操作

稍微复杂一点说就是合并两个数据源的数据,但是不保证顺序,这个顺序只能是事件产生的顺序。

 

/**
 * Merge
 * 使用merge合并两个数据源
 * 例如一组数据来自网络,一组数据来自文件,需要合并数据并一起显示的情况
 *
 * 可以理解为拼接两个Observable的输出,不保证顺序,按照事件产生的顺序发送给订阅者
 * Created by jiangzn on 16/9/8.
 */
public class merge extends Activity {
    String tag = "xiaojingyu";
    @Override
    protected void onCreate(Bundle savedInstanceState) {
        super.onCreate(savedInstanceState);
        setContentView(R.layout.test3);
        Observable.merge(getDataFromFile()  ,getDataFromNet())
                .observeOn(AndroidSchedulers.mainThread())
                .subscribe(new Subscriber() {
                    @Override
                    public void onCompleted() {
                        Log.d(tag,"done loading all data");
                    }

                    @Override
                    public void onError(Throwable e) {
                        Log.d(tag,"error");
                    }

                    @Override
                    public void onNext(String s) {
                        Log.d(tag,"merge:" + s);
                    }
                });
    }
    private Observable getDataFromFile() {
        String[] strs = {"filedata1","filedata2","filedata3","filedata4"};
        Observable temp = Observable.from(strs);
        return temp;
    }

    private Observable getDataFromNet() {
        String[] strs = {"netdata1","netdata2","netdata3","netdata4"};
        Observable temp = Observable.from(strs);
        return temp;
    }
}
map_flatmap:

 

代码比较直观,就是比如传入一个路径给我返回一个bitmap。

map:

 

Observable.just("images/logo.png")  //输入类型
        .map(new Func1() {
            @Override
            public Bitmap call(String filepath) {
                return getBitmapFromPath(filepath);
            }
        })
        .subscribe(new Action1() {
            @Override
            public void call(Bitmap bitmap) {
                showBitmap(bitmap);
            }
        });

 

flatmap:

 

   Observable.from(students)
                .flatMap(new Func1>() {
                    @Override
                    public Observable call(Student student) {
                        MyLog.d(student.name);
                        return Observable.from(student.getCourses());
                    }
                }).subscribe(subscriber);
foreach:

 

实现对数组或者list的遍历。在需要异步执行的情况下还是比for循环好用的。

     Observable.from(names)
                .subscribe(new Action1() {
                    @Override
                    public void call(String s) {
                        MyLog.d(s);
                    }
                });
lifecycle:

 

使用一个compositesubscription来管理我们的observable,以防止内存泄漏。

 

public class lifecycle extends Activity{

    @Override
    protected void onCreate(Bundle savedInstanceState) {
        super.onCreate(savedInstanceState);

    }

    private CompositeSubscription mCompositeSubscription
            = new CompositeSubscription();

    private void doSomething() {
        mCompositeSubscription.add( Observable.just("Hello, World!")
                .subscribe(new Action1(){
                    @Override
                    public void call(String s) {
                        MyLog.d(s);
                    }
                }));
    }

    @Override
    protected void onDestroy() {
        super.onDestroy();
        mCompositeSubscription.unsubscribe();
        //注意! 一旦你调用了 CompositeSubscription.unsubscribe(),
        // 这个CompositeSubscription对象就不可用了, 如果你还想使用CompositeSubscription,
        // 就必须在创建一个新的对象了。
    }
}
最后看完如果还不够过瘾的话,可以从Github上下载下来然后运行跑一跑体验一下哦~

这里是传送门:点我送屠龙宝刀

相关文章
最新文章
热点推荐