当前位置:   article > 正文

RxJava详解(基于2.X版本的背压详解)(下)_io.reactivex.android.schedulers.androidschedulers

io.reactivex.android.schedulers.androidschedulers

RxJava背压基础讲解

https://blog.csdn.net/weixin_37730482/article/details/78085190

 

 

本章节继续讲述RxJava的背压问题

 

 

 

 

 

一.控制 观察者 接收事件的速度

 

<1> 前言

 

被观察者发送消息的速度和观察者接收消息的速度不匹配时,可以控制观察者接收消息的速度


原理是 不管被观察者发送了多少条数据,观察者仅根据自身情况选择接收几条消息。哪怕被观察者发送量100条数据,若观察者仅需要两条消息,则只接收两条消息。忽略另外98条消息。

 

 

 

<2> 代码

  1. package com.example.mydemo.rxjava;
  2. import android.os.Bundle;
  3. import android.util.Log;
  4. import androidx.appcompat.app.AppCompatActivity;
  5. import com.example.mydemo.R;
  6. import org.reactivestreams.Subscriber;
  7. import org.reactivestreams.Subscription;
  8. import io.reactivex.BackpressureStrategy;
  9. import io.reactivex.Flowable;
  10. import io.reactivex.FlowableEmitter;
  11. import io.reactivex.FlowableOnSubscribe;
  12. import io.reactivex.android.schedulers.AndroidSchedulers;
  13. import io.reactivex.schedulers.Schedulers;
  14. public class RxJavaActivity extends AppCompatActivity {
  15. private Subscription mSubscription;
  16. @Override
  17. protected void onCreate(Bundle savedInstanceState) {
  18. super.onCreate(savedInstanceState);
  19. setContentView(R.layout.activity_rxjava);
  20. method();
  21. }
  22. /**
  23. * RxJava Flowable创建被观察者 背压
  24. */
  25. public void method() {
  26. Flowable.create(new FlowableOnSubscribe<Object>() {
  27. @Override
  28. public void subscribe(FlowableEmitter<Object> e) throws Exception {
  29. if (null == e) {
  30. return;
  31. }
  32. /**
  33. * 测试 被观察者发送五条消息
  34. * */
  35. if (!e.isCancelled()) {
  36. for (int i = 0; i < 5; i++) {
  37. e.onNext("Flowable创建被观察者 被观察者发送第 " + i + " 波数据");
  38. }
  39. e.onComplete();
  40. }
  41. }
  42. }, BackpressureStrategy.LATEST)
  43. .subscribeOn(Schedulers.io())//被观察者在io线程中进行
  44. .observeOn(AndroidSchedulers.mainThread())//观察者在主线程中进行
  45. .subscribe(new Subscriber<Object>() {
  46. @Override
  47. public void onSubscribe(Subscription s) {
  48. if (null == s) {
  49. return;
  50. }
  51. mSubscription = s;
  52. mSubscription.request(4);//测试 被观察者发送五条消息 观察者只接收四条
  53. }
  54. @Override
  55. public void onNext(Object o) {
  56. if (null == o) {
  57. return;
  58. }
  59. Log.d("TAG", "Flowable创建被观察者 观察者 onNext方法 o.toString()----:" + o.toString());
  60. }
  61. @Override
  62. public void onError(Throwable t) {
  63. if (null == t) {
  64. return;
  65. }
  66. Log.d("TAG", "Flowable创建被观察者 观察者 onError方法 t.toString()----:" + t.toString());
  67. }
  68. @Override
  69. public void onComplete() {
  70. Log.d("TAG", "Flowable创建被观察者 观察者 onComplete方法 ");
  71. }
  72. });
  73. }
  74. /**
  75. * onDestroy方法 取消订阅
  76. */
  77. @Override
  78. protected void onDestroy() {
  79. super.onDestroy();
  80. if (null != mSubscription) {
  81. mSubscription.cancel();
  82. Log.d("TAG", "退出页面 取消订阅");
  83. }
  84. }
  85. }

 

 

 

<3> 结果

  1. D/TAG: Flowable创建被观察者 观察者 onNext方法 o.toString()----:Flowable创建被观察者 被观察者发送第 0 波数据
  2. D/TAG: Flowable创建被观察者 观察者 onNext方法 o.toString()----:Flowable创建被观察者 被观察者发送第 1 波数据
  3. D/TAG: Flowable创建被观察者 观察者 onNext方法 o.toString()----:Flowable创建被观察者 被观察者发送第 2 波数据
  4. D/TAG: Flowable创建被观察者 观察者 onNext方法 o.toString()----:Flowable创建被观察者 被观察者发送第 3 波数据

 

 

<4> 说明

被观察者发送五条消息。观察者通过onSubscribe方法中参数Subscription的request(number)方法设置只接收number即四条消息。

 

 

 

<5> 重要说明&异步订阅

Flowable抽象类创建被观察者方式  如果被观察者和观察者是异步订阅的 如果不设置

mSubscription.request(N);//设置 观察者只接收N条数据

则说明 观察者不接收消息。此时 被观察者仍然可以发送消息。并将消息存在缓存区等待观察者需要时再取出。即调用

mSubscription.request(N);//设置 观察者只接收N条数据

方法。

 

