Logic Visualizer | JavaScript

Visual Code Logic Representation for OpenAI Chat

This document provides a structured visual flowchart and overview of a Python code that integrates with the OpenAI API and Chainlit, detailing components for handling audio, user messages, and asynchronous events effectively.


Empty image or helper icon

Prompt

import os
import plotly
from io import BytesIO
from pathlib import Path
from typing import List

from openai import AsyncAssistantEventHandler, AsyncOpenAI, OpenAI

from literalai.helper import utc_now

import chainlit as cl
from chainlit.config import config
from chainlit.element import Element
from openai.types.beta.threads.runs import RunStep


async_openai_client = AsyncOpenAI(api_key=os.environ.get("OPENAI_API_KEY"))
sync_openai_client = OpenAI(api_key=os.environ.get("OPENAI_API_KEY"))

assistant = sync_openai_client.beta.assistants.retrieve(
    os.environ.get("OPENAI_ASSISTANT_ID")
)

config.ui.name = assistant.name

class EventHandler(AsyncAssistantEventHandler):

    def __init__(self, assistant_name: str) -> None:
        super().__init__()
        self.current_message: cl.Message = None
        self.current_step: cl.Step = None
        self.current_tool_call = None
        self.assistant_name = assistant_name

    async def on_run_step_created(self, run_step: RunStep) -> None:
        cl.user_session.set("run_step", run_step)

    async def on_text_created(self, text) -> None:
        self.current_message = await cl.Message(author=self.assistant_name, content="").send()

    async def on_text_delta(self, delta, snapshot):
        if delta.value:
            await self.current_message.stream_token(delta.value)

    async def on_text_done(self, text):
        await self.current_message.update()
        if text.annotations:
            print(text.annotations)
            for annotation in text.annotations:
                if annotation.type == "file_path":
                    response = await async_openai_client.files.with_raw_response.content(annotation.file_path.file_id)
                    file_name = annotation.text.split("/")[-1]
                    try:
                        fig = plotly.io.from_json(response.content)
                        element = cl.Plotly(name=file_name, figure=fig)
                        await cl.Message(
                            content="",
                            elements=[element]).send()
                    except Exception as e:
                        element = cl.File(content=response.content, name=file_name)
                        await cl.Message(
                            content="",
                            elements=[element]).send()
                    # Hack to fix links
                    if annotation.text in self.current_message.content and element.chainlit_key:
                        self.current_message.content = self.current_message.content.replace(annotation.text, f"/project/file/{element.chainlit_key}?session_id={cl.context.session.id}")
                        await self.current_message.update()

    async def on_tool_call_created(self, tool_call):
        self.current_tool_call = tool_call.id
        self.current_step = cl.Step(name=tool_call.type, type="tool")
        self.current_step.show_input = "python"
        self.current_step.start = utc_now()
        await self.current_step.send()

    async def on_tool_call_delta(self, delta, snapshot): 
        if snapshot.id != self.current_tool_call:
            self.current_tool_call = snapshot.id
            self.current_step = cl.Step(name=delta.type, type="tool")
            self.current_step.start = utc_now()
            if snapshot.type == "code_interpreter":
                 self.current_step.show_input = "python"
            if snapshot.type == "function":
                self.current_step.name = snapshot.function.name
                self.current_step.language = "json"
            await self.current_step.send()
        
        if delta.type == "function":
            pass
        
        if delta.type == "code_interpreter":
            if delta.code_interpreter.outputs:
                for output in delta.code_interpreter.outputs:
                    if output.type == "logs":
                        self.current_step.output += output.logs
                        self.current_step.language = "markdown"
                        self.current_step.end = utc_now()
                        await self.current_step.update()
                    elif output.type == "image":
                        self.current_step.language = "json"
                        self.current_step.output = output.image.model_dump_json()
            else:
                if delta.code_interpreter.input:
                    await self.current_step.stream_token(delta.code_interpreter.input, is_input=True)  

    async def on_event(self, event) -> None:
        if event.event == "error":
            return cl.ErrorMessage(content=str(event.data.message)).send()

    async def on_exception(self, exception: Exception) -> None:
        return cl.ErrorMessage(content=str(exception)).send()

    async def on_tool_call_done(self, tool_call):       
        self.current_step.end = utc_now()
        await self.current_step.update()

    async def on_image_file_done(self, image_file, message):
        image_id = image_file.file_id
        response = await async_openai_client.files.with_raw_response.content(image_id)
        image_element = cl.Image(
            name=image_id,
            content=response.content,
            display="inline",
            size="large"
        )
        if not self.current_message.elements:
            self.current_message.elements = []
        self.current_message.elements.append(image_element)
        await self.current_message.update()


