Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions src/attach.ts
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,7 @@ export const attachSockets = async <NS extends Namespaces>({
const emitCfg: EmitterConfig<NSEmissions> = { emission, timeout };
const nsCtx: IndependentContext<NSEmissions, NSMeta> = {
logger: rootLogger,
withRooms: makeRoomService({ subject: io, metadata, ...emitCfg }),
withRooms: makeRoomService({ subject: ns, metadata, ...emitCfg }),
all: {
getClients: async () =>
makeRemoteClients({
Expand All @@ -68,7 +68,7 @@ export const attachSockets = async <NS extends Namespaces>({
...emitCfg,
}),
getRooms: () => Array.from(ns.adapter.rooms.keys()),
broadcast: makeEmitter({ subject: io, ...emitCfg }),
broadcast: makeEmitter({ subject: ns, ...emitCfg }),
},
};
ns.on("connection", async (socket) => {
Expand Down
6 changes: 4 additions & 2 deletions src/emission.ts
Original file line number Diff line number Diff line change
Expand Up @@ -58,14 +58,16 @@ export function makeEmitter<E extends EmissionMap>(
} & EmitterConfig<E>,
): Emitter<E>;
export function makeEmitter<E extends EmissionMap>(
props: { subject: Socket["broadcast"] | Server } & EmitterConfig<E>,
props: {
subject: Socket["broadcast"] | Server | Namespace;
} & EmitterConfig<E>,
): Broadcaster<E>;
export function makeEmitter({
subject,
emission,
timeout,
}: {
subject: Socket | SomeRemoteSocket | Socket["broadcast"] | Server;
subject: Socket | SomeRemoteSocket | Namespace | Socket["broadcast"] | Server;
} & EmitterConfig<EmissionMap>) {
/**
* @throws z.ZodError on validation
Expand Down
120 changes: 119 additions & 1 deletion tests/system/issue590.spec.ts
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@ import http from "node:http";
import { Server } from "socket.io";
import { io as ioClient } from "socket.io-client";
import { z } from "zod";
import { attachSockets, Config, ActionsFactory } from "../../src";
import { ActionsFactory, attachSockets, Config } from "../../src";
import { promisify } from "node:util";
import assert from "node:assert/strict";

Expand All @@ -17,6 +17,11 @@ const port = 8999;
*/
describe("Issue #590", () => {
describe("attachSockets() with real Socket.IO", () => {
let intervalRef: NodeJS.Timeout | undefined;
afterEach(() => {
if (intervalRef) clearInterval(intervalRef);
});

test("should query and broadcast to rooms joined in onConnection", async () => {
const httpServer = http.createServer();
const io = new Server();
Expand Down Expand Up @@ -81,6 +86,119 @@ describe("Issue #590", () => {
// withRooms().broadcast() should reach client:
expect(receivedBroadcast).toBe("hello");

clientSocket.disconnect();
await promisify(io.close.bind(io))();
if (httpServer.listening)
await promisify(httpServer.close.bind(httpServer))();
});
test("should broadcast to all from startup hook using all.broadcast", async () => {
const httpServer = http.createServer();
const io = new Server();

const config = new Config().addNamespace({
path: "/chat",
emission: {
testBroadcast: { schema: z.tuple([z.string()]) },
},
hooks: {
onConnection: async ({ client }) => {
await client.join("testRoom");
},
onStartup: async ({ all }) => {
intervalRef = setInterval(() => {
all.broadcast("testBroadcast", "from onStartup hook");
}, 100);
},
},
metadata: z.object({}),
});

await attachSockets({
io,
config,
actions: [],
target: httpServer,
});

await promisify(httpServer.listen.bind(httpServer, port))();

// connect client:
const clientSocket = ioClient(`http://localhost:${port}/chat`, {
transports: ["websocket"],
});

await vi.waitFor(() => assert(clientSocket.connected));

// listen for broadcast:
const broadcastReceived = new Promise<string>((resolve) => {
clientSocket.on("testBroadcast", resolve);
});

const receivedBroadcast = await vi.waitFor(() => broadcastReceived, {
timeout: 1000,
});

// all.broadcast() should reach client:
expect(receivedBroadcast).toBe("from onStartup hook");

clientSocket.disconnect();
await promisify(io.close.bind(io))();
if (httpServer.listening)
await promisify(httpServer.close.bind(httpServer))();
});
test("should broadcast to rooms from startup hook using withRooms", async () => {
const httpServer = http.createServer();
const io = new Server();

const config = new Config().addNamespace({
path: "/chat",
emission: {
testBroadcast: { schema: z.tuple([z.string()]) },
},
hooks: {
onConnection: async ({ client }) => {
await client.join("testRoom");
},
onStartup: async ({ withRooms }) => {
intervalRef = setInterval(() => {
withRooms("testRoom").broadcast(
"testBroadcast",
"from onStartup hook",
);
}, 100);
},
},
metadata: z.object({}),
});

await attachSockets({
io,
config,
actions: [],
target: httpServer,
});

await promisify(httpServer.listen.bind(httpServer, port))();

// connect client:
const clientSocket = ioClient(`http://localhost:${port}/chat`, {
transports: ["websocket"],
});

await vi.waitFor(() => assert(clientSocket.connected));

// listen for broadcast:
const broadcastReceived = new Promise<string>((resolve) => {
clientSocket.on("testBroadcast", resolve);
});

const receivedBroadcast = await vi.waitFor(() => broadcastReceived, {
timeout: 1000,
});

// withRooms().broadcast() should reach client:
expect(receivedBroadcast).toBe("from onStartup hook");

clientSocket.disconnect();
await promisify(io.close.bind(io))();
if (httpServer.listening)
Expand Down