(1) 代码

  1. package com.example.mydemo.rxjava;
  2. import android.os.Bundle;
  3. import android.util.Log;
  4. import android.view.View;
  5. import android.widget.TextView;
  6. import androidx.appcompat.app.AppCompatActivity;
  7. import com.example.mydemo.R;
  8. import org.reactivestreams.Subscriber;
  9. import org.reactivestreams.Subscription;
  10. import io.reactivex.BackpressureStrategy;
  11. import io.reactivex.Flowable;
  12. import io.reactivex.FlowableEmitter;
  13. import io.reactivex.FlowableOnSubscribe;
  14. import io.reactivex.android.schedulers.AndroidSchedulers;
  15. import io.reactivex.schedulers.Schedulers;
  16. public class RxJavaActivity extends AppCompatActivity {
  17. private Subscription mSubscription;
  18. @Override
  19. protected void onCreate(Bundle savedInstanceState) {
  20. super.onCreate(savedInstanceState);
  21. setContentView(R.layout.activity_rxjava);
  22. findView();
  23. method();
  24. }
  25. /**
  26. * 初始化各种View
  27. */
  28. private void findView() {
  29. TextView textView = findViewById(R.id.activity_rxjava_textview);
  30. textView.setOnClickListener(new View.OnClickListener() {
  31. @Override
  32. public void onClick(View v) {
  33. if (null != mSubscription) {
  34. mSubscription.request(4);//观察者只接收四条
  35. }
  36. }
  37. });
  38. }
  39. /**
  40. * RxJava Flowable创建被观察者 背压
  41. */
  42. public void method() {
  43. Flowable.create(new FlowableOnSubscribe<Object>() {
  44. @Override
  45. public void subscribe(FlowableEmitter<Object> e) throws Exception {
  46. if (null == e) {
  47. return;
  48. }
  49. /**
  50. * 测试 被观察者发送五条消息
  51. * */
  52. if (!e.isCancelled()) {
  53. for (int i = 0; i < 5; i++) {
  54. e.onNext("Flowable创建被观察者 被观察者发送第 " + i + " 波数据");
  55. }
  56. e.onComplete();
  57. }
  58. }
  59. }, BackpressureStrategy.LATEST)
  60. .subscribeOn(Schedulers.io())//被观察者在io线程中进行
  61. .observeOn(AndroidSchedulers.mainThread())//观察者在主线程中进行
  62. .subscribe(new Subscriber<Object>() {
  63. @Override
  64. public void onSubscribe(Subscription s) {
  65. if (null == s) {
  66. return;
  67. }
  68. mSubscription = s;
  69. }
  70. @Override
  71. public void onNext(Object o) {
  72. if (null == o) {
  73. return;
  74. }
  75. Log.d("TAG", "Flowable创建被观察者 观察者 onNext方法 o.toString()----:" + o.toString());
  76. }
  77. @Override
  78. public void onError(Throwable t) {
  79. if (null == t) {
  80. return;
  81. }
  82. Log.d("TAG", "Flowable创建被观察者 观察者 onError方法 t.toString()----:" + t.toString());
  83. }
  84. @Override
  85. public void onComplete() {
  86. Log.d("TAG", "Flowable创建被观察者 观察者 onComplete方法 ");
  87. }
  88. });
  89. }
  90. /**
  91. * onDestroy方法 取消订阅
  92. */
  93. @Override
  94. protected void onDestroy() {
  95. super.onDestroy();
  96. if (null != mSubscription) {
  97. mSubscription.cancel();
  98. Log.d("TAG", "退出页面 取消订阅");
  99. }
  100. }
  101. }

 

(2) 结果

默认

无打印 即无消息接收

 

点击按钮 点击第一次 取出四条消息

  1. D/TAG: Flowable创建被观察者 观察者 onNext方法 o.toString()----:Flowable创建被观察者 被观察者发送第 0 波数据
  2. D/TAG: Flowable创建被观察者 观察者 onNext方法 o.toString()----:Flowable创建被观察者 被观察者发送第 1 波数据
  3. D/TAG: Flowable创建被观察者 观察者 onNext方法 o.toString()----:Flowable创建被观察者 被观察者发送第 2 波数据
  4. D/TAG: Flowable创建被观察者 观察者 onNext方法 o.toString()----:Flowable创建被观察者 被观察者发送第 3 波数据

 

点击按钮 点击第二次 取出剩余消息并执行onComplete方法

  1. D/TAG: Flowable创建被观察者 观察者 onNext方法 o.toString()----:Flowable创建被观察者 被观察者发送第 4 波数据
  2. D/TAG: Flowable创建被观察者 观察者 onComplete方法

 

点击按钮 点击第三四五...次 

无打印 即无消息接收。

 

 

 

 

<6> 重要说明&同步订阅

 

同步订阅,即被观察者和观察者处在同一线程中同步订阅关系中没有缓存区

 

同步订阅&异步订阅区别

订阅关系说明
同步订阅观察者和被观察者处在同一线程。被观察者每发送一条消息。必须等到观察者接收后才会发送下一条消息。
异步订阅观察者和被观察者处在不同的线程。被观察者不需要等待观察者处理消息后再发送下一条消息,而是不断发送直到发送消息完毕。

 

 

 

 

 

 

 

 代码

 

[1] 被观察者发送的消息数和观察者设置接收的消息数一致时

 

