diff --git a/packages/nx/src/daemon/client/client.ts b/packages/nx/src/daemon/client/client.ts index c915ee7a69b1b..092cd26ef03e2 100644 --- a/packages/nx/src/daemon/client/client.ts +++ b/packages/nx/src/daemon/client/client.ts @@ -99,6 +99,10 @@ import { POST_TASKS_EXECUTION, PRE_TASKS_EXECUTION, } from '../message-types/run-tasks-execution-hooks'; +import { + isEmitLogMessage, + isUpdateProgressMessage, +} from '../message-types/streaming-messages'; import { GET_ESTIMATED_TASK_TIMINGS, GET_FLAKY_TASKS, @@ -159,6 +163,11 @@ export class DaemonClient { private currentMessage; private currentResolve; private currentReject; + // Tracks the spinner owned by the in-flight request so streamed + // progress updates are routed to the caller's spinner instead of + // mutating the process-wide globalSpinner (which may belong to an + // unrelated command). + private currentSpinner: DelayedSpinner | null = null; private _enabled: boolean | undefined; private _daemonStatus: DaemonStatus = DaemonStatus.DISCONNECTED; @@ -307,6 +316,7 @@ export class DaemonClient { 'Calculating the project graph on the Nx Daemon is taking longer than expected. Re-run with NX_DAEMON=false to see more details.', { ciDelay: 60_000, delay: 30_000 } ); + this.currentSpinner = spinner; try { const response = await this.sendToDaemonViaQueue({ type: 'REQUEST_PROJECT_GRAPH', @@ -323,6 +333,7 @@ export class DaemonClient { } } finally { spinner?.cleanup(); + this.currentSpinner = null; } } @@ -760,9 +771,9 @@ export class DaemonClient { type: 'PROCESS_IN_BACKGROUND', requirePath, data, - // This method is sometimes passed data that cannot be serialized with v8 - // so we force JSON serialization here }, + // This method is sometimes passed data that cannot be serialized with v8 + // so we force JSON serialization here 'json' ); } @@ -1032,11 +1043,15 @@ export class DaemonClient { private async sendToDaemonViaQueue( messageToDaemon: T, - force?: 'v8' | 'json' + parser?: 'v8' | 'json' ): Promise { - return this.queue.sendToQueue(() => - this.sendMessageToDaemon(messageToDaemon, force) - ); + return this.queue.sendToQueue(async () => { + // Set currentSpinner inside the queued function so it's only + // active while this specific message is in flight — preventing + // concurrent callers from overwriting each other's spinner + // reference before their turn arrives. + return await this.sendMessageToDaemon(messageToDaemon, parser); + }); } private setUpConnection() { @@ -1259,6 +1274,19 @@ export class DaemonClient { 'result-parse-start-' + this.currentMessage.type, 'result-parse-end-' + this.currentMessage.type ); + // Streaming messages fire side-effects on the client but do not + // resolve the pending request promise — the daemon can push several + // of these before finally sending the real response. Progress + // updates route through the in-flight request's own spinner so + // we don't stomp on unrelated commands' spinner text. + if (isUpdateProgressMessage(parsedResult)) { + this.currentSpinner?.setMessage(parsedResult.message); + return; + } + if (isEmitLogMessage(parsedResult)) { + console[parsedResult.level](parsedResult.message); + return; + } if (parsedResult.error) { this.currentReject(parsedResult.error); } else { diff --git a/packages/nx/src/daemon/logger.ts b/packages/nx/src/daemon/logger.ts index afc12b7fc43a9..9b7b379b51fa0 100644 --- a/packages/nx/src/daemon/logger.ts +++ b/packages/nx/src/daemon/logger.ts @@ -11,11 +11,14 @@ */ import { appendFileSync, existsSync, mkdirSync } from 'fs'; +import { ProgressTopic } from '../utils/progress-topics'; +import { nxVersion } from '../utils/versions'; +import { EmitLogLevel } from './message-types/streaming-messages'; +import { sendEmitLogMessageToTopic } from './server/client-socket-context'; import { DAEMON_DIR_FOR_CURRENT_WORKSPACE, DAEMON_OUTPUT_LOG_FILE, } from './tmp-dir'; -import { nxVersion } from '../utils/versions'; type LogSource = 'Server' | 'Client'; @@ -51,6 +54,25 @@ class DaemonLogger { this.log(`[WATCHER]: ${s.join(' ')}`); } + /** + * Broadcasts a log line to every client currently subscribed to the + * given topic. Useful for warnings raised inside daemon-executed code + * that we want the user to see in their terminal rather than lose to + * the daemon log file. + * + * Falls back to writing into the daemon log when no clients are + * subscribed to the topic. + * + * Must only be invoked from inside the Nx daemon process. + */ + logToClient( + topic: ProgressTopic, + message: string, + level: EmitLogLevel = 'log' + ) { + sendEmitLogMessageToTopic(topic, message, level); + } + private writeToFile(message: string) { try { if (!existsSync(DAEMON_DIR_FOR_CURRENT_WORKSPACE)) { diff --git a/packages/nx/src/daemon/message-types/streaming-messages.ts b/packages/nx/src/daemon/message-types/streaming-messages.ts new file mode 100644 index 0000000000000..dc0529ed9fd11 --- /dev/null +++ b/packages/nx/src/daemon/message-types/streaming-messages.ts @@ -0,0 +1,36 @@ +export const UPDATE_PROGRESS_MESSAGE = 'UPDATE_PROGRESS_MESSAGE' as const; + +export type UpdateProgressMessage = { + type: typeof UPDATE_PROGRESS_MESSAGE; + message: string; +}; + +export function isUpdateProgressMessage( + message: unknown +): message is UpdateProgressMessage { + return ( + typeof message === 'object' && + message !== null && + 'type' in message && + message['type'] === UPDATE_PROGRESS_MESSAGE + ); +} + +export const EMIT_LOG = 'EMIT_LOG' as const; + +export type EmitLogLevel = 'log' | 'warn' | 'error'; + +export type EmitLogMessage = { + type: typeof EMIT_LOG; + level: EmitLogLevel; + message: string; +}; + +export function isEmitLogMessage(message: unknown): message is EmitLogMessage { + return ( + typeof message === 'object' && + message !== null && + 'type' in message && + message['type'] === EMIT_LOG + ); +} diff --git a/packages/nx/src/daemon/server/client-socket-context.ts b/packages/nx/src/daemon/server/client-socket-context.ts new file mode 100644 index 0000000000000..7b597d9ea12f5 --- /dev/null +++ b/packages/nx/src/daemon/server/client-socket-context.ts @@ -0,0 +1,120 @@ +import type { Socket } from 'net'; +import { MESSAGE_END_SEQ } from '../../utils/consume-messages-from-socket'; +import { ProgressTopic } from '../../utils/progress-topics'; +import { isOnDaemon } from '../is-on-daemon'; +import { serverLogger } from '../logger'; +import { + EMIT_LOG, + EmitLogLevel, + EmitLogMessage, + UPDATE_PROGRESS_MESSAGE, +} from '../message-types/streaming-messages'; +import { serialize } from '../socket-utils'; + +const topicSubscribers = new Map>(); + +export function subscribeClientToTopic( + socket: Socket, + topic: ProgressTopic +): void { + let subscribers = getTopicSubscribers(topic); + if (!subscribers) { + subscribers = new Set(); + topicSubscribers.set(topic, subscribers); + } + subscribers.add(socket); +} + +export function unsubscribeClientFromTopic( + socket: Socket, + topic: ProgressTopic +): void { + const subscribers = getTopicSubscribers(topic); + if (!subscribers) return; + subscribers.delete(socket); +} + +export function getTopicSubscribers(topic: ProgressTopic): Set { + const subscribers = topicSubscribers.get(topic); + if (!subscribers) { + const set = new Set(); + topicSubscribers.set(topic, set); + return set; + } + return subscribers; +} + +export function assertOnDaemon(helperName: string) { + if (!isOnDaemon()) { + throw new Error( + `${helperName} can only be called from the Nx daemon process.` + ); + } +} + +/** + * Writes a streaming message over the given socket using the daemon's + * configured serialization format and terminated with MESSAGE_END_SEQ. + * Errors are logged to the daemon's stdout (redirected to the daemon + * log) rather than propagated — a disconnected client shouldn't tear + * down the current request handler or other subscribers. + */ +export function writeStreamingMessage( + socket: Socket, + payload: unknown, + description: string +) { + try { + serverLogger.log('Streaming message to client:', description); + socket.write(serialize(payload) + MESSAGE_END_SEQ, (err) => { + if (err) { + console.log( + `Streaming message write error (client likely disconnected): ${err.message}` + ); + } + }); + } catch (e) { + console.log( + `Failed to send streaming message to client: ${ + e instanceof Error ? e.message : String(e) + }` + ); + } +} + +/** + * Broadcasts a progress message to every client currently subscribed to + * the given topic. No-op when there are no subscribers. + * + * Must only be invoked from inside the Nx daemon process. + */ +export function sendProgressMessageToTopic( + topic: ProgressTopic, + message: string +): void { + assertOnDaemon('sendProgressMessageToTopic'); + const subscribers = getTopicSubscribers(topic); + if (!subscribers?.size) return; + const payload = { type: UPDATE_PROGRESS_MESSAGE, message }; + for (const socket of subscribers) { + writeStreamingMessage( + socket, + payload, + 'progress update for topic ' + topic + ); + } +} + +export function sendEmitLogMessageToTopic( + topic: ProgressTopic, + message: string, + level: EmitLogLevel +): void { + assertOnDaemon('sendEmitLogMessageToTopic'); + const subscribers = getTopicSubscribers(topic); + if (!subscribers?.size) return; + const payload: EmitLogMessage = { type: EMIT_LOG, message, level }; + for (const socket of subscribers) { + writeStreamingMessage(socket, payload, 'emit log message to ' + topic); + } +} diff --git a/packages/nx/src/daemon/server/handle-request-project-graph.ts b/packages/nx/src/daemon/server/handle-request-project-graph.ts index 19cca0a04c167..9287266d53786 100644 --- a/packages/nx/src/daemon/server/handle-request-project-graph.ts +++ b/packages/nx/src/daemon/server/handle-request-project-graph.ts @@ -1,15 +1,18 @@ +import { Socket } from 'net'; import { performance } from 'perf_hooks'; import { serializeResult } from '../socket-utils'; import { serverLogger } from '../logger'; import { getCachedSerializedProjectGraphPromise } from './project-graph-incremental-recomputation'; import { HandlerResult } from './server'; -export async function handleRequestProjectGraph(): Promise { +export async function handleRequestProjectGraph( + socket: Socket +): Promise { try { performance.mark('server-connection'); serverLogger.requestLog('Client Request for Project Graph Received'); - const result = await getCachedSerializedProjectGraphPromise(); + const result = await getCachedSerializedProjectGraphPromise(socket); if (result.error) { return { description: `Error when preparing serialized project graph.`, diff --git a/packages/nx/src/daemon/server/project-graph-incremental-recomputation.ts b/packages/nx/src/daemon/server/project-graph-incremental-recomputation.ts index 77e399e9940b8..1b99c957c0781 100644 --- a/packages/nx/src/daemon/server/project-graph-incremental-recomputation.ts +++ b/packages/nx/src/daemon/server/project-graph-incremental-recomputation.ts @@ -1,3 +1,4 @@ +import { Socket } from 'net'; import { performance } from 'perf_hooks'; import { readNxJson } from '../../config/nx-json'; import { @@ -38,6 +39,11 @@ import { } from '../../utils/workspace-context'; import { workspaceRoot } from '../../utils/workspace-root'; import { serverLogger } from '../logger'; +import { ProgressTopics } from '../../utils/progress-topics'; +import { + subscribeClientToTopic, + unsubscribeClientFromTopic, +} from './client-socket-context'; import { notifyFileChangeListeners } from './file-watching/file-change-events'; import { notifyFileWatcherSockets } from './file-watching/file-watcher-sockets'; import { notifyProjectGraphListenerSockets } from './project-graph-listener-sockets'; @@ -84,7 +90,16 @@ let knownExternalNodes: Record = {}; let fileChangeCounter = 0; let recomputationGeneration = 0; -export async function getCachedSerializedProjectGraphPromise(): Promise { +export async function getCachedSerializedProjectGraphPromise( + socket?: Socket +): Promise { + // Subscribe the requesting client to the graph-construction topic + // for the duration of the await, so in-flight progress/log messages + // — including those produced by a recomputation that was already + // started before this caller arrived — are broadcast to them. + if (socket) { + subscribeClientToTopic(socket, ProgressTopics.GraphConstruction); + } try { let wasScheduled = false; // recomputing it now on demand. we can ignore the scheduled timeout @@ -180,6 +195,10 @@ export async function getCachedSerializedProjectGraphPromise(): Promise handleRequestProjectGraph(), + () => handleRequestProjectGraph(socket), mode ); } else if (payload.type === 'HASH_TASKS') { diff --git a/packages/nx/src/project-graph/build-project-graph.ts b/packages/nx/src/project-graph/build-project-graph.ts index 80bc305e11fda..2352749ba84c1 100644 --- a/packages/nx/src/project-graph/build-project-graph.ts +++ b/packages/nx/src/project-graph/build-project-graph.ts @@ -1,37 +1,23 @@ -import { workspaceRoot } from '../utils/workspace-root'; +import { existsSync } from 'fs'; import { join } from 'path'; import { performance } from 'perf_hooks'; -import { assertWorkspaceValidity } from '../utils/assert-workspace-validity'; -import { FileData } from './file-utils'; -import { - CachedFileData, - createProjectFileMapCache, - extractCachedFileData, - FileMapCache, - shouldRecomputeWholeGraph, -} from './nx-deps-cache'; -import { applyImplicitDependencies } from './utils/implicit-project-dependencies'; -import { normalizeProjectNodes } from './utils/normalize-project-nodes'; -import type { LoadedNxPlugin } from './plugins/loaded-nx-plugin'; -import { - CreateDependenciesContext, - CreateMetadataContext, - ProjectsMetadata, -} from './plugins'; -import { getRootTsConfigPath } from '../plugins/js/utils/typescript'; +import { readNxJson } from '../config/configuration'; +import { NxJsonConfiguration } from '../config/nx-json'; import { FileMap, ProjectGraph, ProjectGraphExternalNode, } from '../config/project-graph'; -import { readJsonFile } from '../utils/fileutils'; -import { NxJsonConfiguration } from '../config/nx-json'; -import { ProjectGraphBuilder } from './project-graph-builder'; import { ProjectConfiguration } from '../config/workspace-json-project-json'; -import { readNxJson } from '../config/configuration'; -import { existsSync } from 'fs'; -import { PackageJson } from '../utils/package-json'; +import { hashObject } from '../hasher/file-hasher'; import { NxWorkspaceFilesExternals } from '../native'; +import { getRootTsConfigPath } from '../plugins/js/utils/typescript'; +import { assertWorkspaceValidity } from '../utils/assert-workspace-validity'; +import { DelayedSpinner } from '../utils/delayed-spinner'; +import { readJsonFile } from '../utils/fileutils'; +import { PackageJson } from '../utils/package-json'; +import { ProgressTopics } from '../utils/progress-topics'; +import { workspaceRoot } from '../utils/workspace-root'; import { AggregateProjectGraphError, CreateMetadataError, @@ -40,10 +26,25 @@ import { ProcessDependenciesError, WorkspaceValidityError, } from './error-types'; -import { mergeMetadata } from './utils/project-configuration/target-merging'; +import { FileData } from './file-utils'; +import { + CachedFileData, + createProjectFileMapCache, + extractCachedFileData, + FileMapCache, + shouldRecomputeWholeGraph, +} from './nx-deps-cache'; +import { + CreateDependenciesContext, + CreateMetadataContext, + ProjectsMetadata, +} from './plugins'; +import type { LoadedNxPlugin } from './plugins/loaded-nx-plugin'; +import { ProjectGraphBuilder } from './project-graph-builder'; +import { applyImplicitDependencies } from './utils/implicit-project-dependencies'; +import { normalizeProjectNodes } from './utils/normalize-project-nodes'; import type { ConfigurationSourceMaps } from './utils/project-configuration/source-maps'; -import { DelayedSpinner } from '../utils/delayed-spinner'; -import { hashObject } from '../hasher/file-hasher'; +import { mergeMetadata } from './utils/project-configuration/target-merging'; let storedFileMap: FileMap | null = null; let storedAllWorkspaceFiles: FileData[] | null = null; @@ -318,41 +319,34 @@ async function updateProjectGraphWithPlugins( performance.mark('createDependencies:start'); let spinner: DelayedSpinner; - const inProgressPlugins = new Set(); + const inProgressPlugins = new Set( + ...createDependencyPlugins.map((plugin) => plugin.name) + ); - function updateSpinner() { + function getSpinnerText() { if (!spinner || inProgressPlugins.size === 0) { - return; + return ''; } if (inProgressPlugins.size === 1) { - spinner.setMessage( - `Creating project graph dependencies with ${ - inProgressPlugins.values().next().value - }` - ); - } else if (process.env.NX_VERBOSE_LOGGING === 'true') { - spinner.setMessage( - [ - `Creating project graph dependencies with ${inProgressPlugins.size} plugins`, - ...Array.from(inProgressPlugins).map((p) => ` - ${p}`), - ].join('\n') - ); + return `Creating project graph dependencies with ${ + inProgressPlugins.values().next().value + }`; } else { - spinner.setMessage( - `Creating project graph dependencies with ${inProgressPlugins.size} plugins` - ); + return [ + `Creating project graph dependencies with ${inProgressPlugins.size} plugins`, + ...Array.from(inProgressPlugins).map((p) => ` - ${p}`), + ].join('\n'); } } - spinner = new DelayedSpinner( - `Creating project graph dependencies with ${createDependencyPlugins.length} plugins` - ); + spinner = new DelayedSpinner(getSpinnerText(), { + progressTopic: ProgressTopics.GraphConstruction, + }); await Promise.all( createDependencyPlugins.map(async (plugin) => { performance.mark(`${plugin.name}:createDependencies - start`); - inProgressPlugins.add(plugin.name); try { const dependencies = await plugin .createDependencies({ @@ -360,7 +354,7 @@ async function updateProjectGraphWithPlugins( }) .finally(() => { inProgressPlugins.delete(plugin.name); - updateSpinner(); + spinner.setMessage(getSpinnerText()); }); for (const dep of dependencies) { @@ -439,43 +433,39 @@ export async function applyProjectMetadata( performance.mark('createMetadata:start'); let spinner: DelayedSpinner; - const inProgressPlugins = new Set(); + const createMetadataPlugins = plugins.filter( + (plugin) => plugin.createMetadata + ); - function updateSpinner() { + const inProgressPlugins = new Set( + ...createMetadataPlugins.map((p) => p.name) + ); + + function getSpinnerText() { if (!spinner || inProgressPlugins.size === 0) { - return; + return ''; } if (inProgressPlugins.size === 1) { - spinner.setMessage( - `Creating project metadata with ${ - inProgressPlugins.values().next().value - }` - ); - } else if (process.env.NX_VERBOSE_LOGGING === 'true') { - spinner.setMessage( - [ - `Creating project metadata with ${inProgressPlugins.size} plugins`, - ...Array.from(inProgressPlugins).map((p) => ` - ${p}`), - ].join('\n') - ); + return `Creating project metadata with ${ + inProgressPlugins.values().next().value + }`; } else { - spinner.setMessage( - `Creating project metadata with ${inProgressPlugins.size} plugins` - ); + return [ + `Creating project metadata with ${inProgressPlugins.size} plugins`, + ...Array.from(inProgressPlugins).map((p) => ` - ${p}`), + ].join('\n'); } } - const createMetadataPlugins = plugins.filter( - (plugin) => plugin.createMetadata - ); - spinner = new DelayedSpinner( - `Creating project metadata with ${createMetadataPlugins.length} plugins` - ); + spinner = createMetadataPlugins.length + ? new DelayedSpinner(getSpinnerText(), { + progressTopic: ProgressTopics.GraphConstruction, + }) + : undefined; const promises = createMetadataPlugins.map(async (plugin) => { performance.mark(`${plugin.name}:createMetadata - start`); - inProgressPlugins.add(plugin.name); try { const metadata = await plugin.createMetadata(graph, context); results.push({ metadata, pluginName: plugin.name }); @@ -483,7 +473,7 @@ export async function applyProjectMetadata( errors.push(new CreateMetadataError(e, plugin.name)); } finally { inProgressPlugins.delete(plugin.name); - updateSpinner(); + spinner.setMessage(getSpinnerText()); performance.mark(`${plugin.name}:createMetadata - end`); performance.measure( `${plugin.name}:createMetadata`, diff --git a/packages/nx/src/project-graph/plugins/isolation/isolated-plugin.ts b/packages/nx/src/project-graph/plugins/isolation/isolated-plugin.ts index 81af8f6cfa2dc..5276b5be4282d 100644 --- a/packages/nx/src/project-graph/plugins/isolation/isolated-plugin.ts +++ b/packages/nx/src/project-graph/plugins/isolation/isolated-plugin.ts @@ -5,6 +5,7 @@ import path = require('path'); import type { PluginConfiguration } from '../../../config/nx-json'; import type { ProjectGraph } from '../../../config/project-graph'; +import { serverLogger } from '../../../daemon/logger'; import { getPluginOsSocketPath } from '../../../daemon/socket-utils'; import { consumeMessagesFromSocket, @@ -12,6 +13,7 @@ import { } from '../../../utils/consume-messages-from-socket'; import { getNxRequirePaths } from '../../../utils/installation-directory'; import { logger } from '../../../utils/logger'; +import { ProgressTopics } from '../../../utils/progress-topics'; import { waitForSocketConnection } from '../../../utils/wait-for-socket-connection'; import type { RawProjectGraphDependency } from '../../project-graph-builder'; import { LoadedNxPlugin } from '../loaded-nx-plugin'; @@ -29,9 +31,14 @@ import type { MessageResult, PluginWorkerLoadResult, PluginWorkerMessage, + PluginWorkerNotification, PluginWorkerResult, } from './messaging'; -import { isPluginWorkerResult, sendMessageOverSocket } from './messaging'; +import { + isPluginWorkerNotification, + isPluginWorkerResult, + sendMessageOverSocket, +} from './messaging'; import { Hook, Phase, @@ -213,6 +220,10 @@ export class IsolatedPlugin implements LoadedNxPlugin { private handleSocketData = (raw: string) => { const message = parseMessage(raw); + if (isPluginWorkerNotification(message)) { + handlePluginWorkerNotification(message); + return; + } if (!isPluginWorkerResult(message)) { return; } @@ -696,3 +707,22 @@ type Falsy = false | 0 | '' | null | undefined | 0n; function hooks(...array: Array): Array { return array.filter((v): v is Hook => !!v); } + +// When the host process is the daemon, broadcast the log notification +// to every client subscribed to the graph-construction topic so the +// line surfaces in their terminal. When the host is the direct CLI +// there is no client to notify, so the log line goes straight to +// stdout/stderr. +function handlePluginWorkerNotification( + notification: PluginWorkerNotification +): void { + if ((global as any).NX_DAEMON) { + serverLogger.logToClient( + ProgressTopics.GraphConstruction, + notification.message, + notification.level + ); + return; + } + console[notification.level](notification.message); +} diff --git a/packages/nx/src/project-graph/plugins/isolation/messaging.ts b/packages/nx/src/project-graph/plugins/isolation/messaging.ts index 5c2e95253e676..9a340a78f8850 100644 --- a/packages/nx/src/project-graph/plugins/isolation/messaging.ts +++ b/packages/nx/src/project-graph/plugins/isolation/messaging.ts @@ -173,6 +173,34 @@ export type MessageResult = ResultOf< T & WithResult >; +// ============================================================================= +// NOTIFICATIONS (worker -> host, unsolicited, no response expected) +// ============================================================================= + +export type PluginWorkerEmitLogNotification = { + type: 'emitLog'; + level: 'log' | 'warn' | 'error'; + message: string; +}; + +export type PluginWorkerNotification = PluginWorkerEmitLogNotification; + +const NOTIFICATION_TYPES: ReadonlyArray = [ + 'emitLog', +]; + +export function isPluginWorkerNotification( + message: Serializable +): message is PluginWorkerNotification { + return ( + typeof message === 'object' && + message !== null && + 'type' in message && + typeof message.type === 'string' && + (NOTIFICATION_TYPES as readonly string[]).includes(message.type) + ); +} + // ============================================================================= // TYPE GUARDS // ============================================================================= @@ -264,7 +292,7 @@ export async function consumeMessage( */ export function sendMessageOverSocket( socket: Socket, - message: PluginWorkerMessage | PluginWorkerResult + message: PluginWorkerMessage | PluginWorkerResult | PluginWorkerNotification ): void { socket.write(serialize(message)); socket.write(MESSAGE_END_SEQ); diff --git a/packages/nx/src/project-graph/plugins/isolation/plugin-worker.ts b/packages/nx/src/project-graph/plugins/isolation/plugin-worker.ts index ceeceb4b9082a..775ea88940cd6 100644 --- a/packages/nx/src/project-graph/plugins/isolation/plugin-worker.ts +++ b/packages/nx/src/project-graph/plugins/isolation/plugin-worker.ts @@ -10,6 +10,7 @@ import { logger } from '../../../utils/logger'; import { createSerializableError } from '../../../utils/serializable-error'; import type { LoadedNxPlugin } from '../loaded-nx-plugin'; import { consumeMessage, isPluginWorkerMessage } from './messaging'; +import { setPluginWorkerHostSocket } from './worker-streaming'; import { unlinkSync } from 'fs'; import { createServer } from 'net'; @@ -51,6 +52,11 @@ let connectErrorTimeout = setErrorTimeout( const server = createServer((socket) => { connectErrorTimeout?.clear(); + // Make the host-facing socket available to plugin code running in this + // worker so it can emit log / progress notifications without having + // the socket threaded through every call site. + setPluginWorkerHostSocket(socket); + logger.verbose( `[plugin-worker] "${expectedPluginName}" (pid: ${process.pid}) connected` ); diff --git a/packages/nx/src/project-graph/plugins/isolation/worker-streaming.ts b/packages/nx/src/project-graph/plugins/isolation/worker-streaming.ts new file mode 100644 index 0000000000000..481fb1a4b93ca --- /dev/null +++ b/packages/nx/src/project-graph/plugins/isolation/worker-streaming.ts @@ -0,0 +1,44 @@ +import type { Socket } from 'net'; +import { + PluginWorkerEmitLogNotification, + sendMessageOverSocket, +} from './messaging'; + +// Plugin workers talk to their host process over a single socket that +// is established when the host connects. Plugin code running anywhere +// in the worker process needs a way to emit log lines without having +// that socket threaded through every call frame, so we stash a +// module-level reference here when the host connects. +let hostSocket: Socket | null = null; + +export function setPluginWorkerHostSocket(socket: Socket): void { + hostSocket = socket; + socket.once('close', () => { + if (hostSocket === socket) hostSocket = null; + }); +} + +/** + * Emits a log line from the plugin worker up to its host. The host + * decides where it ends up (direct stdout/stderr when running under the + * CLI, forwarded to the active daemon client when running under the + * daemon). + * + * When plugin isolation is turned off, or this is otherwise called + * outside of a connected plugin worker, the message is written + * directly to stdout/stderr so the log line isn't silently dropped. + */ +export function emitPluginWorkerLog( + level: PluginWorkerEmitLogNotification['level'], + message: string +): void { + if (!hostSocket) { + console[level](message); + return; + } + sendMessageOverSocket(hostSocket, { + type: 'emitLog', + level, + message, + }); +} diff --git a/packages/nx/src/project-graph/utils/project-configuration-utils.ts b/packages/nx/src/project-graph/utils/project-configuration-utils.ts index e25dbd854e701..57d4d64c799d4 100644 --- a/packages/nx/src/project-graph/utils/project-configuration-utils.ts +++ b/packages/nx/src/project-graph/utils/project-configuration-utils.ts @@ -12,6 +12,7 @@ import { minimatch } from 'minimatch'; import { performance } from 'perf_hooks'; import { DelayedSpinner } from '../../utils/delayed-spinner'; +import { ProgressTopics } from '../../utils/progress-topics'; import { AggregateCreateNodesError, formatAggregateCreateNodesError, @@ -83,37 +84,29 @@ export async function createProjectConfigurationsWithPlugins( let spinner: DelayedSpinner; const inProgressPlugins = new Set(); - function updateSpinner() { + function getSpinnerText() { if (!spinner || inProgressPlugins.size === 0) { - return; + return ''; } if (inProgressPlugins.size === 1) { - spinner.setMessage( - `Creating project graph nodes with ${ - inProgressPlugins.values().next().value - }` - ); - } else if (process.env.NX_VERBOSE_LOGGING === 'true') { - spinner.setMessage( - [ - `Creating project graph nodes with ${inProgressPlugins.size} plugins`, - ...Array.from(inProgressPlugins).map((p) => ` - ${p}`), - ].join('\n') - ); + return `Creating project graph nodes with ${ + inProgressPlugins.values().next().value + }`; } else { - spinner.setMessage( - `Creating project graph nodes with ${inProgressPlugins.size} plugins` - ); + return [ + `Creating project graph nodes with ${inProgressPlugins.size} plugins`, + ...Array.from(inProgressPlugins).map((p) => ` - ${p}`), + ].join('\n'); } } const createNodesPlugins = plugins.filter( (plugin) => plugin.createNodes?.[0] ); - spinner = new DelayedSpinner( - `Creating project graph nodes with ${createNodesPlugins.length} plugins` - ); + spinner = new DelayedSpinner(getSpinnerText(), { + progressTopic: ProgressTopics.GraphConstruction, + }); const results: Promise< (readonly [ @@ -175,7 +168,7 @@ export async function createProjectConfigurationsWithPlugins( }) .finally(() => { inProgressPlugins.delete(pluginName); - updateSpinner(); + spinner.setMessage(getSpinnerText()); }); results.push(r); diff --git a/packages/nx/src/utils/delayed-spinner.ts b/packages/nx/src/utils/delayed-spinner.ts index f70ed0aec1b25..1e641e7572fb7 100644 --- a/packages/nx/src/utils/delayed-spinner.ts +++ b/packages/nx/src/utils/delayed-spinner.ts @@ -1,9 +1,18 @@ +import { isOnDaemon } from '../daemon/is-on-daemon'; +import { sendProgressMessageToTopic } from '../daemon/server/client-socket-context'; import { isCI } from './is-ci'; +import { ProgressTopic } from './progress-topics'; import { globalSpinner, SHOULD_SHOW_SPINNERS } from './spinner'; export type DelayedSpinnerOptions = { delay?: number; ciDelay?: number; + /** + * When set and running inside the Nx daemon, spinner messages are + * broadcast to every client currently subscribed to this topic so + * their own spinners stay in sync with daemon-side progress. + */ + progressTopic?: ProgressTopic; }; /** @@ -18,6 +27,7 @@ export class DelayedSpinner { private lastMessage: string; private ready: boolean; + private readonly progressTopic: ProgressTopic | undefined; /** * Constructs a new {@link DelayedSpinner} instance. @@ -26,8 +36,11 @@ export class DelayedSpinner { */ constructor(message: string, opts?: DelayedSpinnerOptions) { opts = normalizeDelayedSpinnerOpts(opts); + this.progressTopic = opts.progressTopic; const delay = SHOULD_SHOW_SPINNERS ? opts.delay : opts.ciDelay; + this.broadcastProgress(message); + this.timeouts.push( setTimeout(() => { this.ready = true; @@ -55,6 +68,7 @@ export class DelayedSpinner { } else if (this.ready && this.lastMessage && this.lastMessage !== message) { console.warn(message); } + this.broadcastProgress(message); this.lastMessage = message; return this; } @@ -86,6 +100,12 @@ export class DelayedSpinner { this.spinner?.stop(); this.timeouts.forEach((t) => clearTimeout(t)); } + + private broadcastProgress(message: string) { + if (this.progressTopic && isOnDaemon()) { + sendProgressMessageToTopic(this.progressTopic, message); + } + } } function normalizeDelayedSpinnerOpts( diff --git a/packages/nx/src/utils/progress-topics.ts b/packages/nx/src/utils/progress-topics.ts new file mode 100644 index 0000000000000..82018d6528013 --- /dev/null +++ b/packages/nx/src/utils/progress-topics.ts @@ -0,0 +1,21 @@ +/** + * Named channels that daemon clients can subscribe to in order to + * receive streaming progress/log output produced by a long-running + * daemon operation. A handler subscribes the requesting socket to the + * topics it will produce output for, and broadcast helpers fan out to + * every currently-subscribed socket for that topic. + * + * Topics are the contract between the code producing progress (usually + * through a {@link DelayedSpinner}) and the daemon subscriber registry + * that routes those messages to connected clients, so they live next + * to the spinner rather than inside the daemon implementation. + * + * Add new topics here as other long-running daemon operations grow + * their own streaming surfaces. + */ +export const ProgressTopics = { + GraphConstruction: 'graph-construction', +} as const; + +export type ProgressTopic = + (typeof ProgressTopics)[keyof typeof ProgressTopics];