mirror of https://github.com/openclaw/openclaw.git
feat(openai): add websocket warm-up with configurable toggle
This commit is contained in:
parent
bc9f357ad7
commit
d1615eb35f
|
|
@ -86,6 +86,7 @@ Docs: https://docs.openclaw.ai
|
|||
|
||||
- ACP/ACPX streaming: pin ACPX plugin support to `0.1.15`, add configurable ACPX command/version probing, and streamline ACP stream delivery (`final_only` default + reduced tool-event noise) with matching runtime and test updates. (#30036) Thanks @osolmaz.
|
||||
- OpenAI/Streaming transport: make `openai` Responses WebSocket-first by default (`transport: "auto"` with SSE fallback), add shared OpenAI WS stream/connection runtime wiring with per-session cleanup, and preserve server-side compaction payload mutation (`store` + `context_management`) on the WS path.
|
||||
- OpenAI/WebSocket warm-up: add optional OpenAI Responses WebSocket warm-up (`response.create` with `generate:false`), enable it by default for `openai/*`, and expose `params.openaiWsWarmup` for per-model enable/disable control.
|
||||
|
||||
### Fixes
|
||||
|
||||
|
|
|
|||
|
|
@ -45,6 +45,7 @@ OpenClaw ships with the pi‑ai catalog. These providers require **no**
|
|||
- CLI: `openclaw onboard --auth-choice openai-api-key`
|
||||
- Default transport is `auto` (WebSocket-first, SSE fallback)
|
||||
- Override per model via `agents.defaults.models["openai/<model>"].params.transport` (`"sse"`, `"websocket"`, or `"auto"`)
|
||||
- OpenAI Responses WebSocket warm-up defaults to enabled via `params.openaiWsWarmup` (`true`/`false`)
|
||||
|
||||
```json5
|
||||
{
|
||||
|
|
|
|||
|
|
@ -68,6 +68,9 @@ You can set `agents.defaults.models.<provider/model>.params.transport`:
|
|||
- `"websocket"`: force WebSocket
|
||||
- `"auto"`: try WebSocket, then fall back to SSE
|
||||
|
||||
For `openai/*` (Responses API), OpenClaw also enables WebSocket warm-up by
|
||||
default (`openaiWsWarmup: true`) when WebSocket transport is used.
|
||||
|
||||
```json5
|
||||
{
|
||||
agents: {
|
||||
|
|
@ -85,6 +88,47 @@ You can set `agents.defaults.models.<provider/model>.params.transport`:
|
|||
}
|
||||
```
|
||||
|
||||
### OpenAI WebSocket warm-up
|
||||
|
||||
OpenAI docs describe warm-up as optional. OpenClaw enables it by default for
|
||||
`openai/*` to reduce first-turn latency when using WebSocket transport.
|
||||
|
||||
### Disable warm-up
|
||||
|
||||
```json5
|
||||
{
|
||||
agents: {
|
||||
defaults: {
|
||||
models: {
|
||||
"openai/gpt-5": {
|
||||
params: {
|
||||
openaiWsWarmup: false,
|
||||
},
|
||||
},
|
||||
},
|
||||
},
|
||||
},
|
||||
}
|
||||
```
|
||||
|
||||
### Enable warm-up explicitly
|
||||
|
||||
```json5
|
||||
{
|
||||
agents: {
|
||||
defaults: {
|
||||
models: {
|
||||
"openai/gpt-5": {
|
||||
params: {
|
||||
openaiWsWarmup: true,
|
||||
},
|
||||
},
|
||||
},
|
||||
},
|
||||
},
|
||||
}
|
||||
```
|
||||
|
||||
### OpenAI Responses server-side compaction
|
||||
|
||||
For direct OpenAI Responses models (`openai/*` using `api: "openai-responses"` with
|
||||
|
|
|
|||
|
|
@ -70,6 +70,27 @@ const { MockManager } = vi.hoisted(() => {
|
|||
throw new Error("Mock send failure");
|
||||
}
|
||||
this.sentEvents.push(event);
|
||||
const maybeEvent = event as { type?: string; generate?: boolean; model?: string } | null;
|
||||
// Auto-complete warm-up events so warm-up-enabled tests don't hang waiting
|
||||
// for the warm-up terminal event.
|
||||
if (maybeEvent?.type === "response.create" && maybeEvent.generate === false) {
|
||||
queueMicrotask(() => {
|
||||
this.simulateEvent({
|
||||
type: "response.completed",
|
||||
response: makeResponseObject(`warmup-${Date.now()}`),
|
||||
});
|
||||
});
|
||||
}
|
||||
}
|
||||
|
||||
warmUp(params: { model: string; tools?: unknown[]; instructions?: string }): void {
|
||||
this.send({
|
||||
type: "response.create",
|
||||
generate: false,
|
||||
model: params.model,
|
||||
...(params.tools ? { tools: params.tools } : {}),
|
||||
...(params.instructions ? { instructions: params.instructions } : {}),
|
||||
});
|
||||
}
|
||||
|
||||
onMessage(handler: (event: unknown) => void): () => void {
|
||||
|
|
@ -967,6 +988,67 @@ describe("createOpenAIWebSocketStreamFn", () => {
|
|||
});
|
||||
});
|
||||
});
|
||||
|
||||
it("sends warm-up event before first request when openaiWsWarmup=true", async () => {
|
||||
const streamFn = createOpenAIWebSocketStreamFn("sk-test", "sess-warmup-enabled");
|
||||
const stream = streamFn(
|
||||
modelStub as Parameters<typeof streamFn>[0],
|
||||
contextStub as Parameters<typeof streamFn>[1],
|
||||
{ openaiWsWarmup: true } as unknown as Parameters<typeof streamFn>[2],
|
||||
);
|
||||
await new Promise<void>((resolve, reject) => {
|
||||
queueMicrotask(async () => {
|
||||
try {
|
||||
await new Promise((r) => setImmediate(r));
|
||||
MockManager.lastInstance!.simulateEvent({
|
||||
type: "response.completed",
|
||||
response: makeResponseObject("resp-warm", "Done"),
|
||||
});
|
||||
for await (const _ of await resolveStream(stream)) {
|
||||
// consume
|
||||
}
|
||||
resolve();
|
||||
} catch (e) {
|
||||
reject(e);
|
||||
}
|
||||
});
|
||||
});
|
||||
const sent = MockManager.lastInstance!.sentEvents as Array<Record<string, unknown>>;
|
||||
expect(sent).toHaveLength(2);
|
||||
expect(sent[0]?.type).toBe("response.create");
|
||||
expect(sent[0]?.generate).toBe(false);
|
||||
expect(sent[1]?.type).toBe("response.create");
|
||||
});
|
||||
|
||||
it("skips warm-up when openaiWsWarmup=false", async () => {
|
||||
const streamFn = createOpenAIWebSocketStreamFn("sk-test", "sess-warmup-disabled");
|
||||
const stream = streamFn(
|
||||
modelStub as Parameters<typeof streamFn>[0],
|
||||
contextStub as Parameters<typeof streamFn>[1],
|
||||
{ openaiWsWarmup: false } as unknown as Parameters<typeof streamFn>[2],
|
||||
);
|
||||
await new Promise<void>((resolve, reject) => {
|
||||
queueMicrotask(async () => {
|
||||
try {
|
||||
await new Promise((r) => setImmediate(r));
|
||||
MockManager.lastInstance!.simulateEvent({
|
||||
type: "response.completed",
|
||||
response: makeResponseObject("resp-nowarm", "Done"),
|
||||
});
|
||||
for await (const _ of await resolveStream(stream)) {
|
||||
// consume
|
||||
}
|
||||
resolve();
|
||||
} catch (e) {
|
||||
reject(e);
|
||||
}
|
||||
});
|
||||
});
|
||||
const sent = MockManager.lastInstance!.sentEvents as Array<Record<string, unknown>>;
|
||||
expect(sent).toHaveLength(1);
|
||||
expect(sent[0]?.type).toBe("response.create");
|
||||
expect(sent[0]?.generate).toBeUndefined();
|
||||
});
|
||||
});
|
||||
|
||||
// ─────────────────────────────────────────────────────────────────────────────
|
||||
|
|
|
|||
|
|
@ -53,6 +53,8 @@ interface WsSession {
|
|||
lastContextLength: number;
|
||||
/** True if the connection has been established at least once. */
|
||||
everConnected: boolean;
|
||||
/** True once a best-effort warm-up attempt has run for this session. */
|
||||
warmUpAttempted: boolean;
|
||||
/** True if the session is permanently broken (no more reconnect). */
|
||||
broken: boolean;
|
||||
}
|
||||
|
|
@ -325,6 +327,7 @@ export interface OpenAIWebSocketStreamOptions {
|
|||
}
|
||||
|
||||
type WsTransport = "sse" | "websocket" | "auto";
|
||||
const WARM_UP_TIMEOUT_MS = 8_000;
|
||||
|
||||
function resolveWsTransport(options: Parameters<StreamFn>[2]): WsTransport {
|
||||
const transport = (options as { transport?: unknown } | undefined)?.transport;
|
||||
|
|
@ -333,6 +336,68 @@ function resolveWsTransport(options: Parameters<StreamFn>[2]): WsTransport {
|
|||
: "auto";
|
||||
}
|
||||
|
||||
type WsOptions = Parameters<StreamFn>[2] & { openaiWsWarmup?: unknown; signal?: AbortSignal };
|
||||
|
||||
function resolveWsWarmup(options: Parameters<StreamFn>[2]): boolean {
|
||||
const warmup = (options as WsOptions | undefined)?.openaiWsWarmup;
|
||||
return warmup === true;
|
||||
}
|
||||
|
||||
async function runWarmUp(params: {
|
||||
manager: OpenAIWebSocketManager;
|
||||
modelId: string;
|
||||
tools: FunctionToolDefinition[];
|
||||
instructions?: string;
|
||||
signal?: AbortSignal;
|
||||
}): Promise<void> {
|
||||
if (params.signal?.aborted) {
|
||||
throw new Error("aborted");
|
||||
}
|
||||
await new Promise<void>((resolve, reject) => {
|
||||
const timeout = setTimeout(() => {
|
||||
cleanup();
|
||||
reject(new Error(`warm-up timed out after ${WARM_UP_TIMEOUT_MS}ms`));
|
||||
}, WARM_UP_TIMEOUT_MS);
|
||||
|
||||
const abortHandler = () => {
|
||||
cleanup();
|
||||
reject(new Error("aborted"));
|
||||
};
|
||||
const closeHandler = (code: number, reason: string) => {
|
||||
cleanup();
|
||||
reject(new Error(`warm-up closed (code=${code}, reason=${reason || "unknown"})`));
|
||||
};
|
||||
const unsubscribe = params.manager.onMessage((event) => {
|
||||
if (event.type === "response.completed") {
|
||||
cleanup();
|
||||
resolve();
|
||||
} else if (event.type === "response.failed") {
|
||||
cleanup();
|
||||
const errMsg = event.response?.error?.message ?? "Response failed";
|
||||
reject(new Error(`warm-up failed: ${errMsg}`));
|
||||
} else if (event.type === "error") {
|
||||
cleanup();
|
||||
reject(new Error(`warm-up error: ${event.message} (code=${event.code})`));
|
||||
}
|
||||
});
|
||||
|
||||
const cleanup = () => {
|
||||
clearTimeout(timeout);
|
||||
params.signal?.removeEventListener("abort", abortHandler);
|
||||
params.manager.off("close", closeHandler);
|
||||
unsubscribe();
|
||||
};
|
||||
|
||||
params.signal?.addEventListener("abort", abortHandler, { once: true });
|
||||
params.manager.on("close", closeHandler);
|
||||
params.manager.warmUp({
|
||||
model: params.modelId,
|
||||
tools: params.tools.length > 0 ? params.tools : undefined,
|
||||
instructions: params.instructions,
|
||||
});
|
||||
});
|
||||
}
|
||||
|
||||
/**
|
||||
* Creates a `StreamFn` backed by a persistent WebSocket connection to the
|
||||
* OpenAI Responses API. The first call for a given `sessionId` opens the
|
||||
|
|
@ -369,6 +434,7 @@ export function createOpenAIWebSocketStreamFn(
|
|||
manager,
|
||||
lastContextLength: 0,
|
||||
everConnected: false,
|
||||
warmUpAttempted: false,
|
||||
broken: false,
|
||||
};
|
||||
wsRegistry.set(sessionId, session);
|
||||
|
|
@ -416,6 +482,29 @@ export function createOpenAIWebSocketStreamFn(
|
|||
return fallbackToHttp(model, context, options, eventStream, opts.signal);
|
||||
}
|
||||
|
||||
const signal = opts.signal ?? (options as WsOptions | undefined)?.signal;
|
||||
|
||||
if (resolveWsWarmup(options) && !session.warmUpAttempted) {
|
||||
session.warmUpAttempted = true;
|
||||
try {
|
||||
await runWarmUp({
|
||||
manager: session.manager,
|
||||
modelId: model.id,
|
||||
tools: convertTools(context.tools),
|
||||
instructions: context.systemPrompt ?? undefined,
|
||||
signal,
|
||||
});
|
||||
log.debug(`[ws-stream] warm-up completed for session=${sessionId}`);
|
||||
} catch (warmErr) {
|
||||
if (signal?.aborted) {
|
||||
throw warmErr instanceof Error ? warmErr : new Error(String(warmErr));
|
||||
}
|
||||
log.warn(
|
||||
`[ws-stream] warm-up failed for session=${sessionId}; continuing without warm-up. error=${String(warmErr)}`,
|
||||
);
|
||||
}
|
||||
}
|
||||
|
||||
// ── 3. Compute incremental vs full input ─────────────────────────────
|
||||
const prevResponseId = session.manager.previousResponseId;
|
||||
let inputItems: InputItem[];
|
||||
|
|
@ -544,7 +633,6 @@ export function createOpenAIWebSocketStreamFn(
|
|||
cleanup();
|
||||
reject(new Error("aborted"));
|
||||
};
|
||||
const signal = opts.signal ?? (options as { signal?: AbortSignal } | undefined)?.signal;
|
||||
if (signal?.aborted) {
|
||||
reject(new Error("aborted"));
|
||||
return;
|
||||
|
|
|
|||
|
|
@ -138,9 +138,9 @@ describe("resolveExtraParams", () => {
|
|||
|
||||
describe("applyExtraParamsToAgent", () => {
|
||||
function createOptionsCaptureAgent() {
|
||||
const calls: Array<SimpleStreamOptions | undefined> = [];
|
||||
const calls: Array<(SimpleStreamOptions & { openaiWsWarmup?: boolean }) | undefined> = [];
|
||||
const baseStreamFn: StreamFn = (_model, _context, options) => {
|
||||
calls.push(options);
|
||||
calls.push(options as (SimpleStreamOptions & { openaiWsWarmup?: boolean }) | undefined);
|
||||
return {} as ReturnType<StreamFn>;
|
||||
};
|
||||
return {
|
||||
|
|
@ -559,6 +559,7 @@ describe("applyExtraParamsToAgent", () => {
|
|||
|
||||
expect(calls).toHaveLength(1);
|
||||
expect(calls[0]?.transport).toBe("auto");
|
||||
expect(calls[0]?.openaiWsWarmup).toBe(true);
|
||||
});
|
||||
|
||||
it("lets runtime options override OpenAI default transport", () => {
|
||||
|
|
@ -578,6 +579,68 @@ describe("applyExtraParamsToAgent", () => {
|
|||
expect(calls[0]?.transport).toBe("sse");
|
||||
});
|
||||
|
||||
it("allows disabling OpenAI websocket warm-up via model params", () => {
|
||||
const { calls, agent } = createOptionsCaptureAgent();
|
||||
const cfg = {
|
||||
agents: {
|
||||
defaults: {
|
||||
models: {
|
||||
"openai/gpt-5": {
|
||||
params: {
|
||||
openaiWsWarmup: false,
|
||||
},
|
||||
},
|
||||
},
|
||||
},
|
||||
},
|
||||
};
|
||||
|
||||
applyExtraParamsToAgent(agent, cfg, "openai", "gpt-5");
|
||||
|
||||
const model = {
|
||||
api: "openai-responses",
|
||||
provider: "openai",
|
||||
id: "gpt-5",
|
||||
} as Model<"openai-responses">;
|
||||
const context: Context = { messages: [] };
|
||||
void agent.streamFn?.(model, context, {});
|
||||
|
||||
expect(calls).toHaveLength(1);
|
||||
expect(calls[0]?.openaiWsWarmup).toBe(false);
|
||||
});
|
||||
|
||||
it("lets runtime options override configured OpenAI websocket warm-up", () => {
|
||||
const { calls, agent } = createOptionsCaptureAgent();
|
||||
const cfg = {
|
||||
agents: {
|
||||
defaults: {
|
||||
models: {
|
||||
"openai/gpt-5": {
|
||||
params: {
|
||||
openaiWsWarmup: false,
|
||||
},
|
||||
},
|
||||
},
|
||||
},
|
||||
},
|
||||
};
|
||||
|
||||
applyExtraParamsToAgent(agent, cfg, "openai", "gpt-5");
|
||||
|
||||
const model = {
|
||||
api: "openai-responses",
|
||||
provider: "openai",
|
||||
id: "gpt-5",
|
||||
} as Model<"openai-responses">;
|
||||
const context: Context = { messages: [] };
|
||||
void agent.streamFn?.(model, context, {
|
||||
openaiWsWarmup: true,
|
||||
} as unknown as SimpleStreamOptions);
|
||||
|
||||
expect(calls).toHaveLength(1);
|
||||
expect(calls[0]?.openaiWsWarmup).toBe(true);
|
||||
});
|
||||
|
||||
it("allows forcing Codex transport to SSE", () => {
|
||||
const { calls, agent } = createOptionsCaptureAgent();
|
||||
const cfg = {
|
||||
|
|
|
|||
|
|
@ -46,6 +46,7 @@ export function resolveExtraParams(params: {
|
|||
type CacheRetention = "none" | "short" | "long";
|
||||
type CacheRetentionStreamOptions = Partial<SimpleStreamOptions> & {
|
||||
cacheRetention?: CacheRetention;
|
||||
openaiWsWarmup?: boolean;
|
||||
};
|
||||
|
||||
/**
|
||||
|
|
@ -124,6 +125,9 @@ function createStreamFnWithExtraParams(
|
|||
const transportSummary = typeof transport === "string" ? transport : typeof transport;
|
||||
log.warn(`ignoring invalid transport param: ${transportSummary}`);
|
||||
}
|
||||
if (typeof extraParams.openaiWsWarmup === "boolean") {
|
||||
streamParams.openaiWsWarmup = extraParams.openaiWsWarmup;
|
||||
}
|
||||
const cacheRetention = resolveCacheRetention(extraParams, provider);
|
||||
if (cacheRetention) {
|
||||
streamParams.cacheRetention = cacheRetention;
|
||||
|
|
@ -321,11 +325,19 @@ function createCodexDefaultTransportWrapper(baseStreamFn: StreamFn | undefined):
|
|||
|
||||
function createOpenAIDefaultTransportWrapper(baseStreamFn: StreamFn | undefined): StreamFn {
|
||||
const underlying = baseStreamFn ?? streamSimple;
|
||||
return (model, context, options) =>
|
||||
underlying(model, context, {
|
||||
return (model, context, options) => {
|
||||
const typedOptions = options as
|
||||
| (SimpleStreamOptions & { openaiWsWarmup?: boolean })
|
||||
| undefined;
|
||||
return underlying(model, context, {
|
||||
...options,
|
||||
transport: options?.transport ?? "auto",
|
||||
// Warm-up is optional in OpenAI docs; enabled by default here for lower
|
||||
// first-turn latency on WebSocket sessions. Set params.openaiWsWarmup=false
|
||||
// to disable per model.
|
||||
openaiWsWarmup: typedOptions?.openaiWsWarmup ?? true,
|
||||
});
|
||||
};
|
||||
}
|
||||
|
||||
function isAnthropic1MModel(modelId: string): boolean {
|
||||
|
|
|
|||
Loading…
Reference in New Issue