(1) 代码

  1. package com.example.mydemo.rxjava;
  2. import android.os.Bundle;
  3. import android.util.Log;
  4. import androidx.appcompat.app.AppCompatActivity;
  5. import com.example.mydemo.R;
  6. import org.reactivestreams.Subscriber;
  7. import org.reactivestreams.Subscription;
  8. import io.reactivex.BackpressureStrategy;
  9. import io.reactivex.Flowable;
  10. import io.reactivex.FlowableEmitter;
  11. import io.reactivex.FlowableOnSubscribe;
  12. public class RxJavaActivity extends AppCompatActivity {
  13. private Subscription mSubscription;
  14. @Override
  15. protected void onCreate(Bundle savedInstanceState) {
  16. super.onCreate(savedInstanceState);
  17. setContentView(R.layout.activity_rxjava);
  18. method();
  19. }
  20. /**
  21. * RxJava Flowable创建被观察者 背压
  22. */
  23. public void method() {
  24. Flowable.create(new FlowableOnSubscribe<Object>() {
  25. @Override
  26. public void subscribe(FlowableEmitter<Object> e) throws Exception {
  27. if (null == e) {
  28. return;
  29. }
  30. /**
  31. * 测试 被观察者发送五条消息
  32. * */
  33. if (!e.isCancelled()) {
  34. for (int i = 0; i < 5; i++) {
  35. e.onNext("Flowable创建被观察者 被观察者发送第 " + i + " 波数据");
  36. Log.d("TAG", "Flowable创建被观察者 被观察者发送数据 i----:" + i);
  37. }
  38. e.onComplete();
  39. }
  40. }
  41. }, BackpressureStrategy.LATEST)
  42. .subscribe(new Subscriber<Object>() {
  43. @Override
  44. public void onSubscribe(Subscription s) {
  45. if (null == s) {
  46. return;
  47. }
  48. mSubscription = s;
  49. mSubscription.request(5);//观察者接收五条
  50. }
  51. @Override
  52. public void onNext(Object o) {
  53. if (null == o) {
  54. return;
  55. }
  56. Log.d("TAG", "Flowable创建被观察者 观察者 onNext方法 o.toString()----:" + o.toString());
  57. }
  58. @Override
  59. public void onError(Throwable t) {
  60. if (null == t) {
  61. return;
  62. }
  63. Log.d("TAG", "Flowable创建被观察者 观察者 onError方法 t.toString()----:" + t.toString());
  64. }
  65. @Override
  66. public void onComplete() {
  67. Log.d("TAG", "Flowable创建被观察者 观察者 onComplete方法 ");
  68. }
  69. });
  70. }
  71. /**
  72. * onDestroy方法 取消订阅
  73. */
  74. @Override
  75. protected void onDestroy() {
  76. super.onDestroy();
  77. if (null != mSubscription) {
  78. mSubscription.cancel();
  79. Log.d("TAG", "退出页面 取消订阅");
  80. }
  81. }
  82. }

 

 

(2) 结果

  1. D/TAG: Flowable创建被观察者 观察者 onNext方法 o.toString()----:Flowable创建被观察者 被观察者发送第 0 波数据
  2. D/TAG: Flowable创建被观察者 被观察者发送数据 i----:0
  3. D/TAG: Flowable创建被观察者 观察者 onNext方法 o.toString()----:Flowable创建被观察者 被观察者发送第 1 波数据
  4. D/TAG: Flowable创建被观察者 被观察者发送数据 i----:1
  5. D/TAG: Flowable创建被观察者 观察者 onNext方法 o.toString()----:Flowable创建被观察者 被观察者发送第 2 波数据
  6. D/TAG: Flowable创建被观察者 被观察者发送数据 i----:2
  7. D/TAG: Flowable创建被观察者 观察者 onNext方法 o.toString()----:Flowable创建被观察者 被观察者发送第 3 波数据
  8. D/TAG: Flowable创建被观察者 被观察者发送数据 i----:3
  9. D/TAG: Flowable创建被观察者 观察者 onNext方法 o.toString()----:Flowable创建被观察者 被观察者发送第 4 波数据
  10. D/TAG: Flowable创建被观察者 被观察者发送数据 i----:4
  11. D/TAG: Flowable创建被观察者 观察者 onComplete方法

 

(3) 综上所述,同步订阅情况下不会出现被观察者发送事件速度 > 观察者接收事件速度的情况。可是,却会出现被观察者发送事件数量 > 观察者接收事件数量的问题。

 

 

[2] 被观察者发送的消息数和观察者设置接收的消息数不一致时

 

