OpenAI popularized a pattern of streaming results from a backend API in realtime with ChatGPT. This approach is useful because the time a language model takes to run inference is often longer than what you want for an API call to feel snappy and fast. By streaming the results as they’re produced, the user can start reading them and the product experience doesn’t feel slow as a result.

OpenAI has a nice example of how to use their client to stream results. This approach makes it straightforward to print each token out as it is returned by the model. Most user facing apps aren’t command line interfaces, so to build our own ChatGPT like experience where the tokens show up in realtime on a user interface, we need to do a bit more work. Using Server-Sent Events (SSE), we can display results to a user on a webpage in realtime.

If you’re short on time, Vercel wrote a nice ai library that handles much of the work we’re about to do here in React, with nice hooks and an easy pattern for hosting a backend (through proprietary). This exploration is to understand SSE more deeply and look at what it takes to build your own API and UI capable of the UX described.

A simple implementation#

We’re going to build a fastapi backend to stream tokens from OpenAI to a frontend, which I will build as well. Let’s start with a simple server that returns at StreamingResponse.

server.py

from openai import AsyncOpenAI
from fastapi import FastAPI
from fastapi.responses import StreamingResponse


app = FastAPI()
client = AsyncOpenAI()

async def generator(msg: str):
    stream = await client.chat.completions.create(
        messages=[
            {
                "role": "user",
                "content": msg,
            }
        ],
        model="gpt-3.5-turbo",
        stream=True,
    )

    async for chunk in stream:
        yield chunk.choices[0].delta.content or ''


@app.get("/")
async def main(msg):
    return StreamingResponse(generator(msg))

Run the server

uvicorn server:app --reload

Curl the endpoint shows things are wired together reasonably

โฏ curl "localhost:8000/ask?msg=who%20are%20you"
I am an AI chatbot developed by OpenAI. I am here to provide information and assist with any questions you have. How can I help you today?%

This is almost all we need on the server for an MVP of streaming to a UI. Referencing the same SSE docs as earlier, we see a description of an interface called EventSource. We’ll see a simple html and Javascript frontend with EventSource to build our UI. Looking at the example PHP code, we also see our server code needs some slight modifications. The data we emit needs to be formatted as f"data: {data}\n\n" for EventSource to handle it. We also need to set the content type of text/event-stream.

Here is the server with those changes:

from openai import AsyncOpenAI
from fastapi import FastAPI
from fastapi.responses import StreamingResponse


app = FastAPI()
client = AsyncOpenAI()

async def generator(msg: str):
    stream = await client.chat.completions.create(
        messages=[
            {
                "role": "user",
                "content": msg,
            }
        ],
        model="gpt-3.5-turbo",
        stream=True,
    )

    async for chunk in stream:
        yield f"data: {chunk.choices[0].delta.content or ''}\n\n"


@app.get("/ask")
async def main(msg):
    return StreamingResponse(generator(msg), media_type="text/event-stream")

And the curl still works (though now is less human readable).

data:

data: I

data:  am

data:  an

data:  AI

data:  language

data:  model

data:  created

data:  by

data:  Open

data: AI

data: .

data:  I

data:  am

data:  designed

data:  to

data:  assist

data:  and

data:  provide

data:  information

data:  to

data:  the

data:  best

data:  of

data:  my

data:  abilities

data: .

data:

Now let’s create a simple UI and serve it from the server (for simplicity).

index.html

<!DOCTYPE html>
<html>
<head>
    <title>Stream Test</title>
</head>
<body>
    <button id="streambtn">Start Streaming</button>
    <input type="text" id="messageInput" placeholder="Enter message">
    <div id="container"></div>

    <script>
        var source;
        var button = document.getElementById('streambtn');
        var container = document.getElementById('container');
        var input = document.getElementById('messageInput');

        button.addEventListener('click', function() {
            var message = input.value;
            var url = 'ask/?msg=' + encodeURIComponent(message);

            // clear the input field and container div
            input.value = '';
            container.textContent = '';

            // create new EventSource connected to the server endpoint
            source = new EventSource(url);

            source.addEventListener('open', function(e) {
                console.log('Connection was opened');
            }, false);

            source.addEventListener('message', function(e) {

                // Append the new message to the existing text
                container.textContent += e.data;

            }, false);

            source.addEventListener('error', function(e) {
                if (e.readyState == EventSource.CLOSED) {
                    console.log('Connection was closed');
                }
                else {
                    console.log('An error has occurred');
                }
            }, false);
        });

    </script>
</body>
</html>

We also modify the server to render with html file when request the root url.

from openai import AsyncOpenAI
from fastapi import FastAPI
from fastapi.responses import FileResponse, StreamingResponse


app = FastAPI()
client = AsyncOpenAI()

async def generator(msg: str):
    stream = await client.chat.completions.create(
        messages=[
            {
                "role": "user",
                "content": msg,
            }
        ],
        model="gpt-3.5-turbo",
        stream=True,
    )

    async for chunk in stream:
        yield f"data: {chunk.choices[0].delta.content or ''}\n\n"


@app.get("/ask")
async def main(msg):
    return StreamingResponse(generator(msg), media_type="text/event-stream")


@app.get("/")
async def get_index():
    return FileResponse("index.html")

