Skip to content
Merged
Show file tree
Hide file tree
Changes from 4 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
19 changes: 19 additions & 0 deletions packages/nx/src/daemon/client/client.ts
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,11 @@ import { preventRecursionInGraphConstruction } from '../../project-graph/project
import { ConfigurationSourceMaps } from '../../project-graph/utils/project-configuration/source-maps';
import { parseMessage } from '../../utils/consume-messages-from-socket';
import { DelayedSpinner } from '../../utils/delayed-spinner';
import { globalSpinner } from '../../utils/spinner';
import {
isEmitLogMessage,
isUpdateProgressMessage,
} from '../message-types/streaming-messages';
import { handleImport } from '../../utils/handle-import';
import { isCI } from '../../utils/is-ci';
import { isSandbox } from '../../utils/is-sandbox';
Expand Down Expand Up @@ -1259,6 +1264,20 @@ export class DaemonClient {
'result-parse-start-' + this.currentMessage.type,
'result-parse-end-' + this.currentMessage.type
);
// Streaming messages fire side-effects on the client but do not
// resolve the pending request promise — the daemon can push several
// of these before finally sending the real response.
if (isUpdateProgressMessage(parsedResult)) {
if (globalSpinner.isSpinning()) {
globalSpinner.updateText(parsedResult.message);
}
return;
}
if (isEmitLogMessage(parsedResult)) {
// eslint-disable-next-line no-console
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't think this is disallowed

console[parsedResult.level](parsedResult.message);
return;
}
if (parsedResult.error) {
this.currentReject(parsedResult.error);
} else {
Expand Down
36 changes: 36 additions & 0 deletions packages/nx/src/daemon/message-types/streaming-messages.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,36 @@
export const UPDATE_PROGRESS_MESSAGE = 'UPDATE_PROGRESS_MESSAGE' as const;

export type UpdateProgressMessage = {
type: typeof UPDATE_PROGRESS_MESSAGE;
message: string;
};

export function isUpdateProgressMessage(
message: unknown
): message is UpdateProgressMessage {
return (
typeof message === 'object' &&
message !== null &&
'type' in message &&
message['type'] === UPDATE_PROGRESS_MESSAGE
);
}

export const EMIT_LOG = 'EMIT_LOG' as const;

export type EmitLogLevel = 'log' | 'warn' | 'error';

export type EmitLogMessage = {
type: typeof EMIT_LOG;
level: EmitLogLevel;
message: string;
};

export function isEmitLogMessage(message: unknown): message is EmitLogMessage {
return (
typeof message === 'object' &&
message !== null &&
'type' in message &&
message['type'] === EMIT_LOG
);
}
93 changes: 93 additions & 0 deletions packages/nx/src/daemon/server/client-socket-context.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,93 @@
import { AsyncLocalStorage } from 'async_hooks';
import type { Socket } from 'net';
import { MESSAGE_END_SEQ } from '../../utils/consume-messages-from-socket';
import {
EMIT_LOG,
EmitLogLevel,
UPDATE_PROGRESS_MESSAGE,
} from '../message-types/streaming-messages';
import { isOnDaemon } from '../is-on-daemon';
import { serverLogger } from '../logger';

// Messages that stream back to the client while a request is in flight
// (progress updates, log forwarding) need to know which socket belongs to
// the currently-handled request. Threading a socket through every layer
// of the handler call tree would be invasive, so we stash it in
// AsyncLocalStorage and surface it via the helpers below.
const clientSocketStorage = new AsyncLocalStorage<Socket>();

export function runWithClientSocket<T>(
socket: Socket,
fn: () => Promise<T> | T
): Promise<T> | T {
return clientSocketStorage.run(socket, fn);
}

export function getActiveClientSocket(): Socket | undefined {
return clientSocketStorage.getStore();
}
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

inline this


function assertOnDaemon(helperName: string) {
if (!isOnDaemon()) {
throw new Error(
`${helperName} can only be called from the Nx daemon process.`
);
}
}

function writeStreamingMessage(socket: Socket, payload: unknown) {
try {
socket.write(JSON.stringify(payload) + MESSAGE_END_SEQ, (err) => {
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Use the JSON serialize abstraction with v8 handling.

if (err) {
serverLogger.log(
`Streaming message write error (client likely disconnected): ${err.message}`
);
}
});
} catch (e) {
serverLogger.log(
`Failed to send streaming message to client: ${
e instanceof Error ? e.message : String(e)
}`
);
}
Comment on lines +67 to +82
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Look at respondToClient from shutdown-utils and borrow some ideas like logging this into the server logs etc.

}

/**
* Sends a progress message to the currently-connected client, which will
* update the in-flight spinner on the client side. No-op when called
* outside of a request-handling async context (no active client socket).
*
* Must only be invoked from inside the Nx daemon process.
*/
export function sendProgressMessageToClient(message: string): void {
assertOnDaemon('sendProgressMessageToClient');
const socket = getActiveClientSocket();
if (!socket) return;
writeStreamingMessage(socket, {
type: UPDATE_PROGRESS_MESSAGE,
message,
});
}

/**
* Emits a log line to the currently-connected client. If invoked outside
* of a request-handling async context (e.g. background daemon work with
* no subscriber), the message is written to the daemon logger instead so
* it is not silently dropped.
*
* Must only be invoked from inside the Nx daemon process.
*/
export function emitLogToClient(level: EmitLogLevel, message: string): void {
Comment thread
FrozenPandaz marked this conversation as resolved.
Outdated
Copy link
Copy Markdown
Contributor

@FrozenPandaz FrozenPandaz Apr 20, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Proxy this function to the daemonLogger

assertOnDaemon('emitLogToClient');
const socket = getActiveClientSocket();
if (!socket) {
serverLogger.log(`[emit-log:${level}] ${message}`);
return;
}
writeStreamingMessage(socket, {
type: EMIT_LOG,
level,
message,
});
}
7 changes: 6 additions & 1 deletion packages/nx/src/daemon/server/server.ts
Original file line number Diff line number Diff line change
Expand Up @@ -93,6 +93,7 @@ import {
isWindows,
killSocketOrPath,
} from '../socket-utils';
import { runWithClientSocket } from './client-socket-context';
import { registerFileChangeListener } from './file-watching/file-change-events';
import {
hasRegisteredFileWatcherSockets,
Expand Down Expand Up @@ -494,7 +495,11 @@ export async function handleResult(
let hr: HandlerResult;
const startMark = new Date();
try {
hr = await hrFn();
// Run the handler inside an AsyncLocalStorage context keyed on the
// requesting client socket. Any code path reached from here can call
// sendProgressMessageToClient / emitLogToClient to stream messages
// back to this specific client while the request is in flight.
hr = await runWithClientSocket(socket, () => hrFn());
} catch (error) {
hr = { description: `[${type}]`, error };
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,9 +29,18 @@ import type {
MessageResult,
PluginWorkerLoadResult,
PluginWorkerMessage,
PluginWorkerNotification,
PluginWorkerResult,
} from './messaging';
import { isPluginWorkerResult, sendMessageOverSocket } from './messaging';
import {
isPluginWorkerNotification,
isPluginWorkerResult,
sendMessageOverSocket,
} from './messaging';
import {
emitLogToClient,
sendProgressMessageToClient,
} from '../../../daemon/server/client-socket-context';
import {
Hook,
Phase,
Expand Down Expand Up @@ -213,6 +222,10 @@ export class IsolatedPlugin implements LoadedNxPlugin {

private handleSocketData = (raw: string) => {
const message = parseMessage<any>(raw);
if (isPluginWorkerNotification(message)) {
handlePluginWorkerNotification(message);
return;
}
if (!isPluginWorkerResult(message)) {
return;
}
Expand Down Expand Up @@ -696,3 +709,28 @@ type Falsy = false | 0 | '' | null | undefined | 0n;
function hooks(...array: Array<Hook | Falsy>): Array<Hook> {
return array.filter((v): v is Hook => !!v);
}

// When the host process is the daemon, forward notifications to the
// currently-connected client so log lines surface in the user's
// terminal and progress updates reach the client-side spinner. When the
// host is the direct CLI there is no client socket, so log lines go
// straight to stdout/stderr; progress updates from workers have no
// destination in that case and are dropped (the CLI's own in-process
// plugin-loading loop is responsible for spinner updates there).
function handlePluginWorkerNotification(
notification: PluginWorkerNotification
): void {
if ((global as any).NX_DAEMON) {
if (notification.type === 'emitLog') {
emitLogToClient(notification.level, notification.message);
} else if (notification.type === 'updateProgress') {
sendProgressMessageToClient(notification.message);
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We don't currently have a use case for progress messages from plugin workers... so let's remove it for now.

}
return;
}
if (notification.type === 'emitLog') {
const stream =
notification.level === 'error' ? process.stderr : process.stdout;
stream.write(notification.message + '\n');
}
}
38 changes: 37 additions & 1 deletion packages/nx/src/project-graph/plugins/isolation/messaging.ts
Original file line number Diff line number Diff line change
Expand Up @@ -173,6 +173,42 @@ export type MessageResult<T extends PluginWorkerMessage['type']> = ResultOf<
T & WithResult<PluginMessageDefs>
>;

// =============================================================================
// NOTIFICATIONS (worker -> host, unsolicited, no response expected)
// =============================================================================

export type PluginWorkerEmitLogNotification = {
type: 'emitLog';
level: 'log' | 'warn' | 'error';
message: string;
};

export type PluginWorkerUpdateProgressNotification = {
type: 'updateProgress';
message: string;
};

export type PluginWorkerNotification =
| PluginWorkerEmitLogNotification
| PluginWorkerUpdateProgressNotification;

const NOTIFICATION_TYPES: ReadonlyArray<PluginWorkerNotification['type']> = [
'emitLog',
'updateProgress',
];

export function isPluginWorkerNotification(
message: Serializable
): message is PluginWorkerNotification {
return (
typeof message === 'object' &&
message !== null &&
'type' in message &&
typeof message.type === 'string' &&
(NOTIFICATION_TYPES as readonly string[]).includes(message.type)
);
}

// =============================================================================
// TYPE GUARDS
// =============================================================================
Expand Down Expand Up @@ -264,7 +300,7 @@ export async function consumeMessage(
*/
export function sendMessageOverSocket(
socket: Socket,
message: PluginWorkerMessage | PluginWorkerResult
message: PluginWorkerMessage | PluginWorkerResult | PluginWorkerNotification
): void {
socket.write(serialize(message));
socket.write(MESSAGE_END_SEQ);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ import { logger } from '../../../utils/logger';
import { createSerializableError } from '../../../utils/serializable-error';
import type { LoadedNxPlugin } from '../loaded-nx-plugin';
import { consumeMessage, isPluginWorkerMessage } from './messaging';
import { setPluginWorkerHostSocket } from './worker-streaming';

import { unlinkSync } from 'fs';
import { createServer } from 'net';
Expand Down Expand Up @@ -51,6 +52,11 @@ let connectErrorTimeout = setErrorTimeout(
const server = createServer((socket) => {
connectErrorTimeout?.clear();

// Make the host-facing socket available to plugin code running in this
// worker so it can emit log / progress notifications without having
// the socket threaded through every call site.
setPluginWorkerHostSocket(socket);

logger.verbose(
`[plugin-worker] "${expectedPluginName}" (pid: ${process.pid}) connected`
);
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,54 @@
import type { Socket } from 'net';
import {
PluginWorkerEmitLogNotification,
PluginWorkerUpdateProgressNotification,
sendMessageOverSocket,
} from './messaging';

// Plugin workers talk to their host process over a single socket that is
// established when the host connects. Plugin code running anywhere in the
// worker process needs a way to emit log lines / progress updates without
// having that socket threaded through every call frame, so we stash a
// module-level reference here when the host connects.
let hostSocket: Socket | null = null;

export function setPluginWorkerHostSocket(socket: Socket): void {
hostSocket = socket;
socket.once('close', () => {
if (hostSocket === socket) hostSocket = null;
});
}

/**
* Emits a log line from the plugin worker up to its host. The host
* decides where it ends up (direct stdout/stderr when running under the
* CLI, forwarded to the active daemon client when running under the
* daemon).
*
* No-op when called outside of a plugin worker (i.e. no host connected).
*/
export function emitPluginWorkerLog(
level: PluginWorkerEmitLogNotification['level'],
message: string
): void {
if (!hostSocket) return;
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If plugin isolation is turned off this should probably log it?

sendMessageOverSocket(hostSocket, {
type: 'emitLog',
level,
message,
});
}

/**
* Emits a progress message from the plugin worker up to its host. The
* host forwards it to the active daemon client (if running under the
* daemon) so an in-flight spinner can reflect the update. No-op when
* called outside of a plugin worker.
*/
export function emitPluginWorkerProgress(message: string): void {
if (!hostSocket) return;
sendMessageOverSocket(hostSocket, {
type: 'updateProgress',
message,
} satisfies PluginWorkerUpdateProgressNotification);
}
Loading
Loading