(1) 代码

  1. package com.example.mydemo.rxjava;
  2. import android.os.Bundle;
  3. import android.util.Log;
  4. import androidx.appcompat.app.AppCompatActivity;
  5. import com.example.mydemo.R;
  6. import org.reactivestreams.Subscriber;
  7. import org.reactivestreams.Subscription;
  8. import io.reactivex.BackpressureStrategy;
  9. import io.reactivex.Flowable;
  10. import io.reactivex.FlowableEmitter;
  11. import io.reactivex.FlowableOnSubscribe;
  12. public class RxJavaActivity extends AppCompatActivity {
  13. private Subscription mSubscription;
  14. @Override
  15. protected void onCreate(Bundle savedInstanceState) {
  16. super.onCreate(savedInstanceState);
  17. setContentView(R.layout.activity_rxjava);
  18. method();
  19. }
  20. /**
  21. * RxJava Flowable创建被观察者 背压
  22. */
  23. public void method() {
  24. Flowable.create(new FlowableOnSubscribe<Object>() {
  25. @Override
  26. public void subscribe(FlowableEmitter<Object> e) throws Exception {
  27. if (null == e) {
  28. return;
  29. }
  30. /**
  31. * 测试 被观察者发送五条消息
  32. * */
  33. if (!e.isCancelled()) {
  34. for (int i = 0; i < 5; i++) {
  35. e.onNext("Flowable创建被观察者 被观察者发送第 " + i + " 波数据");
  36. Log.d("TAG", "Flowable创建被观察者 被观察者发送数据 i----:" + i);
  37. }
  38. e.onComplete();
  39. }
  40. }
  41. }, BackpressureStrategy.LATEST)
  42. .subscribe(new Subscriber<Object>() {
  43. @Override
  44. public void onSubscribe(Subscription s) {
  45. if (null == s) {
  46. return;
  47. }
  48. mSubscription = s;
  49. mSubscription.request(3);//观察者接收三条
  50. }
  51. @Override
  52. public void onNext(Object o) {
  53. if (null == o) {
  54. return;
  55. }
  56. Log.d("TAG", "Flowable创建被观察者 观察者 onNext方法 o.toString()----:" + o.toString());
  57. }
  58. @Override
  59. public void onError(Throwable t) {
  60. if (null == t) {
  61. return;
  62. }
  63. Log.d("TAG", "Flowable创建被观察者 观察者 onError方法 t.toString()----:" + t.toString());
  64. }
  65. @Override
  66. public void onComplete() {
  67. Log.d("TAG", "Flowable创建被观察者 观察者 onComplete方法 ");
  68. }
  69. });
  70. }
  71. /**
  72. * onDestroy方法 取消订阅
  73. */
  74. @Override
  75. protected void onDestroy() {
  76. super.onDestroy();
  77. if (null != mSubscription) {
  78. mSubscription.cancel();
  79. Log.d("TAG", "退出页面 取消订阅");
  80. }
  81. }
  82. }

即 被观察者发送五条数据 观察者只接收三条消息。

 

 

(2) 结果

  1. D/TAG: Flowable创建被观察者 观察者 onNext方法 o.toString()----:Flowable创建被观察者 被观察者发送第 0 波数据
  2. D/TAG: Flowable创建被观察者 被观察者发送数据 i----:0
  3. D/TAG: Flowable创建被观察者 观察者 onNext方法 o.toString()----:Flowable创建被观察者 被观察者发送第 1 波数据
  4. D/TAG: Flowable创建被观察者 被观察者发送数据 i----:1
  5. D/TAG: Flowable创建被观察者 观察者 onNext方法 o.toString()----:Flowable创建被观察者 被观察者发送第 2 波数据
  6. D/TAG: Flowable创建被观察者 被观察者发送数据 i----:2
  7. D/TAG: Flowable创建被观察者 被观察者发送数据 i----:3
  8. D/TAG: Flowable创建被观察者 被观察者发送数据 i----:4

 

(3) 综上所述 对于没有缓存区概念的同步订阅关系来说,单纯采用控制观察者的接收事件数量(响应式拉取)还是会出现问题。

 

 

 

[3] 点击时再执行观察者的取方法

 

(1) 代码

  1. package com.example.mydemo.rxjava;
  2. import android.os.Bundle;
  3. import android.util.Log;
  4. import android.view.View;
  5. import android.widget.TextView;
  6. import androidx.appcompat.app.AppCompatActivity;
  7. import com.example.mydemo.R;
  8. import org.reactivestreams.Subscriber;
  9. import org.reactivestreams.Subscription;
  10. import io.reactivex.BackpressureStrategy;
  11. import io.reactivex.Flowable;
  12. import io.reactivex.FlowableEmitter;
  13. import io.reactivex.FlowableOnSubscribe;
  14. public class RxJavaActivity extends AppCompatActivity {
  15. private Subscription mSubscription;
  16. @Override
  17. protected void onCreate(Bundle savedInstanceState) {
  18. super.onCreate(savedInstanceState);
  19. setContentView(R.layout.activity_rxjava);
  20. findView();
  21. method();
  22. }
  23. /**
  24. * 初始化各种View
  25. */
  26. private void findView() {
  27. TextView textView = findViewById(R.id.activity_rxjava_textview);
  28. textView.setOnClickListener(new View.OnClickListener() {
  29. @Override
  30. public void onClick(View v) {
  31. if (null != mSubscription) {
  32. mSubscription.request(4);//观察者只接收四条
  33. }
  34. }
  35. });
  36. }
  37. /**
  38. * RxJava Flowable创建被观察者 背压
  39. */
  40. public void method() {
  41. Flowable.create(new FlowableOnSubscribe<Object>() {
  42. @Override
  43. public void subscribe(FlowableEmitter<Object> e) throws Exception {
  44. if (null == e) {
  45. return;
  46. }
  47. /**
  48. * 测试 被观察者发送五条消息
  49. * */
  50. if (!e.isCancelled()) {
  51. for (int i = 0; i < 5; i++) {
  52. e.onNext("Flowable创建被观察者 被观察者发送第 " + i + " 波数据");
  53. }
  54. e.onComplete();
  55. }
  56. }
  57. }, BackpressureStrategy.LATEST)
  58. .subscribe(new Subscriber<Object>() {
  59. @Override
  60. public void onSubscribe(Subscription s) {
  61. if (null == s) {
  62. return;
  63. }
  64. mSubscription = s;
  65. }
  66. @Override
  67. public void onNext(Object o) {
  68. if (null == o) {
  69. return;
  70. }
  71. Log.d("TAG", "Flowable创建被观察者 观察者 onNext方法 o.toString()----:" + o.toString());
  72. }
  73. @Override
  74. public void onError(Throwable t) {
  75. if (null == t) {
  76. return;
  77. }
  78. Log.d("TAG", "Flowable创建被观察者 观察者 onError方法 t.toString()----:" + t.toString());
  79. }
  80. @Override
  81. public void onComplete() {
  82. Log.d("TAG", "Flowable创建被观察者 观察者 onComplete方法 ");
  83. }
  84. });
  85. }
  86. /**
  87. * onDestroy方法 取消订阅
  88. */
  89. @Override
  90. protected void onDestroy() {
  91. super.onDestroy();
  92. if (null != mSubscription) {
  93. mSubscription.cancel();
  94. Log.d("TAG", "退出页面 取消订阅");
  95. }
  96. }
  97. }

 

 