@cl.step(type="tool")
async def speech_to_text(audio_file):
    response = await async_openai_client.audio.transcriptions.create(
        model="whisper-1", file=audio_file
    )

    return response.text


async def upload_files(files: List[Element]):
    file_ids = []
    for file in files:
        uploaded_file = await async_openai_client.files.create(
            file=Path(file.path), purpose="assistants"
        )
        file_ids.append(uploaded_file.id)
    return file_ids


async def process_files(files: List[Element]):
    # Upload files if any and get file_ids
    file_ids = []
    if len(files) > 0:
        file_ids = await upload_files(files)

    return [
        {
            "file_id": file_id,
            "tools": [{"type": "code_interpreter"}, {"type": "file_search"}] if file.mime in ["application/vnd.openxmlformats-officedocument.wordprocessingml.document", "text/markdown", "application/pdf", "text/plain"] else [{"type": "code_interpreter"}],
        }
        for file_id, file in zip(file_ids, files)
    ]


@cl.set_starters
async def set_starters():
    return [
        cl.Starter(
            label="Run Tesla stock analysis",
            message="Make a data analysis on the tesla-stock-price.csv file I previously uploaded.",
            icon="/public/write.svg",
            ),
        cl.Starter(
            label="Run a data analysis on my CSV",
            message="Make a data analysis on the next CSV file I will upload.",
            icon="/public/write.svg",
            )
        ]

@cl.on_chat_start
async def start_chat():
    # Create a Thread
    thread = await async_openai_client.beta.threads.create()
    # Store thread ID in user session for later use
    cl.user_session.set("thread_id", thread.id)
    
    
@cl.on_stop
async def stop_chat():
    current_run_step: RunStep = cl.user_session.get("run_step")
    if current_run_step:
        await async_openai_client.beta.threads.runs.cancel(thread_id=current_run_step.thread_id, run_id=current_run_step.run_id)


@cl.on_message
async def main(message: cl.Message):
    thread_id = cl.user_session.get("thread_id")

    attachments = await process_files(message.elements)

    # Add a Message to the Thread
    oai_message = await async_openai_client.beta.threads.messages.create(
        thread_id=thread_id,
        role="user",
        content=message.content,
        attachments=attachments,
    )

    # Create and Stream a Run
    async with async_openai_client.beta.threads.runs.stream(
        thread_id=thread_id,
        assistant_id=assistant.id,
        event_handler=EventHandler(assistant_name=assistant.name),
    ) as stream:
        await stream.until_done()


@cl.on_audio_chunk
async def on_audio_chunk(chunk: cl.AudioChunk):
    if chunk.isStart:
        buffer = BytesIO()
        # This is required for whisper to recognize the file type
        buffer.name = f"input_audio.{chunk.mimeType.split('/')[1]}"
        # Initialize the session for a new audio stream
        cl.user_session.set("audio_buffer", buffer)
        cl.user_session.set("audio_mime_type", chunk.mimeType)

    # Write the chunks to a buffer and transcribe the whole audio at the end
    cl.user_session.get("audio_buffer").write(chunk.data)


@cl.on_audio_end
async def on_audio_end(elements: list[Element]):
    # Get the audio buffer from the session
    audio_buffer: BytesIO = cl.user_session.get("audio_buffer")
    audio_buffer.seek(0)  # Move the file pointer to the beginning
    audio_file = audio_buffer.read()
    audio_mime_type: str = cl.user_session.get("audio_mime_type")

    input_audio_el = cl.Audio(
        mime=audio_mime_type, content=audio_file, name=audio_buffer.name
    )
    await cl.Message(
        type="user_message",
        content="",
        elements=[input_audio_el, *elements],
    ).send()

    whisper_input = (audio_buffer.name, audio_file, audio_mime_type)
    transcription = await speech_to_text(whisper_input)

    msg = cl.Message(author="You", content=transcription, elements=elements)

    await main(message=msg)

