Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
110 changes: 110 additions & 0 deletions packages/php-wasm/universal/src/lib/object-pool-proxy.spec.ts
Original file line number Diff line number Diff line change
Expand Up @@ -210,4 +210,114 @@ describe('createPoolProxy', () => {

expect([first, second].sort()).toEqual(['a', 'b']);
});

describe('__withInstance', () => {
it('releases the instance only after the callback promise resolves', async () => {
// One instance, one long-running __withInstance call.
// A concurrent proxy call must wait until the callback
// finishes — not until any intermediate promise inside
// the callback resolves.
const accessLog: string[] = [];
const instance = {
async step(label: string) {
accessLog.push(`step-${label}`);
await new Promise((r) => setTimeout(r, 10));
return label;
},
};

const proxy = createObjectPoolProxy([instance]);

let callbackDone = false;
const withInstanceCall = proxy.__withInstance(async (inst) => {
// Multiple proxied calls inside a single held instance.
// Simulates streaming where the response starts before
// the body is drained.
await inst.step('first');
accessLog.push('mid');
await new Promise((r) => setTimeout(r, 30));
await inst.step('second');
callbackDone = true;
return 'withInstance-done';
});

// Start a concurrent proxied call after a short delay.
// It must queue behind the held instance and only run
// after the callback has fully resolved.
const concurrentCall = (async () => {
await new Promise((r) => setTimeout(r, 5));
accessLog.push('concurrent-enqueued');
const result = await proxy.step('concurrent');
// Assert the callback had already completed by the
// time we acquired the instance.
expect(callbackDone).toBe(true);
return result;
})();

const [withResult, concResult] = await Promise.all([
withInstanceCall,
concurrentCall,
]);

expect(withResult).toBe('withInstance-done');
expect(concResult).toBe('concurrent');

// The concurrent call was enqueued mid-callback but ran
// only after the callback fully finished.
expect(accessLog).toEqual([
'step-first',
'concurrent-enqueued',
'mid',
'step-second',
'step-concurrent',
]);
});

it('releases the instance when the callback throws async', async () => {
const instance = {
async ok() {
return 'ok';
},
};

const proxy = createObjectPoolProxy([instance]);

await expect(
proxy.__withInstance(async () => {
await new Promise((r) => setTimeout(r, 10));
throw new Error('callback boom');
})
).rejects.toThrow('callback boom');

// Instance must be back in the free pool — subsequent
// calls proceed immediately.
expect(await proxy.ok()).toBe('ok');
});

it('releases the instance when the callback throws sync', async () => {
const instance = {
async ok() {
return 'ok';
},
};

const proxy = createObjectPoolProxy([instance]);

await expect(
proxy.__withInstance(() => {
throw new Error('sync boom');
})
).rejects.toThrow('sync boom');

expect(await proxy.ok()).toBe('ok');
});

it('passes the raw instance, not a proxy', async () => {
const instance = { marker: Symbol('raw-instance'), value: 1 };
const proxy = createObjectPoolProxy([instance]);

const received = await proxy.__withInstance(async (inst) => inst);
expect(received).toBe(instance);
});
});
});
26 changes: 24 additions & 2 deletions packages/php-wasm/universal/src/lib/object-pool-proxy.ts
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,24 @@ type Promisified<T extends object> = {
export type Pooled<T extends object> = Omit<
Promisified<T>,
typeof Symbol.dispose | typeof Symbol.asyncDispose
>;
> & {
/**
* Acquire an instance, invoke `fn` with it, and release the
* instance only after `fn`'s returned promise resolves (or
* throws). Use this when a single logical unit of work spans
* multiple proxied calls or outlives a single method invocation
* — e.g. a streaming HTTP response where PHP execution must
* remain bound to one worker until the stream fully drains.
*
* The default per-call acquire/release semantics of the proxy
* release the instance as soon as the proxied method's promise
* resolves, which is too early for streamed responses: the
* response object is returned before the body finishes
* streaming, so the worker would be handed to a concurrent
* request mid-stream.
*/
__withInstance<R>(fn: (instance: T) => R | Promise<R>): Promise<R>;
};

/**
* Creates a proxy that distributes method calls and property accesses
Expand Down Expand Up @@ -88,7 +105,12 @@ export function createObjectPoolProxy<T extends object>(
});
}

return new Proxy({} as Pooled<T>, {
const proxyTarget = {} as Pooled<T>;
// Expose __withInstance directly on the target so it's found by
// the `prop in _target` check before hitting the proxy trap.
(proxyTarget as any).__withInstance = withInstance;

return new Proxy(proxyTarget, {
Comment on lines +108 to +113
Copy link

Copilot AI Apr 14, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Adding a stringly-named control method (__removeInstance) onto a generic Pooled<T> can mask a real __removeInstance property/method on T (if one exists) since the proxy will return the pool control function instead of forwarding to the underlying instance. To avoid potential name collisions, prefer a Symbol-keyed control API (or a separate returned control object) so proxy consumers can’t accidentally shadow/override underlying instance members.

Copilot uses AI. Check for mistakes.
get(_target, prop: string | symbol) {
// Support returning assigned target properties.
// The main reason for this is to allow us to override methods
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,7 @@ export class SinglePHPInstanceManager implements PHPInstanceManager {
async acquirePHPInstance(): Promise<AcquiredPHP> {
if (this.isAcquired) {
throw new Error(
'The PHP instance already acquired. SinglePHPInstanceManager cannot spawn another PHP instance since, by definition, it only manages a single PHP instance.'
'The PHP instance is already acquired. SinglePHPInstanceManager cannot spawn another PHP instance since, by definition, it only manages a single PHP instance.'
);
}
const php = await this.getPrimaryPhp();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import {
consumeAPI,
isLegacyPHPVersion,
type Pooled,
type RemoteAPI,
type UniversalPHP,
} from '@php-wasm/universal';
import type { BlueprintV1Declaration } from '@wp-playground/blueprints';
Expand Down Expand Up @@ -61,7 +62,7 @@ export class BlueprintsV1Handler {
}

async bootWordPress(
playground: Pooled<PlaygroundCliWorker>,
playground: Pooled<RemoteAPI<PlaygroundCliWorker>>,
workerPostInstallMountsPort: NodeMessagePort
) {
let wpDetails: any = undefined;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,7 @@ export class BlueprintsV2Handler {
}

async bootWordPress(
playground: Pooled<PlaygroundCliWorker>,
playground: Pooled<RemoteAPI<PlaygroundCliWorker>>,
workerPostInstallMountsPort: NodeMessagePort
) {
const workerBootArgs = {
Expand Down
90 changes: 65 additions & 25 deletions packages/playground/cli/src/run-cli.ts
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ import {
type PathAlias,
type RemoteAPI,
type AllPHPVersion,
type UniversalPHP,
} from '@php-wasm/universal';
import {
PHPResponse,
Expand Down Expand Up @@ -944,7 +945,7 @@ export type PlaygroundCliWorker =
export const internalsKeyForTesting = Symbol('playground-cli-testing');

export interface RunCLIServer extends AsyncDisposable {
playground: Pooled<PlaygroundCliWorker>;
playground: Pooled<RemoteAPI<PlaygroundCliWorker>>;
server: Server;
serverUrl: string;

Expand Down Expand Up @@ -989,7 +990,7 @@ export async function runCLI(
): Promise<RunCLIServer>;
export async function runCLI(args: RunCLIArgs): Promise<RunCLIServer | void>;
export async function runCLI(args: RunCLIArgs): Promise<RunCLIServer | void> {
let playgroundPool: Pooled<PlaygroundCliWorker>;
let playgroundPool: Pooled<RemoteAPI<PlaygroundCliWorker>>;
const cookieStore = args.internalCookieStore
? new HttpCookieStore()
: undefined;
Expand Down Expand Up @@ -1606,6 +1607,19 @@ export async function runCLI(args: RunCLIArgs): Promise<RunCLIServer | void> {

wordPressReady = true;

// Casts to UniversalPHP at the blueprint/bridge handoff
// points: UniversalPHP is defined as
// `Pooled<LimitedPHPApi>` (among other arms), but the
// runtime pool holds `RemoteAPI<PlaygroundCliWorker>`
// instances. The two shapes expose the same methods
// through the proxy (both produce promises), but
// `Pooled<T>` now reveals T via `__withInstance`'s
// callback — making the structural difference
// observable. Widening `UniversalPHP` to allow
// `Pooled<RemoteAPI<LimitedPHPApi>>` is a cross-package
// public-type change worth a dedicated PR; these
// localized casts preserve the honest local typing
// without churning the blueprints API.
if (!args['experimental-blueprints-v2-runner']) {
const compiledBlueprint = await (
handler as BlueprintsV1Handler
Expand All @@ -1616,7 +1630,7 @@ export async function runCLI(args: RunCLIArgs): Promise<RunCLIServer | void> {
if (compiledBlueprint) {
await runBlueprintV1Steps(
compiledBlueprint,
playgroundPool
playgroundPool as unknown as UniversalPHP
);
}
}
Expand All @@ -1630,7 +1644,10 @@ export async function runCLI(args: RunCLIArgs): Promise<RunCLIServer | void> {
) {
const steps = await getPhpMyAdminInstallSteps();
const compiled = await compileBlueprintV1({ steps });
await runBlueprintV1Steps(compiled, playgroundPool);
await runBlueprintV1Steps(
compiled,
playgroundPool as unknown as UniversalPHP
);
}

if (args.command === 'build-snapshot') {
Expand Down Expand Up @@ -1690,7 +1707,8 @@ export async function runCLI(args: RunCLIArgs): Promise<RunCLIServer | void> {

if (args.xdebug && args.experimentalDevtools) {
const bridge = await startBridge({
phpInstance: playgroundPool,
// See the UniversalPHP cast rationale above.
phpInstance: playgroundPool as unknown as UniversalPHP,
phpRoot: '/wordpress',
});

Expand Down Expand Up @@ -1718,12 +1736,18 @@ export async function runCLI(args: RunCLIArgs): Promise<RunCLIServer | void> {
throw new Error(phpLogs, { cause: error });
}
},
async handleRequest(request: PHPRequest): Promise<StreamedPHPResponse> {
async handleRequest(
request: PHPRequest,
streamResponse: (response: StreamedPHPResponse) => Promise<void>
): Promise<void> {
if (!wordPressReady) {
return StreamedPHPResponse.forHttpCode(
502,
'WordPress is not ready yet'
await streamResponse(
StreamedPHPResponse.forHttpCode(
502,
'WordPress is not ready yet'
)
);
return;
}
// Clear the playground_auto_login_already_happened cookie on the first request.
// Otherwise the first Playground CLI server started on the machine will set it,
Expand All @@ -1745,9 +1769,12 @@ export async function runCLI(args: RunCLIArgs): Promise<RunCLIServer | void> {
'playground_auto_login_already_happened=1; Max-Age=0; Expires=Thu, 01 Jan 1970 00:00:00 GMT; Path=/',
];
}
return StreamedPHPResponse.fromPHPResponse(
new PHPResponse(302, headers, new Uint8Array())
await streamResponse(
StreamedPHPResponse.fromPHPResponse(
new PHPResponse(302, headers, new Uint8Array())
)
);
return;
}
if (cookieStore) {
request = {
Expand All @@ -1764,20 +1791,33 @@ export async function runCLI(args: RunCLIArgs): Promise<RunCLIServer | void> {
};
}

// TODO: Explore switching to a worker thread method to adopt an entire HTTP connection
// It might be more efficient to let the worker respond directly
const response = await playgroundPool.requestStreamed(request);

if (cookieStore) {
const headers = await response.headers;
cookieStore.rememberCookiesFromResponseHeaders(headers);
// While we have an internal cookie store, we filter out the
// Set-Cookie headers from responses so the browser does not
// attempt to manage cookies at the same time as the server.
delete headers['set-cookie'];
}
// Scope the acquire/release around the full stream drain via
// __withInstance. The pool proxy's default per-call semantics
// release the worker when requestStreamed()'s promise resolves
// — which is when the response *starts* streaming, before PHP
// finishes writing the body. That caused "PHP instance already
// acquired" errors under concurrency. Holding the pool instance
// until streamResponse() fully drains ties the worker lifecycle
// to the HTTP response lifecycle, which restores true streaming
// for large responses and SSE without regressing concurrency.
await playgroundPool.__withInstance(async (instance) => {
const response = await instance.requestStreamed(request);

if (cookieStore) {
const headers = await response.headers;
cookieStore.rememberCookiesFromResponseHeaders(headers);
// While we have an internal cookie store, we filter out
// the Set-Cookie headers from responses so the browser
// does not attempt to manage cookies at the same time
// as the server.
delete headers['set-cookie'];
}

return response;
// The callback must not resolve until streamResponse has
// fully drained — that is what keeps the worker bound to
// this request for the entire response lifecycle.
await streamResponse(response);
});
},
}).catch((error) => {
cliOutput.printError(describeError(error));
Expand Down Expand Up @@ -2038,7 +2078,7 @@ function openInBrowser(url: string): void {
}

async function zipSite(
playground: Pooled<PlaygroundCliWorker>,
playground: Pooled<RemoteAPI<PlaygroundCliWorker>>,
outfile: string
) {
await playground.run({
Expand Down
16 changes: 12 additions & 4 deletions packages/playground/cli/src/start-server.ts
Original file line number Diff line number Diff line change
Expand Up @@ -16,9 +16,16 @@ export interface ServerOptions {
port: number;
onBind: (server: Server, port: number) => Promise<RunCLIServer | void>;
/**
* Handler for requests. Always returns StreamedPHPResponse.
* Handler for requests. Receives the PHP request and a callback that
* streams the response to the HTTP client. The handler must not resolve
* until the response is fully streamed — this is critical for pool-based
* concurrency control where the worker must be held for the entire
* request lifecycle.
*/
handleRequest: (request: PHPRequest) => Promise<StreamedPHPResponse>;
handleRequest: (
request: PHPRequest,
streamResponse: (response: StreamedPHPResponse) => Promise<void>
) => Promise<void>;
}

export function isPortInUse(port: number): Promise<boolean> {
Expand Down Expand Up @@ -63,8 +70,9 @@ export async function startServer(
body: await bufferRequestBody(req),
};

const response = await options.handleRequest(phpRequest);
await handleStreamedResponse(response, res);
await options.handleRequest(phpRequest, (response) =>
handleStreamedResponse(response, res)
);
} catch (error) {
logger.error(error);
if (!res.headersSent) {
Expand Down
Loading
Loading