赞
踩
近期在项目中出现了一个需求:对话实现打字机模式,所以学了一点sse,记录一下。
具体情况如下:
1、第三方厂商使用SSE向后端(我)提供数据
2、后端(我)再向前端提供数据(使用sse模式)
主要需要实现在于两个点:1、与第三方厂商建立sse连接。2、与前端建立sse连接。
SpringBoot版本:3.1.5,JDK:21
<dependencies> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-web</artifactId> </dependency> <dependency> <groupId>org.projectlombok</groupId> <artifactId>lombok</artifactId> <optional>true</optional> </dependency> <dependency> <groupId>cn.hutool</groupId> <artifactId>hutool-all</artifactId> <version>5.8.23</version> </dependency> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-webflux</artifactId> </dependency> </dependencies>
import cn.hutool.core.util.RandomUtil; import org.springframework.http.codec.ServerSentEvent; import org.springframework.web.bind.annotation.GetMapping; import org.springframework.web.bind.annotation.RestController; import reactor.core.publisher.Flux; import reactor.util.function.Tuple2; import java.time.Duration; import java.util.ArrayList; import java.util.List; import java.util.UUID; @RestController public class FluxServerController { @GetMapping("/server") public Flux<ServerSentEvent<String>> server(){ String kingdom = "东汉末年,山河动荡,刘汉王朝气数将尽。内有十常侍颠倒黑白,祸乱朝纲。外有张氏兄弟高呼“苍天已死,黄巾当立”的口号,掀起浩大的农民起义"; List<String> list = new ArrayList<>(); for (int i = 0; i < 10; i++) { list.add(RandomUtil.randomString(kingdom, 7)); } Flux<ServerSentEvent<String>> serverSentEventFlux = Flux.fromIterable(list) .map(data -> ServerSentEvent.builder(data).id(UUID.randomUUID().toString()).event("msg").build()) .concatWith(Flux.just(ServerSentEvent.<String>builder().id("end").event("complete").data("数据服务提供商,提供数据结束了").build())); Flux<Long> interval = Flux.interval(Duration.ofSeconds(3)); // 模拟一下3秒返回一条数据 return Flux.zip(interval, serverSentEventFlux).map(Tuple2::getT2); } }
SpringBoot:2.7.6;JDK:8
<dependencies> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter</artifactId> </dependency> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-test</artifactId> <scope>test</scope> </dependency> <dependency> <groupId>cn.hutool</groupId> <artifactId>hutool-all</artifactId> <version>5.8.23</version> </dependency> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-webflux</artifactId> </dependency> </dependencies>
@GetMapping("/client") public Flux<ServerSentEvent<String>> client(){ // 1、连接服务商: //1.1 创建WebClient WebClient client = WebClient.builder() .baseUrl("http://localhost:8080/server") .build(); //1.2 发送 POST Flux<ServerSentEvent<String>> serverSentEventFlux = client.get() .header(HttpHeaders.CONTENT_TYPE, MediaType.APPLICATION_JSON_VALUE) .retrieve() .bodyToFlux( new ParameterizedTypeReference<ServerSentEvent<String>>() { } ); // 2、处理接收数据并返回给前端 return Flux.<ServerSentEvent<String>>create(sink -> { serverSentEventFlux.subscribe(new BaseSubscriber<ServerSentEvent<String>>() { @Override protected void hookOnSubscribe(Subscription subscription) { // 如果要后端关闭与服务商的连接或者与前端的连接,这里把subscription和sink放到concurrentHashMap。就可以用sink控制与前端的sse关闭,用subscription控制和服务商的sse关闭 super.hookOnSubscribe(subscription); } @Override protected void hookOnNext(ServerSentEvent<String> value) { // 这里处理所有的数据信息 System.out.println(value); sink.next(ServerSentEvent.builder(value.data()).id(UUID.randomUUID().toString()).event(value.event()).build()); } @Override protected void hookOnComplete() { System.out.println("SSE结束了!!"); super.hookOnComplete(); } @Override protected void hookOnError(Throwable throwable) { super.hookOnError(throwable); } @Override protected void hookOnCancel() { super.hookOnCancel(); } }); }); }
在这里可以设置不少东西,我在实际项目中就用到了超时
我用的时候基本就是对hookOnSubscribe、hookOnNext的改写,一个用来存连接到concurrentHashMap中,一个处理服务商的数据。
!!还需要注意的是如果正式环境走的NGINX代理,需要设置NGINX超时时间,我设置的超时时间是大于后端连接服务商的超时时间,来保证连接不会被NGINX关闭。
location /api/sse/ {
proxy_pass http://1.1.1.1:90/;
proxy_buffering off; # 禁用缓冲,确保实时性
proxy_set_header Connection ''; # 避免Nginx关闭连接
proxy_read_timeout 120s; # 增加超时时间,确保长连接
}
不太懂前端,copy了一份前端连接sse的代码。(亲测可用)
需要npm安装这个:event-source-polyfill
!!!这份前端代码是一个示例和上面代码没有联系
createSSE() { let that = this // qrId let qrId = this.wxLoginParam.qrId let source = new EventSourcePolyfill( `/api/stream/login?qrId=` + qrId) this.eventSource = source source.addEventListener('open', (e) => { console.log('open', e) }) source.addEventListener('message', (e) => { console.log('message', e) }) source.addEventListener('START', (e) => { console.log('start', e) }) source.addEventListener('EXPIRE', (e) => { let res = JSON.parse(e.data); this.$u.toast(res.message); this.changeEmailLogin() console.log('EXPIRE', e) source.close(); }) source.addEventListener('error', (e) => { console.log('error', e) source.close(); }) source.addEventListener('end', (e) => { console.log('end', e) }) source.addEventListener('SUCCESS', (e) => { this.$u.toast('登录成功'); console.log('SUCCESS', e) let res = JSON.parse(e.data); console.log(res) uni.setStorage({ key: 'Wukong-Token', data: res.data }) let redirectUrl = "/pages/sys/book/index" // 获取存储的 redirectUrl uni.getStorage({ key: 'loginRedirect', success: function (res) { // 打印获取到的 redirectUrl console.log(res.data); // 使用 setTimeout 来延迟执行 uni.reLaunch redirectUrl = res.data }, fail: function (err) { // 处理获取失败的情况 redirectUrl = "/pages/sys/book/index" } }); uni.removeStorage('loginRedirect') setTimeout(() => { uni.reLaunch({ url: redirectUrl }); }, 500); }) },
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。