We can open the site at http://localhost:8000 and enter a message to the model. Then click “Start Streaming”.

Language model streaming without close

We see the response from the model showing up in real-time! Something funny happens though. If we wait a few seconds, we see new responses to the message continue coming through, as if the request is being submitted repeated it. In fact, this is exactly what is happening.

We can modify the server to see a [DONE] response when the streaming is complete and the client to close the EventSource connection when it receives this data.

from openai import AsyncOpenAI
from fastapi import FastAPI
from fastapi.responses import FileResponse, StreamingResponse


app = FastAPI()
client = AsyncOpenAI()

async def generator(msg: str):
    stream = await client.chat.completions.create(
        messages=[
            {
                "role": "user",
                "content": msg,
            }
        ],
        model="gpt-3.5-turbo",
        stream=True,
    )

    async for chunk in stream:
        yield f"data: {chunk.choices[0].delta.content or ''}\n\n"
    yield f"data: [DONE]\n\n"


@app.get("/ask")
async def main(msg):
    return StreamingResponse(generator(msg), media_type="text/event-stream")


@app.get("/")
async def get_index():
    return FileResponse("index.html")
<!DOCTYPE html>
<html>
<head>
    <title>Stream Test</title>
</head>
<body>
    <button id="streambtn">Start Streaming</button>
    <input type="text" id="messageInput" placeholder="Enter message">
    <div id="container"></div>

    <script>
        var source;
        var button = document.getElementById('streambtn');
        var container = document.getElementById('container');
        var input = document.getElementById('messageInput');

        button.addEventListener('click', function() {
            // create new EventSource connected to the server endpoint
            var message = input.value;
            var url = 'ask/?msg=' + encodeURIComponent(message);

            // clear the input field and container div
            input.value = '';
            container.textContent = '';

            source = new EventSource(url);

            source.addEventListener('open', function(e) {
                console.log('Connection was opened');
            }, false);

            source.addEventListener('message', function(e) {
                // if the message is "[DONE]", close the connection
                if (e.data === '[DONE]') {
                    source.close();
                    console.log('Connection was closed');
                    return;
                }

                // append the new message to the existing text
                container.textContent += e.data;

            }, false);

            source.addEventListener('error', function(e) {
                if (e.readyState == EventSource.CLOSED) {
                    console.log('Connection was closed');
                }
                else {
                    console.log('An error has occurred');
                }
            }, false);
        });

    </script>
</body>
</html>

Now when we submit a message from the UI, the client closes the connection with the server after it is finished responding. If we submit another request, the client clears the content and streaming the new response. It generally works. The weirdness in the spacing is due to the model returning newlines (\n) which are being consumed by EventSource as the suffix of data. That is a problem we won’t delve into here.

Repurposing the backend#

EventSource works for streaming events from a backend with SSE, but there are other options depending on how you’re building your UI. As mentioned earlier, Vercel’s ai package provides React hooks to build a realtime streaming chat. We can modify our backend to support this library.

Let’s first consider a minimal (as much as possible anyway) Next.js app with the ai library installed, taken straight from the docs

src/app/page.tsx

"use client";

import { useChat } from "ai/react";

export default function Home() {
  const { messages, input, handleInputChange, handleSubmit } = useChat({
    api: "http://127.0.0.1:8000/ask"
  });

  return (
    <main className="flex min-h-screen flex-col items-center justify-between p-24">
      <div>
        {messages.map((m) => (
          <div key={m.id}>
            {m.role === "user" ? "User: " : "AI: "}
            {m.content}
          </div>
        ))}

        <form onSubmit={handleSubmit}>
          <label>
            Say something...
            <input value={input} onChange={handleInputChange} />
          </label>
          <button type="submit">Send</button>
        </form>
      </div>
    </main>
  );
}

We’ve setup the useChat hook to point to our backend and wired up an input field to submit the conversation along with a new message. If we inspect the payload the ai sends to the backend, we see it’s a POST request and that the body content contains the OpenAI schema for an array of messages.

{
    "messages": [
        {"role": "user", "content": "..."}
    ]
}

With this knowledge, we can modify our backend to expect this request schema and pass it to the model. Through some quick trial and error (maybe the docs have this somewhere as well), I also learned the library expects the streaming responses to contain the tokens only, not the data: {content}\n\n\ wrapping like EventSource does.

Our server code now looks like this

from openai import AsyncOpenAI

from fastapi import FastAPI
from fastapi.middleware.cors import CORSMiddleware
from fastapi.responses import StreamingResponse

app = FastAPI()

# Added because the frontend and this backend run on separate ports
app.add_middleware(
    CORSMiddleware,
    allow_origins=["*"],
    allow_credentials=True,
    allow_methods=["*"],
    allow_headers=["*"],
)

client = AsyncOpenAI()

@app.post("/ask")
async def ask(req: dict):
    stream = await client.chat.completions.create(
        messages=req["messages"],
        model="gpt-3.5-turbo",
        stream=True,
    )

    async def generator():
        async for chunk in stream:
            yield chunk.choices[0].delta.content or ""

    response_messages = generator()
    return StreamingResponse(response_messages, media_type="text/event-stream")

Here’s how it looks (sorry, no fancy UI):

Demo using Vercel