(2) 结果

 

默认

无打印 即无消息接收

 

点击按钮 第一次点击

  1. D/TAG: Flowable创建被观察者 观察者 onNext方法 o.toString()----:Flowable创建被观察者 被观察者发送第 4 波数据
  2. D/TAG: Flowable创建被观察者 观察者 onComplete方法

 

点击按钮第二三四五...次点击

无打印 即无消息接收

 

 

 

 

 

 

 

二.控制 被观察者 发送事件的速度

 

<1> 前言

上述可知可以通过观察者的onSubscribe方法中参数Subscription的request(N)方法控制观察者可接收的消息数,继而控制观察者接收事件的速度。那么被观察者的发送事件的速度怎么控制呢?

 

源码分析

 

create操作符创建被观察者 重写 subscribe方法 该方法参数FlowableEmitter<Object> e 点击

  1. public interface FlowableEmitter<T> extends Emitter<T> {
  2. /**
  3. * Sets a Disposable on this emitter; any previous Disposable
  4. * or Cancellation will be unsubscribed/cancelled.
  5. * @param s the disposable, null is allowed
  6. */
  7. void setDisposable(Disposable s);
  8. /**
  9. * Sets a Cancellable on this emitter; any previous Disposable
  10. * or Cancellation will be unsubscribed/cancelled.
  11. * @param c the cancellable resource, null is allowed
  12. */
  13. void setCancellable(Cancellable c);
  14. /**
  15. * The current outstanding request amount.
  16. * <p>This method is thread-safe.
  17. * @return the current outstanding request amount
  18. */
  19. long requested();
  20. /**
  21. * Returns true if the downstream cancelled the sequence.
  22. * <p>This method is thread-safe.
  23. * @return true if the downstream cancelled the sequence
  24. */
  25. boolean isCancelled();
  26. /**
  27. * Ensures that calls to onNext, onError and onComplete are properly serialized.
  28. * @return the serialized FlowableEmitter
  29. */
  30. FlowableEmitter<T> serialize();
  31. }

 

可以看到 此参数可以判断是否取消订阅  isCancelled()方法。而request()方法可以获取被观察者中的消息数量。

 

 

<2> 同步订阅 控制被观察者发送消息速度

 

(1) 简介

同步订阅中,被观察者和观察者处在同一线程。被观察者中request方法得到的数等于观察者中Subscription.request()方法中设置的数

 

 

(2) 代码

  1. package com.example.mydemo.rxjava;
  2. import android.os.Bundle;
  3. import android.util.Log;
  4. import androidx.appcompat.app.AppCompatActivity;
  5. import com.example.mydemo.R;
  6. import org.reactivestreams.Subscriber;
  7. import org.reactivestreams.Subscription;
  8. import io.reactivex.BackpressureStrategy;
  9. import io.reactivex.Flowable;
  10. import io.reactivex.FlowableEmitter;
  11. import io.reactivex.FlowableOnSubscribe;
  12. public class RxJavaActivity extends AppCompatActivity {
  13. private Subscription mSubscription;
  14. @Override
  15. protected void onCreate(Bundle savedInstanceState) {
  16. super.onCreate(savedInstanceState);
  17. setContentView(R.layout.activity_rxjava);
  18. method();
  19. }
  20. /**
  21. * RxJava Flowable创建被观察者 背压
  22. */
  23. public void method() {
  24. Flowable.create(new FlowableOnSubscribe<Object>() {
  25. @Override
  26. public void subscribe(FlowableEmitter<Object> e) throws Exception {
  27. if (null == e) {
  28. return;
  29. }
  30. long msgNumber = e.requested();
  31. Log.d("TAG", "Flowable创建被观察者 被观察者 msgNumber----:" + msgNumber);
  32. if (!e.isCancelled()) {
  33. for (int i = 0; i < msgNumber; i++) {
  34. e.onNext("Flowable创建被观察者 被观察者发送第 " + i + " 波数据");
  35. }
  36. e.onComplete();
  37. }
  38. }
  39. }, BackpressureStrategy.LATEST)
  40. .subscribe(new Subscriber<Object>() {
  41. @Override
  42. public void onSubscribe(Subscription s) {
  43. if (null == s) {
  44. return;
  45. }
  46. mSubscription = s;
  47. mSubscription.request(5);//观察者设置可接收五条消息
  48. }
  49. @Override
  50. public void onNext(Object o) {
  51. if (null == o) {
  52. return;
  53. }
  54. Log.d("TAG", "Flowable创建被观察者 观察者 onNext方法 o.toString()----:" + o.toString());
  55. }
  56. @Override
  57. public void onError(Throwable t) {
  58. if (null == t) {
  59. return;
  60. }
  61. Log.d("TAG", "Flowable创建被观察者 观察者 onError方法 t.toString()----:" + t.toString());
  62. }
  63. @Override
  64. public void onComplete() {
  65. Log.d("TAG", "Flowable创建被观察者 观察者 onComplete方法 ");
  66. }
  67. });
  68. }
  69. /**
  70. * onDestroy方法 取消订阅
  71. */
  72. @Override
  73. protected void onDestroy() {
  74. super.onDestroy();
  75. if (null != mSubscription) {
  76. mSubscription.cancel();
  77. Log.d("TAG", "退出页面 取消订阅");
  78. }
  79. }
  80. }

 

 

