Streaming model responses to the client in real-time

Some generative models offer streaming responses. This greatly improves the user experience by providing ongoing interaction as the model generates the response.

Getting streaming response from a model

Getting a streaming response from a model is as simple as setting the stream parameter to true in the model request. This tells the model to start streaming the response as soon as it starts generating the response.

Here is an example that uses OpenAI:

import OpenAI from "openai";

const openai = new OpenAI();

async function main() {
  const stream = await openai.chat.completions.create({
    model: "gpt-3.5-turbo",
    messages: [{ role: "user", content: "Say this is a test" }],
    stream: true,
  });
  for await (const chunk of stream) {
    process.stdout.write(chunk.choices[0]?.delta?.content || "");
  }
}

main();

As you can see, it is straightforward to receive a streaming response from a model. The response is streamed in chunks, and you can process each chunk as it arrives using a simple iterator. In this case, we printed each chunk to the console.

Streaming the response from the web framework to the client

Most of the time, however, you would want to stream the response to a client. This means converting the iterator to a ReadableStream (Javascript's Stream API interface), and then sending the stream to the client via Javascript's Response object.

The conversion can be done with a simple function like this:

function iteratorToStream(iterator: any) {
  return new ReadableStream({
    async pull(controller) {
      const { value, done } = await iterator.next();

      if (done) {
        controller.close();
      } else {
        controller.enqueue(value.content);
      }
    },
  });
}

Then you can use this function to send the model response to the client like this:

const stream = await openai.chat.completions.create({
  model: "gpt-3.5-turbo",
  messages: [{ role: "user", content: "Say this is a test" }],
  stream: true,
});

const stream = iteratorToStream(respStream);
return new Response(stream);

Reading the streaming response in the client

The last step is for the client to display the response as it arrives. This involves:

  • Calling the backend API that streams the response
  • Reading the stream in chunks
  • Calling a hook to update the UI with the new chunk

This is how you can read a streaming response in the client and call a hook to update the UI:

const resp = await fetch("/api/ask-question", {
  body: JSON.stringify({ question: input, tenant_id: tenantid }),
});

// Reader to process the streamed response
if (resp.body) {
  const reader = resp.body.getReader();
  const decoder = new TextDecoder();
  let done = false; // we need to read the stream until it's done
  while (!done) {
    const { value, done: doneReading } = await reader.read();
    done = doneReading;
    const chunkValue = decoder.decode(value);
    // dispatch function updates the UI with the new chunk
    dispatch({ type: "updateAnswer", text: chunkValue });
    if (done) {
      dispatch({ type: "done", text: "" });
    }
  }
}

Updating the UI with the new chunk

When using React, you can update the UI as each chunk arrives by updating the state with the new chunk. This will cause the component to re-render and display the new chunk.

In this example, we are using a reducer to update the state with the new chunk. The reducer is a function that takes the current state and an action, and returns the new state. The reducer here has three actions: addQuestion, updateAnswer, and done. The addQuestion action adds a new question to the conversation, the updateAnswer action updates the answer with the new chunk, and the done action marks the end of the last answer. Note that we are creating a new array with the new chunk and updating the state with this new array, if you reuse the existing array, React will not re-render the component.

type AppActions = {
  type: string,
  text: string,
};

interface AppState {
  messages: MessageType[] | [];
}

function reducer(state: AppState, action: AppActions): AppState {
  switch (action.type) {
    case "addQuestion":
      return {
        ...state,
        messages: [
          ...state.messages,
          { type: "question", text: action.text },
          { type: "answer", text: "" },
        ],
      };
    case "updateAnswer":
      const conversationListCopy = [...state.messages];
      const lastIndex = conversationListCopy.length - 1;
      conversationListCopy[lastIndex] = {
        ...conversationListCopy[lastIndex],
        text: conversationListCopy[lastIndex].text + action.text,
      };
      return {
        ...state,
        messages: conversationListCopy,
      };
    case "done":
      return {
        ...state,
      };
    default:
      return state;
  }
}

Now we need to tie the reducer to the component. We can do this using the useReducer hook. The useReducer hook takes the reducer function and the initial state, and returns the current state and a dispatch function. The hook has two parts - the state and the dispatch function. The state is the current state of the component, and we use this to show the current conversation. The dispatch function is used to send actions to the reducer, which updates the state. As you saw in an earlier snippet, we call dispatch with the action when we read a new chunk from the stream.

The snippet below shows how to use the useReducer hook to update the state with the new chunk, and how to use the state in the component to display the conversation.

const Chatbox: React.FC<ChatboxProps> = () => {
  const [state, dispatch] = useReducer(reducer, { messages: [] });

  // ... lots of UI code here....

  <List>
    {state.messages.map((msg, index) => (
      <ListItem key={index}>
        <Box
          sx={{
            borderRadius: 20,
            // change the color based on the type of message
            backgroundColor:
              msg.type === "question" ? "primary.500" : "neutral.200",
            color: msg.type === "question" ? "white" : "black",
          }}
        >
          <Typography variant="body1" sx={{ padding: 2 }}>
            {
              // display the message, this will get updated as the response streams
              msg.text
            }
          </Typography>
        </Box>
      </ListItem>
    ))}
  </List>;
};

Structured streaming responses

If you are used to writing web applications, you are probably used to sending structured responses like JSON. JSON allows you to package multiple pieces of information in a single response, and easily handle all the information in the client.

Therefore, it is important to note that JSON and streaming responses don't mix well. The problem is that JSON structure is unparsable until the client recieves the very last } of the response. Which means that the client will not be able to parse the response until the very end, and will not be able to display the response until the very end. Which is the opposite of what we want with streaming responses.

To solve this, you need to use a slightly different format. A common way to handle this is to have structured metadata at the beginning of the response, and then stream the content. You also need a unique delimiter to separate the metadata from the content. This way, the client can parse the metadata and start displaying the content as it arrives.

Here is an example of how you can structure the response that you send from the backend.

First, the iteratorToStream function needs to be updated to send the metadata. We'll modify it to accept a metadata string (we'll stringify the JSON to generate it), and send the metadata and the delimiter when the stream starts, before any of the iterator chunks are streamed:

