feat(WebWorkers): Add WebSocket MessageBuses for debugging apps

Closes #3858
This commit is contained in:
Jason Teplitz
2015-08-26 10:41:41 -07:00
parent 9f576b0233
commit 4ba4427510
24 changed files with 959 additions and 46 deletions

View File

@ -47,7 +47,7 @@ import {
import {QueryList} from './query_list';
import {reflector} from 'angular2/src/core/reflection/reflection';
import {RenderDirectiveMetadata} from 'angular2/src/core/render/api';
import {EventConfig} from 'angular2/src/core/render/dom/util';
import {EventConfig} from 'angular2/src/core/render/event_config';
import {PipeBinding} from '../pipes/pipe_binding';
var _staticKeys;

View File

@ -1,7 +1,13 @@
import {Injectable} from 'angular2/di';
import {List, ListWrapper, MapWrapper} from 'angular2/src/core/facade/collection';
import {isPresent, isBlank, BaseException, assertionsEnabled} from 'angular2/src/core/facade/lang';
import {
StringWrapper,
isPresent,
isBlank,
BaseException,
assertionsEnabled
} from 'angular2/src/core/facade/lang';
import {reflector} from 'angular2/src/core/reflection/reflection';
import {
@ -505,7 +511,7 @@ function _findDirectiveIndexByExportAs(renderElementBinder, directiveBindings, e
}
}
if (isBlank(matchedDirective) && exportAs !== "$implicit") {
if (isBlank(matchedDirective) && !StringWrapper.equals(exportAs, "$implicit")) {
throw new BaseException(`Cannot find directive with exportAs = '${exportAs}'`);
}

View File

@ -10,7 +10,8 @@ import {CompileElement} from './compile_element';
import {CompileControl} from './compile_control';
import {RenderDirectiveMetadata} from '../../api';
import {EventConfig, dashCaseToCamelCase, camelCaseToDashCase} from '../util';
import {dashCaseToCamelCase, camelCaseToDashCase} from '../util';
import {EventConfig} from '../../event_config';
import {DirectiveBuilder, ElementBinderBuilder} from '../view/proto_view_builder';
/**

View File

@ -8,8 +8,6 @@ import {TemplateCloner} from './template_cloner';
export const NG_BINDING_CLASS_SELECTOR = '.ng-binding';
export const NG_BINDING_CLASS = 'ng-binding';
export const EVENT_TARGET_SEPARATOR = ':';
export const NG_CONTENT_ELEMENT_NAME = 'ng-content';
export const NG_SHADOW_ROOT_ELEMENT_NAME = 'shadow-root';
@ -29,27 +27,6 @@ export function dashCaseToCamelCase(input: string): string {
(m) => { return m[1].toUpperCase(); });
}
export class EventConfig {
constructor(public fieldName: string, public eventName: string, public isLongForm: boolean) {}
static parse(eventConfig: string): EventConfig {
var fieldName = eventConfig, eventName = eventConfig, isLongForm = false;
var separatorIdx = eventConfig.indexOf(EVENT_TARGET_SEPARATOR);
if (separatorIdx > -1) {
// long format: 'fieldName: eventName'
fieldName = StringWrapper.substring(eventConfig, 0, separatorIdx).trim();
eventName = StringWrapper.substring(eventConfig, separatorIdx + 1).trim();
isLongForm = true;
}
return new EventConfig(fieldName, eventName, isLongForm);
}
getFullName(): string {
return this.isLongForm ? `${this.fieldName}${EVENT_TARGET_SEPARATOR}${this.eventName}` :
this.eventName;
}
}
// Attention: This is on the hot path, so don't use closures or default values!
export function queryBoundElements(templateContent: Node, isSingleElementChild: boolean):
Element[] {

View File

@ -34,12 +34,8 @@ import {
PropertyBindingType
} from '../../api';
import {
NG_BINDING_CLASS,
EVENT_TARGET_SEPARATOR,
queryBoundTextNodeIndices,
camelCaseToDashCase
} from '../util';
import {NG_BINDING_CLASS, queryBoundTextNodeIndices, camelCaseToDashCase} from '../util';
import {EVENT_TARGET_SEPARATOR} from "../../event_config";
export class ProtoViewBuilder {
variableBindings: Map<string, string> = new Map();

View File

@ -0,0 +1,23 @@
import {StringWrapper} from 'angular2/src/core/facade/lang';
export const EVENT_TARGET_SEPARATOR = ':';
export class EventConfig {
constructor(public fieldName: string, public eventName: string, public isLongForm: boolean) {}
static parse(eventConfig: string): EventConfig {
var fieldName = eventConfig, eventName = eventConfig, isLongForm = false;
var separatorIdx = eventConfig.indexOf(EVENT_TARGET_SEPARATOR);
if (separatorIdx > -1) {
// long format: 'fieldName: eventName'
fieldName = StringWrapper.substring(eventConfig, 0, separatorIdx).trim();
eventName = StringWrapper.substring(eventConfig, separatorIdx + 1).trim();
isLongForm = true;
}
return new EventConfig(fieldName, eventName, isLongForm);
}
getFullName(): string {
return this.isLongForm ? `${this.fieldName}${EVENT_TARGET_SEPARATOR}${this.eventName}` :
this.eventName;
}
}

View File

@ -0,0 +1,216 @@
library angular2.src.web_workers.debug_tools.multi_client_server_message_bus;
import "package:angular2/src/web_workers/shared/message_bus.dart"
show MessageBus, MessageBusSink, MessageBusSource;
import 'dart:io';
import 'dart:convert' show JSON;
import 'dart:async';
import 'package:angular2/src/core/facade/async.dart' show EventEmitter;
import 'package:angular2/src/web_workers/shared/messaging_api.dart';
// TODO(jteplitz602): Remove hard coded result type and
// clear messageHistory once app is done with it #3859
class MultiClientServerMessageBus implements MessageBus {
final MultiClientServerMessageBusSink sink;
MultiClientServerMessageBusSource source;
bool hasPrimary = false;
MultiClientServerMessageBus(this.sink, this.source);
MultiClientServerMessageBus.fromHttpServer(HttpServer server)
: sink = new MultiClientServerMessageBusSink() {
source = new MultiClientServerMessageBusSource(resultReceived);
server.listen((HttpRequest request) {
if (request.uri.path == "/ws") {
WebSocketTransformer.upgrade(request).then((WebSocket socket) {
var wrapper = new WebSocketWrapper(
sink.messageHistory, sink.resultMarkers, socket);
if (!hasPrimary) {
wrapper.setPrimary(true);
hasPrimary = true;
}
sink.addConnection(wrapper);
source.addConnection(wrapper);
wrapper.stream.listen(null, onDone: _handleDisconnect(wrapper));
});
}
});
}
void resultReceived() {
sink.resultReceived();
}
EventEmitter from(String channel) {
return source.from(channel);
}
EventEmitter to(String channel) {
return sink.to(channel);
}
Function _handleDisconnect(WebSocketWrapper wrapper) {
return () {
sink.removeConnection(wrapper);
if (wrapper.isPrimary) {
hasPrimary = false;
}
};
}
}
class WebSocketWrapper {
WebSocket socket;
Stream stream;
int _numResultsReceived = 0;
bool _isPrimary = false;
bool caughtUp = false;
List<String> _messageHistory;
List<int> _resultMarkers;
WebSocketWrapper(this._messageHistory, this._resultMarkers, this.socket) {
stream = socket.asBroadcastStream();
stream.listen((encodedMessage) {
var message = JSON.decode(encodedMessage)['message'];
if (message is Map && message.containsKey("type")) {
if (message['type'] == 'result') {
resultReceived();
}
}
});
}
bool get isPrimary => _isPrimary;
void resultReceived() {
if (!isPrimary && !caughtUp) {
_numResultsReceived++;
sendToMarker(_numResultsReceived);
}
}
void setPrimary(bool primary) {
_isPrimary = primary;
if (primary) {
caughtUp = true;
}
}
// Sends up to the given result marker
void sendToMarker(int markerIndex) {
int numMessages;
int curr;
if (markerIndex >= _resultMarkers.length) {
// we're past the final result marker so send all messages in history
curr = (_resultMarkers.length > 0)
? _resultMarkers[_resultMarkers.length - 1]
: 0;
numMessages = _messageHistory.length - curr;
caughtUp = true;
} else {
curr = (markerIndex == 0) ? 0 : _resultMarkers[markerIndex - 1];
var end = _resultMarkers[markerIndex];
numMessages = end - curr;
}
while (numMessages > 0) {
socket.add(_messageHistory[curr]);
curr++;
numMessages--;
}
}
}
class MultiClientServerMessageBusSink implements MessageBusSink {
final List<String> messageHistory = new List<String>();
final Set<WebSocketWrapper> openConnections = new Set<WebSocketWrapper>();
final Map<String, EventEmitter> _channels = new Map<String, EventEmitter>();
final List<int> resultMarkers = new List<int>();
void resultReceived() {
resultMarkers.add(messageHistory.length);
}
void addConnection(WebSocketWrapper webSocket) {
openConnections.add(webSocket);
// send messages up to the first result marker to this socket
webSocket.sendToMarker(0);
}
void removeConnection(WebSocketWrapper webSocket) {
openConnections.remove(webSocket);
}
EventEmitter to(String channel) {
if (_channels.containsKey(channel)) {
return _channels[channel];
} else {
var emitter = new EventEmitter();
emitter.listen((message) {
_send({'channel': channel, 'message': message});
});
return emitter;
}
}
void _send(dynamic message) {
String encodedMessage = JSON.encode(message);
openConnections.forEach((WebSocketWrapper webSocket) {
if (webSocket.caughtUp) {
webSocket.socket.add(encodedMessage);
}
});
messageHistory.add(encodedMessage);
}
}
class MultiClientServerMessageBusSource implements MessageBusSource {
final Map<String, EventEmitter> _channels = new Map<String, EventEmitter>();
Function onResultReceived;
MultiClientServerMessageBusSource(this.onResultReceived);
EventEmitter from(String channel) {
if (_channels.containsKey(channel)) {
return _channels[channel];
} else {
var emitter = new EventEmitter();
_channels[channel] = emitter;
return emitter;
}
}
void addConnection(WebSocketWrapper webSocket) {
if (webSocket.isPrimary) {
webSocket.stream.listen((encodedMessage) {
var decodedMessage = decodeMessage(encodedMessage);
var channel = decodedMessage['channel'];
var message = decodedMessage['message'];
if (message is Map && message.containsKey("type")) {
if (message['type'] == 'result') {
// tell the bus that a result was received on the primary
onResultReceived();
}
}
if (_channels.containsKey(channel)) {
_channels[channel].add(message);
}
});
} else {
webSocket.stream.listen((encodedMessage) {
// handle events from non-primary browser
var decodedMessage = decodeMessage(encodedMessage);
var channel = decodedMessage['channel'];
var message = decodedMessage['message'];
if (_channels.containsKey(EVENT_CHANNEL) && channel == EVENT_CHANNEL) {
_channels[channel].add(message);
}
});
}
}
Map<String, dynamic> decodeMessage(dynamic message) {
return JSON.decode(message);
}
}

View File

@ -0,0 +1,140 @@
library angular2.src.web_workers.debug_tools.single_client_server_message_bus;
import "package:angular2/src/web_workers/shared/message_bus.dart"
show MessageBus, MessageBusSink, MessageBusSource;
import 'dart:io';
import 'dart:convert' show JSON;
import 'dart:async';
import "package:angular2/src/core/facade/async.dart" show EventEmitter;
class SingleClientServerMessageBus implements MessageBus {
final SingleClientServerMessageBusSink sink;
SingleClientServerMessageBusSource source;
bool connected = false;
SingleClientServerMessageBus(this.sink, this.source);
SingleClientServerMessageBus.fromHttpServer(HttpServer server)
: sink = new SingleClientServerMessageBusSink() {
source = new SingleClientServerMessageBusSource();
server.listen((HttpRequest request) {
if (request.uri.path == "/ws") {
if (!connected) {
WebSocketTransformer.upgrade(request).then((WebSocket socket) {
sink.setConnection(socket);
var stream = socket.asBroadcastStream();
source.setConnectionFromStream(stream);
stream.listen(null, onDone: _handleDisconnect);
}).catchError((error) {
throw error;
connected = false;
});
connected = true;
} else {
// refuse additional clients
request.response.statusCode = HttpStatus.SERVICE_UNAVAILABLE;
request.response.write("Maximum number of clients connected.");
request.response.close();
}
}
});
}
void _handleDisconnect() {
sink.removeConnection();
source.removeConnection();
connected = false;
}
EventEmitter from(String channel) {
return source.from(channel);
}
EventEmitter to(String channel) {
return sink.to(channel);
}
}
class SingleClientServerMessageBusSink implements MessageBusSink {
final List<String> _messageBuffer = new List<String>();
WebSocket _socket = null;
final Map<String, EventEmitter> _channels = new Map<String, EventEmitter>();
void setConnection(WebSocket webSocket) {
_socket = webSocket;
_sendBufferedMessages();
}
EventEmitter to(String channel) {
if (_channels.containsKey(channel)) {
return _channels[channel];
} else {
var emitter = new EventEmitter();
emitter.listen((message) {
_send({'channel': channel, 'message': message});
});
return emitter;
}
}
void removeConnection() {
_socket = null;
}
void _send(dynamic message) {
String encodedMessage = JSON.encode(message);
if (_socket != null) {
_socket.add(encodedMessage);
} else {
_messageBuffer.add(encodedMessage);
}
}
void _sendBufferedMessages() {
_messageBuffer.forEach((message) => _socket.add(message));
_messageBuffer.clear();
}
}
class SingleClientServerMessageBusSource implements MessageBusSource {
final Map<String, EventEmitter> _channels = new Map<String, EventEmitter>();
Stream _stream;
SingleClientServerMessageBusSource();
EventEmitter from(String channel) {
if (_channels.containsKey(channel)) {
return _channels[channel];
} else {
var emitter = new EventEmitter();
_channels[channel] = emitter;
return emitter;
}
}
void setConnectionFromWebSocket(WebSocket socket) {
setConnectionFromStream(socket.asBroadcastStream());
}
void setConnectionFromStream(Stream stream) {
_stream = stream;
_stream.listen((encodedMessage) {
var decodedMessage = decodeMessage(encodedMessage);
var channel = decodedMessage['channel'];
var message = decodedMessage['message'];
if (_channels.containsKey(channel)) {
_channels[channel].add(message);
}
});
}
void removeConnection() {
_stream = null;
}
Map<String, dynamic> decodeMessage(dynamic message) {
return JSON.decode(message);
}
}

View File

@ -0,0 +1,78 @@
library angular2.src.web_workers.worker.web_socket_message_bus;
import 'dart:html';
import 'dart:convert' show JSON;
import "package:angular2/src/web_workers/shared/message_bus.dart"
show MessageBus, MessageBusSink, MessageBusSource;
import 'package:angular2/src/core/facade/async.dart' show EventEmitter;
class WebSocketMessageBus implements MessageBus {
final WebSocketMessageBusSink sink;
final WebSocketMessageBusSource source;
WebSocketMessageBus(this.sink, this.source);
WebSocketMessageBus.fromWebSocket(WebSocket webSocket)
: sink = new WebSocketMessageBusSink(webSocket),
source = new WebSocketMessageBusSource(webSocket);
EventEmitter from(String channel) {
return source.from(channel);
}
EventEmitter to(String channel) {
return sink.to(channel);
}
}
class WebSocketMessageBusSink implements MessageBusSink {
final WebSocket _webSocket;
final Map<String, EventEmitter> _channels = new Map<String, EventEmitter>();
WebSocketMessageBusSink(this._webSocket);
EventEmitter to(String channel) {
if (_channels.containsKey(channel)) {
return _channels[channel];
} else {
var emitter = new EventEmitter();
emitter.listen((message) {
_send({'channel': channel, 'message': message});
});
_channels[channel] = emitter;
return emitter;
}
}
void _send(message) {
_webSocket.send(JSON.encode(message));
}
}
class WebSocketMessageBusSource implements MessageBusSource {
final Map<String, EventEmitter> _channels = new Map<String, EventEmitter>();
WebSocketMessageBusSource(WebSocket webSocket) {
webSocket.onMessage.listen((MessageEvent encodedMessage) {
var message = decodeMessage(encodedMessage.data);
var channel = message['channel'];
if (_channels.containsKey(channel)) {
_channels[channel].add(message['message']);
}
});
}
EventEmitter from(String channel) {
if (_channels.containsKey(channel)) {
return _channels[channel];
} else {
var emitter = new EventEmitter();
_channels[channel] = emitter;
return emitter;
}
}
Map<String, dynamic> decodeMessage(dynamic message) {
return JSON.decode(message);
}
}

View File

@ -11,7 +11,7 @@ import {
import {ListWrapper, StringMapWrapper, MapWrapper} from "angular2/src/core/facade/collection";
import {Serializer} from "angular2/src/web_workers/shared/serializer";
import {Injectable} from "angular2/di";
import {Type} from "angular2/src/core/facade/lang";
import {Type, StringWrapper} from "angular2/src/core/facade/lang";
@Injectable()
export class ClientMessageBrokerFactory {
@ -90,10 +90,10 @@ export class ClientMessageBroker {
private _handleMessage(message: StringMap<string, any>): void {
var data = new MessageData(message);
// TODO(jteplitz602): replace these strings with messaging constants #3685
if (data.type === "result" || data.type === "error") {
if (StringWrapper.equals(data.type, "result") || StringWrapper.equals(data.type, "error")) {
var id = data.id;
if (this._pending.has(id)) {
if (data.type === "result") {
if (StringWrapper.equals(data.type, "result")) {
this._pending.get(id).resolve(data.value);
} else {
this._pending.get(id).reject(data.value, null);

View File

@ -51,7 +51,7 @@ class IsolateMessageBusSource extends MessageBusSource {
IsolateMessageBusSource(ReceivePort port)
: rawDataStream = port.asBroadcastStream() {
rawDataStream.listen((message) {
if (message is SendPort){
if (message is SendPort) {
return;
}

View File

@ -3,6 +3,7 @@ import {EventEmitter, ObservableWrapper} from 'angular2/src/core/facade/async';
import {MessageBus} from 'angular2/src/web_workers/shared/message_bus';
import {AnchorBasedAppRootUrl} from 'angular2/src/core/services/anchor_based_app_root_url';
import {Injectable} from 'angular2/di';
import {StringWrapper} from 'angular2/src/core/facade/lang';
@Injectable()
export class WebWorkerSetup {
@ -12,7 +13,7 @@ export class WebWorkerSetup {
var source = bus.from(SETUP_CHANNEL);
ObservableWrapper.subscribe(source, (message: string) => {
if (message === "ready") {
if (StringWrapper.equals(message, "ready")) {
ObservableWrapper.callNext(sink, {"rootUrl": rootUrl});
}
});

View File

@ -24,9 +24,9 @@ import {
defaultKeyValueDiffers
} from 'angular2/src/core/change_detection/change_detection';
import {DEFAULT_PIPES} from 'angular2/pipes';
import {StyleUrlResolver} from 'angular2/src/core/render/dom/compiler/style_url_resolver';
import {ExceptionHandler} from 'angular2/src/core/exception_handler';
import {DirectiveResolver} from 'angular2/src/core/compiler/directive_resolver';
import {StyleUrlResolver} from 'angular2/src/core/render/dom/compiler/style_url_resolver';
import {PipeResolver} from 'angular2/src/core/compiler/pipe_resolver';
import {ViewResolver} from 'angular2/src/core/compiler/view_resolver';
import {List, ListWrapper} from 'angular2/src/core/facade/collection';
@ -42,7 +42,6 @@ import {
ComponentRef,
DynamicComponentLoader
} from 'angular2/src/core/compiler/dynamic_component_loader';
import {Testability} from 'angular2/src/core/testability/testability';
import {AppViewPool, APP_VIEW_POOL_CAPACITY} from 'angular2/src/core/compiler/view_pool';
import {AppViewManager} from 'angular2/src/core/compiler/view_manager';
import {AppViewManagerUtils} from 'angular2/src/core/compiler/view_manager_utils';
@ -51,7 +50,6 @@ import {ProtoViewFactory} from 'angular2/src/core/compiler/proto_view_factory';
import {WebWorkerRenderer, WebWorkerCompiler} from './renderer';
import {Renderer, RenderCompiler} from 'angular2/src/core/render/api';
import {internalView} from 'angular2/src/core/compiler/view_ref';
import {ClientMessageBrokerFactory} from 'angular2/src/web_workers/shared/client_message_broker';
import {MessageBus} from 'angular2/src/web_workers/shared/message_bus';
import {APP_COMPONENT_REF_PROMISE, APP_COMPONENT} from 'angular2/src/core/application_tokens';
@ -125,6 +123,8 @@ function _injectorBindings(appComponentType, bus: MessageBus, initData: StringMa
bind(KeyValueDiffers).toValue(defaultKeyValueDiffers),
bind(ChangeDetection).toValue(bestChangeDetection),
DirectiveResolver,
UrlResolver,
StyleUrlResolver,
PipeResolver,
Parser,
Lexer,
@ -132,10 +132,7 @@ function _injectorBindings(appComponentType, bus: MessageBus, initData: StringMa
WebWorkerXHRImpl,
bind(XHR).toAlias(WebWorkerXHRImpl),
ComponentUrlMapper,
UrlResolver,
StyleUrlResolver,
DynamicComponentLoader,
Testability,
bind(AppRootUrl).toValue(new AppRootUrl(initData['rootUrl'])),
WebWorkerEventDispatcher
];