(3) 结果

  1. D/TAG: Flowable创建被观察者 被观察者 msgNumber----:5
  2. D/TAG: Flowable创建被观察者 观察者 onNext方法 o.toString()----:Flowable创建被观察者 被观察者发送第 0 波数据
  3. D/TAG: Flowable创建被观察者 观察者 onNext方法 o.toString()----:Flowable创建被观察者 被观察者发送第 1 波数据
  4. D/TAG: Flowable创建被观察者 观察者 onNext方法 o.toString()----:Flowable创建被观察者 被观察者发送第 2 波数据
  5. D/TAG: Flowable创建被观察者 观察者 onNext方法 o.toString()----:Flowable创建被观察者 被观察者发送第 3 波数据
  6. D/TAG: Flowable创建被观察者 观察者 onNext方法 o.toString()----:Flowable创建被观察者 被观察者发送第 4 波数据
  7. D/TAG: Flowable创建被观察者 观察者 onComplete方法

 

 

(4) 说明

{1} 被观察者的 e.requested();方法具有实时性。即被观察者发送了一条数据 e.requested();方法就会减一。

 

代码 在上基础上改动

  1. long msgNumber = e.requested();
  2. Log.d("TAG", "Flowable创建被观察者 被观察者 msgNumber----:" + msgNumber);
  3. if (!e.isCancelled()) {
  4. for (int i = 0; i < msgNumber; i++) {
  5. e.onNext("Flowable创建被观察者 被观察者发送第 " + i + " 波数据");
  6. Log.d("TAG", "Flowable创建被观察者 被观察者 发送了i后e.requested()----:" + e.requested());
  7. }
  8. e.onComplete();
  9. }

 

结果

  1. D/TAG: Flowable创建被观察者 被观察者 msgNumber----:5
  2. D/TAG: Flowable创建被观察者 观察者 onNext方法 o.toString()----:Flowable创建被观察者 被观察者发送第 0 波数据
  3. D/TAG: Flowable创建被观察者 被观察者 发送了i后e.requested()----:4
  4. D/TAG: Flowable创建被观察者 观察者 onNext方法 o.toString()----:Flowable创建被观察者 被观察者发送第 1 波数据
  5. D/TAG: Flowable创建被观察者 被观察者 发送了i后e.requested()----:3
  6. D/TAG: Flowable创建被观察者 观察者 onNext方法 o.toString()----:Flowable创建被观察者 被观察者发送第 2 波数据
  7. D/TAG: Flowable创建被观察者 被观察者 发送了i后e.requested()----:2
  8. D/TAG: Flowable创建被观察者 观察者 onNext方法 o.toString()----:Flowable创建被观察者 被观察者发送第 3 波数据
  9. D/TAG: Flowable创建被观察者 被观察者 发送了i后e.requested()----:1
  10. D/TAG: Flowable创建被观察者 观察者 onNext方法 o.toString()----:Flowable创建被观察者 被观察者发送第 4 波数据
  11. D/TAG: Flowable创建被观察者 被观察者 发送了i后e.requested()----:0
  12. D/TAG: Flowable创建被观察者 观察者 onComplete方法

 

{2} 观察者没有设置可接收事件数量,即无调用Subscription.request()。那么被观察者默认观察者可接收事件数量 = 0,即FlowableEmitter.requested()的返回值 = 0。

 

{3} 可叠加性

 

代码

  1. mSubscription.request(5);//第一次设置 观察者设置可接收五条消息
  2. mSubscription.request(10);//第二条设置 观察者设置可接收十条消息

 

结果 5+10=15

D/TAG: Flowable创建被观察者 被观察者 msgNumber----:15

 

 

 

 

<3> 异步订阅 控制被观察者发送消息速度

 

(1) 说明

异步订阅中,被观察者和观察者处在不同的线程。所以被观察者 无法通过 FlowableEmitter.requested()知道观察者自身接收事件能力。即 被观察者不能根据 观察者自身接收事件的能力 控制发送事件的速度。

