refactor(ngcc): create new entry-point for cluster workers (#36637)
PR Close #36637
This commit is contained in:

committed by
Matias Niemelä

parent
7e5e60b757
commit
443f5eee85
@ -5,11 +5,7 @@
|
||||
* Use of this source code is governed by an MIT-style license that can be
|
||||
* found in the LICENSE file at https://angular.io/license
|
||||
*/
|
||||
|
||||
/// <reference types="node" />
|
||||
|
||||
import * as cluster from 'cluster';
|
||||
|
||||
import {FileSystem} from '../../../../src/ngtsc/file_system';
|
||||
import {AsyncLocker} from '../../locking/async_locker';
|
||||
import {Logger} from '../../logging/logger';
|
||||
import {PackageJsonUpdater} from '../../writing/package_json_updater';
|
||||
@ -17,8 +13,6 @@ import {AnalyzeEntryPointsFn, CreateCompileFn, Executor} from '../api';
|
||||
import {CreateTaskCompletedCallback} from '../tasks/api';
|
||||
|
||||
import {ClusterMaster} from './master';
|
||||
import {ClusterWorker} from './worker';
|
||||
|
||||
|
||||
/**
|
||||
* An `Executor` that processes tasks in parallel (on multiple processes) and completes
|
||||
@ -26,26 +20,19 @@ import {ClusterWorker} from './worker';
|
||||
*/
|
||||
export class ClusterExecutor implements Executor {
|
||||
constructor(
|
||||
private workerCount: number, private logger: Logger,
|
||||
private workerCount: number, private fileSystem: FileSystem, private logger: Logger,
|
||||
private pkgJsonUpdater: PackageJsonUpdater, private lockFile: AsyncLocker,
|
||||
private createTaskCompletedCallback: CreateTaskCompletedCallback) {}
|
||||
|
||||
async execute(analyzeEntryPoints: AnalyzeEntryPointsFn, createCompileFn: CreateCompileFn):
|
||||
async execute(analyzeEntryPoints: AnalyzeEntryPointsFn, _createCompileFn: CreateCompileFn):
|
||||
Promise<void> {
|
||||
if (cluster.isMaster) {
|
||||
// This process is the cluster master.
|
||||
return this.lockFile.lock(() => {
|
||||
this.logger.debug(`Running ngcc on ${this.constructor.name} (using ${
|
||||
this.workerCount} worker processes).`);
|
||||
const master = new ClusterMaster(
|
||||
this.workerCount, this.logger, this.pkgJsonUpdater, analyzeEntryPoints,
|
||||
this.createTaskCompletedCallback);
|
||||
return master.run();
|
||||
});
|
||||
} else {
|
||||
// This process is a cluster worker.
|
||||
const worker = new ClusterWorker(this.logger, createCompileFn);
|
||||
return worker.run();
|
||||
}
|
||||
return this.lockFile.lock(() => {
|
||||
this.logger.debug(
|
||||
`Running ngcc on ${this.constructor.name} (using ${this.workerCount} worker processes).`);
|
||||
const master = new ClusterMaster(
|
||||
this.workerCount, this.fileSystem, this.logger, this.pkgJsonUpdater, analyzeEntryPoints,
|
||||
this.createTaskCompletedCallback);
|
||||
return master.run();
|
||||
});
|
||||
}
|
||||
}
|
||||
|
@ -10,7 +10,7 @@
|
||||
|
||||
import * as cluster from 'cluster';
|
||||
|
||||
import {resolve} from '../../../../src/ngtsc/file_system';
|
||||
import {FileSystem} from '../../../../src/ngtsc/file_system';
|
||||
import {Logger} from '../../logging/logger';
|
||||
import {PackageJsonUpdater} from '../../writing/package_json_updater';
|
||||
import {AnalyzeEntryPointsFn} from '../api';
|
||||
@ -33,13 +33,16 @@ export class ClusterMaster {
|
||||
private onTaskCompleted: TaskCompletedCallback;
|
||||
|
||||
constructor(
|
||||
private maxWorkerCount: number, private logger: Logger,
|
||||
private maxWorkerCount: number, private fileSystem: FileSystem, private logger: Logger,
|
||||
private pkgJsonUpdater: PackageJsonUpdater, analyzeEntryPoints: AnalyzeEntryPointsFn,
|
||||
createTaskCompletedCallback: CreateTaskCompletedCallback) {
|
||||
if (!cluster.isMaster) {
|
||||
throw new Error('Tried to instantiate `ClusterMaster` on a worker process.');
|
||||
}
|
||||
|
||||
// Set the worker entry-point
|
||||
cluster.setupMaster({exec: this.fileSystem.resolve(__dirname, 'worker.js')});
|
||||
|
||||
this.taskQueue = analyzeEntryPoints();
|
||||
this.onTaskCompleted = createTaskCompletedCallback(this.taskQueue);
|
||||
}
|
||||
@ -227,7 +230,7 @@ export class ClusterMaster {
|
||||
JSON.stringify(msg));
|
||||
}
|
||||
|
||||
const expectedPackageJsonPath = resolve(task.entryPoint.path, 'package.json');
|
||||
const expectedPackageJsonPath = this.fileSystem.resolve(task.entryPoint.path, 'package.json');
|
||||
const parsedPackageJson = task.entryPoint.packageJson;
|
||||
|
||||
if (expectedPackageJsonPath !== msg.packageJsonPath) {
|
||||
|
@ -5,58 +5,102 @@
|
||||
* Use of this source code is governed by an MIT-style license that can be
|
||||
* found in the LICENSE file at https://angular.io/license
|
||||
*/
|
||||
|
||||
/// <reference types="node" />
|
||||
|
||||
import * as cluster from 'cluster';
|
||||
|
||||
import {Logger} from '../../logging/logger';
|
||||
import {CompileFn, CreateCompileFn} from '../api';
|
||||
import {absoluteFrom, CachedFileSystem, getFileSystem, NodeJSFileSystem, setFileSystem} from '../../../../src/ngtsc/file_system';
|
||||
import {readConfiguration} from '../../../../src/perform_compile';
|
||||
import {parseCommandLineOptions} from '../../command_line_options';
|
||||
import {ConsoleLogger} from '../../logging/console_logger';
|
||||
import {Logger, LogLevel} from '../../logging/logger';
|
||||
import {getPathMappingsFromTsConfig} from '../../utils';
|
||||
import {DirectPackageJsonUpdater} from '../../writing/package_json_updater';
|
||||
import {CreateCompileFn} from '../api';
|
||||
import {getCreateCompileFn} from '../create_compile_function';
|
||||
import {stringifyTask} from '../tasks/utils';
|
||||
|
||||
import {MessageToWorker} from './api';
|
||||
import {ClusterPackageJsonUpdater} from './package_json_updater';
|
||||
import {sendMessageToMaster} from './utils';
|
||||
|
||||
|
||||
/**
|
||||
* A cluster worker is responsible for processing one task (i.e. one format property for a specific
|
||||
* entry-point) at a time and reporting results back to the cluster master.
|
||||
*/
|
||||
export class ClusterWorker {
|
||||
private compile: CompileFn;
|
||||
|
||||
constructor(private logger: Logger, createCompileFn: CreateCompileFn) {
|
||||
if (cluster.isMaster) {
|
||||
throw new Error('Tried to instantiate `ClusterWorker` on the master process.');
|
||||
}
|
||||
|
||||
this.compile = createCompileFn(
|
||||
(_task, outcome, message) =>
|
||||
sendMessageToMaster({type: 'task-completed', outcome, message}));
|
||||
}
|
||||
|
||||
run(): Promise<void> {
|
||||
// Listen for `ProcessTaskMessage`s and process tasks.
|
||||
cluster.worker.on('message', (msg: MessageToWorker) => {
|
||||
try {
|
||||
switch (msg.type) {
|
||||
case 'process-task':
|
||||
this.logger.debug(
|
||||
`[Worker #${cluster.worker.id}] Processing task: ${stringifyTask(msg.task)}`);
|
||||
return this.compile(msg.task);
|
||||
default:
|
||||
throw new Error(
|
||||
`[Worker #${cluster.worker.id}] Invalid message received: ${JSON.stringify(msg)}`);
|
||||
}
|
||||
} catch (err) {
|
||||
sendMessageToMaster({
|
||||
type: 'error',
|
||||
error: (err instanceof Error) ? (err.stack || err.message) : err,
|
||||
});
|
||||
// Cluster worker entry point
|
||||
if (require.main === module) {
|
||||
process.title = 'ngcc (worker)';
|
||||
setFileSystem(new CachedFileSystem(new NodeJSFileSystem()));
|
||||
let {
|
||||
basePath,
|
||||
targetEntryPointPath,
|
||||
createNewEntryPointFormats = false,
|
||||
logger = new ConsoleLogger(LogLevel.info),
|
||||
pathMappings,
|
||||
errorOnFailedEntryPoint = false,
|
||||
enableI18nLegacyMessageIdFormat = true,
|
||||
tsConfigPath
|
||||
} = parseCommandLineOptions(process.argv.slice(2));
|
||||
(async () => {
|
||||
try {
|
||||
if (!!targetEntryPointPath) {
|
||||
// targetEntryPointPath forces us to error if an entry-point fails.
|
||||
errorOnFailedEntryPoint = true;
|
||||
}
|
||||
});
|
||||
|
||||
// Return a promise that is never resolved.
|
||||
return new Promise(() => undefined);
|
||||
}
|
||||
const fileSystem = getFileSystem();
|
||||
const absBasePath = absoluteFrom(basePath);
|
||||
const projectPath = fileSystem.dirname(absBasePath);
|
||||
const tsConfig =
|
||||
tsConfigPath !== null ? readConfiguration(tsConfigPath || projectPath) : null;
|
||||
|
||||
if (pathMappings === undefined) {
|
||||
pathMappings = getPathMappingsFromTsConfig(tsConfig, projectPath);
|
||||
}
|
||||
|
||||
const pkgJsonUpdater =
|
||||
new ClusterPackageJsonUpdater(new DirectPackageJsonUpdater(fileSystem));
|
||||
|
||||
// The function for creating the `compile()` function.
|
||||
const createCompileFn = getCreateCompileFn(
|
||||
fileSystem, logger, pkgJsonUpdater, createNewEntryPointFormats, errorOnFailedEntryPoint,
|
||||
enableI18nLegacyMessageIdFormat, tsConfig, pathMappings);
|
||||
|
||||
await startWorker(logger, createCompileFn);
|
||||
process.exitCode = 0;
|
||||
} catch (e) {
|
||||
console.error(e.stack || e.message);
|
||||
process.exitCode = 1;
|
||||
}
|
||||
})();
|
||||
}
|
||||
|
||||
export async function startWorker(logger: Logger, createCompileFn: CreateCompileFn): Promise<void> {
|
||||
if (cluster.isMaster) {
|
||||
throw new Error('Tried to run cluster worker on the master process.');
|
||||
}
|
||||
|
||||
const compile = createCompileFn(
|
||||
(_task, outcome, message) => sendMessageToMaster({type: 'task-completed', outcome, message}));
|
||||
|
||||
|
||||
// Listen for `ProcessTaskMessage`s and process tasks.
|
||||
cluster.worker.on('message', (msg: MessageToWorker) => {
|
||||
try {
|
||||
switch (msg.type) {
|
||||
case 'process-task':
|
||||
logger.debug(
|
||||
`[Worker #${cluster.worker.id}] Processing task: ${stringifyTask(msg.task)}`);
|
||||
return compile(msg.task);
|
||||
default:
|
||||
throw new Error(
|
||||
`[Worker #${cluster.worker.id}] Invalid message received: ${JSON.stringify(msg)}`);
|
||||
}
|
||||
} catch (err) {
|
||||
sendMessageToMaster({
|
||||
type: 'error',
|
||||
error: (err instanceof Error) ? (err.stack || err.message) : err,
|
||||
});
|
||||
}
|
||||
});
|
||||
|
||||
// Return a promise that is never resolved.
|
||||
return new Promise(() => undefined);
|
||||
}
|
@ -178,7 +178,7 @@ function getExecutor(
|
||||
// Execute in parallel. Use up to 8 CPU cores for workers, always reserving one for master.
|
||||
const workerCount = Math.min(8, os.cpus().length - 1);
|
||||
return new ClusterExecutor(
|
||||
workerCount, logger, pkgJsonUpdater, locker, createTaskCompletedCallback);
|
||||
workerCount, fileSystem, logger, pkgJsonUpdater, locker, createTaskCompletedCallback);
|
||||
} else {
|
||||
// Execute serially, on a single thread (async).
|
||||
return new SingleProcessExecutorAsync(logger, locker, createTaskCompletedCallback);
|
||||
|
Reference in New Issue
Block a user