From 1cc3546b7e6f7516f58ed53571c0d07b75a83d05 Mon Sep 17 00:00:00 2001 From: Saoud Rizwan <7799382+saoudrizwan@users.noreply.github.com> Date: Thu, 26 Sep 2024 22:40:18 -0400 Subject: [PATCH] Initial streaming refactor --- package-lock.json | 11 +- package.json | 1 + src/api/index.ts | 5 +- src/api/providers/anthropic.ts | 18 +- src/core/ClaudeDev.ts | 743 +++++++++++++++++- src/core/webview/ClaudeDevProvider.ts | 2 +- src/shared/ExtensionMessage.ts | 12 +- src/{utils => shared}/array.ts | 0 webview-ui/src/components/chat/ChatRow.tsx | 9 +- webview-ui/src/components/chat/ChatView.tsx | 17 +- .../src/context/ExtensionStateContext.tsx | 67 +- 11 files changed, 805 insertions(+), 80 deletions(-) rename src/{utils => shared}/array.ts (100%) diff --git a/package-lock.json b/package-lock.json index 87e49a9..537dc79 100644 --- a/package-lock.json +++ b/package-lock.json @@ -1,18 +1,19 @@ { "name": "claude-dev", - "version": "1.9.3", + "version": "1.9.7", "lockfileVersion": 3, "requires": true, "packages": { "": { "name": "claude-dev", - "version": "1.9.3", + "version": "1.9.7", "license": "MIT", "dependencies": { "@anthropic-ai/bedrock-sdk": "^0.10.2", "@anthropic-ai/sdk": "^0.26.0", "@anthropic-ai/vertex-sdk": "^0.4.1", "@google/generative-ai": "^0.18.0", + "@streamparser/json": "^0.0.21", "@types/clone-deep": "^4.0.4", "@types/pdf-parse": "^1.1.4", "@types/turndown": "^5.0.5", @@ -4492,6 +4493,12 @@ "resolved": "https://registry.npmjs.org/tslib/-/tslib-2.6.3.tgz", "integrity": "sha512-xNvxJEOUiWPGhUuUdQgAJPKOOJfGnIyKySOc09XkKsgdUV/3E2zvwZYdejjmRgPCgcym1juLH3226yA7sEFJKQ==" }, + "node_modules/@streamparser/json": { + "version": "0.0.21", + "resolved": "https://registry.npmjs.org/@streamparser/json/-/json-0.0.21.tgz", + "integrity": "sha512-v+49JBiG1kmc/9Ug79Lz9wyKaRocBgCnpRaLpdy7p0d3ICKtOAfc/H/Epa1j3F6YdnzjnZKKrnJ8xnh/v1P8Aw==", + "license": "MIT" + }, "node_modules/@tootallnate/quickjs-emscripten": { "version": "0.23.0", "resolved": "https://registry.npmjs.org/@tootallnate/quickjs-emscripten/-/quickjs-emscripten-0.23.0.tgz", diff --git a/package.json b/package.json index 486fb0d..72740ac 100644 --- a/package.json +++ b/package.json @@ -149,6 +149,7 @@ "@anthropic-ai/sdk": "^0.26.0", "@anthropic-ai/vertex-sdk": "^0.4.1", "@google/generative-ai": "^0.18.0", + "@streamparser/json": "^0.0.21", "@types/clone-deep": "^4.0.4", "@types/pdf-parse": "^1.1.4", "@types/turndown": "^5.0.5", diff --git a/src/api/index.ts b/src/api/index.ts index ca8e106..1a2d557 100644 --- a/src/api/index.ts +++ b/src/api/index.ts @@ -1,4 +1,5 @@ import { Anthropic } from "@anthropic-ai/sdk" +import { Stream } from "@anthropic-ai/sdk/streaming" import { ApiConfiguration, ModelInfo } from "../shared/api" import { AnthropicHandler } from "./providers/anthropic" import { AwsBedrockHandler } from "./providers/bedrock" @@ -14,12 +15,14 @@ export interface ApiHandlerMessageResponse { userCredits?: number } +export type AnthropicStream = Stream + export interface ApiHandler { createMessage( systemPrompt: string, messages: Anthropic.Messages.MessageParam[], tools: Anthropic.Messages.Tool[] - ): Promise + ): Promise getModel(): { id: string; info: ModelInfo } } diff --git a/src/api/providers/anthropic.ts b/src/api/providers/anthropic.ts index 1b49a89..24ee279 100644 --- a/src/api/providers/anthropic.ts +++ b/src/api/providers/anthropic.ts @@ -1,5 +1,5 @@ import { Anthropic } from "@anthropic-ai/sdk" -import { ApiHandler, ApiHandlerMessageResponse } from "../index" +import { AnthropicStream, ApiHandler, ApiHandlerMessageResponse } from "../index" import { anthropicDefaultModelId, AnthropicModelId, @@ -24,7 +24,7 @@ export class AnthropicHandler implements ApiHandler { systemPrompt: string, messages: Anthropic.Messages.MessageParam[], tools: Anthropic.Messages.Tool[] - ): Promise { + ): Promise { const modelId = this.getModel().id switch (modelId) { case "claude-3-5-sonnet-20240620": @@ -39,7 +39,7 @@ export class AnthropicHandler implements ApiHandler { ) const lastUserMsgIndex = userMsgIndices[userMsgIndices.length - 1] ?? -1 const secondLastMsgUserIndex = userMsgIndices[userMsgIndices.length - 2] ?? -1 - const message = await this.client.beta.promptCaching.messages.create( + const stream = this.client.beta.promptCaching.messages.create( { model: modelId, max_tokens: this.getModel().info.maxTokens, @@ -69,6 +69,7 @@ export class AnthropicHandler implements ApiHandler { }), tools, // cache breakpoints go from tools > system > messages, and since tools dont change, we can just set the breakpoint at the end of system (this avoids having to set a breakpoint at the end of tools which by itself does not meet min requirements for haiku caching) tool_choice: { type: "auto" }, + stream: true, }, (() => { // prompt caching: https://x.com/alexalbert__/status/1823751995901272068 @@ -90,10 +91,14 @@ export class AnthropicHandler implements ApiHandler { } })() ) - return { message } + + return stream + + // throw new Error("Not implemented") + // return { message } } default: { - const message = await this.client.messages.create({ + const stream = await this.client.messages.create({ model: modelId, max_tokens: this.getModel().info.maxTokens, temperature: 0.2, @@ -101,8 +106,9 @@ export class AnthropicHandler implements ApiHandler { messages, tools, tool_choice: { type: "auto" }, + stream: true, }) - return { message } + return stream as AnthropicStream } } } diff --git a/src/core/ClaudeDev.ts b/src/core/ClaudeDev.ts index 62dc4a9..e4fbe22 100644 --- a/src/core/ClaudeDev.ts +++ b/src/core/ClaudeDev.ts @@ -1,14 +1,15 @@ import { Anthropic } from "@anthropic-ai/sdk" +import { PromptCachingBetaMessageStream } from "@anthropic-ai/sdk/lib/PromptCachingBetaMessageStream.mjs" +import { JSONParser, TokenType } from "@streamparser/json" import delay from "delay" import * as diff from "diff" import fs from "fs/promises" import os from "os" -import { SYSTEM_PROMPT } from "./prompts/system" import pWaitFor from "p-wait-for" import * as path from "path" import { serializeError } from "serialize-error" import * as vscode from "vscode" -import { ApiHandler, buildApiHandler } from "../api" +import { AnthropicStream, ApiHandler, buildApiHandler } from "../api" import { diagnosticsToProblemsString, getNewDiagnostics } from "../integrations/diagnostics" import { formatContentBlockToMarkdown } from "../integrations/misc/export-markdown" import { extractTextFromFile } from "../integrations/misc/extract-text" @@ -26,12 +27,14 @@ import { getApiMetrics } from "../shared/getApiMetrics" import { HistoryItem } from "../shared/HistoryItem" import { ToolName } from "../shared/Tool" import { ClaudeAskResponse } from "../shared/WebviewMessage" -import { findLast, findLastIndex } from "../utils/array" +import { findLast, findLastIndex } from "../shared/array" import { arePathsEqual } from "../utils/path" import { parseMentions } from "./mentions" +import { SYSTEM_PROMPT } from "./prompts/system" import { TOOLS } from "./prompts/tools" import { truncateHalfConversation } from "./sliding-window" import { ClaudeDevProvider } from "./webview/ClaudeDevProvider" +import cloneDeep from "clone-deep" const cwd = vscode.workspace.workspaceFolders?.map((folder) => folder.uri.fsPath).at(0) ?? path.join(os.homedir(), "Desktop") // may or may not exist but fs checking existence would immediately ask for permission which would be bad UX, need to come up with a better solution @@ -41,6 +44,10 @@ type UserContent = Array< Anthropic.TextBlockParam | Anthropic.ImageBlockParam | Anthropic.ToolUseBlockParam | Anthropic.ToolResultBlockParam > +type AnthropicPartialContentBlock = Anthropic.Messages.ContentBlock & { + partial?: boolean +} + export class ClaudeDev { readonly taskId: string private api: ApiHandler @@ -171,6 +178,7 @@ export class ClaudeDev { } private async saveClaudeMessages() { + console.log("Saving claude messages...") try { const filePath = path.join(await this.ensureTaskDirectoryExists(), "claude_messages.json") await fs.writeFile(filePath, JSON.stringify(this.claudeMessages)) @@ -199,21 +207,107 @@ export class ClaudeDev { } } + // partial has three valid states true (partial message), false (completion of partial message), undefined (individual complete message) async ask( type: ClaudeAsk, - question?: string + text?: string, + partial?: boolean ): Promise<{ response: ClaudeAskResponse; text?: string; images?: string[] }> { // If this ClaudeDev instance was aborted by the provider, then the only thing keeping us alive is a promise still running in the background, in which case we don't want to send its result to the webview as it is attached to a new instance of ClaudeDev now. So we can safely ignore the result of any active promises, and this class will be deallocated. (Although we set claudeDev = undefined in provider, that simply removes the reference to this instance, but the instance is still alive until this promise resolves or rejects.) if (this.abort) { throw new Error("ClaudeDev instance aborted") } - this.askResponse = undefined - this.askResponseText = undefined - this.askResponseImages = undefined - const askTs = Date.now() - this.lastMessageTs = askTs - await this.addToClaudeMessages({ ts: askTs, type: "ask", ask: type, text: question }) - await this.providerRef.deref()?.postStateToWebview() + + // + + let askTs: number + + if (partial !== undefined) { + console.log(13) + const lastMessage = this.claudeMessages.at(-1) + const isUpdatingPreviousPartial = + lastMessage && lastMessage.partial && lastMessage.type === "ask" && lastMessage.ask === type + if (partial) { + console.log(14) + if (isUpdatingPreviousPartial) { + // existing partial message, so update it + lastMessage.text = text + lastMessage.partial = partial + // todo be more efficient about saving and posting only new data or one whole message at a time so ignore partial for saves, and only post parts of partial message instead of whole array in new listener + + // await this.saveClaudeMessages() + // await this.providerRef.deref()?.postStateToWebview() + await this.providerRef + .deref() + ?.postMessageToWebview({ type: "partialMessage", partialMessage: lastMessage }) + throw new Error("Current ask promise was ignored") + } else { + console.log(15) + // this is a new partial message, so add it with partial state + // this.askResponse = undefined + // this.askResponseText = undefined + // this.askResponseImages = undefined + // askTs = Date.now() + // this.lastMessageTs = askTs + await this.addToClaudeMessages({ ts: Date.now(), type: "ask", ask: type, text, partial }) + await this.providerRef.deref()?.postStateToWebview() + throw new Error("Current ask promise was ignored") + } + } else { + console.log(16) + // partial=false means its a complete version of a previously partial message + if (isUpdatingPreviousPartial) { + console.log(17) + // this is the complete version of a previously partial message, so replace the partial with the complete version + this.askResponse = undefined + this.askResponseText = undefined + this.askResponseImages = undefined + askTs = Date.now() + this.lastMessageTs = askTs + lastMessage.ts = askTs + lastMessage.text = text + lastMessage.partial = false + await this.saveClaudeMessages() + await this.providerRef.deref()?.postStateToWebview() + } else { + console.log(18) + // this is a new partial=false message, so add it like normal + this.askResponse = undefined + this.askResponseText = undefined + this.askResponseImages = undefined + askTs = Date.now() + this.lastMessageTs = askTs + await this.addToClaudeMessages({ ts: askTs, type: "ask", ask: type, text }) + await this.providerRef.deref()?.postStateToWebview() + } + } + } else { + // this is a new non-partial message, so add it like normal + console.log(19) + // const lastMessage = this.claudeMessages.at(-1) + this.askResponse = undefined + this.askResponseText = undefined + this.askResponseImages = undefined + askTs = Date.now() + this.lastMessageTs = askTs + await this.addToClaudeMessages({ ts: askTs, type: "ask", ask: type, text }) + await this.providerRef.deref()?.postStateToWebview() + } + + // + + // if (partial) { + // const lastMessage = this.claudeMessages.at(-1) + // if (lastMessage && lastMessage.type === "ask" && lastMessage.ask === type) { + // lastMessage.text = text + // lastMessage.partial = partial + // // todo be more efficient about saving and posting only new data or one whoe message at atime so ignore partial for saves, and only post parts of partial message instead of whole array in new listener + // await this.saveClaudeMessages() + // await this.providerRef.deref()?.postStateToWebview() + // throw new Error("Current ask promise was ignored") + // } + // } + await pWaitFor(() => this.askResponse !== undefined || this.lastMessageTs !== askTs, { interval: 100 }) if (this.lastMessageTs !== askTs) { throw new Error("Current ask promise was ignored") // could happen if we send multiple asks in a row i.e. with command_output. It's important that when we know an ask could fail, it is handled gracefully @@ -225,14 +319,61 @@ export class ClaudeDev { return result } - async say(type: ClaudeSay, text?: string, images?: string[]): Promise { + async say(type: ClaudeSay, text?: string, images?: string[], partial?: boolean): Promise { if (this.abort) { throw new Error("ClaudeDev instance aborted") } - const sayTs = Date.now() - this.lastMessageTs = sayTs - await this.addToClaudeMessages({ ts: sayTs, type: "say", say: type, text: text, images }) - await this.providerRef.deref()?.postStateToWebview() + + if (partial !== undefined) { + const lastMessage = this.claudeMessages.at(-1) + const isUpdatingPreviousPartial = + lastMessage && lastMessage.partial && lastMessage.type === "say" && lastMessage.say === type + if (partial) { + if (isUpdatingPreviousPartial) { + // existing partial message, so update it + lastMessage.text = text + lastMessage.images = images + lastMessage.partial = partial + // await this.saveClaudeMessages() + // await this.providerRef.deref()?.postStateToWebview() + await this.providerRef + .deref() + ?.postMessageToWebview({ type: "partialMessage", partialMessage: lastMessage }) + } else { + // this is a new partial message, so add it with partial state + + await this.addToClaudeMessages({ ts: Date.now(), type: "say", say: type, text, images, partial }) + await this.providerRef.deref()?.postStateToWebview() + } + } else { + // partial=false means its a complete version of a previously partial message + if (isUpdatingPreviousPartial) { + // this is the complete version of a previously partial message, so replace the partial with the complete version + const sayTs = Date.now() + this.lastMessageTs = sayTs + lastMessage.ts = sayTs + lastMessage.text = text + lastMessage.images = images + lastMessage.partial = false + + // instead of streaming partialMessage events, we do a save and post like normal to persist to disk + await this.saveClaudeMessages() + await this.providerRef.deref()?.postStateToWebview() + } else { + // this is a new partial=false message, so add it like normal + const sayTs = Date.now() + this.lastMessageTs = sayTs + await this.addToClaudeMessages({ ts: sayTs, type: "say", say: type, text, images }) + await this.providerRef.deref()?.postStateToWebview() + } + } + } else { + // this is a new non-partial message, so add it like normal + const sayTs = Date.now() + this.lastMessageTs = sayTs + await this.addToClaudeMessages({ ts: sayTs, type: "say", say: type, text, images }) + await this.providerRef.deref()?.postStateToWebview() + } } private async startTask(task?: string, images?: string[]): Promise { @@ -1432,7 +1573,7 @@ export class ClaudeDev { ] } - async attemptApiRequest(): Promise { + async attemptApiRequest(): Promise { try { let systemPrompt = await SYSTEM_PROMPT(cwd, this.api.getModel().info.supportsImages) if (this.customInstructions && this.customInstructions.trim()) { @@ -1467,16 +1608,12 @@ ${this.customInstructions.trim()} await this.overwriteApiConversationHistory(truncatedMessages) } } - const { message, userCredits } = await this.api.createMessage( + const stream = await this.api.createMessage( systemPrompt, this.apiConversationHistory, TOOLS(cwd, this.api.getModel().info.supportsImages) ) - if (userCredits !== undefined) { - console.log("Updating credits", userCredits) - // TODO: update credits - } - return message + return stream } catch (error) { const { response } = await this.ask( "api_req_failed", @@ -1491,6 +1628,385 @@ ${this.customInstructions.trim()} } } + private currentStreamingContentBlockIndex = 0 + private assistantContentBlocks: AnthropicPartialContentBlock[] = [] + private toolResults: Anthropic.ToolResultBlockParam[] = [] + private toolResultsReady = false + private didRejectTool = false + + // lock so it doesnt get spammed ie pwatifor? + private isLocked = false + async presentAssistantContent() { + if (this.isLocked) { + console.log("isLocked") + return + } + this.isLocked = true + + // when current index finished, then increment and call stream claude content again if contentblocks length has one more. + // otherwise check isStreamingComplete, and set toolResultReady for function to continue + // if length is more than currentstreamingindex, then ignore it since when currentstreaming is finished it will call this func again + + // if (this.currentStreamingContentBlockIndex !== this.assistantContentBlocks.length - 1) { + // console.log(10) + // console.log("currentStreamingContentBlockIndex", this.currentStreamingContentBlockIndex) + // console.log("assistantContentBlocks.length", this.assistantContentBlocks.length) + // // new content past the current streaming index, ignore for now + // // this function will be called one last time for a completed block + // return + // } + + const block = cloneDeep(this.assistantContentBlocks[this.currentStreamingContentBlockIndex]) // need to create copy bc while stream is updating the array, it could be updating the reference block properties too + switch (block.type) { + case "text": + await this.say("text", block.text, undefined, block.partial) + break + case "tool_use": + const toolName = block.name as ToolName + const toolInput = block.input as any + const toolUseId = block.id + + if (this.didRejectTool) { + // ignore any tool content after user has rejected tool once + // we'll fill it in with a rejection message when the message is complete + if (!block.partial) { + this.toolResults.push({ + type: "tool_result", + tool_use_id: toolUseId, + content: "Skipping tool execution due to previous tool user rejection.", + }) + } + break + } + + const askApproval = async (type: ClaudeAsk, partialMessage?: string) => { + const { response, text, images } = await this.ask(type, partialMessage, false) + if (response !== "yesButtonTapped") { + if (response === "messageResponse") { + await this.say("user_feedback", text, images) + // this.toolResults.push() + // const [didUserReject, result] = await this.executeTool(toolName, toolInput) + this.toolResults.push({ + type: "tool_result", + tool_use_id: toolUseId, + content: this.formatToolResponseWithImages( + await this.formatToolDeniedFeedback(text), + images + ), + }) + this.didRejectTool = true + return false + } + + this.toolResults.push({ + type: "tool_result", + tool_use_id: toolUseId, + content: await this.formatToolDenied(), + }) + this.didRejectTool = true + return false + } + return true + } + + const handleError = async (action: string, error: Error) => { + const errorString = `Error ${action}: ${JSON.stringify(serializeError(error))}` + await this.say( + "error", + `Error ${action}:\n${error.message ?? JSON.stringify(serializeError(error), null, 2)}` + ) + this.toolResults.push({ + type: "tool_result", + tool_use_id: toolUseId, + content: await this.formatToolError(errorString), + }) + } + + const pushToolResult = ( + content: string | Array + ) => { + this.toolResults.push({ + type: "tool_result", + tool_use_id: toolUseId, + content, + }) + } + + switch (toolName) { + case "read_file": { + const relPath: string | undefined = toolInput.path + const sharedMessageProps: ClaudeSayTool = { + tool: "readFile", + path: relPath || "", //this.getReadablePath(relPath || ""), + } + try { + if (block.partial) { + const partialMessage = JSON.stringify({ + ...sharedMessageProps, + content: undefined, + } satisfies ClaudeSayTool) + if (this.alwaysAllowReadOnly) { + await this.say("tool", partialMessage, undefined, block.partial) + } else { + await this.ask("tool", partialMessage, block.partial).catch(() => {}) + } + break + } else { + if (!relPath) { + this.consecutiveMistakeCount++ + pushToolResult(await this.sayAndCreateMissingParamError("read_file", "path")) + break + } + this.consecutiveMistakeCount = 0 + const absolutePath = path.resolve(cwd, relPath) + const completeMessage = JSON.stringify({ + ...sharedMessageProps, + content: absolutePath, + } satisfies ClaudeSayTool) + if (this.alwaysAllowReadOnly) { + await this.say("tool", completeMessage, undefined, false) // need to be sending partialValue bool, since undefined has its own purpose in that the message is treated neither as a partial or completion of a partial, but as a single complete message + } else { + const didApprove = await askApproval("tool", completeMessage) + if (!didApprove) { + break + } + } + // now execute the tool like normal + const content = await extractTextFromFile(absolutePath) + pushToolResult(content) + break + } + } catch (error) { + await handleError("reading file", error) + break + } + } + case "list_files": { + const relDirPath: string | undefined = toolInput.path + const recursiveRaw: string | undefined = toolInput.path + const recursive = recursiveRaw?.toLowerCase() === "true" + const sharedMessageProps: ClaudeSayTool = { + tool: !recursive ? "listFilesTopLevel" : "listFilesRecursive", + path: relDirPath || "", + } + try { + if (block.partial) { + const partialMessage = JSON.stringify({ + ...sharedMessageProps, + content: "", + } satisfies ClaudeSayTool) + if (this.alwaysAllowReadOnly) { + await this.say("tool", partialMessage, undefined, block.partial) + } else { + await this.ask("tool", partialMessage, block.partial).catch(() => {}) + } + break + } else { + if (!relDirPath) { + this.consecutiveMistakeCount++ + pushToolResult(await this.sayAndCreateMissingParamError("list_files", "path")) + break + } + this.consecutiveMistakeCount = 0 + const absolutePath = path.resolve(cwd, relDirPath) + const [files, didHitLimit] = await listFiles(absolutePath, recursive, 200) + const result = this.formatFilesList(absolutePath, files, didHitLimit) + const completeMessage = JSON.stringify({ + ...sharedMessageProps, + content: result, + } satisfies ClaudeSayTool) + if (this.alwaysAllowReadOnly) { + await this.say("tool", completeMessage, undefined, false) + } else { + const didApprove = await askApproval("tool", completeMessage) + if (!didApprove) { + break + } + } + pushToolResult(await this.formatToolResult(result)) + break + } + } catch (error) { + await handleError("reading file", error) + break + } + } + + // case "write_to_file": + // return this.writeToFile(toolInput.path, toolInput.content) + + // case "list_files": + // return this.listFiles(toolInput.path, toolInput.recursive) + // case "list_code_definition_names": + // return this.listCodeDefinitionNames(toolInput.path) + // case "search_files": + // return this.searchFiles(toolInput.path, toolInput.regex, toolInput.filePattern) + // case "execute_command": + // return this.executeCommand(toolInput.command) + // case "inspect_site": + // return this.inspectSite(toolInput.url) + // case "ask_followup_question": + // return this.askFollowupQuestion(toolInput.question) + // case "attempt_completion": + // return this.attemptCompletion(toolInput.result, toolInput.command) + // default: + // return [false, `Unknown tool: ${toolName}`] + } + + break + } + + console.log("unlocking") + this.isLocked = false + + console.log(4) + if (!block.partial) { + console.log(5) + // content is complete, call next block if it exists (if not then read stream will call it when its ready) + // even if this.didRejectTool, we still need to fill in the tool results with rejection messages + this.currentStreamingContentBlockIndex++ // need to increment regardless, so when read stream calls this functio again it will be streaming the next block + + if (this.currentStreamingContentBlockIndex < this.assistantContentBlocks.length) { + console.log(6) + + // there are already more content blocks to stream, so we'll call this function ourselves + // await this.presentAssistantContent() + this.presentAssistantContent() + } + } + } + + private partialJsonParser: JSONParser | undefined + // object being built incrementally + private partialObject: Record = {} + private currentKey = "" + private currentValue = "" + private parsingKey: boolean = false + private parsingValue: boolean = false + updateAssistantContentWithPartialJson(chunkIndex: number, partialJson: string): Promise { + return new Promise((resolve, reject) => { + const timeoutId = setTimeout(() => { + // may happen if json parsing class does call onToken, which *shouldnt* happen if passing in non-empty string + reject(new Error("Parsing JSON took too long (> 5 seconds)")) + }, 5_000) + + const cleanupAndResolve = () => { + clearTimeout(timeoutId) + resolve() + } + + const cleanupAndReject = (error: Error) => { + clearTimeout(timeoutId) + reject(error) + } + + if (!this.partialJsonParser) { + this.partialJsonParser = new JSONParser({ emitPartialTokens: true, emitPartialValues: true }) + + // this package enforces setting up an onValue listener ("Can't emit data before the "onValue" callback has been set up."), even though we don't need it. + this.partialJsonParser.onValue = () => { + console.log("onValue") + } + + this.partialJsonParser.onError = (error) => { + console.error("Error parsing input_json_delta", error) + } + this.partialJsonParser.onEnd = () => { + console.log("onEnd") + } + } + + // our json will only ever be string to string maps + // { "key": "value", "key2": "value2" } + // so left brace, string, colon, comma, right brace + // Handle each token emitted by the parser + // need to recreate this listener each time to update the resolve ref + this.partialJsonParser.onToken = async ({ token, value, offset, partial }) => { + console.log("onToken") + try { + switch (token) { + case TokenType.LEFT_BRACE: + // Start of a new JSON object + this.partialObject = {} + this.currentKey = "" + this.parsingKey = false + this.parsingValue = false + break + case TokenType.RIGHT_BRACE: + // End of the current JSON object + this.currentKey = "" + this.currentValue = "" + this.parsingKey = false + this.parsingValue = false + + // Finalize the object once parsing is complete + // ;(this.assistantContentBlocks[chunkIndex] as Anthropic.ToolUseBlock).input = this.partialObject + // this.assistantContentBlocks[chunkIndex]!.partial = false + // await this.presentAssistantContent() // NOTE: only set partial = false and call this once, since doing it several times will create duplicate messages. + console.log("Final parsed object:", this.partialObject) + break + case TokenType.STRING: + if (!this.parsingValue && !this.parsingKey) { + // Starting to parse a key + this.currentKey = value as string + this.parsingKey = !!partial // if not partial, we are done parsing key + } else if (this.parsingKey) { + // Continuing to parse a key + this.currentKey = value as string + this.parsingKey = !!partial + } else if (this.parsingValue) { + // Parsing a value + // Accumulate partial value and update the object + this.currentValue = value as string + if (this.currentKey) { + this.partialObject[this.currentKey] = this.currentValue + } + this.parsingValue = !!partial // if not partial, complete value + } + break + case TokenType.COLON: + // After a key and colon, expect a value + if (this.currentKey !== null) { + this.parsingValue = true + } + break + case TokenType.COMMA: + // Reset for the next key-value pair + this.currentKey = "" + this.currentValue = "" + this.parsingKey = false + this.parsingValue = false + break + default: + console.error("Unexpected token:", token) + } + + // Debugging logs to trace the parsing process + console.log("Partial object:", this.partialObject) + console.log("Offset:", offset, "isPartialToken:", partial) + + // Update the contentBlock with the current state of the partial object + // Use spread operator to ensure a new object reference + ;(this.assistantContentBlocks[chunkIndex] as Anthropic.ToolUseBlock).input = { + ...this.partialObject, + } + // right brace indicates the end of the json object + this.assistantContentBlocks[chunkIndex]!.partial = token !== TokenType.RIGHT_BRACE + cleanupAndResolve() + } catch (error) { + cleanupAndReject(error) + } + } + + try { + this.partialJsonParser.write(partialJson) + } catch (error) { + console.error("Error parsing input_json_delta", error) + cleanupAndReject(error) + } + }) + } + async recursivelyMakeClaudeRequests( userContent: UserContent, includeFileDetails: boolean = false @@ -1595,23 +2111,182 @@ ${this.customInstructions.trim()} await this.providerRef.deref()?.postStateToWebview() try { - const response = await this.attemptApiRequest() + const stream = await this.attemptApiRequest() + let cacheCreationInputTokens = 0 + let cacheReadInputTokens = 0 + let inputTokens = 0 + let outputTokens = 0 + // todo add error listeners so we can return api error? or wil lfor await handle that below? + + // let contentBlocks: AnthropicPartialContentBlock[] = [] + this.assistantContentBlocks = [] + + this.currentStreamingContentBlockIndex = 0 + + for await (const chunk of stream) { + switch (chunk.type) { + case "message_start": + console.log("message_start", chunk.message.content, chunk.message.usage) + // tells us cache reads/writes/input/output + const usage = chunk.message.usage + cacheCreationInputTokens += usage.cache_creation_input_tokens || 0 + cacheReadInputTokens += usage.cache_read_input_tokens || 0 + inputTokens += usage.input_tokens || 0 + outputTokens += usage.output_tokens || 0 + break + case "message_delta": + // tells us stop_reason, stop_sequence, and output tokens along the way and at the end of the message + console.log("message_delta", chunk.delta, chunk.usage) + outputTokens += chunk.usage.output_tokens || 0 + break + case "message_stop": + // no usage data, just an indicator that the message is done + console.log("message_stop", chunk.type) + break + case "content_block_start": + console.log("content_block_start", chunk.index) + // await delay(4_000) + switch (chunk.content_block.type) { + case "text": + console.log("text", chunk.content_block.text) + this.assistantContentBlocks.push(chunk.content_block) + this.assistantContentBlocks.at(-1)!.partial = true + this.presentAssistantContent() + break + case "tool_use": + console.log( + "tool_use", + chunk.index, + chunk.content_block.id, + chunk.content_block.name, + chunk.content_block.input // input is always object, which will be streamed as partial json in content_block_delta. (this initial 'input' will always be an empty object) + ) + this.assistantContentBlocks.push(chunk.content_block) + this.assistantContentBlocks.at(-1)!.partial = true + this.presentAssistantContent() + // Initialize the JSON parser with partial tokens enabled + // partialJsonParser = + } + break + case "content_block_delta": + console.log("content_block_delta", chunk.index) + switch (chunk.delta.type) { + case "text_delta": + console.log("text_delta", chunk.delta.text) + ;(this.assistantContentBlocks[chunk.index] as Anthropic.TextBlock).text += + chunk.delta.text + this.presentAssistantContent() + break + case "input_json_delta": + console.log("input_json_delta", chunk.delta.partial_json) + // try { + // partialJsonParser?.write(chunk.delta.partial_json) + // } catch (error) { + // console.error("Error parsing input_json_delta", error) + // } + + try { + // JSONParser will always give us a token unless we pass in an empty/undefined value (in which case the promise would never resolve) + if (chunk.delta.partial_json) { + // need to await this since we dont want to create multiple jsonparsers in case the read stream comes in faster than the jsonparser can parse + await this.updateAssistantContentWithPartialJson( + chunk.index, + chunk.delta.partial_json + ) + } + } catch (error) { + // may be due to timeout, in which case we can safely ignore + console.error("Error parsing input_json_delta", error) + } + this.presentAssistantContent() + break + } + break + case "content_block_stop": + if (this.assistantContentBlocks[chunk.index]!.type === "text") { + // we only call this for text block since partialJsonParser handles calling this for tool_use blocks (we only eve want to set partial to false and presentAssistantContent once for each block) + console.log(11) + this.assistantContentBlocks[chunk.index]!.partial = false + this.presentAssistantContent() + } + + console.log("content_block_stop", chunk.index) + + // instead of calling .end ourselves, the parser will automatically call it when it sees the end of the json object. (Calling this here can result in "Tokenizer ended in the middle of a token (state: ENDED). Either not all the data was received or the data was invalid." since there is a delay between the last chunk.delta.partial_json and the end of the content_block_delta) + // partialJsonParser?.end() + break + } + } + + console.log("contentBlocks", this.assistantContentBlocks) + + let totalCost: string | undefined + + await this.say( + "api_req_finished", + JSON.stringify({ + tokensIn: inputTokens, + tokensOut: outputTokens, + cacheWrites: cacheCreationInputTokens, + cacheReads: cacheReadInputTokens, + cost: + totalCost || + this.calculateApiCost( + inputTokens, + outputTokens, + cacheCreationInputTokens, + cacheReadInputTokens + ), + }) + ) + + // now add to apiconversationhistory + // need to save assistant responses to file before proceeding to tool use since user can exit at any moment and we wouldn't be able to save the assistant's response + let didEndLoop = false // do we need this + if (this.assistantContentBlocks.length > 0) { + await this.addToApiConversationHistory({ role: "assistant", content: this.assistantContentBlocks }) + + await pWaitFor(() => this.toolResultsReady) + + const { + didEndLoop: recDidEndLoop, + inputTokens: recInputTokens, + outputTokens: recOutputTokens, + } = await this.recursivelyMakeClaudeRequests(this.toolResults) + didEndLoop = recDidEndLoop + inputTokens += recInputTokens + outputTokens += recOutputTokens + } else { + // this should never happen! it there's no assistant_responses, that means we got no text or tool_use content blocks from API which we should assume is an error + await this.say( + "error", + "Unexpected API Response: The language model did not provide any assistant messages. This may indicate an issue with the API or the model's output." + ) + await this.addToApiConversationHistory({ + role: "assistant", + content: [{ type: "text", text: "Failure: I did not provide a response." }], + }) + } + + // return { didEndLoop: false, inputTokens: 0, outputTokens: 0 } // fix + + throw new Error("ClaudeDev fail") if (this.abort) { throw new Error("ClaudeDev instance aborted") } let assistantResponses: Anthropic.Messages.ContentBlock[] = [] - let inputTokens = response.usage.input_tokens - let outputTokens = response.usage.output_tokens - let cacheCreationInputTokens = - (response as Anthropic.Beta.PromptCaching.Messages.PromptCachingBetaMessage).usage - .cache_creation_input_tokens || undefined - let cacheReadInputTokens = - (response as Anthropic.Beta.PromptCaching.Messages.PromptCachingBetaMessage).usage - .cache_read_input_tokens || undefined + // let inputTokens = response.usage.input_tokens + // let outputTokens = response.usage.output_tokens + // let cacheCreationInputTokens = + // (response as Anthropic.Beta.PromptCaching.Messages.PromptCachingBetaMessage).usage + // .cache_creation_input_tokens || undefined + // let cacheReadInputTokens = + // (response as Anthropic.Beta.PromptCaching.Messages.PromptCachingBetaMessage).usage + // .cache_read_input_tokens || undefined // @ts-ignore-next-line - let totalCost = response.usage.total_cost + // let totalCost = response.usage.total_cost await this.say( "api_req_finished", @@ -1688,7 +2363,7 @@ ${this.customInstructions.trim()} } } - let didEndLoop = false + // let didEndLoop = false // attempt_completion is always done last, since there might have been other tools that needed to be called first before the job is finished // it's important to note that claude will order the tools logically in most cases, so we don't have to think about which tools make sense calling before others diff --git a/src/core/webview/ClaudeDevProvider.ts b/src/core/webview/ClaudeDevProvider.ts index 7996feb..4cc2d46 100644 --- a/src/core/webview/ClaudeDevProvider.ts +++ b/src/core/webview/ClaudeDevProvider.ts @@ -4,7 +4,7 @@ import { ClaudeDev } from "../ClaudeDev" import { ApiProvider } from "../../shared/api" import { ExtensionMessage } from "../../shared/ExtensionMessage" import { WebviewMessage } from "../../shared/WebviewMessage" -import { findLast } from "../../utils/array" +import { findLast } from "../../shared/array" import { getNonce } from "./getNonce" import { getUri } from "./getUri" import { selectImages } from "../../integrations/misc/process-images" diff --git a/src/shared/ExtensionMessage.ts b/src/shared/ExtensionMessage.ts index 4a92839..072740c 100644 --- a/src/shared/ExtensionMessage.ts +++ b/src/shared/ExtensionMessage.ts @@ -5,7 +5,15 @@ import { HistoryItem } from "./HistoryItem" // webview will hold state export interface ExtensionMessage { - type: "action" | "state" | "selectedImages" | "ollamaModels" | "theme" | "workspaceUpdated" | "invoke" + type: + | "action" + | "state" + | "selectedImages" + | "ollamaModels" + | "theme" + | "workspaceUpdated" + | "invoke" + | "partialMessage" text?: string action?: "chatButtonTapped" | "settingsButtonTapped" | "historyButtonTapped" | "didBecomeVisible" invoke?: "sendMessage" | "primaryButtonClick" | "secondaryButtonClick" @@ -13,6 +21,7 @@ export interface ExtensionMessage { images?: string[] models?: string[] filePaths?: string[] + partialMessage?: ClaudeMessage } export interface ExtensionState { @@ -33,6 +42,7 @@ export interface ClaudeMessage { say?: ClaudeSay text?: string images?: string[] + partial?: boolean } export type ClaudeAsk = diff --git a/src/utils/array.ts b/src/shared/array.ts similarity index 100% rename from src/utils/array.ts rename to src/shared/array.ts diff --git a/webview-ui/src/components/chat/ChatRow.tsx b/webview-ui/src/components/chat/ChatRow.tsx index e1f179b..d6ce800 100644 --- a/webview-ui/src/components/chat/ChatRow.tsx +++ b/webview-ui/src/components/chat/ChatRow.tsx @@ -741,10 +741,11 @@ const ProgressIndicator = () => ( const Markdown = memo(({ markdown }: { markdown?: string }) => { // react-markdown lets us customize elements, so here we're using their example of replacing code blocks with SyntaxHighlighter. However when there are no language matches (` or ``` without a language specifier) then we default to a normal code element for inline code. Code blocks without a language specifier shouldn't be a common occurrence as we prompt Claude to always use a language specifier. // when claude wraps text in thinking tags, he doesnt use line breaks so we need to insert those ourselves to render markdown correctly - const parsed = markdown?.replace(/([\s\S]*?)<\/thinking>/g, (match, content) => { - return content - // return `__\n\n${content}\n\n__` - }) + // const parsed = markdown?.replace(/([\s\S]*?)<\/thinking>/g, (match, content) => { + // return content + // // return `__\n\n${content}\n\n__` + // }) + const parsed = markdown return (
{ const message: ExtensionMessage = event.data - if (message.type === "state" && message.state) { - setState(message.state) - const config = message.state?.apiConfiguration - const hasKey = config - ? [ - config.apiKey, - config.openRouterApiKey, - config.awsRegion, - config.vertexProjectId, - config.openAiApiKey, - config.ollamaModelId, - config.geminiApiKey, - config.openAiNativeApiKey, - ].some((key) => key !== undefined) - : false - setShowWelcome(!hasKey) - setDidHydrateState(true) - } - if (message.type === "theme" && message.text) { - setTheme(convertTextMateToHljs(JSON.parse(message.text))) - } - if (message.type === "workspaceUpdated" && message.filePaths) { - setFilePaths(message.filePaths) + switch (message.type) { + case "state": { + setState(message.state!) + const config = message.state?.apiConfiguration + const hasKey = config + ? [ + config.apiKey, + config.openRouterApiKey, + config.awsRegion, + config.vertexProjectId, + config.openAiApiKey, + config.ollamaModelId, + config.geminiApiKey, + config.openAiNativeApiKey, + ].some((key) => key !== undefined) + : false + setShowWelcome(!hasKey) + setDidHydrateState(true) + break + } + case "theme": { + if (message.text) { + setTheme(convertTextMateToHljs(JSON.parse(message.text))) + } + break + } + case "workspaceUpdated": { + setFilePaths(message.filePaths ?? []) + break + } + case "partialMessage": { + const partialMessage = message.partialMessage! + setState((prevState) => { + // worth noting it will never be possible for a more up-to-date message to be sent here or in normal messages post since the presentAssistantContent function uses lock + const lastIndex = findLastIndex(prevState.claudeMessages, (msg) => msg.ts === partialMessage.ts) + if (lastIndex !== -1) { + const newClaudeMessages = [...prevState.claudeMessages] + newClaudeMessages[lastIndex] = partialMessage + return { ...prevState, claudeMessages: newClaudeMessages } + } + return prevState + }) + } } }, [])