SSE(Server send event)
服务器向浏览器推送消息,除了 WebSocket ,还有一种叫做 SSE 的方法
特点
严格地说,HTTP 协议无法做到服务器主动推送信息。但是,有一种变通方法,就是服务器向客户端声明,接下来要发送的是流信息(streaming)。
也就是说,发送的不是一次性的数据包,而是一个数据流,会连续不断地发送过来。这时,客户端不会关闭连接,会一直等着服务器发过来的新的数据流,视频播放就是这样的例子。本质上,这种通信就是以流信息的方式,完成一次用时很长的下载。
SSE 就是利用这种机制,使用流信息向浏览器推送信息。它基于 HTTP 协议。
优点
SSE使用HTTP协议,现有的服务器软件都支持。WebSocket是一个独立协议。SSE属于轻量级,使用简单;WebSocket协议相对复杂。SSE默认支持断线重连,WebSocket需要自己实现。SSE一般只用来传送文本,二进制数据需要编码后传送,WebSocket默认支持传送二进制数据。SSE支持自定义发送的消息类型。
数据格式
上面说到,通信的本质是利用客户端不关闭链接,通过不断接收流消息来接收信息,所有就需要有一种通用的数据格式
响应头
Content-Type: text/event-stream
Cache-Control: no-cache
Connection: keep-alive消息体
每一次发送的信息,由若干个 message 组成,每个 message 之间用 \n\n 分隔。每个 message 内部由若干行组成,每一行都是如下格式:
[field]: value\n其中 field 可以是:
- id
- retry
- event
- data
客户端例子
<script>
const sse = new EventSource("/stream")
sse.onopen = function (event) {
console.log('open', event.data);
};
sse.onmessage = function (event) {
var data = event.data;
console.log('message', event.data);
};
sse.onerror = function (event) {
console.log('error', enven.data)
};
</script>服务端例子
import { createApp, createRouter, eventHandler, sendStream, setHeaders, toNodeListener } from "h3"
import { createServer } from "http"
import { PassThrough } from "stream"
import fs from 'fs'
const app = createApp()
let id = 0
let stream = new PassThrough()
const router = createRouter()
.get(
'/',
eventHandler(_ => fs.readFileSync('./index.html'))
)
.get(
'/stream',
eventHandler(event => {
setHeaders(event, {
'Content-Type': 'application/json',
'Connection': 'Keep-Alive',
'Cache-Control': 'no-cache'
})
sendStream(event, stream)
stream.write(`id: ${id++}\n`)
stream.write('retry: 10000\n')
stream.write('event: message')
stream.write(`data: "${id}${new Date()}"\n\n`)
stream.write(`data: "${id}${new Date()}"\n\n`)
setInterval(() => {
stream.write(`data: "${id}${new Date()}"\n\n`)
}, 1000)
})
)
app.use(router)
createServer(toNodeListener(app)).listen(3000)完整例子
上面的 stream 应该可以在服务器的任意地方调用,所以让我们进行封装,是之更加容易被使用
在服务器端,对不同的 ip 地址,保留了一个 stream 实例,便于在其他响应方法中去往 stream 中写入消息
<!DOCTYPE html>
<html lang="en">
<head>
<meta charset="UTF-8">
<meta name="viewport" content="width=device-width,initial-scale=1.0">
<title>Document</title>
</head>
<body>
<h2>hello</h2>
<script>
const sse = new EventSource("/stream/connect")
sse.addEventListener('open', function() {
console.log('sse open')
})
sse.addEventListener('update', function(event) {
var data = event.data;
console.log('update', event.data);
})
sse.addEventListener('heartbeat', function(event) {
var data = event.data;
console.log('heartbeat', event.data);
})
sse.onerror = function (event) {
console.log('error', event.data)
};
setTimeout(() => {
fetch('/stream/update')
setTimeout(() => {
fetch('/stream/heartbeat')
}, 2000)
}, 2000)
</script>
</body>
</html>import { createApp, createRouter, eventHandler, getRequestIP, getRequestURL, sendNoContent, sendStream, setHeader, setHeaders, toNodeListener } from 'h3'
import { DevClient } from './devClient'
import fs from 'fs'
import { createServer } from 'http'
let clientMap = new Map()
const app = createApp()
const router = createRouter()
.get(
'/',
eventHandler(_ => {
return fs.readFileSync('index.html')
})
)
.get(
'/stream/connect',
eventHandler(event => {
const ip = getRequestIP(event)
const client = new DevClient()
clientMap.set(ip, client)
setHeaders(event, {
'Content-Type': 'text/event-stream',
'Connection': 'Keep-Alive',
'Cache-Control': 'no-cache'
})
sendStream(event, client.getStream())
client.send('update', 'open')
})
)
.get(
'/stream/update',
eventHandler(event => {
const ip = getRequestIP(event)
const client = clientMap.get(ip)
client.send('update', 'update Message')
sendNoContent(event, 200)
})
)
.get(
'/stream/heartbeat',
eventHandler(event => {
const ip = getRequestIP(event)
const client = clientMap.get(ip)
client.send('heartbeat', 'heartbeat Message')
sendNoContent(event, 200)
})
)
app.use(router)
createServer(toNodeListener(app)).listen(3000)import { PassThrough } from 'stream'
export type EventType = 'heartbeat' | 'update'
export class DevClient {
private stream: PassThrough
private id = 1
constructor() {
this.stream = new PassThrough()
}
public send(type: EventType, data: string): void {
this.stream.write(`id: ${this.id++}\n`);
this.stream.write(`event: ${type}\n`);
this.stream.write(`retry: 1500\n`);
data.split('\n').forEach(line => {
this.stream.write(`data: ${line}\n`);
});
this.stream.write('\n');
}
public getStream(): PassThrough {
return this.stream
}
}执行:
$ tsx server.ts打开 localhost:3000,控制台输出:
sse open
update open
update send
update update Message
heartbeat heartbeat Message