diff --git a/examples/pubsub/package.json b/examples/pubsub/package.json new file mode 100644 index 0000000..1e97c06 --- /dev/null +++ b/examples/pubsub/package.json @@ -0,0 +1,17 @@ +{ + "name": "@cloudflare/actors-example-pubsub", + "private": true, + "type": "module", + "scripts": { + "dev": "vite", + "build": "vite build", + "deploy": "wrangler deploy", + "cf-typegen": "wrangler types" + }, + "keywords": [], + "author": "", + "dependencies": { + "@cloudflare/vite-plugin": "^1.3.1", + "vite": "^6.3.5" + } +} \ No newline at end of file diff --git a/examples/pubsub/src/index.ts b/examples/pubsub/src/index.ts new file mode 100644 index 0000000..dd18bcc --- /dev/null +++ b/examples/pubsub/src/index.ts @@ -0,0 +1,92 @@ +import { Actor, Entrypoint, handler } from "../../../packages/core/src"; + +export class PubSubService extends Actor { + private subscribers = new Set>(); + + async subscribe(): Promise> { + let controller: ReadableStreamDefaultController; + + return new ReadableStream({ + start: (ctrl) => { + controller = ctrl; + this.subscribers.add(controller); + }, + cancel: () => { + this.subscribers.delete(controller); + }, + }); + } + + async publish(message: string): Promise { + const messageData = JSON.stringify({ + message, + timestamp: Date.now(), + service: this.identifier, + }); + + const encodedMessage = new TextEncoder().encode(messageData + "\n"); + + const subscribersToRemove: ReadableStreamDefaultController[] = + []; + + this.subscribers.forEach((controller) => { + try { + controller.enqueue(encodedMessage); + } catch (e) { + subscribersToRemove.push(controller); + } + }); + + subscribersToRemove.forEach((controller) => { + this.subscribers.delete(controller); + }); + } +} + +export class PubSubClient extends Entrypoint { + async fetch(request: Request): Promise { + const url = new URL(request.url); + const pathParts = url.pathname.split("/").filter((part) => part.length > 0); + + const [action, serviceName] = pathParts; + + if (!serviceName) { + return new Response("Service name is required", { status: 400 }); + } + + const service = PubSubService.get(serviceName); + + if (request.method === "GET" && action === "subscribe") { + const stream = await service.subscribe(); + + return new Response(stream, { + headers: { + "Content-Type": "text/plain; charset=utf-8", + "Cache-Control": "no-cache", + Connection: "keep-alive", + }, + }); + } + + if (request.method === "POST" && action === "publish") { + const message = await request.text(); + + if (!message.trim()) { + return new Response("Message body is required", { status: 400 }); + } + + await service.publish(message); + + return new Response("Message published successfully", { status: 200 }); + } + + return new Response( + "Not found. Use GET /subscribe/{service} or POST /publish/{service}", + { + status: 404, + }, + ); + } +} + +export default handler(PubSubClient); diff --git a/examples/pubsub/vite.config.ts b/examples/pubsub/vite.config.ts new file mode 100644 index 0000000..064a3a6 --- /dev/null +++ b/examples/pubsub/vite.config.ts @@ -0,0 +1,7 @@ +import { defineConfig } from "vite"; +import { cloudflare } from "@cloudflare/vite-plugin"; + +export default defineConfig({ + plugins: [cloudflare()], +}); + diff --git a/examples/pubsub/wrangler.jsonc b/examples/pubsub/wrangler.jsonc new file mode 100644 index 0000000..864560e --- /dev/null +++ b/examples/pubsub/wrangler.jsonc @@ -0,0 +1,26 @@ +{ + "$schema": "../../node_modules/wrangler/config-schema.json", + "name": "actor-pubsub", + "main": "src/index.ts", + "compatibility_date": "2025-05-20", + "compatibility_flags": [], + "migrations": [ + { + "new_sqlite_classes": [ + "PubSubService" + ], + "tag": "v1" + } + ], + "durable_objects": { + "bindings": [ + { + "class_name": "PubSubService", + "name": "PubSubService" + } + ] + }, + "observability": { + "enabled": true + } +} \ No newline at end of file