Skip to content
0

SSE(Server send event)

服务器向浏览器推送消息,除了 WebSocket ,还有一种叫做 SSE 的方法

特点

严格地说,HTTP 协议无法做到服务器主动推送信息。但是,有一种变通方法,就是服务器向客户端声明,接下来要发送的是流信息(streaming)。

也就是说,发送的不是一次性的数据包,而是一个数据流,会连续不断地发送过来。这时,客户端不会关闭连接,会一直等着服务器发过来的新的数据流,视频播放就是这样的例子。本质上,这种通信就是以流信息的方式,完成一次用时很长的下载。

SSE 就是利用这种机制,使用流信息向浏览器推送信息。它基于 HTTP 协议。

优点

  • SSE 使用 HTTP 协议,现有的服务器软件都支持。WebSocket 是一个独立协议。
  • SSE 属于轻量级,使用简单;WebSocket 协议相对复杂。
  • SSE 默认支持断线重连,WebSocket 需要自己实现。
  • SSE 一般只用来传送文本,二进制数据需要编码后传送,WebSocket 默认支持传送二进制数据。
  • SSE 支持自定义发送的消息类型。

数据格式

上面说到,通信的本质是利用客户端不关闭链接,通过不断接收流消息来接收信息,所有就需要有一种通用的数据格式

响应头

Content-Type: text/event-stream
Cache-Control: no-cache
Connection: keep-alive

消息体

每一次发送的信息,由若干个 message 组成,每个 message 之间用 \n\n 分隔。每个 message 内部由若干行组成,每一行都是如下格式:

[field]: value\n

其中 field 可以是:

  • id
  • retry
  • event
  • data

客户端例子

<script>
    const sse = new EventSource("/stream")
    sse.onopen = function (event) {
        console.log('open', event.data);
    };
    sse.onmessage = function (event) {
        var data = event.data;
        console.log('message', event.data);
    };
    sse.onerror = function (event) {
        console.log('error', enven.data)
    };
</script>

服务端例子

import { createApp, createRouter, eventHandler, sendStream, setHeaders, toNodeListener } from "h3"
import { createServer } from "http"
import { PassThrough } from "stream"
import fs from 'fs'

const app = createApp()

let id = 0
let stream = new PassThrough()

const router = createRouter()
    .get(
        '/',
        eventHandler(_ => fs.readFileSync('./index.html'))
    )
    .get(
        '/stream',
        eventHandler(event => {
            setHeaders(event, {
                'Content-Type': 'application/json',
                'Connection': 'Keep-Alive',
                'Cache-Control': 'no-cache'
            })
            sendStream(event, stream)
            stream.write(`id: ${id++}\n`)
            stream.write('retry: 10000\n')
            stream.write('event: message')
            stream.write(`data: "${id}${new Date()}"\n\n`)
            stream.write(`data: "${id}${new Date()}"\n\n`)

            setInterval(() => {
                stream.write(`data: "${id}${new Date()}"\n\n`)
            }, 1000)
        })
    )

app.use(router)

createServer(toNodeListener(app)).listen(3000)

完整例子

上面的 stream 应该可以在服务器的任意地方调用,所以让我们进行封装,是之更加容易被使用

在服务器端,对不同的 ip 地址,保留了一个 stream 实例,便于在其他响应方法中去往 stream 中写入消息

<!DOCTYPE html>
<html lang="en">
<head>
    <meta charset="UTF-8">
    <meta name="viewport" content="width=device-width,initial-scale=1.0">
    <title>Document</title>
</head>
<body>
    <h2>hello</h2>
    <script>
        const sse = new EventSource("/stream/connect")
        sse.addEventListener('open', function() {
            console.log('sse open')
        })
        sse.addEventListener('update', function(event) {
            var data = event.data;
            console.log('update', event.data);
        })
        sse.addEventListener('heartbeat', function(event) {
            var data = event.data;
            console.log('heartbeat', event.data);
        })
        sse.onerror = function (event) {
            console.log('error', event.data)
        };
        setTimeout(() => {
            fetch('/stream/update')
            setTimeout(() => {
                fetch('/stream/heartbeat')
            }, 2000)
        }, 2000)
    </script>
</body>
</html>
import { createApp, createRouter, eventHandler, getRequestIP, getRequestURL,   sendNoContent, sendStream, setHeader, setHeaders, toNodeListener } from 'h3'
import { DevClient } from './devClient'
import fs from 'fs'
import { createServer } from 'http'

let clientMap = new Map()

const app = createApp()

const router = createRouter()
.get(
    '/', 
    eventHandler(_ => {
        return fs.readFileSync('index.html')
    })
)
.get(
    '/stream/connect',
    eventHandler(event => {
        const ip = getRequestIP(event)
        const client = new DevClient()
        clientMap.set(ip, client)
        setHeaders(event, {
            'Content-Type': 'text/event-stream',
            'Connection': 'Keep-Alive',
            'Cache-Control': 'no-cache'
        })
        sendStream(event, client.getStream())            
        client.send('update', 'open')
    })
)
.get(
    '/stream/update', 
    eventHandler(event => {
        const ip = getRequestIP(event)
        const client = clientMap.get(ip)
        client.send('update', 'update Message')
        sendNoContent(event, 200)
    })
)
.get(
    '/stream/heartbeat',
    eventHandler(event => {
        const ip = getRequestIP(event)
        const client = clientMap.get(ip)
        client.send('heartbeat', 'heartbeat Message')   
        sendNoContent(event, 200)
    })
)

app.use(router)

createServer(toNodeListener(app)).listen(3000)
import { PassThrough } from 'stream'

export type EventType =  'heartbeat' | 'update'

export class DevClient {
    private stream: PassThrough
    private id = 1
    constructor() {
        this.stream = new PassThrough()
    }
    
    public send(type: EventType, data: string): void {
        this.stream.write(`id: ${this.id++}\n`);
        this.stream.write(`event: ${type}\n`);
        this.stream.write(`retry: 1500\n`);
        data.split('\n').forEach(line => {
            this.stream.write(`data: ${line}\n`);
        });
        this.stream.write('\n');
    }

    public getStream(): PassThrough {
        return this.stream
    }
}

执行:

$ tsx server.ts

打开 localhost:3000,控制台输出:

sse open
update open
update send
update update Message
heartbeat heartbeat Message

Released under the MIT License.