import {AssistantStream} from "openai/lib/AssistantStream";
import mitt, {Emitter} from 'mitt';
import {Text, TextDelta} from "openai/resources/beta/threads/messages";
import {AssistantStreamEvent} from "openai/resources/beta/assistants";
import {performToolCall, sendMessageAndRun, submitToolCallResponse} from "@/helper/chat/assistantAPI";

import {ResponseMessageRichResponse} from "@/helper/chat/richResponses";
import {AssistantRunConfig, AssistantRunEvents, AssistantRunState} from "@/helper/chat/assistantRun";
import {isWriting, KapitelToolCall} from "@/helper/chat/toolCalls";
import transitionTo = AssistantRunState.transitionTo;
import {ChatStatus} from "@/helper/chat/chatStatus";
import {AInesAssistantType} from "@/graphql/generated/graphql";
import {consoleErrorChat, consoleLog, consoleLogChat} from "@/helper/console";

import {StreamingThreadMessage, ThreadMessage, ThreadMessageStatus} from "@/helper/chat/threadMessage";

type ExpertAssistantRunEvents = {
    messageStreaming: StreamingThreadMessage
};

export class AssistantRunExpert {
    public mitt : Emitter<ExpertAssistantRunEvents & AssistantRunEvents>;

    private runId : string | undefined

    public message : StreamingThreadMessage

    constructor(
        public runConfig: AssistantRunConfig,
        private userMessage: string,
        private optimisticExpertConfirmedPromise : Promise<boolean> | undefined = undefined
    ) {
        // init event bus
        this.mitt = mitt<ExpertAssistantRunEvents & AssistantRunEvents>();

        // init message
        this.message = new StreamingThreadMessage('', 'assistant')

        // start run with user message
        this.submitMessageAndStream(userMessage)
            .then((stream: AssistantStream) => {
                this.setState(AssistantRunState.REQUESTED);
                // start with first stream
                this.setCurrentStream(stream)
            })

        // if there is a parallel expertChooser running confirming our existence we should react to a negative outcome
        if (this.optimisticExpertConfirmedPromise) {
            this.optimisticExpertConfirmedPromise
                .then(confirmed => {
                    if (!confirmed) {
                        consoleLogChat('%s: optimistic expert choice not confirmed - aborting %O',AInesAssistantType[this.runConfig.assistantType], this.runConfig);
                        this.resetCurrentStream()
                        this.setState(AssistantRunState.ABORTED)
                    }
                })
        }
    }


    /*
    state handling
     */

    private state : AssistantRunState = AssistantRunState.INIT;
    public getState() {
        return this.state;
    }
    private setState(state: AssistantRunState) {
        const nextState = transitionTo(state, this.state)
        if (nextState === this.state) {
            return
        }

        this.state = nextState
        this.mitt.emit('stateChanged', nextState)
    }


    /*
    stream sources
     */

    private submitMessageAndStream(message: string) : Promise<AssistantStream> {
        consoleLogChat('%s: sending user message "' + message + '" to %O', AInesAssistantType[this.runConfig.assistantType], this.runConfig)
        return sendMessageAndRun(
            this.runConfig.assistantId,
            this.runConfig.threadId,
            message
        )
    }

    private submitToolCallOutputsAndStream(toolCallOutputs: string[]) : Promise<AssistantStream> {
        if (!this.runId) {
            throw new Error('runId not set yet - should have happened at run beginning')
        }
        consoleLogChat('%s: sending ' + toolCallOutputs.length + ' tool call output(s) to %O', AInesAssistantType[this.runConfig.assistantType], this.runConfig)
        return submitToolCallResponse(
            toolCallOutputs,
            this.runConfig,
            this.runId
        )
    }


    /*
    multi stream handling
     */

    private streams : AssistantStream[] = []
    private currentStream: AssistantStream | undefined = undefined
    private resetCurrentStream() {
        if (this.currentStream) {
            this.toggleEventListeners(this.currentStream, 'off')
            this.currentStream = undefined
        }
    }
    private setCurrentStream(stream: AssistantStream) {
        if (this.currentStream) {
            throw new Error('already currentStream attached - reset first')
        }

        consoleLogChat('%s: receiving response stream for userMessage "' + this.userMessage + '" on %O', AInesAssistantType[this.runConfig.assistantType], this.runConfig)

        this.toggleEventListeners(stream, 'on')
        this.currentStream = stream

        // collect stream
        this.streams.push(stream)
    }


    /*
    stream event management: attaches & detaches local stream event listeners
     */

    private toggleEventListeners(stream: AssistantStream, toggle: 'on'|'off') {
        stream[toggle]('event', this.onGenericRunEvent)
        stream[toggle]('textCreated', this.onTextCreated)
        stream[toggle]('textDelta', this.onTextDelta)
        stream[toggle]('textDone', this.onTextDone)
    }


    /*
    private event handler

    bound to this by using =>
    referenced by on & off event management
     */

    private onGenericRunEvent = async (event: AssistantStreamEvent) => {
        // streamEvent: forward general purpose event-event
        // this.mitt.emit('runStreamEvent', event)

        if(event.event === 'thread.run.created'){
            this.handleRunStart(event)
        }

        if(event.event === 'thread.run.cancelled' || event.event === 'thread.run.completed') {
            this.handleRunDone()
        }

        // intercept tool calls
        if (event.event === 'thread.run.requires_action') {
            this.handleRequiresToolCall(event)
        }
    }

    private onTextCreated = (content: Text) => this.handleMessageStreamStart(content)
    private onTextDelta = (oaiTextDelta: TextDelta, snapshotText:Text) => this.handleMessageStreamDiff(oaiTextDelta, snapshotText)
    private onTextDone = (content: Text)=> this.handleMessageStreamDone(content)


    /*
    run handling
     */

    private handleRunStart = (event: AssistantStreamEvent.ThreadRunCreated)=> {
        // store run id on run creation
        this.runId = event.data.id

        // response stream started: emit event
        this.mitt.emit('runResponseStarted')

        //state: progress state from INIT > RUNNING
        this.setState(AssistantRunState.RUNNING)
    }

    private handleRunDone = () => {
        // state: on content complete progress state from streaming > completed
        this.setState(AssistantRunState.COMPLETED)

        // streamComplete: event
        this.mitt.emit('runComplete')
    }


    /*
    text streaming & message assembly

    streams responseMessage.text - on oaiTextDelta look for 'text:' & '"' and take everything in between as streamed text
     */

    private updateMessage(text: string, richResponses: ResponseMessageRichResponse[] | undefined, status: ThreadMessageStatus | undefined = undefined) {
        this.message.update(text, richResponses)
        if (status) {
            this.message.setStreamingStatus(status)
        }
        this.mitt.emit('messageStreaming', this.message)
    }

    private handleMessageStreamStart(content: Text) {
        // re-init message
        this.updateMessage('', undefined,'ready')
    }

    private handleMessageStreamDiff(oaiTextDelta: TextDelta, snapshotText:Text) {

        // extract delta
        const currentDelta = oaiTextDelta.value || ''
        const currentSnapshot = snapshotText.value

        // state handling for text attribute streaming
        if(this.message.status === 'streaming' && currentDelta.includes('"')){
            // end of text attribute - update only status
            this.updateMessage(this.message.text, undefined,'textDone')
        } else if (this.message.status === 'ready' && (currentSnapshot.endsWith('"text": "') || currentSnapshot.endsWith('"text":"'))) {
            // beginning of text attribute - reset text and move status to streaming
            this.updateMessage('', undefined,'streaming')
        } else if (this.message.status === 'streaming') {
            // append delta of text attribute
            this.updateMessage(this.message.text + currentDelta, undefined)
        }
    }

    private handleMessageStreamDone(content: Text) {
        // responseMessage.text, responseMessage.richResponse on content complete set final text & rich response
        let message
        try {
            message = JSON.parse(content.value)
        } catch {
            message = undefined
        }

        consoleLogChat('%s: response message stream completed for userMessage "' + this.userMessage + '" on %O: %O',AInesAssistantType[this.runConfig.assistantType], this.runConfig, message || content)

        const text = message?.text || ''
        const richResponses = message?.richResponses || []

        this.updateMessage(text, richResponses, 'messageDone')
    }


    /*
    tool call handling
     */

    private toolCalls : KapitelToolCall[] = []

    public getMostRecentToolCall() : KapitelToolCall | undefined {
        return this.toolCalls.length > 0 ? this.toolCalls[this.toolCalls.length-1] : undefined
    }

    private async handleRequiresToolCall(event: AssistantStreamEvent.ThreadRunRequiresAction) {
        const toolCalls = event.data.required_action?.submit_tool_outputs?.tool_calls || [];

        const kapitelToolCalls = toolCalls.map(toolCall => ({
            function: toolCall.function.name,
        }))

        kapitelToolCalls.forEach(kapitelToolCall => {
            // collect
            this.toolCalls.push(kapitelToolCall)
            // emit event
            this.mitt.emit('runRequiresToolCall', kapitelToolCall)
        })

        this.setState(AssistantRunState.PENDING_TOOL_CALL)

        // if there is a parallel expertChooser running confirming our existence we should wait for it
        if (
            this.optimisticExpertConfirmedPromise &&
            this.toolCalls.some(isWriting)
        ) {
            consoleLogChat('%s: optimistic expert run requires writing tool calls. Deferring: %O',AInesAssistantType[this.runConfig.assistantType], this.toolCalls.filter(isWriting))
            // wait for choice and don't execute tool calls if choice is negative
            if (!(await this.optimisticExpertConfirmedPromise)) {
                return
            }
        }

        // detach current stream - we are about to switch and in the meantime this run shouldn't do anything
        this.resetCurrentStream()

        // resolve tool calls
        const toolCallResponses: any[] = await Promise.all(
            toolCalls.map(
                async (toolCall: any) =>{
                    return {
                        tool_call_id: toolCall.id,
                        output: await performToolCall(toolCall)
                    }
                }
            )
        )

        const nextStream = await this.submitToolCallOutputsAndStream(toolCallResponses)
        this.setCurrentStream(nextStream)
        this.setState(AssistantRunState.RUNNING)
    }
}
