Skip to content

服务器发送事件

英文为 server sent events 简称 SSE, 是基于HTTP协议的一种通信方式与 WebSocket 不同的是, 这种通信方式是单向的, 只能服务器先客户端发送事件, 客户端不能向服务器发送事件

服务端实现

废话不多说, 直接使用 express 实现一个最简单的支持 sse 通信方式的服务

js
import express from "express";
import cors from "cors";

const app = express();
const port = 3000;

app.use(cors());
app.use(express.static('public'))

// 自增id
let __id = 0;
const uuid = () => ++__id;

// sse
let timer;
app.get("/sse", (_req, res) => {
  res.writeHead(200, {
    "Connection": "keep-alive",
    "Cache-Control": "no-cache",
    "Content-Type": "text/event-stream;charset=utf-8;",
  });

  function formatMessage(event, payload = null) {
    const id = uuid();
    const data = JSON.stringify({
      id,
      event,
      payload,
    });
    // 每条消息格式必须是这样带有两个换行的
    return `data:${data}\n\n`;
  }

  function send(event, payload) {
    res.write(formatMessage(event, payload));
  }

  function disconnect() {
    send("end", null);
    res.end();
  }

  const messages = [
    "稻香",
    "[ar:周杰伦]",
    "对这个世界如果你有太多的抱怨",
    "跌倒了就不敢继续往前走",
    "为什麽人要这麽的脆弱 堕落",
    "请你打开电视看看",
    "多少人为生命在努力勇敢的走下去",
    "我们是不是该知足",
    "珍惜一切 就算没有拥有",
    "还记得你说家是唯一的城堡",
    "随着稻香河流继续奔跑",
    "微微笑 小时候的梦我知道",
    "不要哭让萤火虫带着你逃跑",
    "乡间的歌谣永远的依靠",
    "回家吧 回到最初的美好",
    "不要这麽容易就想放弃 就像我说的",
    "追不到的梦想 换个梦不就得了",
    "为自己的人生鲜艳上色 先把爱涂上喜欢的颜色",
    "笑一个吧 功成名就不是目的",
    "让自己快乐快乐这才叫做意义",
    "童年的纸飞机 现在终於飞回我手里",
    "所谓的那快乐 赤脚在田里追蜻蜓追到累了",
    "偷摘水果被蜜蜂给叮到怕了 谁在偷笑呢",
    "我靠着稻草人吹着风唱着歌睡着了",
    "哦 哦 午后吉它在虫鸣中更清脆",
    "哦 哦 阳光洒在路上就不怕心碎",
    "珍惜一切 就算没有拥有",
    "还记得你说家是唯一的城堡",
    "随着稻香河流继续奔跑",
    "微微笑 小时候的梦我知道",
    "不要哭让萤火虫带着你逃跑",
    "乡间的歌谣永远的依靠",
    "回家吧 回到最初的美好",
    "还记得你说家是唯一的城堡",
    "随着稻香河流继续奔跑",
    "微微笑 小时候的梦我知道",
    "不要哭让萤火虫带着你逃跑",
    "乡间的歌谣永远的依靠",
    "回家吧 回到最初的美好",
  ];

  let i = 0;
  i === 0 && send("start", null);
  timer && clearInterval(timer);
  timer = setInterval(() => {
    if (i >= messages.length) {
      disconnect();
    } else {
      send("message", messages[i]);
      i += 1;
    }

  }, 100);
});

app.listen(port, () => {
  const url = `http://localhost:${port}`;
  console.log(`server started on: ${url}`);
});

启动服务, 打开浏览器访问 http://localhost:3000 点击按钮查看控制台输出

客户端实现

  1. 使用 EventSource WebAPI 实现 直观
  2. 使用 fetch-event-source 第三方包实现 操作简便
  3. 使用 axios + 手动解析二进制流数据实现 兼容性好

这 3 种方式各有优缺点, 但我会尽量封装成一样的API

