今天,咱們繼續跟着 RxJava-Android-Samples 的腳步,一塊兒看一下RxJava2
在實戰當中的應用,在這個項目中,第二個的例子的描述以下: java
2s
內連續點擊了一個按鈕五次,那麼咱們只會收到一個「你點擊了該按鈕五次」的時間,而不是五個"你點擊了該按鈕"的事件。這個示例的目的是讓咱們學會如何應用
buffer
操做符。
仔細思考了一下,在平時的項目中,咱們彷佛不會遇到須要統計一段時間內用戶點擊了多少次按鈕這種需求。git
可是,咱們有時候會須要計算一段時間內的平均數據,例如統計一段時間內的平均溫度,或者統計一段時間內的平均位置。在接觸RxJava
以前,咱們通常會將這段時間內統計到的數據都暫時存起來,等到須要更新的時間點到了以後,再把這些數據結合起來,計算這些數據的平均值。github
如今,咱們就來看一下,用RxJava2
如何去實現這個需求。bash
這裏,咱們經過一個Handler
循環地發送消息,實現間隔必定時間進行溫度的測量,可是在測量以後,咱們並不實時地更新界面的溫度顯示,而是每隔3s
統計一次過去這段時間內的平均溫度。dom
public class BufferActivity extends AppCompatActivity {
private PublishSubject<Double> mPublishSubject;
private CompositeDisposable mCompositeDisposable;
private TextView mTv;
private SourceHandler mSourceHandler;
@Override
protected void onCreate(Bundle savedInstanceState) {
super.onCreate(savedInstanceState);
setContentView(R.layout.activity_buffer);
mTv = (TextView) findViewById(R.id.tv_buffer);
mPublishSubject = PublishSubject.create();
DisposableObserver<List<Double>> disposableObserver = new DisposableObserver<List<Double>>() {
@Override
public void onNext(List<Double> o) {
double result = 0;
if (o.size() > 0) {
for (Double d : o) {
result += d;
}
result = result / o.size();
}
Log.d("BufferActivity", "更新平均溫度:" + result);
mTv.setText("過去3秒收到了" + o.size() + "個數據, 平均溫度爲:" + result);
}
@Override
public void onError(Throwable throwable) {
}
@Override
public void onComplete() {
}
};
mPublishSubject.buffer(3000, TimeUnit.MILLISECONDS).observeOn(AndroidSchedulers.mainThread()).subscribe(disposableObserver);
mCompositeDisposable = new CompositeDisposable();
mCompositeDisposable.add(disposableObserver);
//開始測量溫度。
mSourceHandler = new SourceHandler();
mSourceHandler.sendEmptyMessage(0);
}
public void updateTemperature(double temperature) {
Log.d("BufferActivity", "溫度測量結果:" + temperature);
mPublishSubject.onNext(temperature);
}
private class SourceHandler extends Handler {
@Override
public void handleMessage(Message msg) {
super.handleMessage(msg);
double temperature = Math.random() * 25 + 5;
updateTemperature(temperature);
//循環地發送。
sendEmptyMessageDelayed(0, 250 + (long) (250 * Math.random()));
}
}
@Override
protected void onDestroy() {
super.onDestroy();
mSourceHandler.removeCallbacksAndMessages(null);
mCompositeDisposable.clear();
}
}
複製代碼
實際的運行結果爲: ide
在上面的例子中,咱們使用了buffer(int time, Unit timeUnit)
,其原理圖以下所示: 函數
mPublishSubject.onNext(temperature);
複製代碼
事件並不會直接傳遞到Observer
的onNext
方法中,而是放在緩衝區中,直到事件到以後,再將全部在這段緩衝事件內放入緩衝區中的值,放在一個List
中一塊兒發送到下游。學習
關於Buffer
的其它用法,這篇文章寫得很全,我這裏就不詳細贅述了,你們能夠參考:RxJava 的學習之變換操做符 - Buffer。spa