Commit 9ae4391b authored by AI-甘富林's avatar AI-甘富林

Update skill sync and chat UI

Co-Authored-By: 's avatarClaude Sonnet 4.6 <noreply@anthropic.com>
parent 29bb5e3a
import path from "node:path";
import { readFile, writeFile } from "node:fs/promises";
import { appendFile, readFile, writeFile } from "node:fs/promises";
import { BrowserWindow, app } from "electron";
import { GatewayClient } from "@qjclaw/gateway-client";
import { RuntimeManager } from "@qjclaw/runtime-manager";
......@@ -74,6 +74,25 @@ interface RendererSmokeState {
messages: Array<{ id: string; role: string; content: string }>;
logs: Array<{ ts: string; level: string; message: string }>;
activeSessionId: string;
streamSmoke: {
phase?: string;
prompt?: string;
selectedSkillId?: string;
requestId?: string;
sessionId?: string;
runId?: string;
assistantMessageId?: string;
startedEventCount?: number;
deltaEventCount?: number;
completedEventCount?: number;
errorEventCount?: number;
fallbackUsed?: boolean;
renderedContent?: string;
finalContent?: string;
executionPolicySource?: string;
executionPolicyModel?: string;
lastError?: string;
} | null;
}
const forcedUserDataPath = process.env.QJCLAW_USER_DATA_PATH?.trim();
......@@ -142,18 +161,82 @@ async function waitForRendererSmokeState(window: BrowserWindow, timeoutMs = 2000
return null;
}
async function waitForRendererStreamSmoke(window: BrowserWindow, timeoutMs = 40000): Promise<RendererSmokeState | null> {
const started = Date.now();
while (Date.now() - started < timeoutMs) {
const state = await waitForRendererSmokeState(window, 2000);
const streamSmoke = state?.streamSmoke;
if (streamSmoke && ["completed", "fallback", "error"].includes(String(streamSmoke.phase ?? ""))) {
return state;
}
await delay(250);
}
return null;
}
async function runSmokeTest(window: BrowserWindow, outputPath: string): Promise<void> {
const result: Record<string, unknown> = {
startedAt: new Date().toISOString()
};
const tracePath = outputPath + ".trace.log";
const trace = async (message: string) => {
const line = "[" + new Date().toISOString() + "] " + message + "\n";
await appendFile(tracePath, line, "utf8").catch(() => undefined);
};
try {
await trace("runSmokeTest:start");
if (window.webContents.isLoadingMainFrame()) {
await new Promise<void>((resolve) => {
window.webContents.once("did-finish-load", () => resolve());
await trace("runSmokeTest:waiting-for-load");
await new Promise<void>((resolve, reject) => {
let settled = false;
let timer: ReturnType<typeof setTimeout> | undefined;
const cleanup = () => {
if (timer) {
clearTimeout(timer);
}
window.webContents.removeListener("did-fail-load", onFailLoad);
window.webContents.removeListener("render-process-gone", onRenderProcessGone);
};
const finish = () => {
if (settled) {
return;
}
settled = true;
cleanup();
resolve();
};
const fail = (message: string) => {
if (settled) {
return;
}
settled = true;
cleanup();
reject(new Error(message));
};
const onFailLoad = (_event: Electron.Event, errorCode: number, errorDescription: string, validatedURL: string, isMainFrame: boolean) => {
if (!isMainFrame) {
return;
}
fail("Renderer main frame failed to load: " + errorDescription + " (" + errorCode + ") " + validatedURL);
};
const onRenderProcessGone = (_event: Electron.Event, details: Electron.RenderProcessGoneDetails) => {
fail("Renderer process exited during smoke load: " + details.reason);
};
timer = setTimeout(() => {
fail("Renderer main frame did not finish loading in time.");
}, 15000);
window.webContents.once("did-finish-load", finish);
window.webContents.on("did-fail-load", onFailLoad);
window.webContents.on("render-process-gone", onRenderProcessGone);
});
await trace("runSmokeTest:load-finished");
}
await trace("runSmokeTest:loading-renderer-state");
let initialState = await waitForRendererSmokeState(window);
if (!initialState) {
throw new Error("Renderer smoke state was not published.");
......@@ -177,14 +260,20 @@ async function runSmokeTest(window: BrowserWindow, outputPath: string): Promise<
}
result.initialState = initialState;
await trace("runSmokeTest:initial-state-ready");
const prompt = `qjc smoke ${new Date().toISOString()}`;
const prompt = `qjc smoke stream ${new Date().toISOString()}`;
await trace("runSmokeTest:before-send-script");
const sendResult = await window.webContents.executeJavaScript(`(async () => {
const api = window.qjcDesktop;
const state = window.__QJC_SMOKE__;
const actions = window.__QJC_SMOKE_ACTIONS__;
if (!api) {
throw new Error("Renderer is using mock desktop API.");
}
if (!actions) {
throw new Error("Renderer smoke actions were not published.");
}
const smokeBaseUrl = ${JSON.stringify(process.env.QJCLAW_SMOKE_CLOUD_API_BASE_URL ?? "")};
const smokeToken = ${JSON.stringify(process.env.QJCLAW_SMOKE_AUTH_TOKEN ?? "")};
const smokeRuntimeApiKey = ${JSON.stringify(process.env.QJCLAW_SMOKE_RUNTIME_CLOUD_API_KEY ?? "smoke-runtime-api-key")};
......@@ -242,19 +331,12 @@ async function runSmokeTest(window: BrowserWindow, outputPath: string): Promise<
const profile = session.state === "authenticated" ? await api.profile.getSummary() : null;
const credits = session.state === "authenticated" ? await api.credits.getSummary() : null;
const skills = session.state === "authenticated" ? await api.skills.list() : [];
const modelConfig = session.state === "authenticated" ? await api.modelConfig.getSummary() : null;
const selectedSkillId = skills[0]?.id;
const workspace = await api.workspace.getSummary();
const selectedSkillId = workspace.skills[0]?.id ?? skills[0]?.id;
const sessions = await api.chat.listSessions();
const sessionId = state?.activeSessionId || sessions[0]?.id || "desktop-main";
const system = await api.system.getSummary();
const reply = await api.chat.sendPrompt(sessionId, ${JSON.stringify(prompt)}, selectedSkillId);
await sleep(2500);
const runtimeTelemetryAfterWait = await api.runtimeTelemetry.getStatus();
const messages = await api.chat.listMessages(reply.sessionId);
const logs = await api.gateway.tailLogs(20);
const health = gatewayProbe.health;
const status = gatewayProbe.status;
const diagnostics = await api.diagnostics.exportSnapshot();
await actions.sendChatPrompt(${JSON.stringify(prompt)}, selectedSkillId);
return {
prompt: ${JSON.stringify(prompt)},
runtimeCloudStatus,
......@@ -266,48 +348,81 @@ async function runSmokeTest(window: BrowserWindow, outputPath: string): Promise<
runtimeHealthAfterProbe,
runtimeLogCount: runtimeLogs.length,
runtimeTelemetryBeforeWait,
runtimeTelemetryAfterWait,
session,
profile,
credits,
skills,
selectedSkillId,
modelConfig: modelConfig ? {
defaultChatModelLabel: modelConfig.defaultChatModelLabel,
routingMode: modelConfig.routingMode,
itemCount: modelConfig.items.length
} : null,
initialSessionId: sessionId,
system,
reply,
health: gatewayProbe.health,
status: gatewayProbe.status
};
})()`);
await trace("runSmokeTest:send-script-finished");
const streamState = await waitForRendererStreamSmoke(window, 40000);
if (!streamState?.streamSmoke) {
throw new Error("Renderer stream smoke did not reach a terminal state.");
}
await trace("runSmokeTest:stream-terminal:" + String(streamState.streamSmoke.phase ?? "unknown"));
await delay(1500);
const finalState = await waitForRendererSmokeState(window, 5000);
const streamSmoke = finalState?.streamSmoke ?? streamState.streamSmoke;
await trace("runSmokeTest:before-post-stream-script");
const postStreamResult = await window.webContents.executeJavaScript(`(async () => {
const api = window.qjcDesktop;
const state = window.__QJC_SMOKE__;
if (!api) {
throw new Error("Renderer is using mock desktop API.");
}
const sessionId = state?.streamSmoke?.sessionId || state?.activeSessionId || "desktop-main";
const runtimeTelemetryAfterWait = await api.runtimeTelemetry.getStatus();
const messages = await api.chat.listMessages(sessionId);
const logs = await api.gateway.tailLogs(20);
const diagnostics = await api.diagnostics.exportSnapshot();
const health = await api.gateway.health();
const status = await api.gateway.status();
return {
runtimeTelemetryAfterWait,
sessionId,
messageCount: messages.length,
lastMessage: messages.at(-1) ?? null,
logCount: logs.length,
diagnostics,
health,
status,
diagnostics
status
};
})()`);
await delay(1000);
const finalState = await waitForRendererSmokeState(window, 5000);
const diagnosticsPath = typeof (sendResult as { diagnostics?: { filePath?: string } }).diagnostics?.filePath === "string"
? (sendResult as { diagnostics: { filePath: string } }).diagnostics.filePath
await trace("runSmokeTest:post-stream-script-finished");
const combinedSendResult = {
...sendResult,
...postStreamResult,
streamSmoke
};
const diagnosticsPath = typeof (combinedSendResult as { diagnostics?: { filePath?: string } }).diagnostics?.filePath === "string"
? (combinedSendResult as { diagnostics: { filePath: string } }).diagnostics.filePath
: undefined;
const diagnosticsSnapshot = diagnosticsPath
? JSON.parse(await readFile(diagnosticsPath, "utf8")) as Record<string, unknown>
: null;
result.sendResult = sendResult;
result.sendResult = combinedSendResult;
result.finalState = finalState;
result.diagnosticsSnapshot = diagnosticsSnapshot;
result.ok = true;
await trace("runSmokeTest:success");
} catch (error) {
result.ok = false;
result.error = error instanceof Error ? error.message : String(error);
await trace("runSmokeTest:error:" + String(result.error));
}
result.finishedAt = new Date().toISOString();
await trace("runSmokeTest:writing-output");
await writeFile(outputPath, JSON.stringify(result, null, 2), "utf8");
await trace("runSmokeTest:output-written");
app.quit();
}
......
import { randomUUID } from "node:crypto";
import { ipcMain, shell } from "electron";
import {
IPC_CHANNELS,
type AppConfig,
type ChatStreamEvent,
type DesktopApi,
type GatewayStatus,
type PluginSummary,
......@@ -372,6 +374,10 @@ export function registerDesktopIpc(services: MainServices): DesktopApi {
};
};
const emitChatStreamEvent = (sender: Electron.WebContents, payload: ChatStreamEvent) => {
sender.send(IPC_CHANNELS.chatStreamEvent, payload);
};
ipcMain.handle(IPC_CHANNELS.workspaceGetSummary, async () => buildWorkspaceSummary());
ipcMain.handle(IPC_CHANNELS.gatewayStatus, async () => gatewayClient.status());
ipcMain.handle(IPC_CHANNELS.gatewayConnect, async () => gatewayClient.connect());
......@@ -470,7 +476,101 @@ export function registerDesktopIpc(services: MainServices): DesktopApi {
throw error;
}
});
ipcMain.handle(IPC_CHANNELS.chatStreamPrompt, async (event, sessionId: string, prompt: string, skillId?: string) => {
const executionPolicy = await resolveExecutionPolicy(skillId);
const requestId = randomUUID();
let settled = false;
let ready = false;
let startedEvent: ChatStreamEvent | null = null;
const queuedEvents: ChatStreamEvent[] = [];
const queueOrSend = (payload: ChatStreamEvent) => {
if (!ready) {
if (payload.type === "started") {
startedEvent = payload;
} else {
queuedEvents.push(payload);
}
return;
}
emitChatStreamEvent(event.sender, payload);
};
runtimeCloudSupervisor.noteMessageReceived(sessionId, prompt, skillId);
try {
const stream = await gatewayClient.streamPrompt(sessionId, prompt, {
onStarted: ({ sessionId: nextSessionId, runId }) => {
queueOrSend({
type: "started",
requestId,
sessionId: nextSessionId,
runId,
executionPolicy
});
},
onDelta: ({ sessionId: nextSessionId, runId, textDelta, fullText }) => {
queueOrSend({
type: "delta",
requestId,
sessionId: nextSessionId,
runId,
textDelta,
fullText
});
},
onCompleted: ({ sessionId: nextSessionId, runId, reply }) => {
settled = true;
runtimeCloudSupervisor.noteMessageSent(nextSessionId, reply.content, executionPolicy.modelId, skillId);
queueOrSend({
type: "completed",
requestId,
sessionId: nextSessionId,
runId,
reply,
executionPolicy
});
},
onError: ({ sessionId: nextSessionId, runId, error }) => {
settled = true;
runtimeCloudSupervisor.noteError("chat_stream_failed", error.message, {
modelId: executionPolicy.modelId,
sessionId: nextSessionId
});
queueOrSend({
type: "error",
requestId,
sessionId: nextSessionId,
runId,
message: error.message
});
}
});
ready = true;
setTimeout(() => {
emitChatStreamEvent(event.sender, startedEvent ?? {
type: "started",
requestId,
sessionId: stream.sessionId,
runId: stream.runId,
executionPolicy
});
for (const queuedEvent of queuedEvents) {
emitChatStreamEvent(event.sender, queuedEvent);
}
}, 0);
return { requestId, sessionId: stream.sessionId, runId: stream.runId, executionPolicy };
} catch (error) {
const message = error instanceof Error ? error.message : String(error);
if (!settled) {
runtimeCloudSupervisor.noteError("chat_stream_failed", message, {
modelId: executionPolicy.modelId,
sessionId
});
}
throw error;
}
});
ipcMain.handle(IPC_CHANNELS.diagnosticsOpenControlUi, async () => {
const config = await getEffectiveConfig();
await shell.openExternal(toControlUiUrl(config.gatewayUrl));
......@@ -588,7 +688,18 @@ export function registerDesktopIpc(services: MainServices): DesktopApi {
});
throw error;
}
}
},
streamPrompt: async (sessionId: string, prompt: string, skillId?: string) => {
const executionPolicy = await resolveExecutionPolicy(skillId);
const stream = await gatewayClient.streamPrompt(sessionId, prompt);
return {
requestId: randomUUID(),
sessionId: stream.sessionId,
runId: stream.runId,
executionPolicy
};
},
onStreamEvent: () => () => undefined
},
diagnostics: {
openControlUi: async () => {
......
import http from "node:http";
function extractPromptText(value: unknown): string {
if (typeof value === "string") {
return value;
}
if (Array.isArray(value)) {
return value
.map((item) => extractPromptText(item))
.filter(Boolean)
.join("\n");
}
if (!value || typeof value !== "object") {
return "";
}
const record = value as Record<string, unknown>;
if (typeof record.text === "string") {
return record.text;
}
if (typeof record.content === "string") {
return record.content;
}
if (record.content) {
return extractPromptText(record.content);
}
if (record.input) {
return extractPromptText(record.input);
}
return "";
}
function buildSmokeReply(body: Record<string, unknown>): string {
const messages = Array.isArray(body.messages) ? body.messages : [];
const prompt = [...messages]
.reverse()
.map((message) => extractPromptText(message))
.find((value) => value.trim().length > 0) || extractPromptText(body.input) || "smoke";
return `Smoke stream ok: ${prompt.trim()}`;
}
export async function startSmokeCloudApiServer(baseUrl: string, token: string, runtimeApiKey = "smoke-runtime-api-key"): Promise<() => Promise<void>> {
const url = new URL(baseUrl);
const hostname = url.hostname;
const port = Number(url.port || (url.protocol === "https:" ? 443 : 80));
const providerToken = "runtime-provider-token";
const providerBaseUrl = `${baseUrl}/openai/v1`;
const server = http.createServer((req, res) => {
const requestUrl = new URL(req.url || "/", `${url.protocol}//${url.host}`);
......@@ -26,6 +69,70 @@ export async function startSmokeCloudApiServer(baseUrl: string, token: string, r
return JSON.parse(Buffer.concat(chunks).toString("utf8")) as Record<string, unknown>;
};
const sendChatCompletion = async (body: Record<string, unknown>) => {
const replyText = buildSmokeReply(body);
const created = Math.floor(Date.now() / 1000);
const model = typeof body.model === "string" ? body.model : "gpt-5.4-mini";
const completionId = `chatcmpl-smoke-${Date.now()}`;
if (body.stream === true) {
res.writeHead(200, {
"Content-Type": "text/event-stream; charset=utf-8",
"Cache-Control": "no-cache, no-transform",
Connection: "keep-alive"
});
const parts = replyText.match(/.{1,10}/g) ?? [replyText];
const writeChunk = (payload: unknown) => {
res.write(`data: ${JSON.stringify(payload)}\n\n`);
};
writeChunk({
id: completionId,
object: "chat.completion.chunk",
created,
model,
choices: [{ index: 0, delta: { role: "assistant" }, finish_reason: null }]
});
for (const part of parts) {
writeChunk({
id: completionId,
object: "chat.completion.chunk",
created,
model,
choices: [{ index: 0, delta: { content: part }, finish_reason: null }]
});
await new Promise((resolve) => setTimeout(resolve, 25));
}
writeChunk({
id: completionId,
object: "chat.completion.chunk",
created,
model,
choices: [{ index: 0, delta: {}, finish_reason: "stop" }]
});
res.write("data: [DONE]\n\n");
res.end();
return;
}
sendJson(200, {
id: completionId,
object: "chat.completion",
created,
model,
choices: [
{
index: 0,
message: { role: "assistant", content: replyText },
finish_reason: "stop"
}
]
});
};
const handleRequest = async () => {
if (req.method === "POST" && requestUrl.pathname === "/openclaw-employee-config") {
const body = await readJsonBody();
......@@ -71,8 +178,8 @@ export async function startSmokeCloudApiServer(baseUrl: string, token: string, r
max_context_length: 200000,
provider: {
name: "Smoke OpenAI Compatible",
base_url: "http://127.0.0.1:11434/v1",
api_key: "runtime-provider-token",
base_url: providerBaseUrl,
api_key: providerToken,
provider_type: "openai_compatible"
}
},
......@@ -148,6 +255,31 @@ export async function startSmokeCloudApiServer(baseUrl: string, token: string, r
return;
}
if (req.method === "GET" && requestUrl.pathname === "/openai/v1/models") {
if (bearerToken !== providerToken) {
sendJson(401, { message: "Invalid provider token." });
return;
}
sendJson(200, {
object: "list",
data: [
{ id: "gpt-5.4-mini", object: "model", created: 0, owned_by: "smoke" },
{ id: "gpt-5.4", object: "model", created: 0, owned_by: "smoke" }
]
});
return;
}
if (req.method === "POST" && requestUrl.pathname === "/openai/v1/chat/completions") {
if (bearerToken !== providerToken) {
sendJson(401, { message: "Invalid provider token." });
return;
}
const body = await readJsonBody();
await sendChatCompletion(body);
return;
}
if (bearerToken !== token) {
sendJson(401, { message: "Invalid cloud access token." });
return;
......
import { contextBridge, ipcRenderer } from "electron";
import { IPC_CHANNELS, type DesktopApi, type RuntimeCloudFetchAction, type SaveConfigInput, type SignInInput } from "@qjclaw/shared-types";
import { contextBridge, ipcRenderer } from "electron";
import {
IPC_CHANNELS,
type ChatStreamListener,
type DesktopApi,
type RuntimeCloudFetchAction,
type SaveConfigInput,
type SignInInput
} from "@qjclaw/shared-types";
const desktopApi: DesktopApi = {
workspace: {
......@@ -55,7 +62,17 @@ const desktopApi: DesktopApi = {
chat: {
listSessions: () => ipcRenderer.invoke(IPC_CHANNELS.chatListSessions),
listMessages: (sessionId: string) => ipcRenderer.invoke(IPC_CHANNELS.chatListMessages, sessionId),
sendPrompt: (sessionId: string, prompt: string, skillId?: string) => ipcRenderer.invoke(IPC_CHANNELS.chatSendPrompt, sessionId, prompt, skillId)
sendPrompt: (sessionId: string, prompt: string, skillId?: string) => ipcRenderer.invoke(IPC_CHANNELS.chatSendPrompt, sessionId, prompt, skillId),
streamPrompt: (sessionId: string, prompt: string, skillId?: string) => ipcRenderer.invoke(IPC_CHANNELS.chatStreamPrompt, sessionId, prompt, skillId),
onStreamEvent: (listener: ChatStreamListener) => {
const wrapped = (_event: Electron.IpcRendererEvent, payload: Parameters<ChatStreamListener>[0]) => {
listener(payload);
};
ipcRenderer.on(IPC_CHANNELS.chatStreamEvent, wrapped);
return () => {
ipcRenderer.removeListener(IPC_CHANNELS.chatStreamEvent, wrapped);
};
}
},
diagnostics: {
openControlUi: () => ipcRenderer.invoke(IPC_CHANNELS.diagnosticsOpenControlUi),
......
import { useEffect, useMemo, useState } from "react";
import { useEffect, useMemo, useRef, useState } from "react";
import type {
AppConfig,
ChatLaunchState,
ChatMessage,
ChatStreamEvent,
ChatStreamListener,
DesktopApi,
DiagnosticsExportResult,
GatewayHealth,
......@@ -19,9 +21,79 @@ import type {
type ViewMode = "chat" | "skills" | "plugins" | "settings";
type Tone = "positive" | "warning";
type MessageStreamState = "streaming" | "error";
type UiChatMessage = ChatMessage & {
streamState?: MessageStreamState;
};
interface ActiveStreamState {
requestId: string;
assistantMessageId: string;
sessionId: string;
targetText: string;
renderedText: string;
finalReply?: ChatMessage;
frameId?: number;
}
type SmokeStreamPhase = "idle" | "requested" | "started" | "streaming" | "completed" | "fallback" | "error";
interface SmokeStreamSnapshot {
phase: SmokeStreamPhase;
prompt: string;
selectedSkillId?: string;
requestId?: string;
sessionId?: string;
runId?: string;
assistantMessageId?: string;
startedEventCount: number;
deltaEventCount: number;
completedEventCount: number;
errorEventCount: number;
fallbackUsed: boolean;
renderedContent: string;
finalContent: string;
executionPolicySource?: string;
executionPolicyModel?: string;
lastError?: string;
}
const DEFAULT_SESSION_ID = "desktop-main";
const SUCCESS_NOTICE_TIMEOUT_MS = 2400;
const TYPEWRITER_CHARS_PER_FRAME = 3;
function createClientMessageId(prefix: string): string {
return globalThis.crypto?.randomUUID?.() ?? `${prefix}-${Date.now()}-${Math.random().toString(16).slice(2)}`;
}
function toUiChatMessage(message: ChatMessage, streamState?: MessageStreamState): UiChatMessage {
return streamState ? { ...message, streamState } : { ...message };
}
function toPlainMessages(items: UiChatMessage[]): ChatMessage[] {
return items.map(({ streamState, ...message }) => message);
}
function buildUserMessage(content: string): UiChatMessage {
return {
id: createClientMessageId("user"),
role: "user",
content,
createdAt: new Date().toISOString()
};
}
function buildAssistantPlaceholder(): UiChatMessage {
return {
id: createClientMessageId("assistant"),
role: "assistant",
content: "",
createdAt: new Date().toISOString(),
streamState: "streaming"
};
}
const DEFAULT_SKILL = {
id: "default-chat",
name: "默认对话",
......@@ -50,6 +122,7 @@ const ui = {
binding: "绑定中...",
changeApiKey: "更换员工密钥",
skillChoice: "选择技能",
clearSkill: "清空技能",
noMessages: "当前没有消息,请先发送一条消息。",
taskPlaceholder: "输入消息后回车或点击发送",
taskDisabledPlaceholder: "请先绑定员工密钥后开始对话。",
......@@ -75,6 +148,14 @@ const ui = {
none: "无"
} as const;
const mockChatStreamListeners = new Set<ChatStreamListener>();
function emitMockChatStreamEvent(event: ChatStreamEvent) {
for (const listener of mockChatStreamListeners) {
listener(event);
}
}
const pluginDisplayMap: Record<string, { name: string; description: string }> = {
"spreadsheet-tools": { name: "表格工具", description: "读取、统计和处理 Excel、CSV 等常见表格文件。" },
"sheet-plugin": { name: "表格工具", description: "读取、统计和处理 Excel、CSV 等常见表格文件。" },
......@@ -155,7 +236,41 @@ const mockDesktopApi = {
chat: {
listSessions: async () => [{ id: DEFAULT_SESSION_ID, title: ui.defaultChat, updatedAt: new Date().toISOString() }],
listMessages: async () => [{ id: "message-1", role: "assistant", content: "Mock UI active.", createdAt: new Date().toISOString() }],
sendPrompt: async (_sessionId: string, prompt: string, skillId?: string) => ({ sessionId: DEFAULT_SESSION_ID, reply: { id: "reply-1", role: "assistant", content: "Mock: " + prompt, createdAt: new Date().toISOString() }, executionPolicy: { source: skillId ? "cloud-skill-binding" : "cloud-default", modelId: "gpt-5.4-mini", modelLabel: "GPT-5.4 Mini", routingMode: "platform-managed", skillId, skillName: skillId, message: "mock" } })
sendPrompt: async (_sessionId: string, prompt: string, skillId?: string) => ({ sessionId: DEFAULT_SESSION_ID, reply: { id: "reply-1", role: "assistant", content: "Mock: " + prompt, createdAt: new Date().toISOString() }, executionPolicy: { source: skillId ? "cloud-skill-binding" : "cloud-default", modelId: "gpt-5.4-mini", modelLabel: "GPT-5.4 Mini", routingMode: "platform-managed", skillId, skillName: skillId, message: "mock" } }),
streamPrompt: async (_sessionId: string, prompt: string, skillId?: string) => {
const requestId = createClientMessageId("mock-request");
const runId = createClientMessageId("mock-run");
const executionPolicy = { source: skillId ? "cloud-skill-binding" as const : "cloud-default" as const, modelId: "gpt-5.4-mini", modelLabel: "GPT-5.4 Mini", routingMode: "platform-managed" as const, skillId, skillName: skillId, message: "mock" };
const replyText = "Mock: " + prompt;
const chunks = replyText.match(/.{1,6}/g) ?? [replyText];
let fullText = "";
window.setTimeout(() => {
emitMockChatStreamEvent({ type: "started", requestId, sessionId: DEFAULT_SESSION_ID, runId, executionPolicy });
}, 0);
chunks.forEach((chunk, index) => {
window.setTimeout(() => {
fullText += chunk;
emitMockChatStreamEvent({ type: "delta", requestId, sessionId: DEFAULT_SESSION_ID, runId, textDelta: chunk, fullText });
}, 90 * (index + 1));
});
window.setTimeout(() => {
emitMockChatStreamEvent({
type: "completed",
requestId,
sessionId: DEFAULT_SESSION_ID,
runId,
reply: { id: createClientMessageId("mock-reply"), role: "assistant", content: replyText, createdAt: new Date().toISOString() },
executionPolicy
});
}, 90 * (chunks.length + 1));
return { requestId, sessionId: DEFAULT_SESSION_ID, runId, executionPolicy };
},
onStreamEvent: (listener: ChatStreamListener) => {
mockChatStreamListeners.add(listener);
return () => {
mockChatStreamListeners.delete(listener);
};
}
},
diagnostics: { openControlUi: async () => undefined, exportSnapshot: async (): Promise<DiagnosticsExportResult> => ({ filePath: "D:/qjclaw/.tmp/mock-diagnostics.json", createdAt: new Date().toISOString() }) }
} as unknown as DesktopApi;
......@@ -187,6 +302,10 @@ declare global {
logs: LogEntry[];
activeSessionId: string;
workspaceSummary: WorkspaceSummary | null;
streamSmoke: SmokeStreamSnapshot | null;
};
__QJC_SMOKE_ACTIONS__?: {
sendChatPrompt(prompt: string, skillId?: string): Promise<void>;
};
}
}
......@@ -218,7 +337,7 @@ export default function App() {
const [systemSummary, setSystemSummary] = useState<SystemSummary | null>(null);
const [gatewayStatus, setGatewayStatus] = useState<GatewayStatus | null>(null);
const [gatewayHealth, setGatewayHealth] = useState<GatewayHealth | null>(null);
const [messages, setMessages] = useState<ChatMessage[]>([]);
const [messages, setMessages] = useState<UiChatMessage[]>([]);
const [activeSessionId, setActiveSessionId] = useState(DEFAULT_SESSION_ID);
const [selectedSkillId, setSelectedSkillId] = useState(DEFAULT_SKILL.id);
const [prompt, setPrompt] = useState("");
......@@ -229,8 +348,10 @@ export default function App() {
const [sending, setSending] = useState(false);
const [errorText, setErrorText] = useState("");
const [infoText, setInfoText] = useState("");
const activeStreamRef = useRef<ActiveStreamState | null>(null);
const [streamSmoke, setStreamSmoke] = useState<SmokeStreamSnapshot | null>(null);
const effectiveSkills = useMemo(() => (workspace?.skills?.length ? workspace.skills : [DEFAULT_SKILL]), [workspace]);
const effectiveSkills = useMemo(() => (workspace?.skills?.length ? [DEFAULT_SKILL, ...workspace.skills] : [DEFAULT_SKILL]), [workspace]);
const selectedSkill = useMemo(() => effectiveSkills.find((skill) => skill.id === selectedSkillId) ?? effectiveSkills[0] ?? DEFAULT_SKILL, [effectiveSkills, selectedSkillId]);
const chatLaunchState: ChatLaunchState = workspace?.chatLaunchState ?? (workspace?.apiKeyConfigured ? "starting" : "unbound");
const chatStatusMessage = workspace?.chatStatusMessage ?? (chatLaunchState === "starting" ? ui.startingHint : chatLaunchState === "error" ? ui.chatNotReadyError : "");
......@@ -262,7 +383,7 @@ export default function App() {
}
try {
setMessages(await desktopApi.chat.listMessages(sessionId));
setMessages((await desktopApi.chat.listMessages(sessionId)).map((message) => toUiChatMessage(message)));
} catch (error) {
setMessages([]);
if (showError) {
......@@ -305,7 +426,7 @@ export default function App() {
setWorkspacePathDraft((current) => current || nextConfig.workspacePath);
setGatewayStatus(statusResult);
const nextSkills = nextWorkspace.skills.length ? nextWorkspace.skills : [DEFAULT_SKILL];
const nextSkills = nextWorkspace.skills.length ? [DEFAULT_SKILL, ...nextWorkspace.skills] : [DEFAULT_SKILL];
if (!nextSkills.some((skill) => skill.id === selectedSkillId)) {
setSelectedSkillId(nextSkills[0].id);
}
......@@ -368,13 +489,285 @@ export default function App() {
modelConfig: null,
systemSummary,
sessions,
messages,
messages: toPlainMessages(messages),
logs: [],
activeSessionId,
workspaceSummary: workspace
workspaceSummary: workspace,
streamSmoke
};
}, [activeSessionId, config, gatewayHealth, gatewayStatus, messages, runtimeCloudStatus, runtimeStatus, runtimeTelemetry, sessions, systemSummary, workspace]);
}, [activeSessionId, config, gatewayHealth, gatewayStatus, messages, runtimeCloudStatus, runtimeStatus, runtimeTelemetry, sessions, streamSmoke, systemSummary, workspace]);
useEffect(() => {
if (!smokeEnabled) {
delete window.__QJC_SMOKE_ACTIONS__;
return;
}
window.__QJC_SMOKE_ACTIONS__ = {
sendChatPrompt: async (nextPrompt: string, skillId?: string) => {
setViewMode("chat");
if (skillId) {
setSelectedSkillId(skillId);
}
setPrompt(nextPrompt);
window.setTimeout(() => {
void submitPrompt(nextPrompt, skillId);
}, 0);
}
};
return () => {
delete window.__QJC_SMOKE_ACTIONS__;
};
});
function updateMessageById(messageId: string, updater: (message: UiChatMessage) => UiChatMessage) {
setMessages((current) => current.map((message) => (message.id === messageId ? updater(message) : message)));
}
function updateStreamSmoke(updater: (current: SmokeStreamSnapshot | null) => SmokeStreamSnapshot | null) {
if (!smokeEnabled) {
return;
}
setStreamSmoke((current) => updater(current));
}
useEffect(() => {
updateStreamSmoke((current) => {
if (!current?.assistantMessageId) {
return current;
}
const assistantMessage = messages.find((message) => message.id === current.assistantMessageId);
if (!assistantMessage) {
return current;
}
const nextRenderedContent = assistantMessage.content;
const nextFinalContent = assistantMessage.streamState ? current.finalContent : assistantMessage.content;
if (nextRenderedContent === current.renderedContent && nextFinalContent === current.finalContent) {
return current;
}
return {
...current,
renderedContent: nextRenderedContent,
finalContent: nextFinalContent
};
});
}, [messages]);
function cancelTypewriter() {
const activeStream = activeStreamRef.current;
if (activeStream?.frameId) {
window.cancelAnimationFrame(activeStream.frameId);
activeStream.frameId = undefined;
}
}
async function syncChatAfterSend(sessionId: string) {
setActiveSessionId(sessionId);
const [telemetry, nextWorkspace, nextGateway] = await Promise.all([
desktopApi.runtimeTelemetry.getStatus().catch(() => null),
desktopApi.workspace.getSummary().catch(() => null),
desktopApi.gateway.status().catch(() => null)
]);
if (telemetry) {
setRuntimeTelemetry(telemetry);
}
if (nextWorkspace) {
setWorkspace(nextWorkspace);
}
setGatewayStatus(nextGateway);
}
function finalizeActiveStream() {
const activeStream = activeStreamRef.current;
if (!activeStream || !activeStream.finalReply) {
return;
}
cancelTypewriter();
updateMessageById(activeStream.assistantMessageId, (message) => ({
...message,
content: activeStream.finalReply?.content ?? activeStream.targetText,
createdAt: activeStream.finalReply?.createdAt ?? message.createdAt,
streamState: undefined
}));
updateStreamSmoke((current) => current ? {
...current,
phase: "completed",
renderedContent: activeStream.finalReply?.content ?? activeStream.targetText,
finalContent: activeStream.finalReply?.content ?? activeStream.targetText
} : current);
const sessionId = activeStream.sessionId;
activeStreamRef.current = null;
setSending(false);
void syncChatAfterSend(sessionId);
}
function scheduleTypewriter() {
const activeStream = activeStreamRef.current;
if (!activeStream || activeStream.frameId) {
return;
}
activeStream.frameId = window.requestAnimationFrame(() => {
const currentStream = activeStreamRef.current;
if (!currentStream) {
return;
}
currentStream.frameId = undefined;
if (currentStream.renderedText.length < currentStream.targetText.length) {
const nextChunk = currentStream.targetText.slice(
currentStream.renderedText.length,
currentStream.renderedText.length + TYPEWRITER_CHARS_PER_FRAME
);
currentStream.renderedText += nextChunk;
updateMessageById(currentStream.assistantMessageId, (message) => ({
...message,
content: currentStream.renderedText,
streamState: "streaming"
}));
}
if (currentStream.renderedText.length < currentStream.targetText.length) {
scheduleTypewriter();
return;
}
finalizeActiveStream();
});
}
function failActiveStream(message: string) {
const activeStream = activeStreamRef.current;
if (activeStream) {
cancelTypewriter();
updateMessageById(activeStream.assistantMessageId, (current) => ({
...current,
content: activeStream.renderedText || activeStream.targetText || current.content,
streamState: "error"
}));
updateStreamSmoke((current) => current ? {
...current,
phase: "error",
renderedContent: activeStream.renderedText || activeStream.targetText || current.renderedContent,
finalContent: activeStream.renderedText || activeStream.targetText || current.finalContent,
lastError: message,
errorEventCount: current.errorEventCount + 1
} : current);
activeStreamRef.current = null;
}
setSending(false);
setErrorText(message);
}
async function completeWithFallback(sessionId: string, promptText: string, skillId: string | undefined, assistantMessageId: string) {
const result = await desktopApi.chat.sendPrompt(sessionId, promptText, skillId);
cancelTypewriter();
activeStreamRef.current = null;
updateMessageById(assistantMessageId, (message) => ({
...message,
content: result.reply.content,
createdAt: result.reply.createdAt,
streamState: undefined
}));
updateStreamSmoke((current) => current ? {
...current,
phase: "fallback",
fallbackUsed: true,
finalContent: result.reply.content,
renderedContent: result.reply.content,
executionPolicySource: result.executionPolicy?.source ?? current.executionPolicySource,
executionPolicyModel: result.executionPolicy?.modelLabel ?? current.executionPolicyModel,
lastError: current.lastError
} : current);
await syncChatAfterSend(result.sessionId);
setSending(false);
}
useEffect(() => {
const unsubscribe = desktopApi.chat.onStreamEvent((event) => {
const activeStream = activeStreamRef.current;
if (!activeStream || event.requestId !== activeStream.requestId) {
return;
}
if (event.type === "started") {
activeStream.sessionId = event.sessionId;
setActiveSessionId(event.sessionId);
updateStreamSmoke((current) => current ? {
...current,
phase: "started",
requestId: event.requestId,
sessionId: event.sessionId,
runId: event.runId,
startedEventCount: current.startedEventCount + 1,
executionPolicySource: event.executionPolicy?.source ?? current.executionPolicySource,
executionPolicyModel: event.executionPolicy?.modelLabel ?? current.executionPolicyModel
} : current);
return;
}
if (event.type === "delta") {
activeStream.sessionId = event.sessionId;
activeStream.targetText = event.fullText && event.fullText.length >= activeStream.targetText.length
? event.fullText
: activeStream.targetText + event.textDelta;
updateStreamSmoke((current) => current ? {
...current,
phase: "streaming",
sessionId: event.sessionId,
runId: event.runId,
deltaEventCount: current.deltaEventCount + 1
} : current);
scheduleTypewriter();
return;
}
if (event.type === "completed") {
activeStream.sessionId = event.sessionId;
activeStream.finalReply = event.reply;
if (event.reply.content.length >= activeStream.targetText.length) {
activeStream.targetText = event.reply.content;
}
updateStreamSmoke((current) => current ? {
...current,
sessionId: event.sessionId,
runId: event.runId,
completedEventCount: current.completedEventCount + 1,
finalContent: event.reply.content,
executionPolicySource: event.executionPolicy?.source ?? current.executionPolicySource,
executionPolicyModel: event.executionPolicy?.modelLabel ?? current.executionPolicyModel
} : current);
scheduleTypewriter();
return;
}
if (event.type === "error") {
updateStreamSmoke((current) => current ? {
...current,
phase: "error",
sessionId: event.sessionId,
runId: event.runId,
errorEventCount: current.errorEventCount + 1,
lastError: event.message
} : current);
failActiveStream(event.message);
}
});
return () => {
unsubscribe();
cancelTypewriter();
activeStreamRef.current = null;
};
}, []);
async function saveConfig(nextApiKey?: string) {
if (!config) {
return;
......@@ -465,8 +858,9 @@ export default function App() {
throw new Error(confirmedWorkspace.chatStatusMessage ?? ui.chatNotReadyError);
}
async function sendPrompt() {
if (!canSend) {
async function submitPrompt(promptText: string, requestedSkillId?: string) {
const trimmedPrompt = promptText.trim();
if (!trimmedPrompt || sending || saving) {
return;
}
......@@ -475,30 +869,71 @@ export default function App() {
try {
await ensureChatAvailable();
const skillId = selectedSkill.id === DEFAULT_SKILL.id ? undefined : selectedSkill.id;
const result = await desktopApi.chat.sendPrompt(DEFAULT_SESSION_ID, prompt.trim(), skillId);
const skillId = requestedSkillId === DEFAULT_SKILL.id ? undefined : requestedSkillId;
const userMessage = buildUserMessage(trimmedPrompt);
const assistantMessage = buildAssistantPlaceholder();
updateStreamSmoke(() => ({
phase: "requested",
prompt: trimmedPrompt,
selectedSkillId: skillId,
requestId: undefined,
sessionId: DEFAULT_SESSION_ID,
runId: undefined,
assistantMessageId: assistantMessage.id,
startedEventCount: 0,
deltaEventCount: 0,
completedEventCount: 0,
errorEventCount: 0,
fallbackUsed: false,
renderedContent: "",
finalContent: ""
}));
setPrompt("");
setActiveSessionId(result.sessionId);
await loadMessages(result.sessionId, true, true);
setMessages((current) => [...current, userMessage, assistantMessage]);
setActiveSessionId(DEFAULT_SESSION_ID);
const [telemetry, nextWorkspace, nextGateway] = await Promise.all([
desktopApi.runtimeTelemetry.getStatus().catch(() => null),
desktopApi.workspace.getSummary().catch(() => null),
desktopApi.gateway.status().catch(() => null)
]);
if (telemetry) {
setRuntimeTelemetry(telemetry);
}
if (nextWorkspace) {
setWorkspace(nextWorkspace);
try {
const stream = await desktopApi.chat.streamPrompt(DEFAULT_SESSION_ID, trimmedPrompt, skillId);
activeStreamRef.current = {
requestId: stream.requestId,
assistantMessageId: assistantMessage.id,
sessionId: stream.sessionId,
targetText: "",
renderedText: ""
};
updateStreamSmoke((current) => current ? {
...current,
requestId: stream.requestId,
sessionId: stream.sessionId,
runId: stream.runId,
executionPolicySource: stream.executionPolicy?.source ?? current.executionPolicySource,
executionPolicyModel: stream.executionPolicy?.modelLabel ?? current.executionPolicyModel
} : current);
setActiveSessionId(stream.sessionId);
} catch {
await completeWithFallback(DEFAULT_SESSION_ID, trimmedPrompt, skillId, assistantMessage.id);
}
setGatewayStatus(nextGateway);
} catch (error) {
setErrorText(err(error));
} finally {
setSending(false);
const message = err(error);
updateStreamSmoke((current) => current ? {
...current,
phase: "error",
lastError: message
} : current);
setErrorText(message);
}
}
async function sendPrompt() {
if (!canSend) {
return;
}
const skillId = selectedSkill.id === DEFAULT_SKILL.id ? undefined : selectedSkill.id;
await submitPrompt(prompt, skillId);
}
async function exportDiagnostics() {
......@@ -562,9 +997,12 @@ export default function App() {
{showChatStatusHint ? <div className={"inline-hint" + (chatLaunchState === "error" ? " error" : "")}>{chatStatusMessage}</div> : null}
<div className="message-list">
{messages.map((message) => (
<article key={message.id} className={"message-card " + message.role}>
<header><strong>{message.role === "assistant" ? ui.app : message.role === "user" ? "用户" : "系统"}</strong></header>
<p>{message.content}</p>
<article key={message.id} className={"message-card " + message.role + (message.streamState ? " " + message.streamState : "")}>
<header><strong>{message.role === "assistant" ? ui.app : message.role === "user" ? "\u7528\u6237" : "\u7cfb\u7edf"}</strong></header>
<p>
{message.content}
{message.streamState === "streaming" ? <span className="message-cursor" aria-hidden="true" /> : null}
</p>
</article>
))}
{!messages.length ? <div className="empty-state">{ui.noMessages}</div> : null}
......@@ -573,9 +1011,14 @@ export default function App() {
<div className="composer-meta">
<label className="skill-select">
<span className="field-label">{ui.skillChoice}</span>
<div className="skill-select-row">
<select value={selectedSkillId} disabled={!isBound} onChange={(event) => setSelectedSkillId(event.target.value)}>
{effectiveSkills.map((skill) => <option key={skill.id} value={skill.id}>{skill.name}</option>)}
</select>
{selectedSkillId !== DEFAULT_SKILL.id ? (
<button type="button" className="secondary skill-clear-button" disabled={!isBound} onClick={() => setSelectedSkillId(DEFAULT_SKILL.id)}>{ui.clearSkill}</button>
) : null}
</div>
</label>
<p className="composer-hint">{isBound ? selectedSkill.description : ui.taskDisabledPlaceholder}</p>
</div>
......
......@@ -277,12 +277,25 @@ strong { font-weight: 600; }
.message-card { padding: 16px; }
.message-card.user { background: #eef5ff; }
.message-card.assistant { background: #eefbf7; }
.message-card.streaming { border-color: #b7e4d5; }
.message-card.error { border-color: rgba(239, 68, 68, 0.24); }
.message-card p {
white-space: pre-wrap;
line-height: 1.7;
margin-top: 6px;
}
.message-cursor {
display: inline-block;
width: 8px;
height: 1.05em;
margin-left: 3px;
border-radius: 999px;
background: #0f7bff;
vertical-align: text-bottom;
animation: cursor-blink 1s steps(1, end) infinite;
}
.composer-shell {
gap: 10px;
padding: 14px;
......@@ -355,6 +368,15 @@ strong { font-weight: 600; }
}
}
@keyframes cursor-blink {
0%, 50% {
opacity: 1;
}
50.1%, 100% {
opacity: 0;
}
}
@media (max-width: 1100px) {
.hero-line { font-size: 21px; }
.composer-meta {
......
......@@ -108,21 +108,39 @@ if (!result.ok) {
throw new Error('Electron smoke failed: ' + message);
}
const sendResult = result.sendResult || {};
const replyPolicy = sendResult.reply && sendResult.reply.executionPolicy;
if (!replyPolicy) {
throw new Error('Execution policy was not returned from chat.sendPrompt.');
const streamSmoke = sendResult.streamSmoke || {};
if (!sendResult.selectedSkillId) {
throw new Error('Smoke did not select a Skill before streaming.');
}
if (replyPolicy.source !== 'cloud-skill-binding') {
throw new Error('Unexpected execution policy source: ' + replyPolicy.source);
if (streamSmoke.phase !== 'completed') {
throw new Error('Renderer stream smoke did not complete successfully: ' + streamSmoke.phase);
}
if (!sendResult.selectedSkillId) {
throw new Error('Smoke did not select a Skill before sendPrompt.');
if (streamSmoke.fallbackUsed) {
throw new Error('Renderer stream smoke fell back to non-streaming sendPrompt.');
}
if (streamSmoke.executionPolicySource !== 'cloud-skill-binding') {
throw new Error('Unexpected stream execution policy source: ' + streamSmoke.executionPolicySource);
}
if (streamSmoke.selectedSkillId !== sendResult.selectedSkillId) {
throw new Error('Renderer stream selectedSkillId does not match smoke selection.');
}
if (Number(streamSmoke.startedEventCount || 0) < 1) {
throw new Error('Renderer stream smoke did not observe a started event.');
}
if (Number(streamSmoke.deltaEventCount || 0) < 1) {
throw new Error('Renderer stream smoke did not observe a delta event.');
}
if (Number(streamSmoke.completedEventCount || 0) < 1) {
throw new Error('Renderer stream smoke did not observe a completed event.');
}
if (Number(streamSmoke.errorEventCount || 0) !== 0) {
throw new Error('Renderer stream smoke observed unexpected error events: ' + streamSmoke.errorEventCount);
}
if (replyPolicy.skillId !== sendResult.selectedSkillId) {
throw new Error('Execution policy skillId does not match selectedSkillId.');
if (!String(streamSmoke.renderedContent || '')) {
throw new Error('Renderer stream smoke did not render assistant content.');
}
if (!sendResult.modelConfig || sendResult.modelConfig.routingMode !== 'skill-bound') {
throw new Error('Unexpected model routing mode: ' + (sendResult.modelConfig && sendResult.modelConfig.routingMode));
if (String(streamSmoke.finalContent || '') !== String(sendResult.lastMessage && sendResult.lastMessage.content || '')) {
throw new Error('Renderer final stream content does not match persisted last message.');
}
if (String(sendResult.system && sendResult.system.userDataPath) !== expectedUserData) {
throw new Error('Smoke ran against an unexpected userData path: ' + (sendResult.system && sendResult.system.userDataPath));
......@@ -180,9 +198,12 @@ const summary = {
userDataPath: expectedUserData,
logsPath: expectedLogs,
selectedSkillId: String(sendResult.selectedSkillId),
executionPolicySource: String(replyPolicy.source),
executionPolicyModel: String(replyPolicy.modelLabel),
executionPolicyRouting: String(replyPolicy.routingMode),
executionPolicySource: String(streamSmoke.executionPolicySource || ''),
executionPolicyModel: String(streamSmoke.executionPolicyModel || ''),
streamPhase: String(streamSmoke.phase || ''),
streamStartedEventCount: Number(streamSmoke.startedEventCount || 0),
streamDeltaEventCount: Number(streamSmoke.deltaEventCount || 0),
streamCompletedEventCount: Number(streamSmoke.completedEventCount || 0),
runtimeActiveMode: String(sendResult.runtimeStatusAfterProbe && sendResult.runtimeStatusAfterProbe.activeMode || ''),
runtimeProcessState: String(sendResult.runtimeStatusAfterProbe && sendResult.runtimeStatusAfterProbe.processState || ''),
runtimeGatewayUrl: String(sendResult.runtimeStatusAfterProbe && sendResult.runtimeStatusAfterProbe.gatewayUrl || ''),
......
......@@ -182,6 +182,7 @@ if (!result.ok) {
throw new Error('Installed smoke failed: ' + (result.error || 'Unknown smoke failure.'));
}
const sendResult = result.sendResult || {};
const streamSmoke = sendResult.streamSmoke || {};
if (!sendResult.system || !sendResult.system.isPackaged) {
throw new Error('Installed smoke did not report packaged mode.');
}
......@@ -203,6 +204,24 @@ if (String(sendResult.system.userDataPath) !== expectedUserData) {
if (String(sendResult.system.logsPath) !== expectedLogs) {
throw new Error('Installed smoke ran against an unexpected logs path: ' + sendResult.system.logsPath);
}
if (streamSmoke.phase !== 'completed') {
throw new Error('Installed renderer stream smoke did not complete successfully: ' + streamSmoke.phase);
}
if (streamSmoke.fallbackUsed) {
throw new Error('Installed renderer stream smoke fell back to non-streaming sendPrompt.');
}
if (Number(streamSmoke.startedEventCount || 0) < 1 || Number(streamSmoke.deltaEventCount || 0) < 1 || Number(streamSmoke.completedEventCount || 0) < 1) {
throw new Error('Installed renderer stream smoke did not observe the expected started/delta/completed events.');
}
if (Number(streamSmoke.errorEventCount || 0) !== 0) {
throw new Error('Installed renderer stream smoke observed unexpected error events: ' + streamSmoke.errorEventCount);
}
if (!String(streamSmoke.renderedContent || '')) {
throw new Error('Installed renderer stream smoke did not render assistant content.');
}
if (String(streamSmoke.finalContent || '') !== String(sendResult.lastMessage && sendResult.lastMessage.content || '')) {
throw new Error('Installed renderer final stream content does not match persisted last message.');
}
if (expectBundled === 'true') {
const runtimeStatus = sendResult.runtimeStatusAfterProbe || {};
const runtimeHealth = sendResult.runtimeHealthAfterProbe || {};
......@@ -248,6 +267,10 @@ const summary = {
runtimePythonPackages: sendResult.runtimeStatusAfterProbe && sendResult.runtimeStatusAfterProbe.installedPythonPackages || [],
authState: String(sendResult.session && sendResult.session.state || ''),
skillCount: Array.isArray(sendResult.skills) ? sendResult.skills.length : 0,
streamPhase: String(streamSmoke.phase || ''),
streamStartedEventCount: Number(streamSmoke.startedEventCount || 0),
streamDeltaEventCount: Number(streamSmoke.deltaEventCount || 0),
streamCompletedEventCount: Number(streamSmoke.completedEventCount || 0),
diagnosticsPath,
runtimeResourceDir,
bundledPythonExecutable: packagedPythonExe,
......
......@@ -46,6 +46,10 @@ interface PendingChatRun {
reject: (reason?: unknown) => void;
timer: NodeJS.Timeout;
sessionKey: string;
accumulatedText: string;
onDelta?: (value: GatewayPromptStreamDelta) => void;
onCompleted?: (value: { sessionId: string; runId: string; reply: ChatMessage }) => void;
onError?: (value: { sessionId: string; runId?: string; error: Error }) => void;
}
interface GatewayClientOptions {
......@@ -102,6 +106,26 @@ interface ChatHistoryResult {
}>;
}
export interface GatewayPromptStreamStart {
sessionId: string;
runId: string;
completion: Promise<ChatMessage>;
}
export interface GatewayPromptStreamDelta {
sessionId: string;
runId: string;
textDelta: string;
fullText?: string;
}
export interface GatewayPromptStreamHandlers {
onStarted?: (value: GatewayPromptStreamStart) => void;
onDelta?: (value: GatewayPromptStreamDelta) => void;
onCompleted?: (value: { sessionId: string; runId: string; reply: ChatMessage }) => void;
onError?: (value: { sessionId: string; runId?: string; error: Error }) => void;
}
const CLIENT_ID = "gateway-client";
const CLIENT_MODE = "backend";
const ROLE = "operator";
......@@ -342,21 +366,44 @@ export class GatewayClient {
async listMessages(sessionId: string): Promise<ChatMessage[]> {
await this.ensureConnected();
const result = (await this.request("chat.history", { sessionKey: sessionId, limit: 100 })) as ChatHistoryResult;
return (result.messages ?? []).map((message, index) => ({
id: `${sessionId}:${message.timestamp ?? index}:${index}`,
const sessionKeys = [sessionId];
if (!sessionId.startsWith("agent:")) {
sessionKeys.push(`agent:main:${sessionId}`);
}
for (let index = 0; index < sessionKeys.length; index += 1) {
const sessionKey = sessionKeys[index];
const result = (await this.request("chat.history", { sessionKey, limit: 100 })) as ChatHistoryResult;
const messages = (result.messages ?? []).map((message, messageIndex) => ({
id: `${sessionKey}:${message.timestamp ?? messageIndex}:${messageIndex}`,
role: message.role ?? "assistant",
content: this.flattenContent(message.content),
createdAt: new Date(message.timestamp ?? Date.now()).toISOString()
}));
if (messages.length > 0 || index === sessionKeys.length - 1) {
return messages;
}
}
return [];
}
async sendPrompt(sessionId: string, prompt: string): Promise<PromptResult> {
const stream = await this.streamPrompt(sessionId, prompt);
const reply = await stream.completion;
return {
sessionId: stream.sessionId,
reply
};
}
async streamPrompt(sessionId: string, prompt: string, handlers: GatewayPromptStreamHandlers = {}): Promise<GatewayPromptStreamStart> {
await this.ensureConnected();
const result = (await this.request("chat.send", {
sessionKey: sessionId,
message: prompt,
idempotencyKey: randomUUID()
})) as { runId?: string; status?: string };
const runId = result.runId;
......@@ -364,11 +411,9 @@ export class GatewayClient {
throw new Error("Gateway did not return a runId for chat.send.");
}
const reply = await this.waitForChatCompletion(runId, sessionId);
return {
sessionId,
reply
};
const completion = this.trackChatRun(runId, sessionId, handlers);
handlers.onStarted?.({ sessionId, runId, completion });
return { sessionId, runId, completion };
}
private async handleEvent(frame: Record<string, unknown>): Promise<void> {
......@@ -427,28 +472,30 @@ export class GatewayClient {
if (eventName === "chat") {
const payload = (frame.payload ?? frame) as Record<string, unknown>;
const runId = typeof payload.runId === "string" ? payload.runId : undefined;
const state = typeof payload.state === "string" ? payload.state : undefined;
if (runId && state === "final") {
const pending = this.pendingChatRuns.get(runId);
if (pending) {
clearTimeout(pending.timer);
this.pendingChatRuns.delete(runId);
const message = payload.message as Record<string, unknown> | undefined;
pending.resolve({
id: `${pending.sessionKey}:${runId}:final`,
role: "assistant",
content: this.flattenContent(message?.content as Array<{ type?: string; text?: string }> | undefined),
createdAt: new Date(Number(message?.timestamp ?? Date.now())).toISOString()
});
const runId = this.findStringDeep(payload, ["runId", "run_id"]);
const state = this.findStringDeep(payload, ["state", "status"]);
if (runId) {
this.emitChatDelta(runId, payload);
if (state === "final") {
this.completeChatRun(runId, this.buildChatMessage(runId, payload));
}
}
}
if (eventName === "agent") {
const payload = (frame.payload ?? frame) as Record<string, unknown>;
if (payload.stream === "error") {
this.appendLog("warn", `Agent stream error: ${JSON.stringify(payload.data ?? {})}`);
const runId = this.findStringDeep(payload, ["runId", "run_id"]);
const stream = this.findStringDeep(payload, ["stream"]);
if (stream === "error") {
const message = this.extractTextCandidate(payload.data) ?? this.extractTextCandidate(payload) ?? JSON.stringify(payload.data ?? {});
this.appendLog("warn", "Agent stream error: " + message);
if (runId) {
this.failChatRun(runId, new Error(message));
}
} else if (!stream || stream === "assistant") {
if (runId) {
this.emitChatDelta(runId, payload.data ?? payload);
}
}
}
......@@ -543,31 +590,255 @@ export class GatewayClient {
});
}
private async waitForChatCompletion(runId: string, sessionKey: string): Promise<ChatMessage> {
private trackChatRun(runId: string, sessionKey: string, handlers: GatewayPromptStreamHandlers): Promise<ChatMessage> {
return new Promise((resolve, reject) => {
const timer = setTimeout(async () => {
this.pendingChatRuns.set(runId, {
resolve,
reject,
timer: this.createChatRunTimer(runId, sessionKey),
sessionKey,
accumulatedText: "",
onDelta: handlers.onDelta,
onCompleted: handlers.onCompleted,
onError: handlers.onError
});
});
}
private createChatRunTimer(runId: string, sessionKey: string): NodeJS.Timeout {
return setTimeout(async () => {
const pending = this.pendingChatRuns.get(runId);
if (!pending) {
return;
}
this.pendingChatRuns.delete(runId);
try {
const history = await this.listMessages(sessionKey);
const assistant = [...history].reverse().find((message) => message.role === "assistant");
if (assistant) {
resolve(assistant);
pending.onCompleted?.({ sessionId: sessionKey, runId, reply: assistant });
pending.resolve(assistant);
return;
}
} catch {
}
reject(new Error("Timed out waiting for chat completion."));
const error = new Error("Timed out waiting for chat completion.");
pending.onError?.({ sessionId: sessionKey, runId, error });
pending.reject(error);
}, 30000);
}
this.pendingChatRuns.set(runId, {
resolve,
reject,
timer,
sessionKey
});
private refreshChatRunTimer(runId: string): void {
const pending = this.pendingChatRuns.get(runId);
if (!pending) {
return;
}
clearTimeout(pending.timer);
pending.timer = this.createChatRunTimer(runId, pending.sessionKey);
}
private emitChatDelta(runId: string, payload: unknown): void {
const pending = this.pendingChatRuns.get(runId);
if (!pending) {
return;
}
const nextFullText = this.extractTextCandidate(payload);
if (!nextFullText || nextFullText.length < pending.accumulatedText.length || nextFullText === pending.accumulatedText) {
return;
}
const textDelta = nextFullText.startsWith(pending.accumulatedText)
? nextFullText.slice(pending.accumulatedText.length)
: nextFullText;
pending.accumulatedText = nextFullText;
this.refreshChatRunTimer(runId);
pending.onDelta?.({
sessionId: pending.sessionKey,
runId,
textDelta,
fullText: nextFullText
});
}
private completeChatRun(runId: string, reply: ChatMessage): void {
const pending = this.pendingChatRuns.get(runId);
if (!pending) {
return;
}
clearTimeout(pending.timer);
this.pendingChatRuns.delete(runId);
pending.onCompleted?.({ sessionId: pending.sessionKey, runId, reply });
pending.resolve(reply);
}
private failChatRun(runId: string, error: Error): void {
const pending = this.pendingChatRuns.get(runId);
if (!pending) {
return;
}
clearTimeout(pending.timer);
this.pendingChatRuns.delete(runId);
pending.onError?.({ sessionId: pending.sessionKey, runId, error });
pending.reject(error);
}
private buildChatMessage(runId: string, payload: Record<string, unknown>): ChatMessage {
const pending = this.pendingChatRuns.get(runId);
const message = this.findRecordDeep(payload, ["message"]);
const content = this.extractTextCandidate(message) ?? pending?.accumulatedText ?? "";
const timestamp = this.findNumberDeep(message ?? payload, ["timestamp", "createdAt", "created_at"]);
return {
id: `${pending?.sessionKey ?? "session"}:${runId}:final`,
role: "assistant",
content,
createdAt: new Date(timestamp ?? Date.now()).toISOString()
};
}
private extractTextCandidate(value: unknown): string | undefined {
if (typeof value === "string") {
const normalized = value.replace(/\r\n/g, "\n");
return normalized.length ? normalized : undefined;
}
if (Array.isArray(value)) {
const joined = value
.map((item) => this.extractTextCandidate(item))
.filter((item): item is string => Boolean(item))
.join("\n");
return joined || undefined;
}
if (!value || typeof value !== "object") {
return undefined;
}
const record = value as Record<string, unknown>;
if (Array.isArray(record.content)) {
const flattened = this.flattenContent(record.content as Array<{ type?: string; text?: string }>);
if (flattened) {
return flattened;
}
}
for (const key of ["textDelta", "delta", "text", "content", "message", "data", "output", "response"]) {
if (!(key in record)) {
continue;
}
const candidate = this.extractTextCandidate(record[key]);
if (candidate) {
return candidate;
}
}
return undefined;
}
private findStringDeep(value: unknown, keys: string[]): string | undefined {
if (!value || typeof value !== "object") {
return undefined;
}
if (Array.isArray(value)) {
for (const item of value) {
const nested = this.findStringDeep(item, keys);
if (nested) {
return nested;
}
}
return undefined;
}
const record = value as Record<string, unknown>;
for (const key of keys) {
if (typeof record[key] === "string") {
return record[key] as string;
}
}
for (const nestedValue of Object.values(record)) {
const nested = this.findStringDeep(nestedValue, keys);
if (nested) {
return nested;
}
}
return undefined;
}
private findNumberDeep(value: unknown, keys: string[]): number | undefined {
if (!value || typeof value !== "object") {
return undefined;
}
if (Array.isArray(value)) {
for (const item of value) {
const nested = this.findNumberDeep(item, keys);
if (typeof nested === "number") {
return nested;
}
}
return undefined;
}
const record = value as Record<string, unknown>;
for (const key of keys) {
if (typeof record[key] === "number" && Number.isFinite(record[key])) {
return record[key] as number;
}
}
for (const nestedValue of Object.values(record)) {
const nested = this.findNumberDeep(nestedValue, keys);
if (typeof nested === "number") {
return nested;
}
}
return undefined;
}
private findRecordDeep(value: unknown, keys: string[]): Record<string, unknown> | undefined {
if (!value || typeof value !== "object") {
return undefined;
}
if (Array.isArray(value)) {
for (const item of value) {
const nested = this.findRecordDeep(item, keys);
if (nested) {
return nested;
}
}
return undefined;
}
const record = value as Record<string, unknown>;
for (const key of keys) {
const candidate = record[key];
if (candidate && typeof candidate === "object" && !Array.isArray(candidate)) {
return candidate as Record<string, unknown>;
}
}
for (const nestedValue of Object.values(record)) {
const nested = this.findRecordDeep(nestedValue, keys);
if (nested) {
return nested;
}
}
return undefined;
}
private async ensureConnected(): Promise<void> {
if (this.websocket && this.websocket.readyState === WebSocket.OPEN && this.statusSnapshot.state === "connected") {
return;
......@@ -608,6 +879,20 @@ export class GatewayClient {
}
private failConnection(message: string): void {
for (const pending of this.pendingRequests.values()) {
clearTimeout(pending.timer);
pending.reject(new Error(message));
}
this.pendingRequests.clear();
for (const [runId, pending] of this.pendingChatRuns.entries()) {
clearTimeout(pending.timer);
const error = new Error(message);
pending.onError?.({ sessionId: pending.sessionKey, runId, error });
pending.reject(error);
}
this.pendingChatRuns.clear();
this.statusSnapshot = this.createStatus("error", message);
this.appendLog("error", message);
}
......@@ -693,7 +978,7 @@ export class GatewayClient {
private stripStructuredLogPrefix(message: string): string {
return message
.replace(LOG_PREFIX_PATTERN, "")
.replace(/âœ\?/g, "ok ")
.replace(/\?/g, "ok ")
.replace(/[?]{2,}/g, "")
.replace(/\s+/g, " ")
.trim();
......
......@@ -20,6 +20,8 @@ export const IPC_CHANNELS = {
chatListSessions: "chat:list-sessions",
chatListMessages: "chat:list-messages",
chatSendPrompt: "chat:send-prompt",
chatStreamPrompt: "chat:stream-prompt",
chatStreamEvent: "chat:stream-event",
diagnosticsOpenControlUi: "diagnostics:open-control-ui",
diagnosticsExportSnapshot: "diagnostics:export-snapshot",
authGetSession: "auth:get-session",
......@@ -263,6 +265,51 @@ export interface ChatExecutionPolicy {
message: string;
}
export interface ChatStreamPromptResult {
requestId: string;
sessionId: string;
runId?: string;
executionPolicy?: ChatExecutionPolicy;
}
export interface ChatStreamStartedEvent {
type: "started";
requestId: string;
sessionId: string;
runId?: string;
executionPolicy?: ChatExecutionPolicy;
}
export interface ChatStreamDeltaEvent {
type: "delta";
requestId: string;
sessionId: string;
runId: string;
textDelta: string;
fullText?: string;
}
export interface ChatStreamCompletedEvent {
type: "completed";
requestId: string;
sessionId: string;
runId: string;
reply: ChatMessage;
executionPolicy?: ChatExecutionPolicy;
}
export interface ChatStreamErrorEvent {
type: "error";
requestId: string;
sessionId: string;
runId?: string;
message: string;
}
export type ChatStreamEvent = ChatStreamStartedEvent | ChatStreamDeltaEvent | ChatStreamCompletedEvent | ChatStreamErrorEvent;
export type ChatStreamListener = (event: ChatStreamEvent) => void;
export interface PromptResult {
sessionId: string;
reply: ChatMessage;
......@@ -448,6 +495,8 @@ export interface DesktopApi {
listSessions(): Promise<SessionSummary[]>;
listMessages(sessionId: string): Promise<ChatMessage[]>;
sendPrompt(sessionId: string, prompt: string, skillId?: string): Promise<PromptResult>;
streamPrompt(sessionId: string, prompt: string, skillId?: string): Promise<ChatStreamPromptResult>;
onStreamEvent(listener: ChatStreamListener): () => void;
};
diagnostics: {
openControlUi(): Promise<void>;
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment