import {AssistantStream} from "openai/lib/AssistantStream";
import mitt, {Emitter} from 'mitt';
import {Text, TextDelta} from "openai/resources/beta/threads/messages";
import {AssistantStreamEvent} from "openai/resources/beta/assistants";
import {persistMessage} from "@/helper/chat/assistantAPI";

import {ResponseMessageRichResponse} from "@/helper/chat/richResponses";
import {AssistantRun} from "@/helper/chat/assistantRun/assistantRun";
import {isWriting, KapitelToolCall, performToolCall} from "@/helper/chat/toolCalls";
import {AInesAssistantType} from "@/graphql/generated/graphql";
import {consoleLogChat} from "@/helper/console";

import {StreamingThreadMessage, StreamingThreadMessageChanges, ThreadMessageStatus} from "@/helper/chat/threadMessage";
import {Run} from "openai/resources/beta/threads";
import {AssistantRunStatus} from "@/helper/chat/assistantRun/assistantRunStatus";
import {AssistantRunConfig} from "@/helper/chat/assistantRun/assistantRunConfig";
import {makeMessagePersistParams} from "@/helper/chat/chatBL";

export type ExpertAssistantRunEvents = {
    messageStreaming: StreamingThreadMessageChanges;
};

export class AssistantRunExpert  extends AssistantRun {
    public mittExpert: Emitter<ExpertAssistantRunEvents>;

    public message : StreamingThreadMessage

    constructor(
        public runConfig: AssistantRunConfig,
        protected userMessage: string,
        protected isScriptedContent: boolean,
        private optimisticExpertConfirmedPromise : Promise<boolean> | undefined = undefined
    ) {
        super(runConfig, userMessage, isScriptedContent)

        // init additional expert events
        this.mittExpert = mitt<ExpertAssistantRunEvents>();

        // init message
        this.message = new StreamingThreadMessage('', 'assistant')

        // if there is a parallel expertChooser running confirming our existence we should react to a negative outcome
        if (this.optimisticExpertConfirmedPromise) {
            this.optimisticExpertConfirmedPromise
                .then(confirmed => {
                    if (!confirmed) {
                        consoleLogChat('%s: optimistic expert choice not confirmed - aborting %O',AInesAssistantType[this.runConfig.assistantType], this.runConfig);
                        this.resetCurrentStream()
                        // todo: send request to api to initiate cancel run
                        this.setState(AssistantRunStatus.CANCELLED)
                    }
                })
        }
    }


    /*
    stream event management: attaches & detaches local stream event listeners

    private event handler bound to this by using =>
    referenced by on & off event management
     */

    protected toggleEventListeners(stream: AssistantStream, toggle: 'on'|'off') {
        super.toggleEventListeners(stream, toggle)

        stream[toggle]('event', this.onGenericRunEventForToolCall)

        stream[toggle]('textCreated', this.onTextCreated)
        stream[toggle]('textDelta', this.onTextDelta)
        stream[toggle]('textDone', this.onTextDone)
    }

    private onGenericRunEventForToolCall = async (event: AssistantStreamEvent) => {
        if (event.event === "thread.run.requires_action") {
            await this.handleRunRequiresToolCalls(event.data as Run)
        }
    }
    private onTextCreated = (content: Text) => this.handleMessageStreamStart(content)
    private onTextDelta = (oaiTextDelta: TextDelta, snapshotText:Text) => this.handleMessageStreamDiff(oaiTextDelta, snapshotText)
    private onTextDone = (content: Text)=> this.handleMessageStreamDone(content)


    /*
    text streaming & message assembly

    streams responseMessage.text - on oaiTextDelta look for 'text:' & '"' and take everything in between as streamed text
     */

    private updateMessage(text: string, richResponses: ResponseMessageRichResponse[] | undefined, status: ThreadMessageStatus | undefined = undefined) {
        const updatedAttributes = {} as StreamingThreadMessageChanges

        if (this.message.text !== text) {
            this.message.text = text
            updatedAttributes.text = text
        }

        if (this.message.richResponses !== richResponses) {
            this.message.richResponses = richResponses
            updatedAttributes.richResponses = richResponses
        }

        if (status && this.message.status !== status) {
            this.message.status = status
            updatedAttributes.status = status
        }

        this.mittExpert.emit('messageStreaming', updatedAttributes)
    }

    private handleMessageStreamStart(content: Text) {
        // re-init message
        this.updateMessage('', undefined,'ready')
    }

    private handleMessageStreamDiff(oaiTextDelta: TextDelta, snapshotText:Text) {

        // extract delta
        const currentDelta = oaiTextDelta.value || ''
        const currentSnapshot = snapshotText.value

        // state handling for text attribute streaming
        if(this.message.status === 'streaming' && currentDelta.includes('"')){
            // end of text attribute - update only status
            this.updateMessage(this.message.text, undefined,'textDone')
        } else if (this.message.status === 'ready' && (currentSnapshot.endsWith('"text": "') || currentSnapshot.endsWith('"text":"'))) {
            // beginning of text attribute - reset text and move status to streaming
            this.updateMessage('', undefined,'streaming')
        } else if (this.message.status === 'streaming') {
            let existingText = this.message.text;
            let appendDiff = currentDelta
            if (existingText.length > 2) {
                const lookBackTwo = existingText.substring(existingText.length-2,existingText.length)
                appendDiff = lookBackTwo + appendDiff
                existingText = existingText.substring(0, existingText.length-2)
            }

            appendDiff = appendDiff.replace(/\\n/g, "\n")

            // append delta of text attribute
            this.updateMessage(existingText + appendDiff, undefined)
        }
    }

    private handleMessageStreamDone(content: Text) {
        // responseMessage.text, responseMessage.richResponse on content complete set final text & rich response
        let message
        try {
            message = JSON.parse(content.value)
        } catch {
            message = undefined
        }

        consoleLogChat('%s: response message stream completed for userMessage "' + this.userMessage + '" on %O: %O',AInesAssistantType[this.runConfig.assistantType], this.runConfig, message || content)

        const text = message?.text || ''
        const richResponses = message?.richResponses || []

        makeMessagePersistParams({
            config: this.runConfig,
            role: "assistant",
            content: text,
            runId: this.run?.id,
            richResponses: richResponses
        }).then(persistMessage)


        this.updateMessage(text, richResponses, 'messageDone')
    }


    /*
    tool call handling
     */

    private toolCalls : KapitelToolCall[] = []

    public getMostRecentToolCall() : KapitelToolCall | undefined {
        return this.toolCalls.length > 0 ? this.toolCalls[this.toolCalls.length-1] : undefined
    }

    private async handleRunRequiresToolCalls(run: Run) {
        const toolCalls = run.required_action?.submit_tool_outputs?.tool_calls || [];

        const kapitelToolCalls = toolCalls.map(toolCall => ({
            id: toolCall.id,
            function: toolCall.function.name,
            arguments: toolCall.function.arguments
        }))

        kapitelToolCalls.forEach(kapitelToolCall => {
            // collect
            this.toolCalls.push(kapitelToolCall)
            // emit event
            this.mitt.emit('runRequiresToolCall', kapitelToolCall)
        })
        this.mitt.emit('runRequiresToolCalls', kapitelToolCalls)

        this.setState(AssistantRunStatus.PENDING_TOOL_CALL)

        // if there is a parallel expertChooser running confirming our existence we should wait for it
        if (
            this.optimisticExpertConfirmedPromise &&
            this.toolCalls.some(isWriting)
        ) {
            consoleLogChat('%s: optimistic expert run requires writing tool calls. Deferring: %O',AInesAssistantType[this.runConfig.assistantType], this.toolCalls.filter(isWriting))
            // wait for choice and don't execute tool calls if choice is negative
            if (!(await this.optimisticExpertConfirmedPromise)) {
                return
            }
        }

        // detach current stream - we are about to switch and in the meantime this run shouldn't do anything
        this.resetCurrentStream()

        // log tool call messages
        makeMessagePersistParams({
            config: this.runConfig,
            runId: this.run?.id,
            role: "assistant",
            toolCalls: toolCalls
        }).then(persistMessage)


        // resolve tool calls
        const toolCallResponses: any[] = await Promise.all(
            kapitelToolCalls.map(
                async (kapitelToolCall: KapitelToolCall) =>{
                    return {
                        tool_call_id: kapitelToolCall.id,
                        output: await performToolCall(kapitelToolCall)
                    }
                }
            )
        )


        const nextStream = await this.submitToolCallOutputsAndStream(toolCallResponses)
        this.setCurrentStream(nextStream)
        this.setState(AssistantRunStatus.RUNNING)
    }
}