被观察者中request方法得到的数==128,96,0。 和观察者中Subscription.request()方法中设置的数不相等

 

 

(2) 代码

  1. package com.example.mydemo.rxjava;
  2. import android.os.Bundle;
  3. import android.util.Log;
  4. import androidx.appcompat.app.AppCompatActivity;
  5. import com.example.mydemo.R;
  6. import org.reactivestreams.Subscriber;
  7. import org.reactivestreams.Subscription;
  8. import io.reactivex.BackpressureStrategy;
  9. import io.reactivex.Flowable;
  10. import io.reactivex.FlowableEmitter;
  11. import io.reactivex.FlowableOnSubscribe;
  12. import io.reactivex.android.schedulers.AndroidSchedulers;
  13. import io.reactivex.schedulers.Schedulers;
  14. public class RxJavaActivity extends AppCompatActivity {
  15. private Subscription mSubscription;
  16. @Override
  17. protected void onCreate(Bundle savedInstanceState) {
  18. super.onCreate(savedInstanceState);
  19. setContentView(R.layout.activity_rxjava);
  20. method();
  21. }
  22. /**
  23. * RxJava Flowable创建被观察者 背压
  24. */
  25. public void method() {
  26. Flowable.create(new FlowableOnSubscribe<Object>() {
  27. @Override
  28. public void subscribe(FlowableEmitter<Object> e) throws Exception {
  29. if (null == e) {
  30. return;
  31. }
  32. long msgNumber = e.requested();
  33. Log.d("TAG", "Flowable创建被观察者 被观察者 msgNumber----:" + msgNumber);
  34. if (!e.isCancelled()) {
  35. for (int i = 0; i < msgNumber; i++) {
  36. e.onNext("Flowable创建被观察者 被观察者发送第 " + i + " 波数据");
  37. }
  38. e.onComplete();
  39. }
  40. }
  41. }, BackpressureStrategy.LATEST)
  42. .subscribeOn(Schedulers.io())//被观察者 IO线程
  43. .observeOn(AndroidSchedulers.mainThread())//观察者 UI线程
  44. .subscribe(new Subscriber<Object>() {
  45. @Override
  46. public void onSubscribe(Subscription s) {
  47. if (null == s) {
  48. return;
  49. }
  50. mSubscription = s;
  51. mSubscription.request(5);//观察者设置可接收五条消息
  52. }
  53. @Override
  54. public void onNext(Object o) {
  55. if (null == o) {
  56. return;
  57. }
  58. Log.d("TAG", "Flowable创建被观察者 观察者 onNext方法 o.toString()----:" + o.toString());
  59. }
  60. @Override
  61. public void onError(Throwable t) {
  62. if (null == t) {
  63. return;
  64. }
  65. Log.d("TAG", "Flowable创建被观察者 观察者 onError方法 t.toString()----:" + t.toString());
  66. }
  67. @Override
  68. public void onComplete() {
  69. Log.d("TAG", "Flowable创建被观察者 观察者 onComplete方法 ");
  70. }
  71. });
  72. }
  73. /**
  74. * onDestroy方法 取消订阅
  75. */
  76. @Override
  77. protected void onDestroy() {
  78. super.onDestroy();
  79. if (null != mSubscription) {
  80. mSubscription.cancel();
  81. Log.d("TAG", "退出页面 取消订阅");
  82. }
  83. }
  84. }

 

 

(3) 结果

  1. D/TAG: Flowable创建被观察者 被观察者 msgNumber----:128
  2. D/TAG: Flowable创建被观察者 观察者 onNext方法 o.toString()----:Flowable创建被观察者 被观察者发送第 0 波数据
  3. D/TAG: Flowable创建被观察者 观察者 onNext方法 o.toString()----:Flowable创建被观察者 被观察者发送第 1 波数据
  4. D/TAG: Flowable创建被观察者 观察者 onNext方法 o.toString()----:Flowable创建被观察者 被观察者发送第 2 波数据
  5. D/TAG: Flowable创建被观察者 观察者 onNext方法 o.toString()----:Flowable创建被观察者 被观察者发送第 3 波数据
  6. D/TAG: Flowable创建被观察者 观察者 onNext方法 o.toString()----:Flowable创建被观察者 被观察者发送第 4 波数据

即 观察者设置可以接收的消息数是五条,但是被观察者中请求的消息数是128

 

 

 

 

 

 

 

三.Flowable其他操作符使用背压策略

 

<1> 前言

上述的讲解都是使用的Flowable的create操作符创建的被观察者,那么背压策略可以在第二个参数传入。如下。

  1. Flowable.create(new FlowableOnSubscribe<Object>() {
  2. @Override
  3. public void subscribe(FlowableEmitter<Object> e) throws Exception {
  4. if (null == e) {
  5. return;
  6. }
  7. if (!e.isCancelled()) {
  8. e.onNext("Flowable创建被观察者 被观察者发送第1波数据");
  9. e.onNext("Flowable创建被观察者 被观察者发送第2波数据");
  10. e.onNext("Flowable创建被观察者 被观察者发送第3波数据");
  11. e.onComplete();
  12. }
  13. Log.d("TAG", "Flowable创建被观察者 被观察者 subscribe方法 线程----:" + Thread.currentThread().getName());
  14. }
  15. }, BackpressureStrategy.LATEST);

 

