前言 Rxjava已经问世有几年的时间,如今的版本迭代也已经到了3.0,这样的任务调度框架其实还有很多, 但是Rxjava的上手可能让然刚开始看的时候, 有点雾水,但是使用使用之后, 你会发现爽的一批,今天来讲解一下RxAndroid的简单实用。
Github地址: https://github.com/ReactiveX/RxAndroid
定义 RxJava 在 GitHub 主页上的自我介绍是
RxJava is a Java VM implementation of Reactive Extensions : a library for composing asynchronous and event-based programs by using observable sequences.
一个在 Java VM 上使用可观测的序列来组成异步的、基于事件的程序的库
说的直白一些就是一个异步任务框架,自动处理任务调度。
好处 方便!
不在需要自己手动切换线程来执行任务, 框架一个方法就可指定执行线程。
简洁!
链式调用的方式,减少了代码的层级嵌套, 让你对于任务的执行流程一目了然。
简单解析 RxJava 的异步实现,是通过一种扩展的观察者模式来实现的。对于事件序列,做出相应的处理,类似我们给view指定点击事件。
使用 框架的使用思想就是创建事件源,创建观察者, 把观察者绑定到事件源上, 就可以处理事件源进行的事件回调。
观察者类 Observer Subscriber 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 Observer<String> observer = new Observer<String>() { @Override public void onNext (String s) { Log.d(tag, "Item: " + s); } @Override public void onCompleted () { Log.d(tag, "Completed!" ); } @Override public void onError (Throwable e) { Log.d(tag, "Error!" ); } };
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 Subscriber<String> subscriber = new Subscriber<String>() { @Override public void onNext(String s) { Log.d(tag, "Item: " + s); } @Override public void onCompleted() { Log.d(tag, "Completed!"); } @Override public void onError(Throwable e) { Log.d(tag, "Error!"); } };
选择 Observer
和 Subscriber
是完全一样的。它们的区别对于使用者来说主要有两点:
onStart()
: 这是 Subscriber
增加的方法。它会在 subscribe 刚开始,而事件还未发送之前被调用,可以用于做一些准备工作,例如数据的清零或重置。这是一个可选方法,默认情况下它的实现为空。需要注意的是,如果对准备工作的线程有要求(例如弹出一个显示进度的对话框,这必须在主线程执行), onStart()
就不适用了,因为它总是在 subscribe 所发生的线程被调用,而不能指定线程。要在指定的线程来做准备工作,可以使用 doOnSubscribe()
方法,具体可以在后面的文中看到。
unsubscribe()
: 这是 Subscriber
所实现的另一个接口 Subscription
的方法,用于取消订阅。在这个方法被调用后,Subscriber
将不再接收事件。一般在这个方法调用前,可以使用 isUnsubscribed()
先判断一下状态。 unsubscribe()
这个方法很重要,因为在 subscribe()
之后, Observable
会持有 Subscriber
的引用,这个引用如果不能及时被释放,将有内存泄露的风险。所以最好保持一个原则:要在不再使用的时候尽快在合适的地方(例如 onPause()
onStop()
等方法中)调用 unsubscribe()
来解除引用关系,以避免内存泄露的发生。
事件源类 Observable 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 Observable observable = Observable.create(new Observable.OnSubscribe<String>() { @Override public void call (Subscriber<? super String> subscriber) { subscriber.onNext("Hello" ); subscriber.onNext("Hi" ); subscriber.onNext("Aloha" ); subscriber.onCompleted(); } }); Observable observable = Observable.just("Hello" , "Hi" , "Aloha" ); String[] words = {"Hello" , "Hi" , "Aloha" }; Observable observable = Observable.from(words);
订阅 就是将Observable 和 Observer/Subscriber联系起来
1 2 3 observable.subscribe(observer); observable.subscribe(subscriber);
Observable 回调 Observer/Subscriber 的顺序
不完整回调
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 Action1<String> onNextAction = new Action1<String>() { @Override public void call (String s) { Log.d(tag, s); } }; Action1<Throwable> onErrorAction = new Action1<Throwable>() { @Override public void call (Throwable throwable) { } }; Action0 onCompletedAction = new Action0() { @Override public void call () { Log.d(tag, "completed" ); } }; observable.subscribe(onNextAction); observable.subscribe(onNextAction, onErrorAction); observable.subscribe(onNextAction, onErrorAction, onCompletedAction);
例子 字符串打印
1 2 3 4 5 6 7 8 String[] names = ...; Observable.from(names) .subscribe(new Action1<String>() { @Override public void call (String name) { Log.d(tag, name); } });
图片加载展示
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 int drawableRes = ...;ImageView imageView = ...; Observable.create(new OnSubscribe<Drawable>() { @Override public void call (Subscriber<? super Drawable> subscriber) { Drawable drawable = getTheme().getDrawable(drawableRes)); subscriber.onNext(drawable); subscriber.onCompleted(); } }).subscribe(new Observer<Drawable>() { @Override public void onNext (Drawable drawable) { imageView.setImageDrawable(drawable); } @Override public void onCompleted () { } @Override public void onError (Throwable e) { Toast.makeText(activity, "Error!" , Toast.LENGTH_SHORT).show(); } });
线程控制 就是指定事件流和订阅者分别在什么线程执行
1 2 3 4 5 6 7 8 9 Observable.just(1 , 2 , 3 , 4 ) .subscribeOn(Schedulers.io()) .observeOn(AndroidSchedulers.mainThread()) .subscribe(new Action1<Integer>() { @Override public void call (Integer number) { Log.d(tag, "number:" + number); } });
到这里RxAndroid的简单使用基本已经完成, 简单总结一下, 就是创建事件源,创建观察者, 指定两者的执行线程,事件源里发送事件,观察者处理事件。
进阶 map方法
1 2 3 4 5 6 7 8 9 10 11 12 13 Observable.just("images/logo.png" ) .map(new Func1<String, Bitmap>() { @Override public Bitmap call (String filePath) { return getBitmapFromPath(filePath); } }) .subscribe(new Action1<Bitmap>() { @Override public void call (Bitmap bitmap) { showBitmap(bitmap); } });
说白了就是事件先经过一次处理之后在回调给订阅者
flatmap方法
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 Student[] students = ...; Subscriber<Course> subscriber = new Subscriber<Course>() { @Override public void onNext (Course course) { Log.d(tag, course.getName()); } ... }; Observable.from(students) .flatMap(new Func1<Student, Observable<Course>>() { @Override public Observable<Course> call (Student student) { return Observable.from(student.getCourses()); } }) .subscribe(subscriber);
这个需求是打印学生的课程名称, 每个学生会上不同课程,通过一次flatmap可以将学生的课程信息回调,避免自己手动写for循环
left方法
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 observable.lift(new Observable.Operator<String, Integer>() { @Override public Subscriber<? super Integer> call(final Subscriber<? super String> subscriber) { return new Subscriber<Integer>() { @Override public void onNext (Integer integer) { subscriber.onNext("" + integer); } @Override public void onCompleted () { subscriber.onCompleted(); } @Override public void onError (Throwable e) { subscriber.onError(e); } }; } });
提供了只用责任链方式的事件处理
compose方法
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 public class LiftAllTransformer implements Observable .Transformer <Integer , String > { @Override public Observable<String> call (Observable<Integer> observable) { return observable .lift1() .lift2() .lift3() .lift4(); } } ... Transformer liftAll = new LiftAllTransformer(); observable1.compose(liftAll).subscribe(subscriber1); observable2.compose(liftAll).subscribe(subscriber2); observable3.compose(liftAll).subscribe(subscriber3); observable4.compose(liftAll).subscribe(subscriber4);
这个方法添加多个left, 并且可以left复用
线程切换
1 2 3 4 5 6 7 8 Observable.just(1 , 2 , 3 , 4 ) .subscribeOn(Schedulers.io()) .observeOn(Schedulers.newThread()) .map(mapOperator) .observeOn(Schedulers.io()) .map(mapOperator2) .observeOn(AndroidSchedulers.mainThread) .subscribe(subscriber);