Fix tool streaming

This commit is contained in:
Saoud Rizwan
2024-09-27 10:45:44 -04:00
parent ec2b9c847e
commit 3b7af5749d

View File

@@ -1725,11 +1725,14 @@ ${this.customInstructions.trim()}
} }
} }
private presentAssistantContentHasPendingUpdates = false
async presentAssistantContent() { async presentAssistantContent() {
if (this.presentAssistantContentLocked) { if (this.presentAssistantContentLocked) {
this.presentAssistantContentHasPendingUpdates = true
return return
} }
this.presentAssistantContentLocked = true this.presentAssistantContentLocked = true
this.presentAssistantContentHasPendingUpdates = false
if (this.currentStreamingContentBlockIndex >= this.assistantContentBlocks.length) { if (this.currentStreamingContentBlockIndex >= this.assistantContentBlocks.length) {
throw new Error("No more content blocks to stream! This shouldn't happen...") // remove and just return after testing throw new Error("No more content blocks to stream! This shouldn't happen...") // remove and just return after testing
@@ -2227,133 +2230,119 @@ ${this.customInstructions.trim()}
// there are already more content blocks to stream, so we'll call this function ourselves // there are already more content blocks to stream, so we'll call this function ourselves
// await this.presentAssistantContent() // await this.presentAssistantContent()
this.presentAssistantContent() this.presentAssistantContent()
return
} }
} }
} }
// block is partial, but the read stream may have finished
if (this.presentAssistantContentHasPendingUpdates) {
this.presentAssistantContent()
}
} }
updateAssistantContentWithPartialJson(chunkIndex: number, partialJson: string): Promise<void> { //
return new Promise((resolve, reject) => {
const timeoutId = setTimeout(() => {
// may happen if json parsing class does call onToken, which *shouldnt* happen if passing in non-empty string
reject(new Error("Parsing JSON took too long (> 2 seconds)"))
}, 2_000)
const cleanupAndResolve = () => { private chunkIndexToJsonParser = new Map<number, JSONParser>()
clearTimeout(timeoutId) getJsonParserForChunk(chunkIndex: number): JSONParser {
resolve() if (!this.chunkIndexToJsonParser.has(chunkIndex)) {
} const parser = new JSONParser({ emitPartialTokens: true, emitPartialValues: true })
// this package enforces setting up an onValue listener ("Can't emit data before the "onValue" callback has been set up."), even though we don't need it.
parser.onValue = () => console.log(`onValue for chunk ${chunkIndex}`)
// parser.onError = (error) => console.error(`Error parsing JSON for chunk ${chunkIndex}:`, error);
// parser.onEnd = () => console.log(`JSON parsing ended for chunk ${chunkIndex}`);
const cleanupAndReject = (error: Error) => { let partialObject: Record<string, string> = {}
clearTimeout(timeoutId) let currentKey: string = ""
reject(error) let currentValue: string = ""
} let parsingKey: boolean = false
let parsingValue: boolean = false
if (!this.partialJsonParser) {
this.partialJsonParser = new JSONParser({ emitPartialTokens: true, emitPartialValues: true })
// this package enforces setting up an onValue listener ("Can't emit data before the "onValue" callback has been set up."), even though we don't need it.
this.partialJsonParser.onValue = () => {
console.log("onValue")
}
this.partialJsonParser.onError = (error) => {
console.error("Error parsing input_json_delta", error)
}
this.partialJsonParser.onEnd = () => {
console.log("onEnd")
}
}
// our json will only ever be string to string maps // our json will only ever be string to string maps
// { "key": "value", "key2": "value2" } // { "key": "value", "key2": "value2" }
// so left brace, string, colon, comma, right brace // so left brace, string, colon, comma, right brace
// (need to recreate this listener each time to update the resolve ref) // (need to recreate this listener each time to update the resolve ref)
this.partialJsonParser.onToken = async ({ token, value, offset, partial }) => { parser.onToken = ({ token, value, offset, partial }) => {
console.log("onToken") console.log("onToken")
const state = this.partialJsonParserState
try { try {
switch (token) { switch (token) {
case TokenType.LEFT_BRACE: case TokenType.LEFT_BRACE:
// Start of a new JSON object // Start of a new JSON object
state.partialObject = {} partialObject = {}
state.currentKey = "" currentKey = ""
state.parsingKey = false parsingKey = false
state.parsingValue = false parsingValue = false
break break
case TokenType.RIGHT_BRACE: case TokenType.RIGHT_BRACE:
// End of the current JSON object // End of the current JSON object
state.currentKey = "" currentKey = ""
state.currentValue = "" currentValue = ""
state.parsingKey = false parsingKey = false
state.parsingValue = false parsingValue = false
// Finalize the object once parsing is complete // Finalize the object once parsing is complete
// ;(this.assistantContentBlocks[chunkIndex] as Anthropic.ToolUseBlock).input = this.partialObject // ;(this.assistantContentBlocks[chunkIndex] as Anthropic.ToolUseBlock).input = this.partialObject
// this.assistantContentBlocks[chunkIndex]!.partial = false // this.assistantContentBlocks[chunkIndex]!.partial = false
// await this.presentAssistantContent() // NOTE: only set partial = false and call this once, since doing it several times will create duplicate messages. // await this.presentAssistantContent() // NOTE: only set partial = false and call this once, since doing it several times will create duplicate messages.
console.log("Final parsed object:", state.partialObject) console.log("Final parsed object:", partialObject)
break break
case TokenType.STRING: case TokenType.STRING:
if (!state.parsingValue && !state.parsingKey) { if (!parsingValue && !parsingKey) {
// Starting to parse a key // Starting to parse a key
state.currentKey = value as string currentKey = value as string
state.parsingKey = !!partial // if not partial, we are done parsing key parsingKey = !!partial // if not partial, we are done parsing key
} else if (state.parsingKey) { } else if (parsingKey) {
// Continuing to parse a key // Continuing to parse a key
state.currentKey = value as string currentKey = value as string
state.parsingKey = !!partial parsingKey = !!partial
} else if (state.parsingValue) { } else if (parsingValue) {
// Parsing a value // Parsing a value
// Accumulate partial value and update the object // Accumulate partial value and update the object
state.currentValue = value as string currentValue = value as string
if (state.currentKey) { if (currentKey) {
state.partialObject[state.currentKey] = state.currentValue partialObject[currentKey] = currentValue
} }
state.parsingValue = !!partial // if not partial, complete value parsingValue = !!partial // if not partial, complete value
} }
break break
case TokenType.COLON: case TokenType.COLON:
// After a key and colon, expect a value // After a key and colon, expect a value
if (state.currentKey !== null) { if (currentKey !== null) {
state.parsingValue = true parsingValue = true
} }
break break
case TokenType.COMMA: case TokenType.COMMA:
// Reset for the next key-value pair // Reset for the next key-value pair
state.currentKey = "" currentKey = ""
state.currentValue = "" currentValue = ""
state.parsingKey = false parsingKey = false
state.parsingValue = false parsingValue = false
break break
default: default:
console.error("Unexpected token:", token) console.error("Unexpected token:", token)
} }
// Debugging logs to trace the parsing process // Debugging logs to trace the parsing process
console.log("Partial object:", state.partialObject) console.log("Partial object:", partialObject)
console.log("Offset:", offset, "isPartialToken:", partial) console.log("Offset:", offset, "isPartialToken:", partial)
// Update the contentBlock with the current state of the partial object // Update the contentBlock with the current state of the partial object
// Use spread operator to ensure a new object reference // Use spread operator to ensure a new object reference
;(this.assistantContentBlocks[chunkIndex] as Anthropic.ToolUseBlock).input = { ;(this.assistantContentBlocks[chunkIndex] as Anthropic.ToolUseBlock).input = {
...state.partialObject, ...partialObject,
} }
// right brace indicates the end of the json object // right brace indicates the end of the json object
this.assistantContentBlocks[chunkIndex]!.partial = token !== TokenType.RIGHT_BRACE this.assistantContentBlocks[chunkIndex]!.partial = token !== TokenType.RIGHT_BRACE
cleanupAndResolve()
this.presentAssistantContent()
} catch (error) { } catch (error) {
cleanupAndReject(error) console.error("Error parsing input_json_delta", error)
} }
} }
try { this.chunkIndexToJsonParser.set(chunkIndex, parser)
this.partialJsonParser.write(partialJson) }
} catch (error) { return this.chunkIndexToJsonParser.get(chunkIndex)!
console.error("Error parsing input_json_delta", error)
cleanupAndReject(error)
}
})
} }
async recursivelyMakeClaudeRequests( async recursivelyMakeClaudeRequests(
@@ -2427,7 +2416,7 @@ ${this.customInstructions.trim()}
this.assistantContentBlocks = [] this.assistantContentBlocks = []
this.didCompleteReadingStream = false this.didCompleteReadingStream = false
this.currentStreamingContentBlockIndex = 0 this.currentStreamingContentBlockIndex = 0
this.chunkIndexToJsonParser.clear()
for await (const chunk of stream) { for await (const chunk of stream) {
switch (chunk.type) { switch (chunk.type) {
case "message_start": case "message_start":
@@ -2469,8 +2458,9 @@ ${this.customInstructions.trim()}
this.assistantContentBlocks.push(chunk.content_block) this.assistantContentBlocks.push(chunk.content_block)
this.assistantContentBlocks.at(-1)!.partial = true this.assistantContentBlocks.at(-1)!.partial = true
this.presentAssistantContent() this.presentAssistantContent()
// Initialize the JSON parser with partial tokens enabled // Initialize the JSON parser with partial tokens enabled
// partialJsonParser = // partialJsonParser =
this.getJsonParserForChunk(chunk.index)
} }
break break
case "content_block_delta": case "content_block_delta":
@@ -2484,26 +2474,26 @@ ${this.customInstructions.trim()}
break break
case "input_json_delta": case "input_json_delta":
console.log("input_json_delta", chunk.delta.partial_json) console.log("input_json_delta", chunk.delta.partial_json)
// try {
// partialJsonParser?.write(chunk.delta.partial_json)
// } catch (error) {
// console.error("Error parsing input_json_delta", error)
// }
try { try {
// JSONParser will always give us a token unless we pass in an empty/undefined value (in which case the promise would never resolve) this.getJsonParserForChunk(chunk.index).write(chunk.delta.partial_json)
if (chunk.delta.partial_json) {
// need to await this since we dont want to create multiple jsonparsers in case the read stream comes in faster than the jsonparser can parse
await this.updateAssistantContentWithPartialJson(
chunk.index,
chunk.delta.partial_json
)
}
} catch (error) { } catch (error) {
// may be due to timeout, in which case we can safely ignore
console.error("Error parsing input_json_delta", error) console.error("Error parsing input_json_delta", error)
} }
this.presentAssistantContent()
// try {
// // JSONParser will always give us a token unless we pass in an empty/undefined value (in which case the promise would never resolve)
// if (chunk.delta.partial_json) {
// // need to await this since we dont want to create multiple jsonparsers in case the read stream comes in faster than the jsonparser can parse
// await this.updateAssistantContentWithPartialJson(
// chunk.index,
// chunk.delta.partial_json
// )
// }
// } catch (error) {
// // may be due to timeout, in which case we can safely ignore
// console.error("Error parsing input_json_delta", error)
// }
// this.presentAssistantContent()
break break
} }
break break