var observable = Rx.Observable.create(function(observer) { observer.onNext('Simon'); observer.onNext('Jen'); observer.onNext('Sergi'); observer.onCompleted(); // 成功完成 });
var observer = Rx.Observer.create( function onNext(x) { console.log('Next: ' + x); }, function onError(err) { console.log('Error: ' + err); }, function onCompleted() { console.log('Completed'); } );
function get(url) { return Rx.Observable.create(function(observer) { // Make a traditional Ajax request var req = new XMLHttpRequest(); req.open('GET', url); req.onload = function() { if (req.status == 200) { // If the status is 200, meaning there have been no problems, // Yield the result to listeners and complete the sequence observer.onNext(req.response); observer.onCompleted(); } else { // Otherwise, signal to listeners that there has been an error observer.onError(new Error(req.statusText)); } }; req.onerror = function() { observer.onError(new Error("Unknown Error")); }; req.send(); }); } // Create an Ajax Observable var test = get('/api/contents.json');
在上面的代碼中,使用create封裝XMLHttpRequest的get函數,若是HTTP GET請求成功了,咱們將發送它的內容並完成那個序列(咱們的observable講僅僅發送一個結果),不然咱們將發射一個錯誤。在最後一行,我是使用一個特定的url去調用這個函數。這將會產生一個observable,可是它不會發出任何請求,這個很重要:observable不會作任何事,直到最少有一個observer訂閱它,因此讓咱們接着以下:ajax
// Subscribe an Observer to it test.subscribe( function onNext(x) { console.log('Result: ' + x); }, function onError(err) { console.log('Error: ' + err); }, function onCompleted() { console.log('Completed'); } );
Rx.DOM.get('/api/contents.json').subscribe( function onNext(data) { console.log(data.response); }, function onError(err) { console.error(err); } );
Rx.Observable .from(['Adrià', 'Jen', 'Sergi']) .subscribe( function(x) { console.log('Next: ' + x); }, function(err) { console.log('Error:', err); } function() { console.log('Completed'); } );
var allMoves = Rx.Observable.fromEvent(document, 'mousemove'); allMoves.subscribe(function(e) { console.log(e.clientX, e.clientY); });
var movesOnTheRight = allMoves.filter(function(e) { return e.clientX > window.innerWidth / 2; }); var movesOnTheLeft = allMoves.filter(function(e) { return e.clientX < window.innerWidth / 2; }); movesOnTheRight.subscribe(function(e) { console.log('Mouse is on the right:', e.clientX); }); movesOnTheLeft.subscribe(function(e) { console.log('Mouse is on the left:', e.clientX); });
var Rx = require('rx'); // Load RxJS var fs = require('fs'); // Load Node.js Filesystem module // Create an Observable from the readdir method var readdir = Rx.Observable.fromNodeCallback(fs.readdir); // Send a delayed message var source = readdir('/Users/sergi'); var subscription = source.subscribe( function(res) { console.log('List of directories: ' + res); }, function(err) { console.log('Error: ' + err); }, function() { console.log('Done!'); });