赞
踩
在学习RxJava前首先需要了解ReactiveX,因为RxJava是ReactiveX的一种Java的实现形式
ReactiveX官网对于自身的介绍是:
An API for asynchronous programmingwith observable streams
异步编程设计中基于可观察的事件流的接口编程
总结就是异步线程、接口编程、可观察的事件流
入门基础:
https://www.jianshu.com/p/cd3557b1a474
https://www.cnblogs.com/lyysz/p/6344507.html
https://www.cnblogs.com/liushilin/p/7058302.html
https://www.jb51.net/article/92309.htm https://zhuanlan.zhihu.com/p/31413825
观察者 被观察者 事件订阅
Rxjava
在Android的SDK中,给开发者提供的用于异步操作的原生内容有AsyncTask和Handler。对于简单的异步请求来说,使用Android原生的AsyncTask和Handler即可满足需求,但是对于复杂的业务逻辑而言,依然使用AsyncTask和Handler会导致代码结构混乱,代码的可读性非常差。
但是RxJava的异步操作是基于观察者模式实现的,在越来越复杂的业务逻辑中,RxJava依旧可以保持简洁
基本使用:
implementation 'io.reactivex.rxjava2:rxjava:2.1.0'
implementation 'io.reactivex.rxjava2:rxandroid:2.0.1'
implementation 'com.jakewharton.rxbinding2:rxbinding:2.1.1' // 操作功能防抖
我们需要先创建一个Observable
Observable.create(new ObservableOnSubscribe<Integer>() { //1. 创建被观察者 Observable 对象
@Override
public void subscribe(ObservableEmitter<Integer> emitter) throws Exception {
emitter.onNext(1);
emitter.onNext(2);
emitter.onNext(3);
}
}).subscribe(//3订阅事件
new Consumer<Integer>() {//2创建观察者 (Observer )
@Override
public void accept(Integer s) throws Exception {
}
});
1 一般我们是结合Rxjava与retrofit一起使用的
implementation "com.squareup.retrofit2:retrofit:$rootProject.retrofitVersion" implementation "com.squareup.retrofit2:retrofit-mock:$rootProject.retrofitVersion" implementation "com.squareup.retrofit2:converter-gson:$rootProject.retrofitVersion" implementation 'com.squareup.okhttp3:logging-interceptor:3.5.0' implementation "com.squareup.retrofit2:converter-scalars:$rootProject.retrofitVersion" implementation "com.squareup.retrofit2:adapter-rxjava2:$rootProject.retrofitVersion" implementation "com.squareup.retrofit2:converter-gson:$rootProject.retrofitVersion" implementation "com.google.code.gson:gson:$rootProject.gsonVersion" private WangAndroidApi api; api = HttpUtil.getOnlineCookieRetrofit().create(WangAndroidApi.class); api.getProjectItem(1, 294) .subscribeOn(Schedulers.io()) // 上面 异步 .observeOn(AndroidSchedulers.mainThread()) // 下面 主线程 .subscribe(new Consumer<ProjectItem>() { @Override public void accept(ProjectItem data) throws Exception { Log.d(TAG, "getProjectListAction: " + data); } });
2.功能防抖(防止view短时间内多次点击频繁响应)
implementation 'com.jakewharton.rxbinding2:rxbinding:2.1.1' // 操作功能防抖
View view=findViewById(R.id.bt_anti_shake);
RxView.clicks(view).throttleFirst(2000,TimeUnit.MILLISECONDS)
.subscribe(new Consumer<Object>() {
@Override
public void accept(Object o) throws Exception {
}
});
3网络嵌套(解决方案,巧用flatMap)
我们知道map是将我们数据类型进行转换后传递下去
有些时候我们需要先异步线程请求数据,然后根据数据,拿到对应的数据,再次循环请求对应的详情数据,如果正常的嵌套两层还好,但是要是多次循环和嵌套? 可以使用flatMap,发送多次的事件
重点:Observable.fromIterable(projectBean.getData());以及线程切换
@SuppressLint("CheckResult") protected void flatMapTest() { Button view = findViewById(R.id.bt_anti_shake); RxView.clicks(view) .throttleFirst(1000, TimeUnit.MILLISECONDS) .observeOn(Schedulers.io()) // 我只给下面 切换 异步 .flatMap(new Function<Object, ObservableSource<ProjectBean>>() { @Override public ObservableSource<ProjectBean> apply(Object o) throws Exception { return api.getProject(); } }) .flatMap(new Function<ProjectBean, ObservableSource<ProjectBean.DataBean>>() { @Override public ObservableSource<ProjectBean.DataBean> apply(ProjectBean projectBean) throws Exception { return Observable.fromIterable(projectBean.getData()); } }) .flatMap(new Function<ProjectBean.DataBean, ObservableSource<ProjectItem>>() { @Override public ObservableSource<ProjectItem> apply(ProjectBean.DataBean dataBean) throws Exception { return api.getProjectItem(1, dataBean.getId()); } }) .observeOn(AndroidSchedulers.mainThread())// 我只给下面 切换 ui线程 .subscribe(new Consumer<ProjectItem>() { @Override public void accept(ProjectItem projectItem) throws Exception { // 如果我要更新UI Log.d(TAG, "accept: " + projectItem); } }); }
4 doOnNext运用 场景:例如银行的业务经常一个操作(登入之类的)需要查询多个接口
* * 1.请求服务器注册操作 * * 2.注册完成之后,更新注册UI * * 3.马上去登录服务器操作 * * 4.登录完成之后,更新登录的UI MyRetrofit.createRetrofit().create(IReqeustNetwor.class) .registerAction(new RegisterRequest()) // todo 2请求服务器注册操作 .subscribeOn(Schedulers.io()) // 给上面 异步 .observeOn(AndroidSchedulers.mainThread()) // 给下面分配主线程 .doOnNext(new Consumer<RegisterResponse>() { // todo 3 @Override public void accept(RegisterResponse registerResponse) throws Exception { // todo 2.注册完成之后,更新注册UI } }) // todo 3.马上去登录服务器操作 .observeOn(Schedulers.io()) // 给下面分配了异步线程 .flatMap(new Function<RegisterResponse, ObservableSource<LoginResponse>>() { // todo 4 @Override public ObservableSource<LoginResponse> apply(RegisterResponse registerResponse) throws Exception { Observable<LoginResponse> loginResponseObservable = MyRetrofit.createRetrofit().create(IReqeustNetwor.class) .loginAction(new LoginReqeust()); return loginResponseObservable; } }) .observeOn(AndroidSchedulers.mainThread()) // 给下面 执行主线程 .subscribe(new Observer<LoginResponse>() { // 一定是主线程,为什么,因为 subscribe 马上调用onSubscribe @Override public void onSubscribe(Disposable d) { // TODO 1 progressDialog = new ProgressDialog(RequestActivity.this); progressDialog.show(); // UI 操作 disposable = d; } @Override public void onNext(LoginResponse loginResponse) { // todo 5 // TODO 4.登录完成之后,更新登录的UI } @Override public void onError(Throwable e) { } // todo 6 @Override public void onComplete() { if (progressDialog != null) { progressDialog.dismiss(); } } }); // 记得销毁 @Override protected void onDestroy() { super.onDestroy(); // 必须这样写,最起码的标准 if (disposable != null) if (!disposable.isDisposed()) disposable.dispose(); }
5小技巧tips:封装分配线程代码
subscribeOn(Schedulers.io()) // 给上面代码分配异步线程 .observeOn(AndroidSchedulers.mainThread())// 给下面代码分配主线程
public final static <UD> ObservableTransformer<UD, UD> rxud() { return new ObservableTransformer<UD, UD>() { @Override public ObservableSource<UD> apply(Observable<UD> upstream) { return upstream.subscribeOn(Schedulers.io()) // 给上面代码分配异步线程 .observeOn(AndroidSchedulers.mainThread()) // 给下面代码分配主线程; /* .map(new Function<UD, UD>() { @Override public UD apply(UD ud) throws Exception { Log.d(TAG, "apply: 我监听到你了,居然再执行"); return ud; } });*/ } }; }
subscribeOn() 多个只有第一个生效
.observeOn()多个都可生效,对下面一个生效
RxJava模式与原理(标准观察者与RxJava观察者,map变换操作符原理)
在讲rxjava源码分析前先讲一个知识点:RxJava Hook
全局监听rxjava:使用代码如下:
public class SourceActivity1 extends AppCompatActivity { @Override protected void onCreate(Bundle savedInstanceState) { super.onCreate(savedInstanceState); setContentView(R.layout.activity_main); //监听全局rxjava 使用 RxJavaPlugins.setOnObservableAssembly(new Function<Observable, Observable>() { @Override public Observable apply(Observable observable) throws Exception { //拦截做处理,类似钩子功能 return observable; } }); testHook(); } private void testHook() { Observable.create(new ObservableOnSubscribe<Object>() { @Override public void subscribe(ObservableEmitter<Object> e) throws Exception { e.onNext("A"); } }).map(new Function<Object, Object>() { @Override public Object apply(Object o) throws Exception { return o; } }).subscribe(new Observer<Object>() { @Override public void onSubscribe(Disposable d) { } @Override public void onNext(Object o) { } @Override public void onError(Throwable e) { } @Override public void onComplete() { } }); } }
分析:在我们使用rxjava中如上代码中的testhook()方法, RxJavaPlugins.setOnObservableAssembly(Function funtion)设置拦截就可以全局监听rxjava使用,再此拦截的回调做我们自己所要的逻辑。这个场景有点类似动态插入我们要做的代码,然后执行对应的rxjava代码。
分析原理:我们在rxjava操作符方法里会看到如下代码:如上面的testhook方法中的create、map 最终都会调用对应的RxJavaPlugins.onAssembly方法,如下:
可以看到这个setOnObservableAssembly()方法就是全局赋值,如果onObservableAssembly没有设置赋值,那么上面的的RxJavaPlugins.onAssembly(new ObservableCreate(source))就执行原有代码,如果设置赋值,那么会执行我们设置的回调执行apply方法,再执行原有代码逻辑,这个就是它的原理。
RxJava的观察者模式源码分析:
TODO RxJava的观察者模式
1:Observer 源码看看
2:Observable创建过程 源码分析
3:subscribe订阅过程 源码分析
1 Observer源码很简单,就是自定义实现Observer接口,一般我们都是直接new Observer()实现对应的onSubscribe(),onNext(@NonNull T t); onError(@NonNull Throwable e), onComplete()方法
2 Observable创建过程:create方法返回ObservableCreate对象,同时将自定义source 传入
3subscribe订阅过程
调用subscribeActual(observer);此处observer是自定义实现Observer接口, subscribeActual实现方法为ObservableCreate.subscribeActual(observer) 做了事:
1 可以看到是构建发射器,将自定义的观察者observer传入持有
2 observer.onSubscribe(parent);这就是为什么一订阅马上会调用new Observer()中的 onSubscribe()方法
3 source.subscribe(parent); 此处source是自定义source,将发射器传入,具体实现就是自定义source的subscribe(CreateEmitter t)方法
然后,即如下自定义source的subscribe实现 emitter.onNext(“A”);, 执行的就是:ObservableCreate中的 observer.onNext(t);即调用自定义观察者observer的onNext方法
整块代码如下
/** * TODO RxJava的观察者模式 * 1:Observer 源码看看 * 2:Observable创建过程 源码分析 * 3:subscribe订阅过程 源码分析 */ public class SourceActivity2 extends AppCompatActivity { @Override protected void onCreate(Bundle savedInstanceState) { super.onCreate(savedInstanceState); // 结论: new ObservableCreate() {source == 自定义source} // 2:Observable创建过程 源码分析 Observable.create( // 自定义source new ObservableOnSubscribe<String>() { @Override public void subscribe(ObservableEmitter<String> emitter) throws Exception { // 发射器.onNext emitter.onNext("A"); } }) // 3:subscribe订阅过程 源码分析 // ObservableCreate. subscribe .subscribe( // 自定义观察者 // 1:Observer 源码看看 new Observer<String>() { @Override public void onSubscribe(Disposable d) { } @Override public void onNext(String s) { } @Override public void onError(Throwable e) { } @Override public void onComplete() { } }); } }
1 ObservableCreate(source)、自定义suroce
2 ObservableMap 创建:new ObservableMap<T, R>(ObservableCreate(source), mapper自定义)
3订阅ObservableMap.subscribe()
实际调用source.subscribe(new MapObserver<T, U>(observer自定义, mapper自定义));
实际调用ObservableCreate(source).subscribe(new MapObserver(observer自定义, mapper自定义));
具体实现ObservableCreate类中的subscribeActual()方法
source.subscribe(new CreateEmitter(new MapObserver(observer自定义, mapper自定义)));//
4、emitter.onNext();调用相当于调用MapObserver.onNext()
通过我们自定义mapper的apply()方法完成转换,然后传递出去。
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。