txt
.
├── README.md
├── index.html
├── package.json
├── pnpm-lock.yaml
├── src
│   ├── App.vue                      # 入口文件
│   ├── hooks
│   │   ├── axios-event-source.js    # 使用 axios + 手动解析数据流的方式实现
│   │   ├── event-source.js          # 使用 EventSource webAPI 实现
│   │   ├── fetch-event-source.js    # 使用 fetch-event-source 实现
│   │   └── index.js                 # 导出三个hooks
│   └── main.js
└── vite.config.js
vue
<template>
  <div>
    <button @click="handleEvnetSource">event source</button>
    <button @click="handleFetchEventSource">fetch-event-source</button>
    <button @click="handleAxiosWithParser">axios-with-parser</button>
  </div>
</template>

<script setup>
import {
  useEventSource,
  useFetchEventSource,
  useAxiosEventSource,
} from "@/hooks";

const url = "http://localhost:3000/sse";

// event source
const es = useEventSource();
function handleEvnetSource() {
  es.connect(url, {
    onOpen(e) {
      console.log("服务器链接成功", e);
    },
    onError(e) {
      console.log("发生错误", e);
    },
    onMessage(e) {
      if (e.event === "end") {
        console.log("断开链接:", e);
        es.disconnect();
      } else {
        console.log("收到消息:", e);
      }
    },
  });
}

const fes = useFetchEventSource();
function handleFetchEventSource() {
  fes.setRetryTimes(3);
  fes.connect(url, {
    method: "get",
    params: {
      field: "value"
    },
    headers: {
      Authorization: `Bearer your_token_string`,
    },
    onOpen(e) {
      console.log("服务器链接成功", e);
    },
    onClose(e) {
      console.log("服务器断开链接", e);
    },
    onError(e) {
      console.log("发生错误", e);
    },
    onMessage(e) {
      console.log("收到消息", e);
    }
  })
}

const aes = useAxiosEventSource();
function handleAxiosWithParser() {
  aes.connect(url, {
    method: "get",
    params: {
      field: "value"
    },
    headers: {
      Authorization: `Bearer your_token_string`,
    },
    onOpen(e) {
      console.log("服务器链接成功", e);
    },
    onError(e) {
      console.log("发送错误", e);
    },
    onMessage(e) {
      console.log("接收到数据", e);
    }
  });
}
</script>

使用 EventSource API 客户端

  • 浏览器原生支持, 不需要安装额外的第三方包
  • 无法携带一些参数给到服务端
  • 无法在nodejs端使用
js
const isCallable = (fn) => typeof fn === "function";

export function useEventSource() {
  let eventSource = null;
  let isInitialize = false;

  // 链接服务器
  function connect(url, handlers = {}, opts = {}) {
    eventSource = new EventSource(url, opts);
    isInitialize = true;
    if (typeof url !== "string") {
      throw new TypeError("[useEventSource]url is not a string");
    }

    const { onOpen, onMessage, onError } = handlers;

    eventSource.addEventListener("message", (e) => {
      if (!isCallable(onMessage)) {
        throw new TypeError("[useEventSource]onMessage is not a function");
      }
      try {
        const data = JSON.parse(e.data);
        onMessage(data);
      } catch (e) {
        console.log("[useEventSource connect onmessage]:failed to parse json", e);
      }
    });
    eventSource.addEventListener("open", (e) => {
      isCallable(onOpen) && onOpen(e);
    });

    eventSource.addEventListener("error", (e) => {
      isCallable(onError) && onError(e);
    });
  }

  // 主动断开
  function disconnect() {
    if (isInitialize) {
      eventSource.close();
    }
  }

  // 获取 EventSource 实例
  function getEventSourceInst() {
    if (isInitialize) {
      return eventSource;
    }
  }

  return {
    connect,
    disconnect,
    getEventSourceInst,
  };
}

使用 fetch-event-source

  • 更好的API封装, 需要安装 fetch-event-source
  • 可携带数据给服务端, 自动重试控制
  • 无法在nodejs端使用
js
import { fetchEventSource, EventStreamContentType } from '@microsoft/fetch-event-source';

// 可重试错误, 在 onopen 方法中抛出这个错误就会自动重试
export class RetriableError extends Error { }

// 致命错误, 在 onerror 方法中抛出这个错误就会终止重试
export class FatalError extends Error { }

// 判断一个值是否是一个函数
const isCallable = (fn) => typeof fn === "function";

// 默认自动重试次数(设置为 Infinity 可无上限重试)
export const DefaultMaxRetryTimes = 5;

