refactor: replace hand-rolled SSE with Hono + fetch-event-source
Server (index.ts): - migrate to Hono streamSSE, mounted under Bun.serve fetch handler - idleTimeout: 255 fixes the silent Bun 10s timeout that killed SSE responses before the first keepalive could fire (root cause of the empty EventStream tab) - stream.onAbort wires an AbortController into upstream fetch signal - 15s : keepalive raw SSE comments for Cloudflare 120s headroom - decodeDataUrl returns Uint8Array<ArrayBuffer> for DOM Blob types - chromeDevToolsAutomaticWorkspaceFolders: false silences the 'Unable to add filesystem' warning in sandboxed browsers Client (client.ts new): - extracted from inline <script> — Bun only bundles external script src, not inline module imports, so node_modules bare specifiers must live in their own file - @microsoft/fetch-event-source replaces hand-rolled fetch + ReadableStream parsing; supports POST + body + signal natively - client aborts the loop on event:done so fetchEventSource doesn't retry Build: - drop unused react/react-dom/@types/react* deps (KISS) - add 'DOM', 'DOM.Iterable' to tsconfig lib for client.ts
This commit is contained in:
@@ -1,3 +1,6 @@
|
||||
import { Hono } from "hono";
|
||||
import { streamSSE } from "hono/streaming";
|
||||
import type { SSEStreamingApi } from "hono/streaming";
|
||||
import index from "./index.html";
|
||||
|
||||
type Size = `${number}x${number}`;
|
||||
@@ -11,26 +14,16 @@ type GenerateRequest = {
|
||||
referenceImages?: string[];
|
||||
};
|
||||
|
||||
type SSEController = ReadableStreamDefaultController<Uint8Array>;
|
||||
|
||||
const encoder = new TextEncoder();
|
||||
|
||||
function sseEvent(controller: SSEController, event: string, data: unknown): void {
|
||||
controller.enqueue(
|
||||
encoder.encode(`event: ${event}\ndata: ${JSON.stringify(data)}\n\n`),
|
||||
);
|
||||
}
|
||||
|
||||
function sseComment(controller: SSEController, text: string): void {
|
||||
controller.enqueue(encoder.encode(`: ${text}\n\n`));
|
||||
}
|
||||
|
||||
function decodeDataUrl(dataUrl: string): { bytes: Buffer; mime: string } | null {
|
||||
function decodeDataUrl(
|
||||
dataUrl: string,
|
||||
): { bytes: Uint8Array<ArrayBuffer>; mime: string } | null {
|
||||
const match = dataUrl.match(/^data:([^;]+);base64,(.+)$/);
|
||||
if (!match) return null;
|
||||
const mime = match[1]!;
|
||||
const b64 = match[2]!;
|
||||
return { bytes: Buffer.from(b64, "base64"), mime };
|
||||
const binary = atob(match[2]!);
|
||||
const bytes = new Uint8Array(new ArrayBuffer(binary.length));
|
||||
for (let i = 0; i < binary.length; i++) bytes[i] = binary.charCodeAt(i);
|
||||
return { bytes, mime };
|
||||
}
|
||||
|
||||
async function callUpstream(args: {
|
||||
@@ -41,8 +34,9 @@ async function callUpstream(args: {
|
||||
size: Size;
|
||||
referenceImages: string[];
|
||||
stream: boolean;
|
||||
signal?: AbortSignal;
|
||||
}): Promise<Response> {
|
||||
const { baseURL, apiKey, model, prompt, size, referenceImages, stream } = args;
|
||||
const { baseURL, apiKey, model, prompt, size, referenceImages, stream, signal } = args;
|
||||
const isEdit = referenceImages.length > 0;
|
||||
const url = `${baseURL.replace(/\/+$/, "")}/images/${isEdit ? "edits" : "generations"}`;
|
||||
|
||||
@@ -71,6 +65,7 @@ async function callUpstream(args: {
|
||||
method: "POST",
|
||||
headers: { Authorization: `Bearer ${apiKey}` },
|
||||
body: form,
|
||||
signal,
|
||||
});
|
||||
}
|
||||
|
||||
@@ -86,6 +81,7 @@ async function callUpstream(args: {
|
||||
"Content-Type": "application/json",
|
||||
},
|
||||
body: JSON.stringify(body),
|
||||
signal,
|
||||
});
|
||||
}
|
||||
|
||||
@@ -101,68 +97,76 @@ function parseSSEBlock(raw: string): { event: string; data: string } | null {
|
||||
return { event: eventName, data: dataLines.join("\n") };
|
||||
}
|
||||
|
||||
async function emitUpstreamBlock(
|
||||
raw: string,
|
||||
stream: SSEStreamingApi,
|
||||
): Promise<void> {
|
||||
const block = parseSSEBlock(raw);
|
||||
if (!block || block.data === "[DONE]") return;
|
||||
let parsed: {
|
||||
type?: string;
|
||||
b64_json?: string;
|
||||
partial_image_index?: number;
|
||||
};
|
||||
try {
|
||||
parsed = JSON.parse(block.data);
|
||||
} catch {
|
||||
return;
|
||||
}
|
||||
const type = parsed.type ?? block.event;
|
||||
const b64 = parsed.b64_json;
|
||||
if (!b64) return;
|
||||
if (type.endsWith(".partial_image")) {
|
||||
await stream.writeSSE({
|
||||
event: "partial",
|
||||
data: JSON.stringify({
|
||||
image: `data:image/png;base64,${b64}`,
|
||||
index: parsed.partial_image_index ?? 0,
|
||||
}),
|
||||
});
|
||||
} else if (type.endsWith(".completed")) {
|
||||
await stream.writeSSE({
|
||||
event: "final",
|
||||
data: JSON.stringify({ image: `data:image/png;base64,${b64}` }),
|
||||
});
|
||||
}
|
||||
}
|
||||
|
||||
async function forwardUpstreamSSE(
|
||||
upstream: Response,
|
||||
controller: SSEController,
|
||||
stream: SSEStreamingApi,
|
||||
): Promise<void> {
|
||||
if (!upstream.body) throw new Error("Upstream returned no body");
|
||||
const reader = upstream.body.getReader();
|
||||
const decoder = new TextDecoder();
|
||||
let buffer = "";
|
||||
|
||||
const handle = (raw: string) => {
|
||||
const block = parseSSEBlock(raw);
|
||||
if (!block) return;
|
||||
if (block.data === "[DONE]") return;
|
||||
let parsed: {
|
||||
type?: string;
|
||||
b64_json?: string;
|
||||
partial_image_index?: number;
|
||||
};
|
||||
try {
|
||||
parsed = JSON.parse(block.data);
|
||||
} catch {
|
||||
return;
|
||||
}
|
||||
const type = parsed.type ?? block.event;
|
||||
const b64 = parsed.b64_json;
|
||||
if (!b64) return;
|
||||
if (type.endsWith(".partial_image")) {
|
||||
sseEvent(controller, "partial", {
|
||||
image: `data:image/png;base64,${b64}`,
|
||||
index: parsed.partial_image_index ?? 0,
|
||||
});
|
||||
} else if (type.endsWith(".completed")) {
|
||||
sseEvent(controller, "final", {
|
||||
image: `data:image/png;base64,${b64}`,
|
||||
});
|
||||
}
|
||||
};
|
||||
|
||||
while (true) {
|
||||
const { value, done } = await reader.read();
|
||||
if (done) break;
|
||||
buffer += decoder.decode(value, { stream: true });
|
||||
let idx: number;
|
||||
while ((idx = buffer.indexOf("\n\n")) !== -1) {
|
||||
handle(buffer.slice(0, idx));
|
||||
await emitUpstreamBlock(buffer.slice(0, idx), stream);
|
||||
buffer = buffer.slice(idx + 2);
|
||||
}
|
||||
}
|
||||
if (buffer.trim().length > 0) handle(buffer);
|
||||
if (buffer.trim().length > 0) await emitUpstreamBlock(buffer, stream);
|
||||
}
|
||||
|
||||
async function forwardUpstreamJSON(
|
||||
upstream: Response,
|
||||
controller: SSEController,
|
||||
stream: SSEStreamingApi,
|
||||
): Promise<void> {
|
||||
const data = (await upstream.json()) as {
|
||||
data?: Array<{ b64_json?: string }>;
|
||||
};
|
||||
for (const item of data.data ?? []) {
|
||||
if (!item.b64_json) continue;
|
||||
sseEvent(controller, "final", {
|
||||
image: `data:image/png;base64,${item.b64_json}`,
|
||||
await stream.writeSSE({
|
||||
event: "final",
|
||||
data: JSON.stringify({
|
||||
image: `data:image/png;base64,${item.b64_json}`,
|
||||
}),
|
||||
});
|
||||
}
|
||||
}
|
||||
@@ -171,94 +175,96 @@ function isStreamingUnsupportedError(errText: string): boolean {
|
||||
return /\b(stream|partial_images)\b/i.test(errText);
|
||||
}
|
||||
|
||||
const app = new Hono();
|
||||
|
||||
app.post("/api/generate", async (c) => {
|
||||
const body = (await c.req.json()) as GenerateRequest;
|
||||
const { baseURL, apiKey, model, prompt, size, referenceImages } = body;
|
||||
if (!baseURL || !apiKey || !model || !prompt) {
|
||||
return c.json(
|
||||
{ error: "baseURL, apiKey, model, prompt are required" },
|
||||
400,
|
||||
);
|
||||
}
|
||||
const refs = Array.isArray(referenceImages) ? referenceImages : [];
|
||||
const args = {
|
||||
baseURL,
|
||||
apiKey,
|
||||
model,
|
||||
prompt,
|
||||
size: size ?? ("1024x1024" as Size),
|
||||
referenceImages: refs,
|
||||
};
|
||||
|
||||
return streamSSE(c, async (stream) => {
|
||||
const abort = new AbortController();
|
||||
stream.onAbort(() => abort.abort());
|
||||
|
||||
await stream.write(": connected\n\n");
|
||||
const keepalive = setInterval(() => {
|
||||
stream.write(": keepalive\n\n").catch(() => {});
|
||||
}, 15_000);
|
||||
|
||||
try {
|
||||
let upstream = await callUpstream({
|
||||
...args,
|
||||
stream: true,
|
||||
signal: abort.signal,
|
||||
});
|
||||
|
||||
if (!upstream.ok && upstream.status === 400) {
|
||||
const errText = await upstream.text().catch(() => "");
|
||||
if (isStreamingUnsupportedError(errText)) {
|
||||
upstream = await callUpstream({
|
||||
...args,
|
||||
stream: false,
|
||||
signal: abort.signal,
|
||||
});
|
||||
} else {
|
||||
throw new Error(`Upstream 400: ${errText || upstream.statusText}`);
|
||||
}
|
||||
}
|
||||
|
||||
if (!upstream.ok) {
|
||||
const errText = await upstream.text().catch(() => "");
|
||||
throw new Error(
|
||||
`Upstream ${upstream.status}: ${errText || upstream.statusText}`,
|
||||
);
|
||||
}
|
||||
|
||||
const contentType = upstream.headers.get("content-type") ?? "";
|
||||
if (contentType.includes("event-stream")) {
|
||||
await forwardUpstreamSSE(upstream, stream);
|
||||
} else {
|
||||
await forwardUpstreamJSON(upstream, stream);
|
||||
}
|
||||
|
||||
await stream.writeSSE({ event: "done", data: "" });
|
||||
} catch (err) {
|
||||
if (abort.signal.aborted) return;
|
||||
const message = err instanceof Error ? err.message : String(err);
|
||||
console.error("[generate] error:", err);
|
||||
await stream.writeSSE({
|
||||
event: "error",
|
||||
data: JSON.stringify({ message }),
|
||||
});
|
||||
} finally {
|
||||
clearInterval(keepalive);
|
||||
}
|
||||
});
|
||||
});
|
||||
|
||||
const server = Bun.serve({
|
||||
hostname: "0.0.0.0",
|
||||
idleTimeout: 255,
|
||||
routes: {
|
||||
"/": index,
|
||||
"/api/generate": {
|
||||
POST: async (req) => {
|
||||
const body = (await req.json()) as GenerateRequest;
|
||||
const { baseURL, apiKey, model, prompt, size, referenceImages } = body;
|
||||
if (!baseURL || !apiKey || !model || !prompt) {
|
||||
return Response.json(
|
||||
{ error: "baseURL, apiKey, model, prompt are required" },
|
||||
{ status: 400 },
|
||||
);
|
||||
}
|
||||
const refs = Array.isArray(referenceImages) ? referenceImages : [];
|
||||
const args = {
|
||||
baseURL,
|
||||
apiKey,
|
||||
model,
|
||||
prompt,
|
||||
size: size ?? ("1024x1024" as Size),
|
||||
referenceImages: refs,
|
||||
};
|
||||
|
||||
const stream = new ReadableStream<Uint8Array>({
|
||||
async start(controller) {
|
||||
const keepalive = setInterval(() => {
|
||||
try {
|
||||
sseComment(controller, "keepalive");
|
||||
} catch {}
|
||||
}, 20_000);
|
||||
|
||||
try {
|
||||
let upstream = await callUpstream({ ...args, stream: true });
|
||||
|
||||
if (!upstream.ok && upstream.status === 400) {
|
||||
const errText = await upstream.text().catch(() => "");
|
||||
if (isStreamingUnsupportedError(errText)) {
|
||||
upstream = await callUpstream({ ...args, stream: false });
|
||||
} else {
|
||||
throw new Error(`Upstream 400: ${errText || upstream.statusText}`);
|
||||
}
|
||||
}
|
||||
|
||||
if (!upstream.ok) {
|
||||
const errText = await upstream.text().catch(() => "");
|
||||
throw new Error(
|
||||
`Upstream ${upstream.status}: ${errText || upstream.statusText}`,
|
||||
);
|
||||
}
|
||||
|
||||
const contentType = upstream.headers.get("content-type") ?? "";
|
||||
if (contentType.includes("event-stream")) {
|
||||
await forwardUpstreamSSE(upstream, controller);
|
||||
} else {
|
||||
await forwardUpstreamJSON(upstream, controller);
|
||||
}
|
||||
|
||||
sseEvent(controller, "done", {});
|
||||
} catch (err) {
|
||||
const message = err instanceof Error ? err.message : String(err);
|
||||
console.error("[generate] error:", err);
|
||||
try {
|
||||
sseEvent(controller, "error", { message });
|
||||
} catch {}
|
||||
} finally {
|
||||
clearInterval(keepalive);
|
||||
try {
|
||||
controller.close();
|
||||
} catch {}
|
||||
}
|
||||
},
|
||||
});
|
||||
|
||||
return new Response(stream, {
|
||||
headers: {
|
||||
"Content-Type": "text/event-stream",
|
||||
"Cache-Control": "no-cache, no-transform",
|
||||
"X-Accel-Buffering": "no",
|
||||
Connection: "keep-alive",
|
||||
},
|
||||
});
|
||||
},
|
||||
},
|
||||
},
|
||||
fetch: app.fetch,
|
||||
development: {
|
||||
hmr: true,
|
||||
console: true,
|
||||
chromeDevToolsAutomaticWorkspaceFolders: false,
|
||||
},
|
||||
});
|
||||
|
||||
|
||||
Reference in New Issue
Block a user