diff --git a/src/attach.ts b/src/attach.ts index caae1264..6ed3f549 100644 --- a/src/attach.ts +++ b/src/attach.ts @@ -59,7 +59,7 @@ export const attachSockets = async ({ const emitCfg: EmitterConfig = { emission, timeout }; const nsCtx: IndependentContext = { logger: rootLogger, - withRooms: makeRoomService({ subject: io, metadata, ...emitCfg }), + withRooms: makeRoomService({ subject: ns, metadata, ...emitCfg }), all: { getClients: async () => makeRemoteClients({ @@ -68,7 +68,7 @@ export const attachSockets = async ({ ...emitCfg, }), getRooms: () => Array.from(ns.adapter.rooms.keys()), - broadcast: makeEmitter({ subject: io, ...emitCfg }), + broadcast: makeEmitter({ subject: ns, ...emitCfg }), }, }; ns.on("connection", async (socket) => { diff --git a/src/emission.ts b/src/emission.ts index 24c342c6..d9718327 100644 --- a/src/emission.ts +++ b/src/emission.ts @@ -58,14 +58,16 @@ export function makeEmitter( } & EmitterConfig, ): Emitter; export function makeEmitter( - props: { subject: Socket["broadcast"] | Server } & EmitterConfig, + props: { + subject: Socket["broadcast"] | Server | Namespace; + } & EmitterConfig, ): Broadcaster; export function makeEmitter({ subject, emission, timeout, }: { - subject: Socket | SomeRemoteSocket | Socket["broadcast"] | Server; + subject: Socket | SomeRemoteSocket | Namespace | Socket["broadcast"] | Server; } & EmitterConfig) { /** * @throws z.ZodError on validation diff --git a/tests/system/issue590.spec.ts b/tests/system/issue590.spec.ts index ee5103da..7b510b87 100644 --- a/tests/system/issue590.spec.ts +++ b/tests/system/issue590.spec.ts @@ -2,7 +2,7 @@ import http from "node:http"; import { Server } from "socket.io"; import { io as ioClient } from "socket.io-client"; import { z } from "zod"; -import { attachSockets, Config, ActionsFactory } from "../../src"; +import { ActionsFactory, attachSockets, Config } from "../../src"; import { promisify } from "node:util"; import assert from "node:assert/strict"; @@ -17,6 +17,11 @@ const port = 8999; */ describe("Issue #590", () => { describe("attachSockets() with real Socket.IO", () => { + let intervalRef: NodeJS.Timeout | undefined; + afterEach(() => { + if (intervalRef) clearInterval(intervalRef); + }); + test("should query and broadcast to rooms joined in onConnection", async () => { const httpServer = http.createServer(); const io = new Server(); @@ -81,6 +86,119 @@ describe("Issue #590", () => { // withRooms().broadcast() should reach client: expect(receivedBroadcast).toBe("hello"); + clientSocket.disconnect(); + await promisify(io.close.bind(io))(); + if (httpServer.listening) + await promisify(httpServer.close.bind(httpServer))(); + }); + test("should broadcast to all from startup hook using all.broadcast", async () => { + const httpServer = http.createServer(); + const io = new Server(); + + const config = new Config().addNamespace({ + path: "/chat", + emission: { + testBroadcast: { schema: z.tuple([z.string()]) }, + }, + hooks: { + onConnection: async ({ client }) => { + await client.join("testRoom"); + }, + onStartup: async ({ all }) => { + intervalRef = setInterval(() => { + all.broadcast("testBroadcast", "from onStartup hook"); + }, 100); + }, + }, + metadata: z.object({}), + }); + + await attachSockets({ + io, + config, + actions: [], + target: httpServer, + }); + + await promisify(httpServer.listen.bind(httpServer, port))(); + + // connect client: + const clientSocket = ioClient(`http://localhost:${port}/chat`, { + transports: ["websocket"], + }); + + await vi.waitFor(() => assert(clientSocket.connected)); + + // listen for broadcast: + const broadcastReceived = new Promise((resolve) => { + clientSocket.on("testBroadcast", resolve); + }); + + const receivedBroadcast = await vi.waitFor(() => broadcastReceived, { + timeout: 1000, + }); + + // all.broadcast() should reach client: + expect(receivedBroadcast).toBe("from onStartup hook"); + + clientSocket.disconnect(); + await promisify(io.close.bind(io))(); + if (httpServer.listening) + await promisify(httpServer.close.bind(httpServer))(); + }); + test("should broadcast to rooms from startup hook using withRooms", async () => { + const httpServer = http.createServer(); + const io = new Server(); + + const config = new Config().addNamespace({ + path: "/chat", + emission: { + testBroadcast: { schema: z.tuple([z.string()]) }, + }, + hooks: { + onConnection: async ({ client }) => { + await client.join("testRoom"); + }, + onStartup: async ({ withRooms }) => { + intervalRef = setInterval(() => { + withRooms("testRoom").broadcast( + "testBroadcast", + "from onStartup hook", + ); + }, 100); + }, + }, + metadata: z.object({}), + }); + + await attachSockets({ + io, + config, + actions: [], + target: httpServer, + }); + + await promisify(httpServer.listen.bind(httpServer, port))(); + + // connect client: + const clientSocket = ioClient(`http://localhost:${port}/chat`, { + transports: ["websocket"], + }); + + await vi.waitFor(() => assert(clientSocket.connected)); + + // listen for broadcast: + const broadcastReceived = new Promise((resolve) => { + clientSocket.on("testBroadcast", resolve); + }); + + const receivedBroadcast = await vi.waitFor(() => broadcastReceived, { + timeout: 1000, + }); + + // withRooms().broadcast() should reach client: + expect(receivedBroadcast).toBe("from onStartup hook"); + clientSocket.disconnect(); await promisify(io.close.bind(io))(); if (httpServer.listening)