refactor(async): refactor EventEmitter

Refactor EventEmitter and Async Facade to match ES7 Observable semantics, properly use RxJS typedefs, make EventEmitter inherit from RxJS Subject. Closes #4149.

BREAKING CHANGE:
- consumers of EventEmitter no longer need to call .toRx()
- EventEmitter is now generic and requires a type - e.g. `EventEmitter<string>`
- EventEmitter and Observable now use the `.subscribe(generatorOrNext, error, complete)` method instead of `.observer(generator)`
- ObservableWrapper uses `callNext/callError/callComplete` instead of `callNext/callThrow/callReturn`
This commit is contained in:
Rob Wormald
2015-10-24 18:48:43 -07:00
parent 72e65d6797
commit ca3986f31d
35 changed files with 341 additions and 113 deletions

View File

@ -1,5 +1,5 @@
// Public API for Facade
export {ConcreteType, Type} from './facade/lang';
export {Observable, EventEmitter} from './facade/async';
export {Observable, EventEmitter, Subject} from './facade/async';
export {Predicate} from './facade/collection';
export {WrappedException} from './facade/exceptions';

View File

@ -50,16 +50,16 @@ class ObservableWrapper {
emitter.add(value);
}
static void callThrow(EventEmitter emitter, error) {
static void callError(EventEmitter emitter, error) {
emitter.addError(error);
}
static void callReturn(EventEmitter emitter) {
static void callComplete(EventEmitter emitter) {
emitter.close();
}
}
class EventEmitter extends Stream {
class EventEmitter<T> extends Stream<T> {
StreamController<dynamic> _controller;
/// Creates an instance of [EventEmitter], which depending on [isAsync],
@ -86,3 +86,30 @@ class EventEmitter extends Stream {
_controller.close();
}
}
//todo(robwormald): maybe fix in ts2dart?
class Subject<T> extends Stream<T> {
StreamController<dynamic> _controller;
Subject([bool isAsync = true]) {
_controller = new StreamController.broadcast(sync: !isAsync);
}
StreamSubscription listen(void onData(dynamic line),
{void onError(Error error), void onDone(), bool cancelOnError}) {
return _controller.stream.listen(onData,
onError: onError, onDone: onDone, cancelOnError: cancelOnError);
}
void add(value) {
_controller.add(value);
}
void addError(error) {
_controller.addError(error);
}
void close() {
_controller.close();
}
}

View File

@ -3,8 +3,8 @@ import {global, isPresent} from 'angular2/src/core/facade/lang';
// without depending on rxjs.
import {PromiseWrapper, Promise, PromiseCompleter} from 'angular2/src/core/facade/promise';
export {PromiseWrapper, Promise, PromiseCompleter} from 'angular2/src/core/facade/promise';
// TODO(jeffbcross): use ES6 import once typings are available
var Subject = require('@reactivex/rxjs/dist/cjs/Subject');
import {Subject, Subscription, Observable as RxObservable} from '@reactivex/rxjs/dist/cjs/Rx';
import Operator from '@reactivex/rxjs/dist/cjs/Operator';
export namespace NodeJS {
export interface Timer {}
@ -24,31 +24,26 @@ export class TimerWrapper {
export class ObservableWrapper {
// TODO(vsavkin): when we use rxnext, try inferring the generic type from the first arg
static subscribe<T>(emitter: Observable, onNext: (value: T) => void,
onThrow: (exception: any) => void = null,
onReturn: () => void = null): Object {
return emitter.observer({next: onNext, throw: onThrow, return: onReturn});
static subscribe<T>(emitter: any, onNext: (value: T) => void,
onError: (exception: any) => void = null,
onComplete: () => void = null): Object {
return emitter.subscribe({next: onNext, error: onError, complete: onComplete});
}
static isObservable(obs: any): boolean { return obs instanceof Observable; }
static isObservable(obs: any): boolean { return obs instanceof EventEmitter; }
/**
* Returns whether `obs` has any subscribers listening to events.
*/
static hasSubscribers(obs: EventEmitter): boolean { return obs._subject.observers.length > 0; }
static hasSubscribers(obs: EventEmitter<any>): boolean { return obs.observers.length > 0; }
static dispose(subscription: any) { subscription.unsubscribe(); }
static callNext(emitter: EventEmitter, value: any) { emitter.next(value); }
static callNext(emitter: EventEmitter<any>, value: any) { emitter.next(value); }
static callThrow(emitter: EventEmitter, error: any) { emitter.throw(error); }
static callError(emitter: EventEmitter<any>, error: any) { emitter.error(error); }
static callReturn(emitter: EventEmitter) { emitter.return (null); }
}
// TODO: vsavkin change to interface
export class Observable {
observer(generator: any): Object { return null; }
static callComplete(emitter: EventEmitter<any>) { emitter.complete(); }
}
/**
@ -90,9 +85,7 @@ export class Observable {
*
* Once a reference implementation of the spec is available, switch to it.
*/
export class EventEmitter extends Observable {
/** @internal */
_subject = new Subject();
export class EventEmitter<T> extends Subject<T> {
/** @internal */
_isAsync: boolean;
@ -105,19 +98,32 @@ export class EventEmitter extends Observable {
this._isAsync = isAsync;
}
observer(generator: any): any {
var schedulerFn = this._isAsync ? (value) => { setTimeout(() => generator.next(value)); } :
(value) => { generator.next(value); };
return this._subject.subscribe(schedulerFn,
(error) => generator.throw ? generator.throw(error) : null,
() => generator.return ? generator.return () : null);
subscribe(generatorOrNext?: any, error?: any, complete?: any): any {
if (generatorOrNext && typeof generatorOrNext === 'object') {
let schedulerFn = this._isAsync ?
(value) => { setTimeout(() => generatorOrNext.next(value)); } :
(value) => { generatorOrNext.next(value); };
return super.subscribe(schedulerFn,
(err) => generatorOrNext.error ? generatorOrNext.error(err) : null,
() => generatorOrNext.complete ? generatorOrNext.complete() : null);
} else {
let schedulerFn = this._isAsync ? (value) => { setTimeout(() => generatorOrNext(value)); } :
(value) => { generatorOrNext(value); };
return super.subscribe(schedulerFn, (err) => error ? error(err) : null,
() => complete ? complete() : null);
}
}
toRx(): any { return this._subject; }
next(value: any) { this._subject.next(value); }
throw(error: any) { this._subject.error(error); }
return (value?: any) { this._subject.complete(); }
}
// todo(robwormald): ts2dart should handle this properly
export class Observable<T> extends RxObservable<T> {
lift<T, R>(operator: Operator<T, R>): Observable<T> {
const observable = new Observable();
observable.source = this;
observable.operator = operator;
return observable;
}
}
export {Subject}

View File

@ -51,7 +51,7 @@ export abstract class AbstractControl {
_value: any;
/** @internal */
_valueChanges: EventEmitter;
_valueChanges: EventEmitter<any>;
private _status: string;
private _errors: {[key: string]: any};
@ -86,7 +86,8 @@ export abstract class AbstractControl {
get untouched(): boolean { return !this._touched; }
get valueChanges(): Observable { return this._valueChanges; }
get valueChanges(): Observable<any> { return this._valueChanges; }
get pending(): boolean { return this._status == PENDING; }
markAsTouched(): void { this._touched = true; }

View File

@ -31,7 +31,7 @@ export class QueryList<T> {
private _results: Array<T> = [];
private _emitter = new EventEmitter();
get changes(): Observable { return this._emitter; }
get changes(): Observable<any> { return this._emitter; }
get length(): number { return this._results.length; }
get first(): T { return ListWrapper.first(this._results); }
get last(): T { return ListWrapper.last(this._results); }

View File

@ -1,5 +1,5 @@
import {isBlank, isPresent, isPromise, CONST} from 'angular2/src/core/facade/lang';
import {Promise, ObservableWrapper, Observable} from 'angular2/src/core/facade/async';
import {Promise, ObservableWrapper, Observable, EventEmitter} from 'angular2/src/core/facade/async';
import {Pipe} from 'angular2/src/core/metadata';
import {Injectable} from 'angular2/src/core/di';
import {
@ -69,7 +69,7 @@ export class AsyncPipe implements PipeTransform, PipeOnDestroy {
/** @internal */
_subscription: Object = null;
/** @internal */
_obj: Observable | Promise<any> = null;
_obj: Observable<any>| Promise<any>| EventEmitter<any> = null;
private _strategy: any = null;
/** @internal */
public _ref: ChangeDetectorRef;
@ -81,7 +81,7 @@ export class AsyncPipe implements PipeTransform, PipeOnDestroy {
}
}
transform(obj: Observable | Promise<any>, args?: any[]): any {
transform(obj: Observable<any>| Promise<any>| EventEmitter<any>, args?: any[]): any {
if (isBlank(this._obj)) {
if (isPresent(obj)) {
this._subscribe(obj);
@ -103,7 +103,7 @@ export class AsyncPipe implements PipeTransform, PipeOnDestroy {
}
/** @internal */
_subscribe(obj: Observable | Promise<any>): void {
_subscribe(obj: Observable<any>| Promise<any>| EventEmitter<any>): void {
this._obj = obj;
this._strategy = this._selectStrategy(obj);
this._subscription =
@ -111,7 +111,7 @@ export class AsyncPipe implements PipeTransform, PipeOnDestroy {
}
/** @internal */
_selectStrategy(obj: Observable | Promise<any>): any {
_selectStrategy(obj: Observable<any>| Promise<any>| EventEmitter<any>): any {
if (isPromise(obj)) {
return _promiseStrategy;
} else if (ObservableWrapper.isObservable(obj)) {

View File

@ -113,13 +113,13 @@ export class NgZone {
_onErrorHandler: ErrorHandlingFn;
/** @internal */
_onTurnStartEvents: EventEmitter;
_onTurnStartEvents: EventEmitter<any>;
/** @internal */
_onTurnDoneEvents: EventEmitter;
_onTurnDoneEvents: EventEmitter<any>;
/** @internal */
_onEventDoneEvents: EventEmitter;
_onEventDoneEvents: EventEmitter<any>;
/** @internal */
_onErrorEvents: EventEmitter;
_onErrorEvents: EventEmitter<any>;
// Number of microtasks pending from _innerZone (& descendants)
/** @internal */

View File

@ -9,7 +9,7 @@ export class SpyLocation implements Location {
/** @internal */
_query: string = '';
/** @internal */
_subject: EventEmitter = new EventEmitter();
_subject: EventEmitter<any> = new EventEmitter();
/** @internal */
_baseHref: string = '';

View File

@ -8,7 +8,7 @@ export class MockLocationStrategy extends LocationStrategy {
internalTitle: string = '';
urlChanges: string[] = [];
/** @internal */
_subject: EventEmitter = new EventEmitter();
_subject: EventEmitter<any> = new EventEmitter();
constructor() { super(); }
simulatePopState(url: string): void {

View File

@ -79,7 +79,7 @@ export const APP_BASE_HREF: OpaqueToken = CONST_EXPR(new OpaqueToken('appBaseHre
@Injectable()
export class Location {
/** @internal */
_subject: EventEmitter = new EventEmitter();
_subject: EventEmitter<any> = new EventEmitter();
/** @internal */
_baseHref: string;

View File

@ -59,7 +59,7 @@ export class Router {
private _auxRouters = new Map<string, Router>();
private _childRouter: Router;
private _subject: EventEmitter = new EventEmitter();
private _subject: EventEmitter<any> = new EventEmitter();
constructor(public registry: RouteRegistry, public parent: Router, public hostComponent: any) {}

View File

@ -44,7 +44,7 @@ export abstract class ClientMessageBroker {
export class ClientMessageBroker_ extends ClientMessageBroker {
private _pending: Map<string, PromiseCompleter<any>> = new Map<string, PromiseCompleter<any>>();
private _sink: EventEmitter;
private _sink: EventEmitter<any>;
/** @internal */
public _serializer: Serializer;

View File

@ -30,14 +30,14 @@ export abstract class MessageBus implements MessageBusSource, MessageBusSink {
* Returns an {@link EventEmitter} that emits every time a message
* is received on the given channel.
*/
abstract from(channel: string): EventEmitter;
abstract from(channel: string): EventEmitter<any>;
/**
* Returns an {@link EventEmitter} for the given channel
* To publish methods to that channel just call next (or add in dart) on the returned emitter
*/
abstract to(channel: string): EventEmitter;
abstract to(channel: string): EventEmitter<any>;
}
export interface MessageBusSource {
@ -60,7 +60,7 @@ export interface MessageBusSource {
* Returns an {@link EventEmitter} that emits every time a message
* is received on the given channel.
*/
from(channel: string): EventEmitter;
from(channel: string): EventEmitter<any>;
}
export interface MessageBusSink {
@ -83,5 +83,5 @@ export interface MessageBusSink {
* Returns an {@link EventEmitter} for the given channel
* To publish methods to that channel just call next (or add in dart) on the returned emitter
*/
to(channel: string): EventEmitter;
to(channel: string): EventEmitter<any>;
}

View File

@ -27,9 +27,9 @@ export class PostMessageBus implements MessageBus {
this.sink.initChannel(channel, runInZone);
}
from(channel: string): EventEmitter { return this.source.from(channel); }
from(channel: string): EventEmitter<any> { return this.source.from(channel); }
to(channel: string): EventEmitter { return this.sink.to(channel); }
to(channel: string): EventEmitter<any> { return this.sink.to(channel); }
}
export class PostMessageBusSink implements MessageBusSink {
@ -52,19 +52,17 @@ export class PostMessageBusSink implements MessageBusSink {
var emitter = new EventEmitter();
var channelInfo = new _Channel(emitter, runInZone);
this._channels[channel] = channelInfo;
emitter.observer({
next: (data: Object) => {
var message = {channel: channel, message: data};
if (runInZone) {
this._messageBuffer.push(message);
} else {
this._sendMessages([message]);
}
emitter.subscribe((data: Object) => {
var message = {channel: channel, message: data};
if (runInZone) {
this._messageBuffer.push(message);
} else {
this._sendMessages([message]);
}
});
}
to(channel: string): EventEmitter {
to(channel: string): EventEmitter<any> {
if (StringMapWrapper.contains(this._channels, channel)) {
return this._channels[channel].emitter;
} else {
@ -107,7 +105,7 @@ export class PostMessageBusSource implements MessageBusSource {
this._channels[channel] = channelInfo;
}
from(channel: string): EventEmitter {
from(channel: string): EventEmitter<any> {
if (StringMapWrapper.contains(this._channels, channel)) {
return this._channels[channel].emitter;
} else {
@ -140,7 +138,7 @@ export class PostMessageBusSource implements MessageBusSource {
* keeps track of if it should run in the zone.
*/
class _Channel {
constructor(public emitter: EventEmitter, public runInZone: boolean) {}
constructor(public emitter: EventEmitter<any>, public runInZone: boolean) {}
}
// TODO(jteplitz602) Replace this with the definition in lib.webworker.d.ts(#3492)

View File

@ -45,7 +45,7 @@ export abstract class ServiceMessageBroker {
* If that method returns a promise, the UIMessageBroker returns the result to the worker.
*/
export class ServiceMessageBroker_ extends ServiceMessageBroker {
private _sink: EventEmitter;
private _sink: EventEmitter<any>;
private _methods: Map<string, Function> = new Map<string, Function>();
constructor(messageBus: MessageBus, private _serializer: Serializer, public channel) {

View File

@ -14,7 +14,7 @@ import {StringMapWrapper} from 'angular2/src/core/facade/collection';
import {EventEmitter, ObservableWrapper} from 'angular2/src/core/facade/async';
export class EventDispatcher implements RenderEventDispatcher {
constructor(private _viewRef: RenderViewRef, private _sink: EventEmitter,
constructor(private _viewRef: RenderViewRef, private _sink: EventEmitter<any>,
private _serializer: Serializer) {}
dispatchRenderEvent(elementIndex: number, eventName: string, locals: Map<string, any>): boolean {