feat(facade): added support for observables
This commit is contained in:
@ -1,7 +1,7 @@
|
||||
library angular.core.facade.async;
|
||||
|
||||
import 'dart:async';
|
||||
export 'dart:async' show Future;
|
||||
export 'dart:async' show Future, Stream, StreamController, StreamSubscription;
|
||||
|
||||
class PromiseWrapper {
|
||||
static Future resolve(obj) => new Future.value(obj);
|
||||
@ -32,6 +32,32 @@ class PromiseWrapper {
|
||||
}
|
||||
}
|
||||
|
||||
class ObservableWrapper {
|
||||
static StreamSubscription subscribe(Stream s, Function onNext, [onError, onComplete]) {
|
||||
return s.listen(onNext, onError: onError, onDone: onComplete, cancelOnError: true);
|
||||
}
|
||||
|
||||
static StreamController createController() {
|
||||
return new StreamController.broadcast();
|
||||
}
|
||||
|
||||
static Stream createObservable(StreamController controller) {
|
||||
return controller.stream;
|
||||
}
|
||||
|
||||
static void callNext(StreamController controller, value) {
|
||||
controller.add(value);
|
||||
}
|
||||
|
||||
static void callThrow(StreamController controller, error) {
|
||||
controller.addError(error);
|
||||
}
|
||||
|
||||
static void callReturn(StreamController controller) {
|
||||
controller.close();
|
||||
}
|
||||
}
|
||||
|
||||
class _Completer {
|
||||
final Completer c;
|
||||
|
||||
|
@ -1,5 +1,6 @@
|
||||
import {int, global} from 'angular2/src/facade/lang';
|
||||
import {int, global, isPresent} from 'angular2/src/facade/lang';
|
||||
import {List} from 'angular2/src/facade/collection';
|
||||
import Rx from 'rx/dist/rx.all';
|
||||
|
||||
export var Promise = global.Promise;
|
||||
|
||||
@ -51,3 +52,47 @@ export class PromiseWrapper {
|
||||
return maybePromise instanceof Promise;
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
/**
|
||||
* Use Rx.Observable but provides an adapter to make it work as specified here:
|
||||
* https://github.com/jhusain/observable-spec
|
||||
*
|
||||
* Once a reference implementation of the spec is available, switch to it.
|
||||
*/
|
||||
export var Observable = Rx.Observable;
|
||||
export var ObservableController = Rx.Subject;
|
||||
|
||||
export class ObservableWrapper {
|
||||
static createController():Rx.Subject {
|
||||
return new Rx.Subject();
|
||||
}
|
||||
|
||||
static createObservable(subject:Rx.Subject):Observable {
|
||||
return subject;
|
||||
}
|
||||
|
||||
static subscribe(observable:Observable, generatorOrOnNext, onThrow = null, onReturn = null) {
|
||||
if (isPresent(generatorOrOnNext.next)) {
|
||||
return observable.observeOn(Rx.Scheduler.timeout).subscribe(
|
||||
(value) => generatorOrOnNext.next(value),
|
||||
(error) => generatorOrOnNext.throw(error),
|
||||
() => generatorOrOnNext.return()
|
||||
);
|
||||
} else {
|
||||
return observable.observeOn(Rx.Scheduler.timeout).subscribe(generatorOrOnNext, onThrow, onReturn);
|
||||
}
|
||||
}
|
||||
|
||||
static callNext(subject:Rx.Subject, value:any) {
|
||||
subject.onNext(value);
|
||||
}
|
||||
|
||||
static callThrow(subject:Rx.Subject, error:any) {
|
||||
subject.onError(error);
|
||||
}
|
||||
|
||||
static callReturn(subject:Rx.Subject) {
|
||||
subject.onCompleted();
|
||||
}
|
||||
}
|
Reference in New Issue
Block a user