feat: stream gpt-image generation via SSE with keepalive

- /api/generate now responds with text/event-stream end-to-end
- forwards upstream image_generation.* / image_edit.* partial+completed events
- 20s keepalive comments survive Cloudflare's 120s proxy-read timeout
- falls back to non-streaming when upstream rejects stream/partial_images
- drops @ai-sdk/openai-compatible, @ai-sdk/react, ai (unused)
- frontend consumes SSE via fetch+ReadableStream, shows progressive preview
This commit is contained in:
2026-05-18 22:44:31 +08:00
parent 54f13c1097
commit 5af05b2141
5 changed files with 327 additions and 170 deletions
+226 -90
View File
@@ -1,60 +1,174 @@
import { createOpenAICompatible } from "@ai-sdk/openai-compatible";
import { generateImage } from "ai";
import index from "./index.html";
type Size = `${number}x${number}`;
async function generateWithReference({
baseURL,
apiKey,
model,
prompt,
size,
referenceImages,
}: {
type GenerateRequest = {
baseURL?: string;
apiKey?: string;
model?: string;
prompt?: string;
size?: Size;
referenceImages?: string[];
};
type SSEController = ReadableStreamDefaultController<Uint8Array>;
const encoder = new TextEncoder();
function sseEvent(controller: SSEController, event: string, data: unknown): void {
controller.enqueue(
encoder.encode(`event: ${event}\ndata: ${JSON.stringify(data)}\n\n`),
);
}
function sseComment(controller: SSEController, text: string): void {
controller.enqueue(encoder.encode(`: ${text}\n\n`));
}
function decodeDataUrl(dataUrl: string): { bytes: Buffer; mime: string } | null {
const match = dataUrl.match(/^data:([^;]+);base64,(.+)$/);
if (!match) return null;
const mime = match[1]!;
const b64 = match[2]!;
return { bytes: Buffer.from(b64, "base64"), mime };
}
async function callUpstream(args: {
baseURL: string;
apiKey: string;
model: string;
prompt: string;
size: Size;
referenceImages: string[];
}): Promise<string[]> {
const form = new FormData();
form.append("model", model);
form.append("prompt", prompt);
form.append("size", size);
stream: boolean;
}): Promise<Response> {
const { baseURL, apiKey, model, prompt, size, referenceImages, stream } = args;
const isEdit = referenceImages.length > 0;
const url = `${baseURL.replace(/\/+$/, "")}/images/${isEdit ? "edits" : "generations"}`;
for (let i = 0; i < referenceImages.length; i++) {
const dataUrl = referenceImages[i];
if (!dataUrl) continue;
const match = dataUrl.match(/^data:([^;]+);base64,(.+)$/);
if (!match) continue;
const mime = match[1]!;
const b64 = match[2]!;
const bytes = Buffer.from(b64, "base64");
const ext = mime.split("/")[1] ?? "png";
form.append("image", new Blob([bytes], { type: mime }), `ref-${i}.${ext}`);
if (isEdit) {
const form = new FormData();
form.append("model", model);
form.append("prompt", prompt);
form.append("size", size);
if (stream) {
form.append("stream", "true");
form.append("partial_images", "2");
}
for (let i = 0; i < referenceImages.length; i++) {
const dataUrl = referenceImages[i];
if (!dataUrl) continue;
const decoded = decodeDataUrl(dataUrl);
if (!decoded) continue;
const ext = decoded.mime.split("/")[1] ?? "png";
form.append(
"image",
new Blob([decoded.bytes], { type: decoded.mime }),
`ref-${i}.${ext}`,
);
}
return fetch(url, {
method: "POST",
headers: { Authorization: `Bearer ${apiKey}` },
body: form,
});
}
const url = `${baseURL.replace(/\/+$/, "")}/images/edits`;
const res = await fetch(url, {
const body: Record<string, unknown> = { model, prompt, size };
if (stream) {
body.stream = true;
body.partial_images = 2;
}
return fetch(url, {
method: "POST",
headers: { Authorization: `Bearer ${apiKey}` },
body: form,
headers: {
Authorization: `Bearer ${apiKey}`,
"Content-Type": "application/json",
},
body: JSON.stringify(body),
});
}
if (!res.ok) {
const text = await res.text().catch(() => "");
throw new Error(`Upstream ${res.status}: ${text || res.statusText}`);
function parseSSEBlock(raw: string): { event: string; data: string } | null {
let eventName = "message";
const dataLines: string[] = [];
for (const line of raw.split("\n")) {
if (line.startsWith(":")) continue;
if (line.startsWith("event:")) eventName = line.slice(6).trim();
else if (line.startsWith("data:")) dataLines.push(line.slice(5).trim());
}
if (dataLines.length === 0) return null;
return { event: eventName, data: dataLines.join("\n") };
}
const data = (await res.json()) as {
data?: Array<{ b64_json?: string }>;
async function forwardUpstreamSSE(
upstream: Response,
controller: SSEController,
): Promise<void> {
if (!upstream.body) throw new Error("Upstream returned no body");
const reader = upstream.body.getReader();
const decoder = new TextDecoder();
let buffer = "";
const handle = (raw: string) => {
const block = parseSSEBlock(raw);
if (!block) return;
if (block.data === "[DONE]") return;
let parsed: {
type?: string;
b64_json?: string;
partial_image_index?: number;
};
try {
parsed = JSON.parse(block.data);
} catch {
return;
}
const type = parsed.type ?? block.event;
const b64 = parsed.b64_json;
if (!b64) return;
if (type.endsWith(".partial_image")) {
sseEvent(controller, "partial", {
image: `data:image/png;base64,${b64}`,
index: parsed.partial_image_index ?? 0,
});
} else if (type.endsWith(".completed")) {
sseEvent(controller, "final", {
image: `data:image/png;base64,${b64}`,
});
}
};
return (data.data ?? [])
.map((item) => (item.b64_json ? `data:image/png;base64,${item.b64_json}` : null))
.filter((s): s is string => s !== null);
while (true) {
const { value, done } = await reader.read();
if (done) break;
buffer += decoder.decode(value, { stream: true });
let idx: number;
while ((idx = buffer.indexOf("\n\n")) !== -1) {
handle(buffer.slice(0, idx));
buffer = buffer.slice(idx + 2);
}
}
if (buffer.trim().length > 0) handle(buffer);
}
async function forwardUpstreamJSON(
upstream: Response,
controller: SSEController,
): Promise<void> {
const data = (await upstream.json()) as {
data?: Array<{ b64_json?: string }>;
};
for (const item of data.data ?? []) {
if (!item.b64_json) continue;
sseEvent(controller, "final", {
image: `data:image/png;base64,${item.b64_json}`,
});
}
}
function isStreamingUnsupportedError(errText: string): boolean {
return /\b(stream|partial_images)\b/i.test(errText);
}
const server = Bun.serve({
@@ -63,60 +177,82 @@ const server = Bun.serve({
"/": index,
"/api/generate": {
POST: async (req) => {
try {
const { baseURL, apiKey, model, prompt, size, referenceImages } =
(await req.json()) as {
baseURL?: string;
apiKey?: string;
model?: string;
prompt?: string;
size?: Size;
referenceImages?: string[];
};
if (!baseURL || !apiKey || !model || !prompt) {
return Response.json(
{ error: "baseURL, apiKey, model, prompt are required" },
{ status: 400 },
);
}
if (Array.isArray(referenceImages) && referenceImages.length > 0) {
const images = await generateWithReference({
baseURL,
apiKey,
model,
prompt,
size: size ?? "1024x1024",
referenceImages,
});
return Response.json({ images });
}
const provider = createOpenAICompatible({
name: "custom",
apiKey,
baseURL,
});
const { images } = await generateImage({
model: provider.imageModel(model),
prompt,
size: size || "1024x1024",
});
const out = images.map((img) => {
const mediaType = (img as { mediaType?: string }).mediaType ?? "image/png";
const base64 = (img as { base64?: string }).base64;
return base64 ? `data:${mediaType};base64,${base64}` : null;
}).filter(Boolean);
return Response.json({ images: out });
} catch (err) {
const message = err instanceof Error ? err.message : String(err);
console.error("[generate] error:", err);
return Response.json({ error: message }, { status: 500 });
const body = (await req.json()) as GenerateRequest;
const { baseURL, apiKey, model, prompt, size, referenceImages } = body;
if (!baseURL || !apiKey || !model || !prompt) {
return Response.json(
{ error: "baseURL, apiKey, model, prompt are required" },
{ status: 400 },
);
}
const refs = Array.isArray(referenceImages) ? referenceImages : [];
const args = {
baseURL,
apiKey,
model,
prompt,
size: size ?? ("1024x1024" as Size),
referenceImages: refs,
};
const stream = new ReadableStream<Uint8Array>({
async start(controller) {
const keepalive = setInterval(() => {
try {
sseComment(controller, "keepalive");
} catch {}
}, 20_000);
try {
let upstream = await callUpstream({ ...args, stream: true });
if (!upstream.ok && upstream.status === 400) {
const errText = await upstream.text().catch(() => "");
if (isStreamingUnsupportedError(errText)) {
upstream = await callUpstream({ ...args, stream: false });
} else {
throw new Error(`Upstream 400: ${errText || upstream.statusText}`);
}
}
if (!upstream.ok) {
const errText = await upstream.text().catch(() => "");
throw new Error(
`Upstream ${upstream.status}: ${errText || upstream.statusText}`,
);
}
const contentType = upstream.headers.get("content-type") ?? "";
if (contentType.includes("event-stream")) {
await forwardUpstreamSSE(upstream, controller);
} else {
await forwardUpstreamJSON(upstream, controller);
}
sseEvent(controller, "done", {});
} catch (err) {
const message = err instanceof Error ? err.message : String(err);
console.error("[generate] error:", err);
try {
sseEvent(controller, "error", { message });
} catch {}
} finally {
clearInterval(keepalive);
try {
controller.close();
} catch {}
}
},
});
return new Response(stream, {
headers: {
"Content-Type": "text/event-stream",
"Cache-Control": "no-cache, no-transform",
"X-Accel-Buffering": "no",
Connection: "keep-alive",
},
});
},
},
},