refactor(async): replace RxJS with RxNext

Closes #3110
Closes #4201
This commit is contained in:
vsavkin
2015-09-11 15:38:09 -07:00
committed by Jeff Cross
parent 7b3161a229
commit 16bf335a4a
25 changed files with 61 additions and 96 deletions

View File

@ -1,6 +1,7 @@
///<reference path="../../../typings/tsd.d.ts" />
import {global, isPresent} from 'angular2/src/core/facade/lang';
import * as Rx from 'rx';
// TODO(jeffbcross): use ES6 import once typings are available
var Subject = require('@reactivex/rxjs/dist/cjs/Subject');
export {Promise};
@ -75,7 +76,7 @@ export class ObservableWrapper {
static isObservable(obs: any): boolean { return obs instanceof Observable; }
static dispose(subscription: any) { subscription.dispose(); }
static dispose(subscription: any) { subscription.unsubscribe(); }
static callNext(emitter: EventEmitter, value: any) { emitter.next(value); }
@ -128,30 +129,19 @@ export class Observable {
* Once a reference implementation of the spec is available, switch to it.
*/
export class EventEmitter extends Observable {
_subject: Rx.Subject<any> = new Rx.Subject<any>();
_immediateScheduler = (<any>Rx.Scheduler).immediate;
_subject = new Subject();
observer(generator: any): Rx.IDisposable {
return this._subject.observeOn(this._immediateScheduler)
.subscribe((value) => { setTimeout(() => generator.next(value)); },
(error) => generator.throw ? generator.throw(error) : null,
() => generator.return ? generator.return () : null);
observer(generator: any): any {
return this._subject.subscribe((value) => { setTimeout(() => generator.next(value)); },
(error) => generator.throw ? generator.throw(error) : null,
() => generator.return ? generator.return () : null);
}
toRx(): Rx.Observable<any> { return this._subject; }
toRx(): any { return this; }
/**
* Emits a `value`.
*/
next(value: any) { this._subject.onNext(value); }
next(value: any) { this._subject.next(value); }
/**
* Emits an `error`.
*/
throw(error: any) { this._subject.onError(error); }
throw(error: any) { this._subject.error(error); }
/**
* Closes the stream.
*/
return (value?: any) { this._subject.onCompleted(); }
return (value?: any) { this._subject.complete(); }
}

View File

@ -1,5 +1,3 @@
/// <reference path="../../typings/rx/rx.d.ts" />
import {
ReadyStates,
RequestModesOpts,

View File

@ -9,7 +9,7 @@
"repository": <%= JSON.stringify(packageJson.repository) %>,
"dependencies": {
"angular2": "<%= packageJson.version %>",
"rx": "<%= packageJson.dependencies['rx'] %>",
"@reactivex/rxjs": "<%= packageJson.dependencies['@reactivex/rxjs'] %>",
"reflect-metadata": "<%= packageJson.dependencies['reflect-metadata'] %>",
"traceur": "<%= packageJson.dependencies['traceur'] %>"
},