那么使用其他的操作符呢,比如前节讲的interval操作符。下面讲解。

 

 

 

<2> 代码

  1. package com.wjn.rxdemo.rxjava;
  2. import android.os.Bundle;
  3. import android.util.Log;
  4. import androidx.appcompat.app.AppCompatActivity;
  5. import com.wjn.rxdemo.R;
  6. import org.reactivestreams.Subscriber;
  7. import org.reactivestreams.Subscription;
  8. import java.util.concurrent.TimeUnit;
  9. import io.reactivex.Flowable;
  10. import io.reactivex.schedulers.Schedulers;
  11. public class RxJavaActivity extends AppCompatActivity {
  12. private Subscription mSubscription;
  13. @Override
  14. protected void onCreate(Bundle savedInstanceState) {
  15. super.onCreate(savedInstanceState);
  16. setContentView(R.layout.activity_rxjava);
  17. method();
  18. }
  19. /**
  20. * RxJava Flowable创建被观察者 背压
  21. */
  22. private void method() {
  23. Flowable.interval(1, TimeUnit.MILLISECONDS)
  24. .onBackpressureBuffer() // 添加背压策略封装好的方法,此处选择Buffer模式,即缓存区大小无限制
  25. .observeOn(Schedulers.newThread())
  26. .subscribe(new Subscriber<Long>() {
  27. @Override
  28. public void onSubscribe(Subscription s) {
  29. if (null == s) {
  30. return;
  31. }
  32. mSubscription = s;
  33. s.request(Long.MAX_VALUE);//观察者设置可接收最多条数的消息
  34. }
  35. @Override
  36. public void onNext(Long aLong) {
  37. if (null == aLong) {
  38. return;
  39. }
  40. Log.d("TAG", "Flowable创建被观察者 观察者 onNext方法 aLong----:" + aLong);
  41. //模拟1秒接收
  42. try {
  43. Thread.sleep(1000);
  44. } catch (InterruptedException e) {
  45. e.printStackTrace();
  46. }
  47. }
  48. @Override
  49. public void onError(Throwable t) {
  50. if (null == t) {
  51. return;
  52. }
  53. Log.d("TAG", "Flowable创建被观察者 观察者 onError方法 t.toString()----:" + t.toString());
  54. }
  55. @Override
  56. public void onComplete() {
  57. Log.d("TAG", "Flowable创建被观察者 观察者 onComplete方法 ");
  58. }
  59. });
  60. }
  61. /**
  62. * onDestroy方法 取消订阅
  63. */
  64. @Override
  65. protected void onDestroy() {
  66. super.onDestroy();
  67. if (null != mSubscription) {
  68. mSubscription.cancel();
  69. Log.d("TAG", "退出页面 取消订阅");
  70. }
  71. }
  72. }

 

 

<3> 结果

  1. D/TAG: Flowable创建被观察者 观察者 onNext方法 aLong----:0
  2. D/TAG: Flowable创建被观察者 观察者 onNext方法 aLong----:1
  3. D/TAG: Flowable创建被观察者 观察者 onNext方法 aLong----:2
  4. D/TAG: Flowable创建被观察者 观察者 onNext方法 aLong----:3
  5. D/TAG: Flowable创建被观察者 观察者 onNext方法 aLong----:4
  6. D/TAG: Flowable创建被观察者 观察者 onNext方法 aLong----:5

 

 

<4> 关闭页面

D/TAG: 退出页面 取消订阅

 

 

<5> 说明

上述代码中有这样的设置

.onBackpressureBuffer() // 添加背压策略封装好的方法,此处选择Buffer模式,即缓存区大小无限制

那么还有没有其他的设置呢,以及每个设置对应的含义是什么呢。

 

举例含义对create操作符
.onBackpressureBuffer()缓存区满时 将缓存区设置无限大 注意内存溢出BUFFER
.onBackpressureDrop()缓存区满时 事件丢失 保留[0,127]条数据 即 前 128条数据DROP
onBackpressureLatest()缓存区满时 事件丢失 保留[0,127]条数据+超出的最后一条数据

LATEST

 

 

 

 

 

 

 

 

 

 

 

四.总结

<1> RxJava的背压可以使用Flowable创建被观察者来实现。背压的使用大前提是 异步订阅中,观察者和被观察者发送消息的速度或者数量不匹配,一般是被观察者发送的速度太快或者消息太多,观察者来不及处理。

 

<2> 背压一般应用在被观察者和观察者异步订阅时,因为同步订阅正式项目很少使用。(RxJava核心就是异步嘛)。

 

<3> 其实一般使用RxJava实现异步,只要注意在适当的时机取消订阅,防止内存泄漏即可。没必要使用背压。

 

<4> 其实处理被观察者和观察者匹配问题也可以有其他的操作办法,比如使用delay()操作符创建一个延时的被观察者。再比如使用interval操作符创建指定时间的被观察者等等。

 

声明:本文内容由网友自发贡献,不代表【wpsshop博客】立场,版权归原作者所有,本站不承担相应法律责任。如您发现有侵权的内容,请联系我们。转载请注明出处:https://www.wpsshop.cn/w/我家自动化/article/detail/300792
推荐阅读
相关标签
  

闽ICP备14008679号