赞
踩
对于某些需要实时更新的数据(例如Facebook/Twitter 更新、估价更新、新的博文、赛事结果等)来说,有这么几种解决方案:
Polling(轮询)
在客户端重复的向服务端发送新请求。如果服务器没有新的数据更动,关闭本次连接。然后客户端在稍等一段时间之后,再次发起新请求,一直重复这样的步骤。
Long-polling(长轮询)
在长轮询中,客户端发送一个请求到服务端。如果服务端没有新的数据更动,那么本次连接将会被保持,直到等待到更新后的数据,返回给客户端并关闭这个连接。
Server-Sent Events
SSE类似于长轮询的机制,但是它在每一次的连接中,不只等待一次数据的更动。客户端发送一个请求到服务端 ,服务端保持这个请求直到一个新的消息准备好,将消息返回至客户端,此时不关闭连接,仍然保持它,供其它消息使用。SSE的一大特色就是重复利用一个连接来处理每一个消息(又称event)。
WebSocket
WebSocket不同于以上的这些技术,因为它提供了一个真正意义上的双向连接。WebSocket是HTML5中非常强大的新特性,已经得到广泛应用。
一般来说HTTP协议是要客户端先请求服务器,服务器才能响应给客户端,无法做到服务器主动推送信息。但是,有一种变通方法,就是服务器向客户端声明,接下来要发送的是流信息(event-streaming)。
也就是说,发送的不是一次性的数据包,而是一个数据流,会连续不断地发送过来。这时,客户端不会关闭连接,会一直等着服务器发过来的新的数据流,视频播放就是这样的例子。本质上,这种通信就是以流信息的方式,完成一次用时很长的下载。
SSE 就是利用这种机制,使用流信息向客户端推送信息。
event: 事件类型,服务端可以自定义,默认是message事件
Id: 每一条事件流的ID,在失败重传事件流的时候有重要作用
retry: 浏览器连接断开之后重连的间隔时间,单位:毫秒,在自动重新连接的过程中,之前收到的最后一个事件流ID会被发送到服务端。
data: 发送的数据
每个字段K-V后面用"\n"结尾,如:
借助webflux实现定时
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-webflux</artifactId>
</dependency>
springMvc支持sse
服务端代码
@GetMapping("/sse") @CrossOrigin public SseEmitter handleSse() { SseEmitter emitter = new SseEmitter(); Flux.interval(Duration.ofSeconds(1)) .map(i -> "Server-Sent Event #" + i) .doOnCancel(() -> emitter.complete()) .subscribe( data -> { emitter.send(data); }, error -> emitter.completeWithError(error), () -> emitter.complete() ); return emitter; }
postMan请求
效果
可以发现与普通http请求不同地方在于请求过程中有SseEmitter这个对象
首先返回这个对象,后续通过SseEmitter对浏览器推送消息
首先介绍返回值的HandlerMethodReturnValueHandler
org.springframework.web.method.support.HandlerMethodReturnValueHandlerComposite#selectHandler
@Nullable
private HandlerMethodReturnValueHandler selectHandler(@Nullable Object value, MethodParameter returnType) {
boolean isAsyncValue = isAsyncReturnValue(value, returnType);
for (HandlerMethodReturnValueHandler handler : this.returnValueHandlers) {
if (isAsyncValue && !(handler instanceof AsyncHandlerMethodReturnValueHandler)) {
continue;
}
if (handler.supportsReturnType(returnType)) {
return handler;
}
}
return null;
}
springMvc内置了很多处理器
简单介绍其中的几种处理器
public void handleReturnValue(@Nullable Object returnValue, MethodParameter returnType, ModelAndViewContainer mavContainer, NativeWebRequest webRequest) throws Exception {<!-- --> // 返回值为空处理 ... HttpServletResponse response = webRequest.getNativeResponse(HttpServletResponse.class); ServerHttpResponse outputMessage = new ServletServerHttpResponse(response); // 返回值为ResponseEntity<ResponseBodyEmitter> 或 ResponseEntity<SseEmitter>时的处理 ... ServletRequest request = webRequest.getNativeRequest(ServletRequest.class); ResponseBodyEmitter emitter; if (returnValue instanceof ResponseBodyEmitter) {<!-- --> emitter = (ResponseBodyEmitter) returnValue; }else {<!-- --> // 这里是响应式编程解析的部分,暂时不去了解 .... } // 默认空实现,SseEmitter中覆盖重写,设置了响应头类型为MediaType.TEXT_EVENT_STREAM emitter.extendResponse(outputMessage); // 流式场景不需要对响应缓存 ShallowEtagHeaderFilter.disableContentCaching(request); // 包装响应以忽略进一步的头更改,头将在第一次写入时刷新 outputMessage = new StreamingServletServerHttpResponse(outputMessage); HttpMessageConvertingHandler handler; try {<!-- --> // 这里使用了DeferredResult DeferredResult<?> deferredResult = new DeferredResult<>(emitter.getTimeout()); //设置异步请求,可以在别的线程进行对response进行恢复 WebAsyncUtils.getAsyncManager(webRequest).startDeferredResultProcessing(deferredResult, mavContainer); handler = new HttpMessageConvertingHandler(outputMessage, deferredResult); } catch (Throwable ex) {<!-- --> emitter.initializeWithError(ex); throw ex; } // 这块是主要逻辑 emitter.initialize(handler); }
这里介绍DeferredResult
建立一次连接,让他们等待尽可能长的时间。这样同时如果有新的数据到达服务器,服务器可以直接返回响应。通过这种方式,我们绝对可以减少所涉及的请求和响应周期的数量。
浏览器发起异步请求
请求到达服务端被挂起(使用浏览器查看请求状态,此时为pending)
向浏览器进行响应,分为两种情况:
3.1 调用DeferredResult.setResult(),请求被唤醒,返回结果
3.2 超时,返回一个你设定的结果
浏览得到响应,再次重复1,处理此次响应结果
下面代码逻辑是只要有告警状态的变更,就从数据库中获取所有设备的最新状态,然后和redis中保存的比较,如果有更新,就调setResult方法,把结果返回给客户端。如果没有更新,就在20秒后跳出循环。
@GetMapping("/defferResult") @CrossOrigin @ResponseBody public DeferredResult defferResult() { DeferredResult deferredResult = new DeferredResult(3*1000l); Executors.newSingleThreadExecutor().execute(()->{ try { sleep(4000l); } catch (InterruptedException e) { e.printStackTrace(); } deferredResult.setResult("sucess"); }); return deferredResult; }
如果超时会抛出AsyncRequestTimeoutException异常,并且由springmvc处理,对于返回值,也是由springMvc进行再次处理。
例如抛出超时异常,可以被我们进行处理
因此DeferredResult会通过两次springMvc的解析,一次是返回自身deferredResult,一次是设置是设置结果。
也就是说,利用了DeferredResult对http请求起到挂起作用。
handler = new HttpMessageConvertingHandler(outputMessage, deferredResult);
根据前面我们看出来
后续消息的发送都通过这个handler
synchronized void initialize(Handler handler) throws IOException {<!-- --> this.handler = handler; try {<!-- --> // 遍历之前发送的数据 for (DataWithMediaType sendAttempt : this.earlySendAttempts) {<!-- --> // 这里会调用handler的send方法 sendInternal(sendAttempt.getData(), sendAttempt.getMediaType()); } }finally {<!-- --> this.earlySendAttempts.clear(); } // 数据是否已经发完了 if (this.complete) {<!-- --> // 有没有报错 if (this.failure != null) {<!-- --> this.handler.completeWithError(this.failure); }else {<!-- --> // 这里最终会调用DefferedResult.setResult this.handler.complete(); } }else {<!-- --> this.handler.onTimeout(this.timeoutCallback); this.handler.onError(this.errorCallback); this.handler.onCompletion(this.completionCallback); } }
这里需要注意的地方,在handler没有初始化反正的时候,调用SseEmitter的send发送的消息都暂存到earlySendAttempts,当初始化完成后,首先将之前暂存的消息进行发送
this.handler.onTimeout(this.timeoutCallback);
this.handler.onError(this.errorCallback);
this.handler.onCompletion(this.completionCallback);
分别设置DeferredResult的onTimeout,onError,onCompletion方法。
总体来说sse方式,还是利用了DeferredResult异步响应的方式
首先调用emitter的send方法
@Override
public void send(Object object) throws IOException {
send(object, null);
}
@Override
public void send(Object object, @Nullable MediaType mediaType) throws IOException {
send(event().data(object, mediaType));
}
这里我们发现,这种发送消息的方式事件的消息体只有data属性,而没有id,和retry属性。
如果需要id等属性可以这样写
@GetMapping("/sse") @CrossOrigin @ResponseBody public SseEmitter handleSse() { SseEmitter emitter = new SseEmitter(20000l); AtomicInteger atomicInteger = new AtomicInteger(); Flux.interval(Duration.ofSeconds(1)) .map(i -> "Server-Sent Event #" + i) .doOnCancel(() -> emitter.complete()) .subscribe( data -> { try { emitter.send(SseEmitter.event() .id(String.valueOf(atomicInteger.getAndAdd(1))) .name("message") .data("Message " + atomicInteger.get())); // emitter.complete(); } catch (IOException e) { emitter.completeWithError(new RuntimeException("发送错误")); } }, error -> emitter.completeWithError(error), () -> emitter.complete() ); return emitter; }
客户端收到id的话,如果消息未接受完毕连接就断开了,那么可以再次通过最后的id请求服务器,服务器根据id继续发送消息。进行短线重连的操作
构建事件进行发送
public void send(SseEventBuilder builder) throws IOException {
Set<DataWithMediaType> dataToSend = builder.build();
synchronized (this) {
for (DataWithMediaType entry : dataToSend) {
super.send(entry.getData(), entry.getMediaType());
}
}
分别发送三次,正好构成了
data:hello world\n\n一个消息的数据格式,浏览器可以对应的进行解析
private class HttpMessageConvertingHandler implements ResponseBodyEmitter.Handler {<!-- --> ... @SuppressWarnings("unchecked") private <T> void sendInternal(T data, @Nullable MediaType mediaType) throws IOException {<!-- --> // RequestMappingHandlerAdapter实例化的时候会设置,例如ByteArrayHttpMessageConverter,StringHttpMessageConverter for (HttpMessageConverter<?> converter : ResponseBodyEmitterReturnValueHandler.this.sseMessageConverters) {<!-- --> if (converter.canWrite(data.getClass(), mediaType)) {<!-- --> // 将消息写入输出流 ((HttpMessageConverter<T>) converter).write(data, mediaType, this.outputMessage); this.outputMessage.flush(); return; } } throw new IllegalArgumentException("No suitable converter for " + data.getClass()); } }
在http请求挂起期间,还可以对ServerHttpResponse进行写入操作,不断的发送给浏览器。
发送完成执行emitter.complete(),完成请求,结束对http请求的挂起。
发送中间出现错误,将错误信息发送给浏览器
SseEmitter emitter = new SseEmitter(1000l);
构造SseEmitter时可以指定超时时间,实际就是DeferredResult的超时时间,
当到达超时时间,连接自动释放掉,因此需要设置适当的超时时间
需要借助okhttp3
<dependency>
<groupId>com.squareup.okhttp3</groupId>
<artifactId>okhttp-sse</artifactId>
<version>3.14.9</version>
</dependency>
编写响应监听器
package com.unfbx.chatgpt.sse; import lombok.SneakyThrows; import lombok.extern.slf4j.Slf4j; import okhttp3.Response; import okhttp3.ResponseBody; import okhttp3.sse.EventSource; import okhttp3.sse.EventSourceListener; import java.util.Objects; /** * 描述: sse * * @author https:www.unfbx.com * 2023-02-28 */ @Slf4j public class ConsoleEventSourceListener extends EventSourceListener { @Override public void onOpen(EventSource eventSource, Response response) { log.info("OpenAI建立sse连接..."); } @Override public void onEvent(EventSource eventSource, String id, String type, String data) { log.info("OpenAI返回数据:{}", data); if (data.equals("[DONE]")) { log.info("OpenAI返回数据结束了"); return; } } @Override public void onClosed(EventSource eventSource) { log.info("OpenAI关闭sse连接..."); } @SneakyThrows @Override public void onFailure(EventSource eventSource, Throwable t, Response response) { if(Objects.isNull(response)){ log.error("OpenAI sse连接异常:{}", t); eventSource.cancel(); return; } ResponseBody body = response.body(); if (Objects.nonNull(body)) { log.error("OpenAI sse连接异常data:{},异常:{}", body.string(), t); } else { log.error("OpenAI sse连接异常data:{},异常:{}", response, t); } eventSource.cancel(); } }
发送请求,并且设置监听器对sse事件进行监听
public static void main(String[] args) { try { OkHttpClient okHttpClient = new OkHttpClient .Builder() .connectTimeout(30, TimeUnit.SECONDS) .writeTimeout(30, TimeUnit.SECONDS) .readTimeout(30, TimeUnit.SECONDS) .build(); EventSource.Factory factory = EventSources.createFactory(okHttpClient); Request request = new Request.Builder() .url("http://localhost:8062/user/sse") .get() .build(); //创建事件 EventSource eventSource = factory.newEventSource(request, new ConsoleEventSourceListener()); } catch (Exception e) { log.error("请求参数解析异常:{}", e); e.printStackTrace(); } }
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。