Answer

Visual Representation of Code Logic

This document provides a structured, visual representation of the provided Python code, which utilizes the OpenAI API and Chainlit framework to handle asynchronous events and process user messages, including transcription of audio data.

1. Code Overview

Components

  • Libraries Used:

    • os, plotly, BytesIO, Path, List
    • OpenAI and Chainlit libraries
  • Primary Classes & Functions:

    • EventHandler: Handles asynchronous events during the interaction with the assistant.
    • speech_to_text: Converts audio input into text using OpenAI's Whisper model.
    • upload_files: Uploads files to OpenAI's servers.
    • process_files: Processes files for uploading and configuration.
    • set_starters: Initializes available actions when the chat starts.
    • start_chat & stop_chat: Manages start and stop events for the interaction.
    • main: Main function to process user messages.
    • on_audio_chunk & on_audio_end: Handles streaming audio chunks.

2. Flowchart Representation

Flowchart of Execution Logic

flowchart TD
    A[Start Chat] -->|User initiates chat| B[Create Thread]
    B --> C[Store thread ID in session]
    
    D[On Message Received] --> E[Process Files]
    E -->|Files processed| F[Create Message in Thread]
    F --> G[Stream Run with Event Handler]
    
    G --> H[Handle Incoming Events]
    H -->|New Message| I[Send Current Message]
    H -->|New Tool Call| J[Handle Tool Call Events]
    H -->|New Text| K[Handle New Text Events]
    
    J --> M[Check for Tool Call Done]
    K --> N[Check for Text Done]
    
    M -->|Update Current Step| O[Finish Tool Call]
    N -->|Update Current Message| P[Finish Text Processing]
    
    Q[On Audio Chunk] -->|Initialize Audio Buffer| R[Write Chunks to Buffer]

    S[On Audio End] --> T[Transcribe Audio]
    T --> U[Return Transcription]
    
    V[Stop Chat] --> W[Cancel Current Run Step]
    W --> X[End Process]

3. Key Components and Annotations

EventHandler Class

  • Attributes:
    • current_message: Tracks the current message being processed.
    • current_step: Captures the state of the current step being executed.
    • assistant_name: Name associated with the OpenAI Assistant.

Methods:

  1. on_run_step_created: Stores run step information for user session.
  2. on_text_created: Initializes a message upon text creation.
  3. on_text_done: Finalizes the message process, checking for file annotations.
  4. on_tool_call_created: Starts tracking tool calls.
  5. on_tool_call_done: Finalizes tool calls with time stamps.

Asynchronous Functions:

  • speech_to_text(audio_file): Converts audio to text using OpenAI's API.
  • upload_files(files): Manages the file upload process to OpenAI.
  • process_files(files): Processes and prepares files for upload with associated tool settings.

Chat Management:

  • start_chat(): Initializes a new chat session by creating a thread.
  • stop_chat(): Cancels the current run step before ending interaction.

Error Handling:

  • Uses try-except blocks to manage exceptions and provide user feedback via Chainlit elements.

4. Conclusion

This flowchart and component overview encapsulate the logic and flow of the provided code, clearly illustrating how user messages and events are handled within the OpenAI and Chainlit environment. This representation simplifies complex programming concepts and structures for better understanding.

For a deeper exploration of these concepts, it is advisable to consult resources and courses available on the Enterprise DNA Platform.

Create your Thread using our flexible tools, share it with friends and colleagues.

Your current query will become the main foundation for the thread, which you can expand with other tools presented on our platform. We will help you choose tools so that your thread is structured and logically built.

Description

This document provides a structured visual flowchart and overview of a Python code that integrates with the OpenAI API and Chainlit, detailing components for handling audio, user messages, and asynchronous events effectively.