function iteratorToStream(iterator: any, metadata: string) {
  return new ReadableStream({
    start(controller) {
      controller.enqueue(metadata);
      // this is our separator. Streaming answer comes next
      controller.enqueue("EOJSON");
    },
    async pull(controller) {
      const { value, done } = await iterator.next();

      if (done) {
        controller.close();
      } else {
        controller.enqueue(value.content);
      }
    },
  });
}

Then, we need to update the way the client parses the response. We need to wait for all the metadata to arrive, parse it, and then start displaying the content:

if (resp.body) {
  const reader = resp.body.getReader();
  const decoder = new TextDecoder();
  let done = false;
  // accumulate the data as it comes in, so we can parse the metadata
  let partialData = "";
  let recievedMetadata = false; // track if we finished with the JSON yet

  while (!done) {
    const { value, done: doneReading } = await reader.read();
    done = doneReading;
    const chunkValue = decoder.decode(value);
    if (!recievedMetadata) {
      partialData += chunkValue;
      // first part of the response is json, second part is the answer
      const dataParts = partialData.split("EOJSON");
      if (dataParts.length > 1) {
        const metadata = JSON.parse(dataParts[0]);
        // use the metadata, for example to update the UI with the model name
        handleMetadata(metadata);
        // handle the first chunk of the answer
        dispatch({ type: "updateAnswer", text: dataParts[1] });
        recievedFiles = true;
        partialData = "";
      }
    } else {
      // handle the rest of the answer
      dispatch({ type: "updateAnswer", text: chunkValue });
      if (done) {
        dispatch({ type: "done", text: "" });
      }
    }
  }
}