Add API streaming failed error handling

This commit is contained in:
Saoud Rizwan
2024-09-30 19:52:00 -04:00
parent 9b1b9c10a1
commit 42bcc4420d
3 changed files with 110 additions and 62 deletions

View File

@@ -21,7 +21,14 @@ import { ApiConfiguration } from "../shared/api"
import { findLastIndex } from "../shared/array"
import { combineApiRequests } from "../shared/combineApiRequests"
import { combineCommandSequences } from "../shared/combineCommandSequences"
import { ClaudeApiReqInfo, ClaudeAsk, ClaudeMessage, ClaudeSay, ClaudeSayTool } from "../shared/ExtensionMessage"
import {
ClaudeApiReqCancelReason,
ClaudeApiReqInfo,
ClaudeAsk,
ClaudeMessage,
ClaudeSay,
ClaudeSayTool,
} from "../shared/ExtensionMessage"
import { getApiMetrics } from "../shared/getApiMetrics"
import { HistoryItem } from "../shared/HistoryItem"
import { ToolName } from "../shared/Tool"
@@ -1600,7 +1607,7 @@ export class ClaudeDev {
// update api_req_started. we can't use api_req_finished anymore since it's a unique case where it could come after a streaming message (ie in the middle of being updated or executed)
// fortunately api_req_finished was always parsed out for the gui anyways, so it remains solely for legacy purposes to keep track of prices in tasks from history
// (it's worth removing a few months from now)
const updateApiReqMsg = (cancelled?: boolean) => {
const updateApiReqMsg = (cancelReason?: ClaudeApiReqCancelReason, streamingFailedMessage?: string) => {
this.claudeMessages[lastApiReqIndex].text = JSON.stringify({
...JSON.parse(this.claudeMessages[lastApiReqIndex].text || "{}"),
tokensIn: inputTokens,
@@ -1616,10 +1623,51 @@ export class ClaudeDev {
cacheWriteTokens,
cacheReadTokens
),
cancelled,
cancelReason,
streamingFailedMessage,
} satisfies ClaudeApiReqInfo)
}
const abortStream = async (cancelReason: ClaudeApiReqCancelReason, streamingFailedMessage?: string) => {
if (this.diffViewProvider.isEditing) {
await this.diffViewProvider.revertChanges() // closes diff view
}
// if last message is a partial we need to update and save it
const lastMessage = this.claudeMessages.at(-1)
if (lastMessage && lastMessage.partial) {
lastMessage.ts = Date.now()
lastMessage.partial = false
// instead of streaming partialMessage events, we do a save and post like normal to persist to disk
console.log("updating partial message", lastMessage)
// await this.saveClaudeMessages()
}
// Let assistant know their response was interrupted for when task is resumed
await this.addToApiConversationHistory({
role: "assistant",
content: [
{
type: "text",
text:
assistantMessage +
`\n\n[${
cancelReason === "streaming_failed"
? "Response interrupted by API Error"
: "Response interrupted by user"
}]`,
},
],
})
// update api_req_started to have cancelled and cost, so that we can display the cost of the partial stream
updateApiReqMsg(cancelReason, streamingFailedMessage)
await this.saveClaudeMessages()
// signals to provider that it can retrieve the saved messages from disk, as abortTask can not be awaited on in nature
this.didFinishAborting = true
}
// reset streaming state
this.currentStreamingContentIndex = 0
this.assistantMessageContent = []
@@ -1632,52 +1680,36 @@ export class ClaudeDev {
await this.diffViewProvider.reset()
let assistantMessage = ""
// TODO: handle error being thrown in stream
for await (const chunk of stream) {
switch (chunk.type) {
case "usage":
inputTokens += chunk.inputTokens
outputTokens += chunk.outputTokens
cacheWriteTokens += chunk.cacheWriteTokens ?? 0
cacheReadTokens += chunk.cacheReadTokens ?? 0
totalCost = chunk.totalCost
break
case "text":
assistantMessage += chunk.text
this.parseAssistantMessage(assistantMessage)
this.presentAssistantMessage()
break
try {
for await (const chunk of stream) {
switch (chunk.type) {
case "usage":
inputTokens += chunk.inputTokens
outputTokens += chunk.outputTokens
cacheWriteTokens += chunk.cacheWriteTokens ?? 0
cacheReadTokens += chunk.cacheReadTokens ?? 0
totalCost = chunk.totalCost
break
case "text":
assistantMessage += chunk.text
this.parseAssistantMessage(assistantMessage)
this.presentAssistantMessage()
break
}
if (this.abort) {
console.log("aborting stream...")
await abortStream("user_cancelled")
break // aborts the stream
}
}
if (this.abort) {
console.log("aborting stream...")
if (this.diffViewProvider.isEditing) {
await this.diffViewProvider.revertChanges() // closes diff view
}
// if last message is a partial we need to save it
const lastMessage = this.claudeMessages.at(-1)
if (lastMessage && lastMessage.partial) {
lastMessage.ts = Date.now()
lastMessage.partial = false
// instead of streaming partialMessage events, we do a save and post like normal to persist to disk
console.log("saving messages...", lastMessage)
// await this.saveClaudeMessages()
}
//
await this.addToApiConversationHistory({
role: "assistant",
content: [{ type: "text", text: assistantMessage + "\n\n[Response interrupted by user]" }],
})
// update api_req_started to have cancelled and cost, so that we can display the cost of the partial stream
updateApiReqMsg(true)
await this.saveClaudeMessages()
// signals to provider that it can retrieve the saved messages from disk, as abortTask can not be awaited on in nature
this.didFinishAborting = true
break // aborts the stream
} catch (error) {
this.abortTask() // if the stream failed, there's various states the task could be in (i.e. could have streamed some tools the user may have executed), so we just resort to replicating a cancel task
await abortStream("streaming_failed", error.message ?? JSON.stringify(serializeError(error), null, 2))
const history = await this.providerRef.deref()?.getTaskWithId(this.taskId)
if (history) {
await this.providerRef.deref()?.initClaudeDevWithHistoryItem(history.historyItem)
await this.providerRef.deref()?.postStateToWebview()
}
}