Rxjs-Observable

Observable

建立一個observable

var observable = Rx.Observable.create(function(observer) {
    observer.onNext('Simon');
    observer.onNext('Jen');
    observer.onNext('Sergi');
    observer.onCompleted(); // 成功完成
});

建立一個observer

var observer = Rx.Observer.create(
    function onNext(x) {
        console.log('Next: ' + x);
    },
    function onError(err) {
        console.log('Error: ' + err);
    },
    function onCompleted() {
        console.log('Completed');
    }
);

observable的方式的ajax請求

function get(url) {
    return Rx.Observable.create(function(observer) {
        // Make a traditional Ajax request
        var req = new XMLHttpRequest();
        req.open('GET', url);
        req.onload = function() {
            if (req.status == 200) {
                // If the status is 200, meaning there have been no problems,
                // Yield the result to listeners and complete the sequence
                observer.onNext(req.response);
                observer.onCompleted();
            } else {
                // Otherwise, signal to listeners that there has been an error
                observer.onError(new Error(req.statusText));
            }
        };
        req.onerror = function() {
            observer.onError(new Error("Unknown Error"));
        };
        req.send();
    });
}
// Create an Ajax Observable
var test = get('/api/contents.json');

 在上面的代碼中,使用create封裝XMLHttpRequest的get函數,若是HTTP GET請求成功了,咱們將發送它的內容並完成那個序列(咱們的observable講僅僅發送一個結果),不然咱們將發射一個錯誤。在最後一行,我是使用一個特定的url去調用這個函數。這將會產生一個observable,可是它不會發出任何請求,這個很重要:observable不會作任何事,直到最少有一個observer訂閱它,因此讓咱們接着以下:ajax

// Subscribe an Observer to it
    test.subscribe(
        function onNext(x) {
            console.log('Result: ' + x);
        },
        function onError(err) {
            console.log('Error: ' + err);
        },
        function onCompleted() {
            console.log('Completed');
        }
    );

操做符(operator)

  • 在rxjs中,改變或者查詢序列(sequence)的方法叫作操做符。
  • Rxjs DOM library提供了好多方法根據dom相關的資源去建立Observable。因此咱們就可使用Rx.DOM.Request.get來處理一個get請求了。
Rx.DOM.get('/api/contents.json').subscribe(
        function onNext(data) {
            console.log(data.response);
        },
        function onError(err) {
            console.error(err);
        }
    );

根據Arrays建立Observable

  • 可使用任何array相似或者Iterable對象,經過多功能的from操做符轉化爲Observable。from操做符使用一個array做爲參數,而且返回一個Observable(發射array的每個元素)。
  • from操做符伴隨着fromEvent,在RxJS中這是最方便和使用頻率最高的操做符。
Rx.Observable
    .from(['Adrià', 'Jen', 'Sergi'])
    .subscribe(
        function(x) {
            console.log('Next: ' + x);
        },
        function(err) {
            console.log('Error:', err);
        }

        function() {
            console.log('Completed');
        }
    );

根據js的Event建立Observable

  • 把一個event轉化爲了一個Observable:
var allMoves = Rx.Observable.fromEvent(document, 'mousemove');
    allMoves.subscribe(function(e) {
        console.log(e.clientX, e.clientY);
    });
  • 把event轉化爲Observable解開了event自己的強制約束。更重要的是,咱們能夠基於原始的Observable建立一個新的Observable,而且這些新的Observable是獨立的,能夠別用於其餘的任務:
var movesOnTheRight = allMoves.filter(function(e) {
        return e.clientX > window.innerWidth / 2;
    });
    var movesOnTheLeft = allMoves.filter(function(e) {
        return e.clientX < window.innerWidth / 2;
    });
    movesOnTheRight.subscribe(function(e) {
        console.log('Mouse is on the right:', e.clientX);
    });
    movesOnTheLeft.subscribe(function(e) {
        console.log('Mouse is on the left:', e.clientX);
    });
  • Observable是不可改變的,每一涉及到它的操做符都是建立的一個新的Observable

根據Callback函數建立Observable

  • 若是使用第三方的js庫基於callback編寫代碼進行交互總有好多意外。使用fromCallback和fromNodeCallback兩個函數咱們能夠把咱們的Callback轉換爲Observable。Node.js老是遵循着調用回調函數首先使用一個error的參數告訴回調函數,發生了錯誤。當咱們使用fromNodeCallback去建立指定的Node.js風格的回調函數:
var Rx = require('rx'); // Load RxJS
    var fs = require('fs'); // Load Node.js Filesystem module
    // Create an Observable from the readdir method
    var readdir = Rx.Observable.fromNodeCallback(fs.readdir);
    // Send a delayed message
    var source = readdir('/Users/sergi');
    var subscription = source.subscribe(
        function(res) {
            console.log('List of directories: ' + res);
        },
        function(err) {
            console.log('Error: ' + err);
        },
        function() {
            console.log('Done!');
        });
  • 在上面的代碼中,咱們建立一個Observable readdir 使用了Node.js的 fs.readdir方法。fs.readdir接受一個目錄路徑和一個回調函數delayMsg,一旦這個目錄內容重置回調函數就會被調用。
  • 咱們使用readdir在一樣的參數時咱們傳給原始的fs.readdir,減去了那個回調函數。它返會一個Observable,能夠合適的使用onNext,onError,onCompleted,當咱們訂閱一個Observer到它的時候。
本站公眾號
   歡迎關注本站公眾號,獲取更多信息