export function useFetchEventSource() {
  let retryTimes = 0;
  let maxRetryTimes = DefaultMaxRetryTimes;
  let abortController = new AbortController(); // 控制取消请求
  let isInitialize = false;

  // 连接
  async function connect(url, opts = {}) {
    const { onOpen, onMessage, onError, onClose } = opts;
    const options = Object.assign(opts, {
      signal: abortController.signal,
      async onopen(res) {
        // 注意: EventStreamContentType 常量的值是字符串: "text/event-stream"
        if (res.ok && res.headers.get("content-type").includes(EventStreamContentType)) {
          isCallable(onOpen) && onOpen(res);
          return Promise.resolve();
        }
        throw new RetriableError();
      },
      onerror(e) {
        // 控制重试次数
        if (retryTimes >= maxRetryTimes) {
          throw new FatalError();
        }
        retryTimes++;
        isCallable(onError) && onError(e);
      },
      onclose(e) {
        isCallable(onClose) && onClose(e);
      },
      onmessage(e) {
        // 处理消息
        try {
          const data = JSON.parse(e.data);
          isCallable(onMessage) && onMessage(data);
        } catch (e) {
          console.error(`[fetch-event-source connect onmessage]:JSON parse error:`, e);
        }
      },
    });
    isInitialize = true;
    await fetchEventSource(url, options);
  }

  // 断开链接
  function disconnect() {
    if (isInitialize) {
      abortController.abort();
    }
  }

  // 设置最大重试次数
  function setRetryTimes(times = DefaultMaxRetryTimes) {
    maxRetryTimes = times;
  }

  return {
    connect,
    disconnect,
    setRetryTimes,
  }
}

使用 axios + 手动解析数据

  • 更好的兼容性, 需要安装axios
  • 可携带数据给服务端, 但是需要手动解析数据流
  • 可以在nodejs端使用
  • 没有自动重试, 需要手动实现重试
js
import axios from "axios";

const isCallable = (fn) => typeof fn === "function";
const isObject = (value) => value !== null && typeof value === "object";

// 可读的二进制流解码
async function readableStreamParser(stream, callback) {
  const decoder = new TextDecoder();
  const reader = stream.getReader();
  const prefix = "data:";
  const prefixLength = prefix.length;

  if (!isCallable(callback)) {
    throw new TypeError("[readableStreamParser]callback is not a function");
  }

  while (true) {
    const { done, value } = await reader.read();
    if (done) { // 已解析完
      break;
    }

    const chunk = decoder.decode(value, {
      stream: true,
    });

    // 去除不需要的数据, 因为有些空行是没用的, 不需要触发 onMessage
    // 每行的数据格式 'data:{"event":"start","payload":null}'
    const lines = chunk.split('\n\n');

    for (let i = 0, l = lines.length; i < l; i++) {
      const line = lines[i];
      if (line.startsWith(prefix)) {
        const data = line.substring(prefixLength);
        // 注: 如果服务端给的不是json格式, 这里会抛出异常, 然后执行 onError
        callback(JSON.parse(data));
      }
    }
  }
}

export function useAxiosEventSource() {
  let controller = null;
  let isInitialize = false;

  // 发送请求链接服务器
  async function connect(url, opts = {}) {
    const defaultOpts = {
      headers: {
        Accept: "text/event-stream"
      },
    };

    // 确保会设置 Accept 请求头
    if (isObject(opts.headers)) {
      opts.headers = Object.assign(opts.headers, defaultOpts.headers)
    }

    controller = new AbortController();
    const options = Object.assign(defaultOpts, opts, {
      url,
      signal: controller.signal,
      responseType: "stream",
      adapter: "fetch",
    });

    const { onOpen, onMessage, onError } = opts;

    isInitialize = true;
    return axios(options).then((res) => {
      isCallable(onOpen) && onOpen(res);
      // 手动解码
      readableStreamParser(res.data, onMessage);
    }).catch((err) => {
      // 如果报错
      isCallable(onError) && onError(err);
    });
  }

  // 断开链接
  function disconnect() {
    if (isInitialize) {
      controller.abort();
    }
  }

  return {
    connect,
    disconnect,
  }
}

Released under the MIT License.