diff --git a/modules/angular2/src/core/zone/ng_zone.dart b/modules/angular2/src/core/zone/ng_zone.dart index 974341fc90..5ffbe7985e 100644 --- a/modules/angular2/src/core/zone/ng_zone.dart +++ b/modules/angular2/src/core/zone/ng_zone.dart @@ -215,14 +215,15 @@ class NgZone { _inVmTurnDone = true; parent.run(_innerZone, _onTurnDone); - if (_pendingMicrotasks == 0 && _onEventDone != null) { - runOutsideAngular(_onEventDone); - } } finally { _inVmTurnDone = false; _hasExecutedCodeInInnerZone = false; } } + + if (_pendingMicrotasks == 0 && _onEventDone != null) { + runOutsideAngular(_onEventDone); + } } } } diff --git a/modules/angular2/src/core/zone/ng_zone.ts b/modules/angular2/src/core/zone/ng_zone.ts index 4b00ae2ae0..0e03295df3 100644 --- a/modules/angular2/src/core/zone/ng_zone.ts +++ b/modules/angular2/src/core/zone/ng_zone.ts @@ -100,7 +100,7 @@ export class NgZone { * * This hook is useful for validating application state (e.g. in a test). */ - overrideOnEventDone(onEventDoneFn: Function, opt_waitForAsync: boolean): void { + overrideOnEventDone(onEventDoneFn: Function, opt_waitForAsync: boolean = false): void { var normalizedOnEventDone = normalizeBlank(onEventDoneFn); if (opt_waitForAsync) { this._onEventDone = () => { @@ -212,14 +212,15 @@ export class NgZone { try { this._inVmTurnDone = true; parentRun.call(ngZone._innerZone, ngZone._onTurnDone); - if (ngZone._pendingMicrotasks === 0 && isPresent(ngZone._onEventDone)) { - ngZone.runOutsideAngular(ngZone._onEventDone); - } } finally { this._inVmTurnDone = false; ngZone._hasExecutedCodeInInnerZone = false; } } + + if (ngZone._pendingMicrotasks === 0 && isPresent(ngZone._onEventDone)) { + ngZone.runOutsideAngular(ngZone._onEventDone); + } } } }; diff --git a/modules/angular2/src/mock/ng_zone_mock.ts b/modules/angular2/src/mock/ng_zone_mock.ts index 3a82e2f5e6..57a868e964 100644 --- a/modules/angular2/src/mock/ng_zone_mock.ts +++ b/modules/angular2/src/mock/ng_zone_mock.ts @@ -1,9 +1,17 @@ import {NgZone} from 'angular2/src/core/zone/ng_zone'; export class MockNgZone extends NgZone { + _onEventDone: () => void; + constructor() { super({enableLongStackTrace: false}); } run(fn: Function): any { return fn(); } runOutsideAngular(fn: Function): any { return fn(); } + + overrideOnEventDone(fn: () => void, opt_waitForAsync: boolean = false): void { + this._onEventDone = fn; + } + + simulateZoneExit(): void { this._onEventDone(); } } diff --git a/modules/angular2/src/web_workers/debug_tools/multi_client_server_message_bus.dart b/modules/angular2/src/web_workers/debug_tools/multi_client_server_message_bus.dart index 8303232bb5..3570723f69 100644 --- a/modules/angular2/src/web_workers/debug_tools/multi_client_server_message_bus.dart +++ b/modules/angular2/src/web_workers/debug_tools/multi_client_server_message_bus.dart @@ -1,25 +1,29 @@ library angular2.src.web_workers.debug_tools.multi_client_server_message_bus; -import "package:angular2/src/web_workers/shared/message_bus.dart" - show MessageBus, MessageBusSink, MessageBusSource; import 'dart:io'; import 'dart:convert' show JSON; import 'dart:async'; -import 'package:angular2/src/core/facade/async.dart' show EventEmitter; import 'package:angular2/src/web_workers/shared/messaging_api.dart'; +import 'package:angular2/src/web_workers/shared/generic_message_bus.dart'; // TODO(jteplitz602): Remove hard coded result type and // clear messageHistory once app is done with it #3859 -class MultiClientServerMessageBus implements MessageBus { - final MultiClientServerMessageBusSink sink; - MultiClientServerMessageBusSource source; +class MultiClientServerMessageBus extends GenericMessageBus { bool hasPrimary = false; - MultiClientServerMessageBus(this.sink, this.source); + @override + MultiClientServerMessageBusSink get sink => super.sink; + @override + MultiClientServerMessageBusSource get source => super.source; + + MultiClientServerMessageBus(MultiClientServerMessageBusSink sink, + MultiClientServerMessageBusSource source) + : super(sink, source); MultiClientServerMessageBus.fromHttpServer(HttpServer server) - : sink = new MultiClientServerMessageBusSink() { - source = new MultiClientServerMessageBusSource(resultReceived); + : super(new MultiClientServerMessageBusSink(), + new MultiClientServerMessageBusSource()) { + source.onResult.listen(_resultReceived); server.listen((HttpRequest request) { if (request.uri.path == "/ws") { WebSocketTransformer.upgrade(request).then((WebSocket socket) { @@ -38,18 +42,10 @@ class MultiClientServerMessageBus implements MessageBus { }); } - void resultReceived() { + void _resultReceived(_) { sink.resultReceived(); } - EventEmitter from(String channel) { - return source.from(channel); - } - - EventEmitter to(String channel) { - return sink.to(channel); - } - Function _handleDisconnect(WebSocketWrapper wrapper) { return () { sink.removeConnection(wrapper); @@ -72,12 +68,15 @@ class WebSocketWrapper { WebSocketWrapper(this._messageHistory, this._resultMarkers, this.socket) { stream = socket.asBroadcastStream(); stream.listen((encodedMessage) { - var message = JSON.decode(encodedMessage)['message']; - if (message is Map && message.containsKey("type")) { - if (message['type'] == 'result') { - resultReceived(); + var messages = JSON.decode(encodedMessage); + messages.forEach((data) { + var message = data['message']; + if (message is Map && message.containsKey("type")) { + if (message['type'] == 'result') { + resultReceived(); + } } - } + }); }); } @@ -121,10 +120,9 @@ class WebSocketWrapper { } } -class MultiClientServerMessageBusSink implements MessageBusSink { +class MultiClientServerMessageBusSink extends GenericMessageBusSink { final List messageHistory = new List(); final Set openConnections = new Set(); - final Map _channels = new Map(); final List resultMarkers = new List(); void resultReceived() { @@ -141,76 +139,77 @@ class MultiClientServerMessageBusSink implements MessageBusSink { openConnections.remove(webSocket); } - EventEmitter to(String channel) { - if (_channels.containsKey(channel)) { - return _channels[channel]; - } else { - var emitter = new EventEmitter(); - emitter.listen((message) { - _send({'channel': channel, 'message': message}); - }); - return emitter; - } - } - - void _send(dynamic message) { - String encodedMessage = JSON.encode(message); + @override + void sendMessages(List messages) { + String encodedMessages = JSON.encode(messages); openConnections.forEach((WebSocketWrapper webSocket) { if (webSocket.caughtUp) { - webSocket.socket.add(encodedMessage); + webSocket.socket.add(encodedMessages); } }); - messageHistory.add(encodedMessage); + messageHistory.add(encodedMessages); } } -class MultiClientServerMessageBusSource implements MessageBusSource { - final Map _channels = new Map(); +class MultiClientServerMessageBusSource extends GenericMessageBusSource { Function onResultReceived; + final StreamController mainController; + final StreamController resultController = new StreamController(); - MultiClientServerMessageBusSource(this.onResultReceived); + MultiClientServerMessageBusSource._(controller) + : mainController = controller, + super(controller.stream); - EventEmitter from(String channel) { - if (_channels.containsKey(channel)) { - return _channels[channel]; - } else { - var emitter = new EventEmitter(); - _channels[channel] = emitter; - return emitter; - } + factory MultiClientServerMessageBusSource() { + return new MultiClientServerMessageBusSource._( + new StreamController.broadcast()); } + Stream get onResult => resultController.stream; + void addConnection(WebSocketWrapper webSocket) { if (webSocket.isPrimary) { - webSocket.stream.listen((encodedMessage) { - var decodedMessage = decodeMessage(encodedMessage); - var channel = decodedMessage['channel']; - var message = decodedMessage['message']; - if (message is Map && message.containsKey("type")) { - if (message['type'] == 'result') { - // tell the bus that a result was received on the primary - onResultReceived(); + webSocket.stream.listen((encodedMessages) { + var decodedMessages = _decodeMessages(encodedMessages); + decodedMessages.forEach((decodedMessage) { + var message = decodedMessage['message']; + if (message is Map && message.containsKey("type")) { + if (message['type'] == 'result') { + // tell the bus that a result was received on the primary + resultController.add(message); + } } - } + }); - if (_channels.containsKey(channel)) { - _channels[channel].add(message); - } + mainController.add(decodedMessages); }); } else { - webSocket.stream.listen((encodedMessage) { - // handle events from non-primary browser - var decodedMessage = decodeMessage(encodedMessage); - var channel = decodedMessage['channel']; - var message = decodedMessage['message']; - if (_channels.containsKey(EVENT_CHANNEL) && channel == EVENT_CHANNEL) { - _channels[channel].add(message); + webSocket.stream.listen((encodedMessages) { + // handle events from non-primary connection. + var decodedMessages = _decodeMessages(encodedMessages); + var eventMessages = new List>(); + decodedMessages.forEach((decodedMessage) { + var channel = decodedMessage['channel']; + if (channel == EVENT_CHANNEL) { + eventMessages.add(decodedMessage); + } + }); + if (eventMessages.length > 0) { + mainController.add(eventMessages); } }); } } - Map decodeMessage(dynamic message) { - return JSON.decode(message); + List _decodeMessages(dynamic messages) { + return JSON.decode(messages); + } + + // This is a noop for the MultiClientBus because it has to decode the JSON messages before + // the generic bus receives them in order to check for results and forward events + // from the non-primary connection. + @override + List decodeMessages(dynamic messages) { + return messages; } } diff --git a/modules/angular2/src/web_workers/debug_tools/single_client_server_message_bus.dart b/modules/angular2/src/web_workers/debug_tools/single_client_server_message_bus.dart index ceee149203..f1cf41021c 100644 --- a/modules/angular2/src/web_workers/debug_tools/single_client_server_message_bus.dart +++ b/modules/angular2/src/web_workers/debug_tools/single_client_server_message_bus.dart @@ -1,22 +1,24 @@ library angular2.src.web_workers.debug_tools.single_client_server_message_bus; -import "package:angular2/src/web_workers/shared/message_bus.dart" - show MessageBus, MessageBusSink, MessageBusSource; import 'dart:io'; import 'dart:convert' show JSON; -import 'dart:async'; -import "package:angular2/src/core/facade/async.dart" show EventEmitter; +import 'package:angular2/src/web_workers/shared/generic_message_bus.dart'; -class SingleClientServerMessageBus implements MessageBus { - final SingleClientServerMessageBusSink sink; - SingleClientServerMessageBusSource source; +class SingleClientServerMessageBus extends GenericMessageBus { bool connected = false; - SingleClientServerMessageBus(this.sink, this.source); + @override + SingleClientServerMessageBusSink get sink => super.sink; + @override + SingleClientServerMessageBusSource get source => super.source; + + SingleClientServerMessageBus(SingleClientServerMessageBusSink sink, + SingleClientServerMessageBusSource source) + : super(sink, source); SingleClientServerMessageBus.fromHttpServer(HttpServer server) - : sink = new SingleClientServerMessageBusSink() { - source = new SingleClientServerMessageBusSource(); + : super(new SingleClientServerMessageBusSink(), + new SingleClientServerMessageBusSource()) { server.listen((HttpRequest request) { if (request.uri.path == "/ws") { if (!connected) { @@ -24,7 +26,7 @@ class SingleClientServerMessageBus implements MessageBus { sink.setConnection(socket); var stream = socket.asBroadcastStream(); - source.setConnectionFromStream(stream); + source.attachTo(stream); stream.listen(null, onDone: _handleDisconnect); }).catchError((error) { throw error; @@ -43,51 +45,30 @@ class SingleClientServerMessageBus implements MessageBus { void _handleDisconnect() { sink.removeConnection(); - source.removeConnection(); connected = false; } - - EventEmitter from(String channel) { - return source.from(channel); - } - - EventEmitter to(String channel) { - return sink.to(channel); - } } -class SingleClientServerMessageBusSink implements MessageBusSink { +class SingleClientServerMessageBusSink extends GenericMessageBusSink { final List _messageBuffer = new List(); WebSocket _socket = null; - final Map _channels = new Map(); void setConnection(WebSocket webSocket) { _socket = webSocket; _sendBufferedMessages(); } - EventEmitter to(String channel) { - if (_channels.containsKey(channel)) { - return _channels[channel]; - } else { - var emitter = new EventEmitter(); - emitter.listen((message) { - _send({'channel': channel, 'message': message}); - }); - return emitter; - } - } - void removeConnection() { _socket = null; } - void _send(dynamic message) { - String encodedMessage = JSON.encode(message); + @override + void sendMessages(List message) { + String encodedMessages = JSON.encode(message); if (_socket != null) { - _socket.add(encodedMessage); + _socket.add(encodedMessages); } else { - _messageBuffer.add(encodedMessage); + _messageBuffer.add(encodedMessages); } } @@ -97,44 +78,11 @@ class SingleClientServerMessageBusSink implements MessageBusSink { } } -class SingleClientServerMessageBusSource implements MessageBusSource { - final Map _channels = new Map(); - Stream _stream; +class SingleClientServerMessageBusSource extends GenericMessageBusSource { + SingleClientServerMessageBusSource() : super(null); - SingleClientServerMessageBusSource(); - - EventEmitter from(String channel) { - if (_channels.containsKey(channel)) { - return _channels[channel]; - } else { - var emitter = new EventEmitter(); - _channels[channel] = emitter; - return emitter; - } - } - - void setConnectionFromWebSocket(WebSocket socket) { - setConnectionFromStream(socket.asBroadcastStream()); - } - - void setConnectionFromStream(Stream stream) { - _stream = stream; - _stream.listen((encodedMessage) { - var decodedMessage = decodeMessage(encodedMessage); - var channel = decodedMessage['channel']; - var message = decodedMessage['message']; - - if (_channels.containsKey(channel)) { - _channels[channel].add(message); - } - }); - } - - void removeConnection() { - _stream = null; - } - - Map decodeMessage(dynamic message) { - return JSON.decode(message); + @override + List decodeMessages(dynamic messages) { + return JSON.decode(messages); } } diff --git a/modules/angular2/src/web_workers/debug_tools/web_socket_message_bus.dart b/modules/angular2/src/web_workers/debug_tools/web_socket_message_bus.dart index 6f47cc1bd0..4c50533a53 100644 --- a/modules/angular2/src/web_workers/debug_tools/web_socket_message_bus.dart +++ b/modules/angular2/src/web_workers/debug_tools/web_socket_message_bus.dart @@ -2,77 +2,33 @@ library angular2.src.web_workers.worker.web_socket_message_bus; import 'dart:html'; import 'dart:convert' show JSON; -import "package:angular2/src/web_workers/shared/message_bus.dart" - show MessageBus, MessageBusSink, MessageBusSource; -import 'package:angular2/src/core/facade/async.dart' show EventEmitter; +import 'package:angular2/src/web_workers/shared/generic_message_bus.dart'; -class WebSocketMessageBus implements MessageBus { - final WebSocketMessageBusSink sink; - final WebSocketMessageBusSource source; - - WebSocketMessageBus(this.sink, this.source); +class WebSocketMessageBus extends GenericMessageBus { + WebSocketMessageBus( + WebSocketMessageBusSink sink, WebSocketMessageBusSource source) + : super(sink, source); WebSocketMessageBus.fromWebSocket(WebSocket webSocket) - : sink = new WebSocketMessageBusSink(webSocket), - source = new WebSocketMessageBusSource(webSocket); - - EventEmitter from(String channel) { - return source.from(channel); - } - - EventEmitter to(String channel) { - return sink.to(channel); - } + : super(new WebSocketMessageBusSink(webSocket), + new WebSocketMessageBusSource(webSocket)); } -class WebSocketMessageBusSink implements MessageBusSink { +class WebSocketMessageBusSink extends GenericMessageBusSink { final WebSocket _webSocket; - final Map _channels = new Map(); WebSocketMessageBusSink(this._webSocket); - EventEmitter to(String channel) { - if (_channels.containsKey(channel)) { - return _channels[channel]; - } else { - var emitter = new EventEmitter(); - emitter.listen((message) { - _send({'channel': channel, 'message': message}); - }); - _channels[channel] = emitter; - return emitter; - } - } - - void _send(message) { - _webSocket.send(JSON.encode(message)); + void sendMessages(List messages) { + _webSocket.send(JSON.encode(messages)); } } -class WebSocketMessageBusSource implements MessageBusSource { - final Map _channels = new Map(); +class WebSocketMessageBusSource extends GenericMessageBusSource { + WebSocketMessageBusSource(WebSocket webSocket) : super(webSocket.onMessage); - WebSocketMessageBusSource(WebSocket webSocket) { - webSocket.onMessage.listen((MessageEvent encodedMessage) { - var message = decodeMessage(encodedMessage.data); - var channel = message['channel']; - if (_channels.containsKey(channel)) { - _channels[channel].add(message['message']); - } - }); - } - - EventEmitter from(String channel) { - if (_channels.containsKey(channel)) { - return _channels[channel]; - } else { - var emitter = new EventEmitter(); - _channels[channel] = emitter; - return emitter; - } - } - - Map decodeMessage(dynamic message) { - return JSON.decode(message); + List decodeMessages(MessageEvent event) { + var messages = event.data; + return JSON.decode(messages); } } diff --git a/modules/angular2/src/web_workers/shared/client_message_broker.ts b/modules/angular2/src/web_workers/shared/client_message_broker.ts index c8c38c8e4d..16c25c540b 100644 --- a/modules/angular2/src/web_workers/shared/client_message_broker.ts +++ b/modules/angular2/src/web_workers/shared/client_message_broker.ts @@ -18,7 +18,11 @@ export {Type} from "angular2/src/core/facade/lang"; export class ClientMessageBrokerFactory { constructor(private _messageBus: MessageBus, protected _serializer: Serializer) {} - createMessageBroker(channel: string): ClientMessageBroker { + /** + * Initializes the given channel and attaches a new {@link ClientMessageBroker} to it. + */ + createMessageBroker(channel: string, runInZone: boolean = true): ClientMessageBroker { + this._messageBus.initChannel(channel, runInZone); return new ClientMessageBroker(this._messageBus, this._serializer, channel); } } diff --git a/modules/angular2/src/web_workers/shared/generic_message_bus.dart b/modules/angular2/src/web_workers/shared/generic_message_bus.dart new file mode 100644 index 0000000000..7623b67f10 --- /dev/null +++ b/modules/angular2/src/web_workers/shared/generic_message_bus.dart @@ -0,0 +1,151 @@ +library angular2.src.web_workers.shared.generic_message_bus; + +import 'dart:async'; +import 'package:angular2/src/core/facade/async.dart' show EventEmitter; +import 'package:angular2/src/web_workers/shared/message_bus.dart' + show MessageBus, MessageBusSink, MessageBusSource; +import 'package:angular2/src/core/zone/ng_zone.dart'; +import 'package:angular2/src/core/facade/lang.dart'; + +class GenericMessageBus implements MessageBus { + final MessageBusSink _sink; + final MessageBusSource _source; + + MessageBusSink get sink => _sink; + MessageBusSource get source => _source; + + GenericMessageBus(MessageBusSink sink, MessageBusSource source) + : _sink = sink, + _source = source; + + void attachToZone(NgZone zone) { + sink.attachToZone(zone); + source.attachToZone(zone); + } + + void initChannel(String channel, [bool runInZone = true]) { + sink.initChannel(channel, runInZone); + source.initChannel(channel, runInZone); + } + + EventEmitter from(String channel) { + return source.from(channel); + } + + EventEmitter to(String channel) { + return sink.to(channel); + } +} + +abstract class GenericMessageBusSink implements MessageBusSink { + NgZone _zone; + final _channels = new Map(); + final _messageBuffer = new List(); + + void attachToZone(NgZone zone) { + _zone = zone; + _zone.overrideOnEventDone(() { + sendMessages(_messageBuffer); + _messageBuffer.clear(); + }, false); + } + + void initChannel(String channelName, [bool runInZone = true]) { + if (_channels.containsKey(channelName)) { + throw new BaseException("${channelName} has already been initialized."); + } + + var emitter = new EventEmitter(); + var channel = new _Channel(emitter, runInZone); + + emitter.listen((data) { + var message = {'channel': channelName, 'message': data}; + if (runInZone) { + _messageBuffer.add(message); + } else { + sendMessages([message]); + } + }); + + _channels[channelName] = channel; + } + + EventEmitter to(String channelName) { + if (_channels.containsKey(channelName)) { + return _channels[channelName].emitter; + } else { + throw new BaseException( + "${channelName} is not set up. Did you forget to call initChannel?"); + } + } + + void sendMessages(List messages); +} + +abstract class GenericMessageBusSource implements MessageBusSource { + Stream _stream; + final _channels = new Map(); + NgZone _zone; + + Stream get stream => _stream; + + GenericMessageBusSource(Stream stream) { + attachTo(stream); + } + + void attachTo(Stream stream) { + _stream = stream; + if (stream != null) { + stream.listen((messages) { + List decodedMessages = decodeMessages(messages); + if (decodedMessages != null) { + decodedMessages.forEach((message) => _handleMessage(message)); + } + }); + } + } + + void attachToZone(NgZone zone) { + _zone = zone; + } + + void initChannel(String channelName, [bool runInZone = true]) { + if (_channels.containsKey(channelName)) { + throw new BaseException("${channelName} has already been initialized."); + } + + var emitter = new EventEmitter(); + var channelInfo = new _Channel(emitter, runInZone); + _channels[channelName] = channelInfo; + } + + EventEmitter from(String channelName) { + if (_channels.containsKey(channelName)) { + return _channels[channelName].emitter; + } else { + throw new BaseException( + "${channelName} is not set up. Did you forget to call initChannel?"); + } + } + + void _handleMessage(dynamic data) { + var channelName = data['channel']; + if (_channels.containsKey(channelName)) { + var channelInfo = _channels[channelName]; + if (channelInfo.runInZone) { + _zone.run(() => channelInfo.emitter.add(data['message'])); + } else { + channelInfo.emitter.add(data['message']); + } + } + } + + List decodeMessages(dynamic message); +} + +class _Channel { + EventEmitter emitter; + bool runInZone; + + _Channel(this.emitter, this.runInZone); +} diff --git a/modules/angular2/src/web_workers/shared/isolate_message_bus.dart b/modules/angular2/src/web_workers/shared/isolate_message_bus.dart index 6132158743..6a3d5390a1 100644 --- a/modules/angular2/src/web_workers/shared/isolate_message_bus.dart +++ b/modules/angular2/src/web_workers/shared/isolate_message_bus.dart @@ -1,76 +1,33 @@ library angular2.src.web_workers.shared.isolate_message_bus; import 'dart:isolate'; -import 'dart:async'; -import 'dart:core'; -import 'package:angular2/src/web_workers/shared/message_bus.dart' - show MessageBus, MessageBusSink, MessageBusSource; -import 'package:angular2/src/core/facade/async.dart'; - -class IsolateMessageBus implements MessageBus { - final IsolateMessageBusSink sink; - final IsolateMessageBusSource source; +import 'package:angular2/src/web_workers/shared/generic_message_bus.dart'; +class IsolateMessageBus extends GenericMessageBus { IsolateMessageBus(IsolateMessageBusSink sink, IsolateMessageBusSource source) - : sink = sink, - source = source; - - EventEmitter from(String channel) { - return source.from(channel); - } - - EventEmitter to(String channel) { - return sink.to(channel); - } + : super(sink, source); } -class IsolateMessageBusSink implements MessageBusSink { +class IsolateMessageBusSink extends GenericMessageBusSink { final SendPort _port; - final Map _channels = new Map(); IsolateMessageBusSink(SendPort port) : _port = port; - EventEmitter to(String channel) { - if (_channels.containsKey(channel)) { - return _channels[channel]; - } else { - var emitter = new EventEmitter(); - emitter.listen((message) { - _port.send({'channel': channel, 'message': message}); - }); - _channels[channel] = emitter; - return emitter; - } + @override + void sendMessages(List messages) { + _port.send(messages); } } -class IsolateMessageBusSource extends MessageBusSource { - final Stream rawDataStream; - final Map _channels = new Map(); +class IsolateMessageBusSource extends GenericMessageBusSource { + IsolateMessageBusSource(ReceivePort port) : super(port.asBroadcastStream()); - IsolateMessageBusSource(ReceivePort port) - : rawDataStream = port.asBroadcastStream() { - rawDataStream.listen((message) { - if (message is SendPort) { - return; - } - - if (message.containsKey("channel")) { - var channel = message['channel']; - if (_channels.containsKey(channel)) { - _channels[channel].add(message['message']); - } - } - }); - } - - EventEmitter from(String channel) { - if (_channels.containsKey(channel)) { - return _channels[channel]; - } else { - var emitter = new EventEmitter(); - _channels[channel] = emitter; - return emitter; + @override + List decodeMessages(dynamic messages) { + if (messages is SendPort) { + return null; } + + return messages; } } diff --git a/modules/angular2/src/web_workers/shared/message_bus.ts b/modules/angular2/src/web_workers/shared/message_bus.ts index a4b5042b42..95e6f97885 100644 --- a/modules/angular2/src/web_workers/shared/message_bus.ts +++ b/modules/angular2/src/web_workers/shared/message_bus.ts @@ -1,5 +1,6 @@ import {EventEmitter} from 'angular2/src/core/facade/async'; import {BaseException} from 'angular2/src/core/facade/lang'; +import {NgZone} from 'angular2/src/core/zone/ng_zone'; export {EventEmitter, Observable} from 'angular2/src/core/facade/async'; function _abstract() { @@ -13,6 +14,23 @@ function _abstract() { * by the corresponding MessageBusSource. */ export /* abstract (with TS 1.6) */ class MessageBus implements MessageBusSource, MessageBusSink { + /** + * Sets up a new channel on the MessageBus. + * MUST be called before calling from or to on the channel. + * If runInZone is true then the source will emit events inside the angular zone + * and the sink will buffer messages and send only once the zone exits. + * if runInZone is false then the source will emit events inside the global zone + * and the sink will send messages immediatly. + */ + initChannel(channel: string, runInZone: boolean = true): void { throw _abstract(); } + + /** + * Assigns this bus to the given zone. + * Any callbacks attached to channels where runInZone was set to true on initialization + * will be executed in the given zone. + */ + attachToZone(zone: NgZone): void { throw _abstract(); } + /** * Returns an {@link EventEmitter} that emits every time a messsage * is received on the given channel. @@ -28,6 +46,21 @@ export /* abstract (with TS 1.6) */ class MessageBus implements MessageBusSource } export interface MessageBusSource { + /** + * Sets up a new channel on the MessageBusSource. + * MUST be called before calling from on the channel. + * If runInZone is true then the source will emit events inside the angular zone. + * if runInZone is false then the source will emit events inside the global zone. + */ + initChannel(channel: string, runInZone: boolean): void; + + /** + * Assigns this source to the given zone. + * Any channels which are initialized with runInZone set to true will emit events that will be + * executed within the given zone. + */ + attachToZone(zone: NgZone): void; + /** * Returns an {@link EventEmitter} that emits every time a messsage * is received on the given channel. @@ -36,6 +69,21 @@ export interface MessageBusSource { } export interface MessageBusSink { + /** + * Sets up a new channel on the MessageBusSink. + * MUST be called before calling to on the channel. + * If runInZone is true the sink will buffer messages and send only once the zone exits. + * if runInZone is false the sink will send messages immediatly. + */ + initChannel(channel: string, runInZone: boolean): void; + + /** + * Assigns this sink to the given zone. + * Any channels which are initilialized with runInZone set to true will wait for the given zone + * to exit before sending messages. + */ + attachToZone(zone: NgZone): void; + /** * Returns an {@link EventEmitter} for the given channel * To publish methods to that channel just call next (or add in dart) on the returned emitter diff --git a/modules/angular2/src/web_workers/shared/post_message_bus.ts b/modules/angular2/src/web_workers/shared/post_message_bus.ts index 623129f30d..4ca8c4137e 100644 --- a/modules/angular2/src/web_workers/shared/post_message_bus.ts +++ b/modules/angular2/src/web_workers/shared/post_message_bus.ts @@ -3,9 +3,11 @@ import { MessageBusSource, MessageBusSink } from "angular2/src/web_workers/shared/message_bus"; +import {BaseException} from 'angular2/src/core/facade/lang'; import {EventEmitter} from 'angular2/src/core/facade/async'; import {StringMap, StringMapWrapper} from 'angular2/src/core/facade/collection'; import {Injectable} from "angular2/src/core/di"; +import {NgZone} from 'angular2/src/core/zone/ng_zone'; /** * A TypeScript implementation of {@link MessageBus} for communicating via JavaScript's @@ -13,62 +15,131 @@ import {Injectable} from "angular2/src/core/di"; */ @Injectable() export class PostMessageBus implements MessageBus { - constructor(private _sink: PostMessageBusSink, private _source: PostMessageBusSource) {} + constructor(public sink: PostMessageBusSink, public source: PostMessageBusSource) {} - from(channel: string): EventEmitter { return this._source.from(channel); } + attachToZone(zone: NgZone): void { + this.source.attachToZone(zone); + this.sink.attachToZone(zone); + } - to(channel: string): EventEmitter { return this._sink.to(channel); } + initChannel(channel: string, runInZone: boolean = true): void { + this.source.initChannel(channel, runInZone); + this.sink.initChannel(channel, runInZone); + } + + from(channel: string): EventEmitter { return this.source.from(channel); } + + to(channel: string): EventEmitter { return this.sink.to(channel); } } export class PostMessageBusSink implements MessageBusSink { - private _channels: StringMap = StringMapWrapper.create(); + private _zone: NgZone; + private _channels: StringMap = StringMapWrapper.create(); + private _messageBuffer: Array = []; constructor(private _postMessageTarget: PostMessageTarget) {} - public to(channel: string): EventEmitter { + attachToZone(zone: NgZone): void { + this._zone = zone; + this._zone.overrideOnEventDone(() => this._handleOnEventDone(), false); + } + + initChannel(channel: string, runInZone: boolean = true): void { if (StringMapWrapper.contains(this._channels, channel)) { - return this._channels[channel]; - } else { - var emitter = new EventEmitter(); - emitter.observer({ - next: (message: Object) => { - this._postMessageTarget.postMessage({channel: channel, message: message}); + throw new BaseException(`${channel} has already been initialized`); + } + + var emitter = new EventEmitter(); + var channelInfo = new _Channel(emitter, runInZone); + this._channels[channel] = channelInfo; + emitter.observer({ + next: (data: Object) => { + var message = {channel: channel, message: data}; + if (runInZone) { + this._messageBuffer.push(message); + } else { + this._sendMessages([message]); } - }); - return emitter; + } + }); + } + + to(channel: string): EventEmitter { + if (StringMapWrapper.contains(this._channels, channel)) { + return this._channels[channel].emitter; + } else { + throw new BaseException(`${channel} is not set up. Did you forget to call initChannel?`); + } + } + + private _handleOnEventDone() { + // TODO: Send all buffered messages in one postMessage call + this._sendMessages(this._messageBuffer); + this._messageBuffer = []; + } + + private _sendMessages(messages: Array) { this._postMessageTarget.postMessage(messages); } +} + +export class PostMessageBusSource implements MessageBusSource { + private _zone: NgZone; + private _channels: StringMap = StringMapWrapper.create(); + + constructor(eventTarget?: EventTarget) { + if (eventTarget) { + eventTarget.addEventListener("message", (ev: MessageEvent) => this._handleMessages(ev)); + } else { + // if no eventTarget is given we assume we're in a WebWorker and listen on the global scope + addEventListener("message", (ev: MessageEvent) => this._handleMessages(ev)); + } + } + + attachToZone(zone: NgZone) { this._zone = zone; } + + initChannel(channel: string, runInZone: boolean = true) { + if (StringMapWrapper.contains(this._channels, channel)) { + throw new BaseException(`${channel} has already been initialized`); + } + + var emitter = new EventEmitter(); + var channelInfo = new _Channel(emitter, runInZone); + this._channels[channel] = channelInfo; + } + + from(channel: string): EventEmitter { + if (StringMapWrapper.contains(this._channels, channel)) { + return this._channels[channel].emitter; + } else { + throw new BaseException(`${channel} is not set up. Did you forget to call initChannel?`); + } + } + + private _handleMessages(ev: MessageEvent): void { + var messages = ev.data; + for (var i = 0; i < messages.length; i++) { + this._handleMessage(messages[i]); + } + } + + private _handleMessage(data: any): void { + var channel = data.channel; + if (StringMapWrapper.contains(this._channels, channel)) { + var channelInfo = this._channels[channel]; + if (channelInfo.runInZone) { + this._zone.run(() => { channelInfo.emitter.next(data.message); }); + } else { + channelInfo.emitter.next(data.message); + } } } } -export class PostMessageBusSource implements MessageBusSource { - private _channels: StringMap = StringMapWrapper.create(); - - constructor(eventTarget?: EventTarget) { - if (eventTarget) { - eventTarget.addEventListener("message", (ev: MessageEvent) => this._handleMessage(ev)); - } else { - // if no eventTarget is given we assume we're in a WebWorker and listen on the global scope - addEventListener("message", (ev: MessageEvent) => this._handleMessage(ev)); - } - } - - private _handleMessage(ev: MessageEvent) { - var data = ev.data; - var channel = data.channel; - if (StringMapWrapper.contains(this._channels, channel)) { - this._channels[channel].next(data.message); - } - } - - public from(channel: string): EventEmitter { - if (StringMapWrapper.contains(this._channels, channel)) { - return this._channels[channel]; - } else { - var emitter = new EventEmitter(); - this._channels[channel] = emitter; - return emitter; - } - } +/** + * Helper class that wraps a channel's {@link EventEmitter} and + * keeps track of if it should run in the zone. + */ +class _Channel { + constructor(public emitter: EventEmitter, public runInZone: boolean) {} } // TODO(jteplitz602) Replace this with the definition in lib.webworker.d.ts(#3492) diff --git a/modules/angular2/src/web_workers/shared/service_message_broker.ts b/modules/angular2/src/web_workers/shared/service_message_broker.ts index d4bec431a9..381e95b78e 100644 --- a/modules/angular2/src/web_workers/shared/service_message_broker.ts +++ b/modules/angular2/src/web_workers/shared/service_message_broker.ts @@ -14,7 +14,11 @@ import { export class ServiceMessageBrokerFactory { constructor(private _messageBus: MessageBus, protected _serializer: Serializer) {} - createMessageBroker(channel: string): ServiceMessageBroker { + /** + * Initializes the given channel and attaches a new {@link ServiceMessageBroker} to it. + */ + createMessageBroker(channel: string, runInZone: boolean = true): ServiceMessageBroker { + this._messageBus.initChannel(channel, runInZone); return new ServiceMessageBroker(this._messageBus, this._serializer, channel); } } diff --git a/modules/angular2/src/web_workers/ui/application.dart b/modules/angular2/src/web_workers/ui/application.dart index be70e2485e..c114ef2e45 100644 --- a/modules/angular2/src/web_workers/ui/application.dart +++ b/modules/angular2/src/web_workers/ui/application.dart @@ -5,7 +5,8 @@ import 'dart:async'; import 'dart:core'; import 'package:angular2/src/web_workers/shared/message_bus.dart' show MessageBus; -import 'package:angular2/src/web_workers/ui/impl.dart' show bootstrapUICommon, WebWorkerApplication; +import 'package:angular2/src/web_workers/ui/impl.dart' + show bootstrapUICommon, WebWorkerApplication; import 'package:angular2/src/web_workers/shared/isolate_message_bus.dart'; /** @@ -37,7 +38,7 @@ Future spawnWebWorker(Uri uri) async { class UIMessageBusSource extends IsolateMessageBusSource { UIMessageBusSource(ReceivePort port) : super(port); - Future get sink => rawDataStream.firstWhere((message) { + Future get sink => stream.firstWhere((message) { return message is SendPort; }); } diff --git a/modules/angular2/src/web_workers/ui/impl.ts b/modules/angular2/src/web_workers/ui/impl.ts index 02f9708350..af6fe2e21e 100644 --- a/modules/angular2/src/web_workers/ui/impl.ts +++ b/modules/angular2/src/web_workers/ui/impl.ts @@ -32,6 +32,7 @@ export function bootstrapUICommon(bus: MessageBus): WebWorkerApplication { BrowserDomAdapter.makeCurrent(); var zone = createNgZone(); wtfInit(); + bus.attachToZone(zone); return zone.run(() => { var injector = createInjector(zone, bus); injector.get(MessageBasedRenderCompiler).start(); @@ -47,11 +48,11 @@ export class WebWorkerApplication { constructor(private _clientMessageBrokerFactory: ClientMessageBrokerFactory, private _serviceMessageBrokerFactory: ServiceMessageBrokerFactory) {} - createClientMessageBroker(channel: string): ClientMessageBroker { - return this._clientMessageBrokerFactory.createMessageBroker(channel); + createClientMessageBroker(channel: string, runInZone: boolean = true): ClientMessageBroker { + return this._clientMessageBrokerFactory.createMessageBroker(channel, runInZone); } - createServiceMessageBroker(channel: string): ServiceMessageBroker { - return this._serviceMessageBrokerFactory.createMessageBroker(channel); + createServiceMessageBroker(channel: string, runInZone: boolean = true): ServiceMessageBroker { + return this._serviceMessageBrokerFactory.createMessageBroker(channel, runInZone); } } diff --git a/modules/angular2/src/web_workers/ui/render_compiler.ts b/modules/angular2/src/web_workers/ui/render_compiler.ts index 8ed6744d70..3a7b962a7b 100644 --- a/modules/angular2/src/web_workers/ui/render_compiler.ts +++ b/modules/angular2/src/web_workers/ui/render_compiler.ts @@ -17,7 +17,7 @@ export class MessageBasedRenderCompiler { private _renderCompiler: RenderCompiler) {} start(): void { - var broker = this._brokerFactory.createMessageBroker(RENDER_COMPILER_CHANNEL); + var broker = this._brokerFactory.createMessageBroker(RENDER_COMPILER_CHANNEL, false); broker.registerMethod("compileHost", [RenderDirectiveMetadata], bind(this._renderCompiler.compileHost, this._renderCompiler), ProtoViewDto); diff --git a/modules/angular2/src/web_workers/ui/renderer.ts b/modules/angular2/src/web_workers/ui/renderer.ts index 866a899269..5aa6c59650 100644 --- a/modules/angular2/src/web_workers/ui/renderer.ts +++ b/modules/angular2/src/web_workers/ui/renderer.ts @@ -26,6 +26,7 @@ export class MessageBasedRenderer { start(): void { var broker = this._brokerFactory.createMessageBroker(RENDERER_CHANNEL); + this._bus.initChannel(EVENT_CHANNEL); broker.registerMethod("createRootHostView", [RenderProtoViewRef, PRIMITIVE, PRIMITIVE, PRIMITIVE], bind(this._createRootHostView, this)); diff --git a/modules/angular2/src/web_workers/ui/setup.ts b/modules/angular2/src/web_workers/ui/setup.ts index cfcd017b58..355ba57e06 100644 --- a/modules/angular2/src/web_workers/ui/setup.ts +++ b/modules/angular2/src/web_workers/ui/setup.ts @@ -14,6 +14,7 @@ export class WebWorkerSetup { } start(): void { + this._bus.initChannel(SETUP_CHANNEL, false); var sink = this._bus.to(SETUP_CHANNEL); var source = this._bus.from(SETUP_CHANNEL); diff --git a/modules/angular2/src/web_workers/worker/application.ts b/modules/angular2/src/web_workers/worker/application.ts index 0749b2027a..fe2dd1d929 100644 --- a/modules/angular2/src/web_workers/worker/application.ts +++ b/modules/angular2/src/web_workers/worker/application.ts @@ -30,8 +30,10 @@ export function bootstrapWebWorker( appComponentType: Type, componentInjectableBindings: Array = null): Promise { var sink = new PostMessageBusSink({ - postMessage: - (message: any, transferrables?:[ArrayBuffer]) => { _postMessage(message, transferrables); } + postMessage: (message: any, transferrables?:[ArrayBuffer]) => { + console.log("Sending", message); + _postMessage(message, transferrables); + } }); var source = new PostMessageBusSource(); var bus = new PostMessageBus(sink, source); diff --git a/modules/angular2/src/web_workers/worker/application_common.ts b/modules/angular2/src/web_workers/worker/application_common.ts index d9247cc1fa..1521c64139 100644 --- a/modules/angular2/src/web_workers/worker/application_common.ts +++ b/modules/angular2/src/web_workers/worker/application_common.ts @@ -146,14 +146,13 @@ export function bootstrapWebWorkerCommon( var bootstrapProcess: PromiseCompleter = PromiseWrapper.completer(); var zone = new NgZone({enableLongStackTrace: assertionsEnabled()}); - zone.run(() => { - // TODO(rado): prepopulate template cache, so applications with only - // index.html and main.js are possible. - // + bus.attachToZone(zone); - var subscription: any; - var emitter = bus.from(SETUP_CHANNEL); - subscription = ObservableWrapper.subscribe(emitter, (message: StringMap) => { + var subscription: any; + bus.initChannel(SETUP_CHANNEL, false); + var emitter = bus.from(SETUP_CHANNEL); + subscription = ObservableWrapper.subscribe(emitter, (message: StringMap) => { + zone.run(() => { var exceptionHandler; try { var appInjector = @@ -167,7 +166,6 @@ export function bootstrapWebWorkerCommon( var lc = appInjector.get(LifeCycle); lc.registerWith(zone, appChangeDetector); lc.tick(); // the first tick that will bootstrap the app - bootstrapProcess.resolve(new ApplicationRef(componentRef, appComponentType, appInjector)); }; @@ -185,9 +183,8 @@ export function bootstrapWebWorkerCommon( bootstrapProcess.reject(e, e.stack); } }); - - ObservableWrapper.callNext(bus.to(SETUP_CHANNEL), "ready"); }); + ObservableWrapper.callNext(bus.to(SETUP_CHANNEL), "ready"); return bootstrapProcess.promise; } diff --git a/modules/angular2/src/web_workers/worker/event_dispatcher.ts b/modules/angular2/src/web_workers/worker/event_dispatcher.ts index b9d52489cb..e48a163a50 100644 --- a/modules/angular2/src/web_workers/worker/event_dispatcher.ts +++ b/modules/angular2/src/web_workers/worker/event_dispatcher.ts @@ -6,14 +6,14 @@ import {EVENT_CHANNEL} from 'angular2/src/web_workers/shared/messaging_api'; import {MessageBus} from 'angular2/src/web_workers/shared/message_bus'; import {EventEmitter, ObservableWrapper} from 'angular2/src/core/facade/async'; import {deserializeGenericEvent} from './event_deserializer'; -import {NgZone} from 'angular2/src/core/zone/ng_zone'; @Injectable() export class WebWorkerEventDispatcher { private _eventDispatchRegistry: Map = new Map(); - constructor(bus: MessageBus, private _serializer: Serializer, private _zone: NgZone) { + constructor(bus: MessageBus, private _serializer: Serializer) { + bus.initChannel(EVENT_CHANNEL); var source = bus.from(EVENT_CHANNEL); ObservableWrapper.subscribe( source, (message) => this._dispatchEvent(new RenderEventData(message, _serializer))); @@ -22,10 +22,8 @@ export class WebWorkerEventDispatcher { private _dispatchEvent(eventData: RenderEventData): void { var dispatcher = this._eventDispatchRegistry.get(eventData.viewRef); - this._zone.run(() => { - eventData.locals['$event'] = deserializeGenericEvent(eventData.locals['$event']); - dispatcher.dispatchRenderEvent(eventData.elementIndex, eventData.eventName, eventData.locals); - }); + eventData.locals['$event'] = deserializeGenericEvent(eventData.locals['$event']); + dispatcher.dispatchRenderEvent(eventData.elementIndex, eventData.eventName, eventData.locals); } registerEventDispatcher(viewRef: RenderViewRef, dispatcher: RenderEventDispatcher): void { diff --git a/modules/angular2/test/core/zone/ng_zone_spec.ts b/modules/angular2/test/core/zone/ng_zone_spec.ts index 51729dd811..48659bb6a5 100644 --- a/modules/angular2/test/core/zone/ng_zone_spec.ts +++ b/modules/angular2/test/core/zone/ng_zone_spec.ts @@ -186,6 +186,20 @@ function commonTests() { }, 80); })); + it('should call standalone onEventDone', inject([AsyncTestCompleter], (async) => { + _zone.overrideOnTurnStart(null); + _zone.overrideOnEventDone(() => { _log.add('onEventDone'); }); + + _zone.overrideOnTurnDone(null); + + macroTask(() => { _zone.run(_log.fn('run')); }); + + macroTask(() => { + expect(_log.result()).toEqual('run; onEventDone'); + async.done(); + }, 80); + })); + it('should not allow onEventDone to cause further digests', inject([AsyncTestCompleter], (async) => { _zone.overrideOnTurnStart(null); diff --git a/modules/angular2/test/web_workers/debug_tools/message_bus_common.dart b/modules/angular2/test/web_workers/debug_tools/message_bus_common.dart index 4c7c0cc994..dd4fdacf2d 100644 --- a/modules/angular2/test/web_workers/debug_tools/message_bus_common.dart +++ b/modules/angular2/test/web_workers/debug_tools/message_bus_common.dart @@ -19,5 +19,5 @@ void expectSinkSendsEncodedJson(SpyObject socket, MessageBusSink sink, void expectMessageEquality(String message, Map expectedData, String channel) { expect(JSON.decode(message)) - .toEqual({'channel': channel, 'message': expectedData}); + .toEqual([{'channel': channel, 'message': expectedData}]); } diff --git a/modules/angular2/test/web_workers/debug_tools/multi_client_server_message_bus.server.spec.dart b/modules/angular2/test/web_workers/debug_tools/multi_client_server_message_bus.server.spec.dart index 561efbfcbd..164e93fa58 100644 --- a/modules/angular2/test/web_workers/debug_tools/multi_client_server_message_bus.server.spec.dart +++ b/modules/angular2/test/web_workers/debug_tools/multi_client_server_message_bus.server.spec.dart @@ -38,6 +38,7 @@ main() { inject([AsyncTestCompleter], (async) { const NUM_CLIENTS = 5; var sink = new MultiClientServerMessageBusSink(); + sink.initChannel(CHANNEL, false); int numMessagesSent = 0; // initialize all the sockets var sockets = new List(NUM_CLIENTS); @@ -78,7 +79,7 @@ main() { for (var i = 0; i < numMessages; i++) { var message = {'value': random.nextInt(MAX)}; messageHistory - .add(JSON.encode({'channel': CHANNEL, 'message': message})); + .add(JSON.encode([{'channel': CHANNEL, 'message': message}])); } // copy the message history to ensure the test fails if the wrapper modifies the list return new List.from(messageHistory); @@ -136,7 +137,7 @@ main() { }); void sendMessage(StreamController controller, dynamic message) { - controller.add(JSON.encode(message)); + controller.add(JSON.encode([message])); } void testForwardingMessages(bool primary, bool events, Function done) { @@ -146,10 +147,11 @@ main() { new WebSocketWrapper(messageHistory, resultMarkers, result.socket); socket.setPrimary(primary); - var source = new MultiClientServerMessageBusSource(null); + var source = new MultiClientServerMessageBusSource(); source.addConnection(socket); var channel = events ? EVENT_CHANNEL : CHANNEL; + source.initChannel(channel, false); source.from(channel).listen((message) { expect(message).toEqual(MESSAGE); done(); @@ -187,7 +189,9 @@ main() { socket.setPrimary(true); var source = - new MultiClientServerMessageBusSource(() => async.done()); + new MultiClientServerMessageBusSource(); + source.onResult.listen((result) => async.done()); + source.initChannel(CHANNEL, false); source.addConnection(socket); var message = { diff --git a/modules/angular2/test/web_workers/debug_tools/single_client_server_message_bus.server.spec.dart b/modules/angular2/test/web_workers/debug_tools/single_client_server_message_bus.server.spec.dart index d53b7d97e9..1ec526deb0 100644 --- a/modules/angular2/test/web_workers/debug_tools/single_client_server_message_bus.server.spec.dart +++ b/modules/angular2/test/web_workers/debug_tools/single_client_server_message_bus.server.spec.dart @@ -28,6 +28,8 @@ main() { inject([AsyncTestCompleter], (async) { var socket = new SpyWebSocket(); var sink = new SingleClientServerMessageBusSink(); + sink.initChannel(CHANNEL, false); + sink.setConnection(socket); expectSinkSendsEncodedJson(socket, sink, "add", async); })); @@ -36,6 +38,7 @@ main() { "should buffer messages before connect", inject([AsyncTestCompleter], (async) { var sink = new SingleClientServerMessageBusSink(); + sink.initChannel(CHANNEL, false); sink.to(CHANNEL).add(MESSAGE); var socket = new SpyWebSocket(); @@ -51,6 +54,8 @@ main() { inject([AsyncTestCompleter], (async) { var SECOND_MESSAGE = const {'test': 12, 'second': 'hi'}; var sink = new SingleClientServerMessageBusSink(); + sink.initChannel(CHANNEL, false); + sink.to(CHANNEL).add(MESSAGE); var socket = new SpyWebSocket(); @@ -78,20 +83,19 @@ main() { it( "should decode JSON messages and emit them", inject([AsyncTestCompleter], (async) { - var socket = new SpyWebSocket(); StreamController controller = new StreamController.broadcast(); - socket.spy("asBroadcastStream").andCallFake(() => controller.stream); var source = new SingleClientServerMessageBusSource(); - source.setConnectionFromWebSocket(socket); + source.initChannel(CHANNEL, false); + source.attachTo(controller.stream); source.from(CHANNEL).listen((message) { expect(message).toEqual(MESSAGE); async.done(); }); controller - .add(JSON.encode({'channel': CHANNEL, 'message': MESSAGE})); + .add(JSON.encode([{'channel': CHANNEL, 'message': MESSAGE}])); })); }); } diff --git a/modules/angular2/test/web_workers/debug_tools/web_socket_message_bus_spec.dart b/modules/angular2/test/web_workers/debug_tools/web_socket_message_bus_spec.dart index 040ab05d17..ce22b9a501 100644 --- a/modules/angular2/test/web_workers/debug_tools/web_socket_message_bus_spec.dart +++ b/modules/angular2/test/web_workers/debug_tools/web_socket_message_bus_spec.dart @@ -28,6 +28,7 @@ main() { inject([AsyncTestCompleter], (async) { var socket = new SpyWebSocket(); var sink = new WebSocketMessageBusSink(socket); + sink.initChannel(CHANNEL, false); expectSinkSendsEncodedJson(socket, sink, "send", async); })); }); @@ -41,6 +42,7 @@ main() { new StreamController.broadcast(); socket.spy("get:onMessage").andCallFake(() => controller.stream); var source = new WebSocketMessageBusSource(socket); + source.initChannel(CHANNEL, false); source.from(CHANNEL).listen((message) { expect(message).toEqual(MESSAGE); @@ -49,7 +51,7 @@ main() { var event = new SpyMessageEvent(); event.spy("get:data").andCallFake( - () => JSON.encode({'channel': CHANNEL, 'message': MESSAGE})); + () => JSON.encode([{'channel': CHANNEL, 'message': MESSAGE}])); controller.add(event); })); }); diff --git a/modules/angular2/test/web_workers/shared/message_bus_spec.ts b/modules/angular2/test/web_workers/shared/message_bus_spec.ts index efcb1ff219..4ffc44553a 100644 --- a/modules/angular2/test/web_workers/shared/message_bus_spec.ts +++ b/modules/angular2/test/web_workers/shared/message_bus_spec.ts @@ -10,9 +10,11 @@ import { SpyObject, proxy } from 'angular2/test_lib'; -import {ObservableWrapper} from 'angular2/src/core/facade/async'; +import {ObservableWrapper, TimerWrapper} from 'angular2/src/core/facade/async'; import {MessageBus} from 'angular2/src/web_workers/shared/message_bus'; import {createConnectedMessageBus} from './message_bus_util'; +import {MockNgZone} from 'angular2/src/mock/ng_zone_mock'; +import {NgZone} from 'angular2/src/core/zone/ng_zone'; export function main() { /** @@ -27,6 +29,7 @@ export function main() { inject([AsyncTestCompleter], (async) => { const CHANNEL = "CHANNEL 1"; const MESSAGE = "Test message"; + bus.initChannel(CHANNEL, false); var fromEmitter = bus.from(CHANNEL); ObservableWrapper.subscribe(fromEmitter, (message: any) => { @@ -41,6 +44,7 @@ export function main() { const CHANNEL = "CHANNEL 1"; const MESSAGE = "TESTING"; const NUM_LISTENERS = 2; + bus.initChannel(CHANNEL, false); var callCount = 0; var emitHandler = (message: any) => { @@ -66,6 +70,8 @@ export function main() { const MESSAGE_ONE = "This is a message on CHANNEL 1"; const MESSAGE_TWO = "This is a message on CHANNEL 2"; var callCount = 0; + bus.initChannel(CHANNEL_ONE, false); + bus.initChannel(CHANNEL_TWO, false); var firstFromEmitter = bus.from(CHANNEL_ONE); ObservableWrapper.subscribe(firstFromEmitter, (message) => { @@ -91,4 +97,55 @@ export function main() { ObservableWrapper.callNext(secondToEmitter, MESSAGE_TWO); })); }); + + describe("PostMessageBusSink", () => { + var bus: MessageBus; + const CHANNEL = "Test Channel"; + + function setup(runInZone: boolean, zone: NgZone) { + bus.attachToZone(zone); + bus.initChannel(CHANNEL, runInZone); + } + + /** + * Flushes pending messages and then runs the given function. + */ + function flushMessages(fn: () => void) { TimerWrapper.setTimeout(fn, 10); } + + beforeEach(() => { bus = createConnectedMessageBus(); }); + + it("should buffer messages and wait for the zone to exit before sending", + inject([AsyncTestCompleter, NgZone], (async, zone: MockNgZone) => { + setup(true, zone); + + var wasCalled = false; + ObservableWrapper.subscribe(bus.from(CHANNEL), (message) => { wasCalled = true; }); + ObservableWrapper.callNext(bus.to(CHANNEL), "hi"); + + + flushMessages(() => { + expect(wasCalled).toBeFalsy(); + + zone.simulateZoneExit(); + flushMessages(() => { + expect(wasCalled).toBeTruthy(); + async.done(); + }); + }); + })); + + it("should send messages immediatly when run outside the zone", + inject([AsyncTestCompleter, NgZone], (async, zone: MockNgZone) => { + setup(false, zone); + + var wasCalled = false; + ObservableWrapper.subscribe(bus.from(CHANNEL), (message) => { wasCalled = true; }); + ObservableWrapper.callNext(bus.to(CHANNEL), "hi"); + + flushMessages(() => { + expect(wasCalled).toBeTruthy(); + async.done(); + }); + })); + }); } diff --git a/modules/angular2/test/web_workers/shared/service_message_broker_spec.ts b/modules/angular2/test/web_workers/shared/service_message_broker_spec.ts index 422bdb40ab..38000a9748 100644 --- a/modules/angular2/test/web_workers/shared/service_message_broker_spec.ts +++ b/modules/angular2/test/web_workers/shared/service_message_broker_spec.ts @@ -39,7 +39,11 @@ export function main() { describe("UIMessageBroker", () => { var messageBuses; - beforeEach(() => { messageBuses = createPairedMessageBuses(); }); + beforeEach(() => { + messageBuses = createPairedMessageBuses(); + messageBuses.ui.initChannel(CHANNEL); + messageBuses.worker.initChannel(CHANNEL); + }); it("should call registered method with correct arguments", inject([Serializer], (serializer) => { var broker = new ServiceMessageBroker(messageBuses.ui, serializer, CHANNEL); diff --git a/modules/angular2/test/web_workers/shared/web_worker_test_util.ts b/modules/angular2/test/web_workers/shared/web_worker_test_util.ts index d6a32f94e1..dbfac75fbe 100644 --- a/modules/angular2/test/web_workers/shared/web_worker_test_util.ts +++ b/modules/angular2/test/web_workers/shared/web_worker_test_util.ts @@ -5,6 +5,8 @@ import { MessageBus } from 'angular2/src/web_workers/shared/message_bus'; import {MockEventEmitter} from './mock_event_emitter'; +import {BaseException} from 'angular2/src/core/facade/lang'; +import {NgZone} from 'angular2/src/core/zone/ng_zone'; /** * Returns two MessageBus instances that are attached to each other. @@ -30,30 +32,56 @@ export class PairedMessageBuses { export class MockMessageBusSource implements MessageBusSource { constructor(private _channels: StringMap) {} - from(channel: string): MockEventEmitter { + initChannel(channel: string, runInZone = true) { if (!StringMapWrapper.contains(this._channels, channel)) { this._channels[channel] = new MockEventEmitter(); } + } + + from(channel: string): MockEventEmitter { + if (!StringMapWrapper.contains(this._channels, channel)) { + throw new BaseException(`${channel} is not set up. Did you forget to call initChannel?`); + } return this._channels[channel]; } + + attachToZone(zone: NgZone) {} } export class MockMessageBusSink implements MessageBusSink { constructor(private _channels: StringMap) {} + initChannel(channel: string, runInZone = true) { + if (!StringMapWrapper.contains(this._channels, channel)) { + this._channels[channel] = new MockEventEmitter(); + } + } + to(channel: string): MockEventEmitter { if (!StringMapWrapper.contains(this._channels, channel)) { this._channels[channel] = new MockEventEmitter(); } return this._channels[channel]; } + + attachToZone(zone: NgZone) {} } +/** + * Mock implementation of the {@link MessageBus} for tests. + * Runs syncronously, and does not support running within the zone. + */ export class MockMessageBus extends MessageBus { constructor(public sink: MockMessageBusSink, public source: MockMessageBusSource) { super(); } + initChannel(channel: string, runInZone = true) { + this.sink.initChannel(channel, runInZone); + this.source.initChannel(channel, runInZone); + } to(channel: string): MockEventEmitter { return this.sink.to(channel); } from(channel: string): MockEventEmitter { return this.source.from(channel); } + + attachToZone(zone: NgZone) {} } diff --git a/modules/angular2/test/web_workers/worker/event_dispatcher_spec.ts b/modules/angular2/test/web_workers/worker/event_dispatcher_spec.ts index 44ecbca7d2..1bcd344022 100644 --- a/modules/angular2/test/web_workers/worker/event_dispatcher_spec.ts +++ b/modules/angular2/test/web_workers/worker/event_dispatcher_spec.ts @@ -11,7 +11,6 @@ import { proxy } from 'angular2/test_lib'; import {Serializer} from 'angular2/src/web_workers/shared/serializer'; -import {NgZone} from 'angular2/src/core/zone/ng_zone'; import {ON_WEB_WORKER} from 'angular2/src/web_workers/shared/api'; import {bind} from 'angular2/core'; import {RenderProtoViewRefStore} from 'angular2/src/web_workers/shared/render_proto_view_ref_store'; @@ -34,11 +33,10 @@ export function main() { RenderViewWithFragmentsStore ]); - it("should dispatch events", - inject([Serializer, NgZone, AsyncTestCompleter], (serializer, zone, async) => { + it("should dispatch events", inject([Serializer, AsyncTestCompleter], (serializer, async) => { var messageBuses = createPairedMessageBuses(); var webWorkerEventDispatcher = - new WebWorkerEventDispatcher(messageBuses.worker, serializer, zone); + new WebWorkerEventDispatcher(messageBuses.worker, serializer); var elementIndex = 15; var eventName = 'click'; diff --git a/modules/angular2/test/web_workers/worker/xhr_impl_spec.ts b/modules/angular2/test/web_workers/worker/xhr_impl_spec.ts index cfcbf47812..d343893129 100644 --- a/modules/angular2/test/web_workers/worker/xhr_impl_spec.ts +++ b/modules/angular2/test/web_workers/worker/xhr_impl_spec.ts @@ -45,5 +45,5 @@ export function main() { class MockMessageBrokerFactory extends ClientMessageBrokerFactory { constructor(private _messageBroker: ClientMessageBroker) { super(null, null); } - createMessageBroker(channel: string) { return this._messageBroker; } + createMessageBroker(channel: string, runInZone = true) { return this._messageBroker; } } diff --git a/modules/examples/src/web_workers/message_broker/index.dart b/modules/examples/src/web_workers/message_broker/index.dart index 4207b4364e..f6f3cf6202 100644 --- a/modules/examples/src/web_workers/message_broker/index.dart +++ b/modules/examples/src/web_workers/message_broker/index.dart @@ -9,7 +9,7 @@ main() { reflector.reflectionCapabilities = new ReflectionCapabilities(); const ECHO_CHANNEL = "ECHO"; bootstrap("background_index.dart").then((instance) { - var broker = instance.app.createClientMessageBroker(ECHO_CHANNEL); + var broker = instance.app.createClientMessageBroker(ECHO_CHANNEL, false); querySelector("#send_echo").addEventListener("click", (e) { var val = (querySelector("#echo_input") as InputElement).value; var args = new UiArguments("echo", [new FnArg(val, PRIMITIVE)]); diff --git a/modules/examples/src/web_workers/message_broker/index.ts b/modules/examples/src/web_workers/message_broker/index.ts index 5b9b4541a2..e4bb79f8e0 100644 --- a/modules/examples/src/web_workers/message_broker/index.ts +++ b/modules/examples/src/web_workers/message_broker/index.ts @@ -3,7 +3,7 @@ import {bootstrap, UiArguments, FnArg, PRIMITIVE} from "angular2/web_worker/ui"; const ECHO_CHANNEL = "ECHO"; var instance = bootstrap("loader.js"); -var broker = instance.app.createClientMessageBroker(ECHO_CHANNEL); +var broker = instance.app.createClientMessageBroker(ECHO_CHANNEL, false); document.getElementById("send_echo") .addEventListener("click", (e) => { diff --git a/modules/examples/src/web_workers/message_broker/index_common.ts b/modules/examples/src/web_workers/message_broker/index_common.ts index c2fc51ee42..193a323a94 100644 --- a/modules/examples/src/web_workers/message_broker/index_common.ts +++ b/modules/examples/src/web_workers/message_broker/index_common.ts @@ -7,7 +7,7 @@ const ECHO_CHANNEL = "ECHO"; @View({template: "

WebWorker MessageBroker Test

"}) export class App { constructor(private _serviceBrokerFactory: ServiceMessageBrokerFactory) { - var broker = _serviceBrokerFactory.createMessageBroker(ECHO_CHANNEL); + var broker = _serviceBrokerFactory.createMessageBroker(ECHO_CHANNEL, false); broker.registerMethod("echo", [PRIMITIVE], this._echo, PRIMITIVE); } diff --git a/modules/examples/src/web_workers/todo/index_web_socket.dart b/modules/examples/src/web_workers/todo/index_web_socket.dart index ad8548258a..a1faf9471e 100644 --- a/modules/examples/src/web_workers/todo/index_web_socket.dart +++ b/modules/examples/src/web_workers/todo/index_web_socket.dart @@ -10,7 +10,9 @@ import 'dart:html' main() { reflector.reflectionCapabilities = new ReflectionCapabilities(); var webSocket = new WebSocket("ws://127.0.0.1:1337/ws"); - var bus = new WebSocketMessageBus.fromWebSocket(webSocket); + webSocket.onOpen.listen((e) { + var bus = new WebSocketMessageBus.fromWebSocket(webSocket); - bootstrapUICommon(bus); + bootstrapUICommon(bus); + }); } diff --git a/modules/examples/src/web_workers/todo/server_index.dart b/modules/examples/src/web_workers/todo/server_index.dart index 16150aaa5a..918a63d724 100644 --- a/modules/examples/src/web_workers/todo/server_index.dart +++ b/modules/examples/src/web_workers/todo/server_index.dart @@ -12,5 +12,6 @@ void main() { HttpServer.bind('127.0.0.1', 1337).then((HttpServer server) { var bus = new MultiClientServerMessageBus.fromHttpServer(server); bootstrapWebWorkerCommon(TodoApp, bus).catchError((error) => throw error); + print ("Server Listening for requests on 127.0.0.1:1337"); }); }