Streaming responses from OpenAI to Next.JS on Vercel

ElfHelp.ai uses GPT-3 to help you brainstorm gift ideas and then search for them on Amazon, Google, and Etsy. A few thousand people tried it out over the holidays, way back when text-davinci-003 was state of the art. (The only reason it's not on gpt-3.5-turbo yet is because the prompt is designed to get the proper JSON formatting from text-davinci-003, but that should be a future update.)

Elf Help

Because GPT-3 models can take many seconds to return, it was timing out often. By streaming the response, we are able to keep the edge function alive and also set the stage for user experience improvements like accurate progress bar loading and/or streaming results to the UI.

Original code

/* api/generate.js */

axios.post('https://api.openai.com/v1/completions', {
  model: "text-davinci-003",
  prompt: prompt, // string defined above with user parameters
  max_tokens: 2048,
  temperature: 0.7
}, {
  headers: {
    'Content-Type': 'application/json',
    'Authorization': `Bearer ${process.env.OPENAI_KEY}`,
  },
}).then(response => {
  try {
    // processResponse is a separate function that reformats the JSON
    const processedResponse = processResponse(response.data.choices[0].text)
    res.status(200).json(processedResponse)
  } catch (e) {
    res.status(500).json({ error: 'Could not process response' })
  }
})

Note: the examples here are mostly in vanilla JavaScript, but most readers will avoid headache by using TypeScript. Gratefully in Next.JS you can migrate over piecemeal, and later in this post you'll see some TypeScript I copied and pasted in another file.

Moving to streaming

This code works locally, but while it's running in production on a serverless environment like Vercel it may time out. There are two steps required to get things working:

  1. Stream the response from OpenAI to Next.JS / Vercel
  2. Forward the response to the client as it streams in

Streaming from OpenAI (Docs)

This looks like the easier of the two steps, but don't copy and paste it just yet! By itself it turns out later to be the wrong approach.

/* api/generate.js */

import axios from 'axios'

let chunks = '' // initialize to build the response
const response = await axios.post('https://api.openai.com/v1/completions', {
  model: "text-davinci-003",
  prompt: prompt,
  max_tokens: 2048,
  temperature: 0.7,
  stream: true, // added
}, {
  headers: {
    'Content-Type': 'application/json',
    'Authorization': `Bearer ${process.env.OPENAI_KEY}`,
  },
  responseType: 'stream', // added
})

response.data.on('data', (chunk) => {
  const text = chunk.toString().slice(6)
  if (text !== '[DONE]') {
    try {
      const chunkObj = JSON.parse(text)
      chunks += chunkObj.choices[0].text
    } catch (e) {
      // probably just the end of the response: data: [DONE]
      console.error(e)
      console.log(text)
    }
  }
  else {
    console.log('spotted the elusive [DONE] string!')
  }
})

response.data.on('end', () => {
    try {
      const processedResponse = processResponse(chunks)
      res.status(200).json(processedResponse)
    } catch (e) {
      res.status(500).json({ error: 'Could not process response' })
    }
})

This works on the server, and it will print out the response as it comes in.

The problem is that the client is still waiting for a response, and when the server doesn't deliver one, the serverless platform shuts it down. Little does it know that the server is working hard to assemble a response; what it cares about is communication with the client.

In order to communicate with the client, it helps to start with the open source twitterbio.com project from Hassan El Mghari at Vercel. Coincidentally, it also has an api route at api/generate, but it adds decoupled logic to query OpenAI and return a ReadableStream that can be passed through to the client as chunks of the response stream in.

The only changes I made to the file are to switch to text-davinci-003. Note that it also just works despite being in TypeScript.

/* utils/OpenAIStream.ts */

import {
  createParser,
  ParsedEvent,
  ReconnectInterval,
} from "eventsource-parser";

export type ChatGPTAgent = "user" | "system";

export interface ChatGPTMessage {
  role: ChatGPTAgent;
  content: string;
}

export interface OpenAIStreamPayload {
  model: string;
  messages: ChatGPTMessage[];
  temperature: number;
  top_p: number;
  frequency_penalty: number;
  presence_penalty: number;
  max_tokens: number;
  stream: boolean;
  n: number;
}

export async function OpenAIStream(payload: OpenAIStreamPayload) {
  const encoder = new TextEncoder();
  const decoder = new TextDecoder();

  let counter = 0;

  const res = await fetch("https://api.openai.com/v1/completions", {
    headers: {
      "Content-Type": "application/json",
      Authorization: `Bearer ${process.env.OPENAI_KEY ?? ""}`,
    },
    method: "POST",
    body: JSON.stringify(payload),
  });

  const stream = new ReadableStream({
    async start(controller) {
      // callback
      function onParse(event: ParsedEvent | ReconnectInterval) {
        if (event.type === "event") {
          const data = event.data;
          // https://beta.openai.com/docs/api-reference/completions/create#completions/create-stream
          if (data === "[DONE]") {
            controller.close();
            return;
          }
          try {
            const json = JSON.parse(data);
            const text = json.choices[0].text || "";
            if (counter < 2 && (text.match(/\n/) || []).length) {
              // this is a prefix character (i.e., "\n\n"), do nothing
              return;
            }
            const queue = encoder.encode(text);
            controller.enqueue(queue);
            counter++;
          } catch (e) {
            // maybe parse error
            controller.error(e);
          }
        }
      }

      // stream response (SSE) from OpenAI may be fragmented into multiple chunks
      // this ensures we properly read chunks and invoke an event for each SSE event stream
      const parser = createParser(onParse);
      // https://web.dev/streams/#asynchronous-iteration
      for await (const chunk of res.body as any) {
        parser.feed(decoder.decode(chunk));
      }
    },
  });

  return stream;
}

Then back in generate.js we can actually make our handler function simpler, since most of the logic has been abstracted out by Hassan.

const payload = {
  model: 'text-davinci-003',
  prompt: prompt,
  temperature: 0.7,
  max_tokens: 2048,
  stream: true,
}

const stream = await OpenAIStream(payload)
return new Response(stream)

This gives the client the response it needs, and we can see it streaming in if we log it in the browser console. The results are not good in this video, but it demonstrates the technical change.

We also have to move the logic to process the response to the client, and concatenate the reponse chunks there as well.

/* index.jsx */

async function requestResults(e) {
  /* logic to set reqBody */

  const response = await fetch("/api/generate", {
    method: "POST",
    headers: {
      "Content-Type": "application/json",
    },
    body: reqBody, // set earlier by user input
  });
  if (!response.ok) {
    throw new Error(response.statusText);
  }

  // This data is a ReadableStream
  const data = response.body;
  if (!data) {
    return;
  }

  const reader = data.getReader();
  const decoder = new TextDecoder();
  let done = false;

  let rawIdeas = ''
  while (!done) {
    const { value, done: doneReading } = await reader.read();
    done = doneReading;
    const chunkValue = decoder.decode(value);
    rawIdeas += chunkValue
  }

  return processResponse(rawIdeas)
}

Thanks for making it this far! A neat follow-on project would be showing an accurate progress bar or streaming results as they come in. We'll probably open source the code soon, so please be in touch if you would be interested in adding those features.