当前位置:   article > 正文

Web服务器推送信息SSE/WebSocket_text/event-stream;charset=utf-8

text/event-stream;charset=utf-8

介绍

没有简单,通用的方法来以可接受的性能在Web应用程序中实现服务器到客户端的异步通信。

HTTP是客户端-服务器计算模型中的请求-响应协议。为了开始交换,客户端向服务器提交请求。为了完成交换,服务器将响应返回给客户端。在HTTP协议中,客户端是消息交换的发起者。

在某些情况下,服务器应该是发起者。实现此目的的方法之一是允许服务器将消息推送到发布/订阅计算模型中的客户端。

服务器发送事件(SSE)是一种用于为特定 Web应用程序实现异步服务器到客户端通信的简单技术。

总览

有几种技术可以使客户端从服务器接收有关异步更新的消息。它们可以分为两类:客户端请求  和 服务器请求

客户拉

客户端请求技术中,客户端会定期请求服务器进行更新。服务器可以响应更新,也可以响应尚未更新的特殊响应。客户端请求有两种类型:短轮询长轮询

短轮询

客户端定期向服务器发送请求。如果服务器有更新,它将向客户端发送响应并关闭连接。如果服务器没有更新,它将向客户端发送特殊响应,并关闭连接。

比如客户端使用 ajax 定时执行。

长时间轮询

客户端向服务器发送请求。如果服务器有更新,它将向客户端发送响应并关闭连接。如果服务器没有更新,它将保持连接,直到有可用的更新为止。当有可用更新时,服务器将响应发送到客户端并关闭连接。如果更新在某些超时时间内不可用,则服务器会向客户端发送特殊响应,并关闭连接。

服务器推送

服务器推送技术中,服务器在客户端可用后立即主动向客户端发送消息。其中,服务器推送有两种类型:服务器发送事件(SSE) 和 WebSocket

服务器发送的事件(SSE)

服务器发送的事件是一种仅从服务器向基于浏览器的Web应用程序中的客户端发送文本消息的技术。服务器发送的事件基于HTTP协议中的持久连接。服务器发送的事件具有W3C 标准化的网络协议和EventSource客户端接口,作为HTML5标准套件的一部分。

WebSocket

WebSocket是一种在Web应用程序中实现同时,双向,实时通信的技术。WebSocket基于除HTTP之外的协议,因此它可能需要对网络基础架构(代理服务器,NAT,防火墙等)进行额外的设置。但是,WebSocket可以提供使用基于HTTP的技术难以实现的性能。

SSE网络协议

要订阅服务器事件,客户端应GET使用标头进行请求:

  • Accept: text/event-stream指示标准要求的事件的媒体类型
  • Cache-Control: no-cache 禁用所有事件缓存
  • Connection: keep-alive指示正在使用持久连接
  1. GET /sse HTTP/1.1
  2. Accept: text/event-stream
  3. Cache-Control: no-cache
  4. Connection: keep-alive

服务器应使用以下标头的响应来确认订阅:

  • Content-Type: text/event-stream;charset=UTF-8指示标准要求的媒体类型和事件编码
  • Transfer-Encoding: chunked 表示服务器流式传输动态生成的内容,因此事先不知道内容大小
  1. HTTP/1.1 200
  2. Content-Type: text/event-stream;charset=UTF-8
  3. Transfer-Encoding: chunked

订阅后,服务器将在消息可用后立即发送消息。事件是UTF-8编码的文本消息。事件之间用两个换行符分隔\n\n。每个事件都包含一个或多个name: value字段,以单个换行符分隔\n

在该data字段中,服务器可以发送事件数据。

  1. data: The first event.
  2. data: The second event.

服务器可以data通过一个换行符将字段分成几行\n

  1. data: The third
  2. data: event.

id服务器可以在该字段中发送唯一的事件标识符。如果连接断开,客户端应自动重新连接并发送id带有header 的最后一个接收到的事件Last-Event-ID

  1. id: 1
  2. data: The first event.
  3. id: 2
  4. data: The second event.

event服务器可以在该字段中发送事件类型。服务器可以在同一预订中发送不同类型的事件,也可以不发送任何类型的事件。

  1. event: type1
  2. data: An event of type1.
  3. event: type2
  4. data: An event of type2.
  5. data: An event without any type.

retry服务器可以在该字段中发送超时(以毫秒为单位),在此之后客户端应在连接断开时自动重新连接。如果未指定此字段,则标准值为3000毫秒。

retry: 1000

如果一行以冒号开头:,则客户端应将其忽略。这可用于从服务器发送注释或防止某些代理服务器因超时而关闭连接。

: ping

SSE客户端:EventSource界面

要打开连接,应创建一个EventSource对象。

  1. var eventSource = new EventSource('/sse);

尽管服务器发送事件旨在将事件从服务器发送到客户端,但是仍然可以使用GET查询参数将数据从客户端传递到服务器

  1. var eventSource = new EventSource('/sse?event=type1);
  2. ...
  3. eventSource.close();
  4. eventSource = new EventSource('/sse?event=type1&event=type2);
  5. ...

要关闭连接,应将其称为method close()

eventSource.close();

readyState一个表示连接状态的属性:

  • EventSource.CONNECTING = 0 -尚未建立连接,或者连接已关闭并且客户端正在重新连接
  • EventSource.OPEN = 1 -客户端具有打开的连接,并在接收事件时处理事件
  • EventSource.CLOSED = 2-连接未打开,并且客户端没有尝试重新连接,或者出现致命错误或close()调用了该方法

要处理连接的建立,应将其预订给onopen事件处理程序。

  1. eventSource.onopen = function () {
  2. console.log('connection is established');
  3. };

要处理连接状态中的某些更改致命错误,应将其预订给onerrror事件处理程序。

  1. eventSource.onerror = function (event) {
  2. console.log('connection state: ' + eventSource.readyState + ', error: ' + event);
  3. };

要处理没有该event字段的接收事件,应将其预订给onmessage事件处理程序。

  1. eventSource.onmessage = function (event) {
  2. console.log('id: ' + event.lastEventId + ', data: ' + event.data);
  3. };

要使用该event字段处理接收事件,应为该事件订阅事件处理程序。

  1. eventSource.addEventListener('type1', function (event) {
  2. console.log('id: ' + event.lastEventId + ', data: ' + event.data);
  3. }, false);

 

SSE Java服务器:Spring Web MVC

介绍

Spring Web MVC框架5.2.0基于Servlet 3.1 API,并使用线程池来实现异步Java Web应用程序。此类应用程序可以在Servlet 3.1+容器(例如Tomcat 8.5和Jetty 9.3)上运行。

总览

要使用Spring Web MVC框架实现发送事件:

  1. 创建一个控制器类,并用@RestController注释对其进行标记
  2. 创建一个创建客户端连接的方法,该方法返回SseEmitter,处理GET请求并产生text/event-stream
    1. 创建一个new SseEmitter,以保存它并从方法中返回它
  3. 在另一个线程中异步发送事件,获取已保存的内容,SseEmitterSseEmitter.send根据需要多次调用方法
    1. 完成发送事件,请调用SseEmitter.complete()方法
    2. 要完成特殊的事件发送,请调用SseEmitter.completeWithError()方法

简化的控制器源:

  1. @RestController
  2. public class SseWebMvcController
  3. private SseEmitter emitter;
  4. @GetMapping(path="/sse", produces=MediaType.TEXT_EVENT_STREAM_VALUE)
  5. SseEmitter createConnection() {
  6. emitter = new SseEmitter();
  7. return emitter;
  8. }
  9. // in another thread
  10. void sendEvents() {
  11. try {
  12. emitter.send("Alpha");
  13. emitter.send("Omega");
  14. emitter.complete();
  15. } catch(Exception e) {
  16. emitter.completeWithError(e);
  17. }
  18. }
  19. }

若要仅使用data字段发送事件,应使用SseEmitter.send(Object object)方法。要发送的事件与领域dataideventretry和意见,应当使用 SseEmitter.send(SseEmitter.SseEventBuilder建设者)方法。

在下面的示例中,为了将相同的事件发送给许多客户端,实现了SseEmitters类。要创建客户端连接,有一种add(SseEmitter emitter)方法将a保存SseEmitter在线程安全的容器中。为了异步发送事件,有一种send(Object obj)方法可以将相同的事件发送到所有连接的客户端。

简化的类源:

  1. class SseEmitters {
  2. private final List<SseEmitter> emitters = new CopyOnWriteArrayList<>();
  3. SseEmitter add(SseEmitter emitter) {
  4. this.emitters.add(emitter);
  5. emitter.onCompletion(() -> {
  6. this.emitters.remove(emitter);
  7. });
  8. emitter.onTimeout(() -> {
  9. emitter.complete();
  10. this.emitters.remove(emitter);
  11. });
  12. return emitter;
  13. }
  14. void send(Object obj) {
  15. List<SseEmitter> failedEmitters = new ArrayList<>();
  16. this.emitters.forEach(emitter -> {
  17. try {
  18. emitter.send(obj);
  19. } catch (Exception e) {
  20. emitter.completeWithError(e);
  21. failedEmitters.add(emitter);
  22. }
  23. });
  24. this.emitters.removeAll(failedEmitters);
  25. }
  26. }

处理持久性周期性事件流

在此示例中,服务器每秒发送一次持续时间短的定期事件流-有限的单词流(快速的棕色狐狸跳过懒惰的狗 pangram),直到单词完成为止。

为了实现这一点,使用了提到的SseEmitters类。为了异步并定期发送事件,已创建了一个缓存的线程池。因为事件流是持久的,所以每个客户端连接都在controller方法内部将单独的任务提交给线程池。

简化的控制器源:

  1. @Controller
  2. @RequestMapping("/sse/mvc")
  3. public class WordsController {
  4. private static final String[] WORDS = "The quick brown fox jumps over the lazy dog.".split(" ");
  5. private final ExecutorService cachedThreadPool = Executors.newCachedThreadPool();
  6. @GetMapping(path = "/words", produces = MediaType.TEXT_EVENT_STREAM_VALUE)
  7. SseEmitter getWords() {
  8. SseEmitter emitter = new SseEmitter();
  9. cachedThreadPool.execute(() -> {
  10. try {
  11. for (int i = 0; i < WORDS.length; i++) {
  12. emitter.send(WORDS[i]);
  13. TimeUnit.SECONDS.sleep(1);
  14. }
  15. emitter.complete();
  16. } catch (Exception e) {
  17. emitter.completeWithError(e);
  18. }
  19. });
  20. return emitter;
  21. }
  22. }

 

具有EventSourceJavaScript客户端的事件客户端源。

  1. <!DOCTYPE html>
  2. <html lang="en">
  3. <head>
  4. <meta charset="UTF-8">
  5. <title>Server-Sent Events client example with EventSource</title>
  6. </head>
  7. <body>
  8. <script>
  9. if (window.EventSource == null) {
  10. alert('The browser does not support Server-Sent Events');
  11. } else {
  12. var eventSource = new EventSource('/sse/mvc/words');
  13. eventSource.onopen = function () {
  14. console.log('connection is established');
  15. };
  16. eventSource.onerror = function (error) {
  17. console.log('connection state: ' + eventSource.readyState + ', error: ' + event);
  18. };
  19. eventSource.onmessage = function (event) {
  20. console.log('id: ' + event.lastEventId + ', data: ' + event.data);
  21. if (event.data.endsWith('.')) {
  22. eventSource.close();
  23. console.log('connection is closed');
  24. }
  25. };
  26. }
  27. </script>
  28. </body>
  29. </html>

EventSource浏览器中带有JavaScript客户端的事件客户端示例。在客户端使用了自动重新连接,在服务器端使用了已实现的重新连接

 

处理持久的周期性事件

在此示例中,服务器发送持久的周期性事件流-服务器性能信息的每秒潜在无限流:

  • 承诺的虚拟内存大小
  • 交换空间总大小
  • 可用交换空间大小
  • 物理内存总大小
  • 可用物理内存大小
  • 系统CPU负载
  • 处理CPU负载

为了实现此目的,实现了PerformanceService类,该类使用OperatingSystemMXBean类从操作系统读取性能信息。还使用了提到的SseEmitters类。为了异步并定期发送事件,已创建了计划的线程池。因为事件流是持久的,所以将单个任务提交到线程池以将事件同时发送到所有客户端

简化的控制器示例:

  1. @RestController
  2. @RequestMapping("/sse/mvc")
  3. public class PerformanceController {
  4. private final PerformanceService performanceService;
  5. PerformanceController(PerformanceService performanceService) {
  6. this.performanceService = performanceService;
  7. }
  8. private final AtomicInteger id = new AtomicInteger();
  9. private final ScheduledExecutorService scheduledThreadPool = Executors.newScheduledThreadPool(1);
  10. private final SseEmitters emitters = new SseEmitters();
  11. @PostConstruct
  12. void init() {
  13. scheduledThreadPool.scheduleAtFixedRate(() -> {
  14. emitters.send(performanceService.getPerformance());
  15. }, 0, 1, TimeUnit.SECONDS);
  16. }
  17. @GetMapping(path = "/performance", produces = MediaType.TEXT_EVENT_STREAM_VALUE)
  18. SseEmitter getPerformance() {
  19. return emitters.add();
  20. }
  21. }

 

处理非周期性事件

在此示例中,服务器发送有关正在监视的文件夹中的文件更改(创建,修改,删除)的非定期事件流。使用该文件夹后,System.getProperty("user.home")属性将提供当前用户的主文件夹。

为了实现此目的,实现了FolderWatchService类,该类使用Java NIO文件监视功能。还使用了提到的SseEmitters类。为了异步和非周期性地发送事件,FolderWatchService类将生成Spring应用程序事件,这些事件由控制器(通过实现侦听器方法)消耗。

一个简化的服务器示例:

  1. @RestController
  2. @RequestMapping("/sse/mvc")
  3. public class FolderWatchController implements ApplicationListener<FolderChangeEvent> {
  4. private final FolderWatchService folderWatchService;
  5. FolderWatchController(FolderWatchService folderWatchService) {
  6. this.folderWatchService = folderWatchService;
  7. }
  8. private final SseEmitters emitters = new SseEmitters();
  9. @PostConstruct
  10. void init() {
  11. folderWatchService.start(System.getProperty("user.home"));
  12. }
  13. @GetMapping(path = "/folder-watch", produces = MediaType.TEXT_EVENT_STREAM_VALUE)
  14. SseEmitter getFolderWatch() {
  15. return emitters.add(new SseEmitter());
  16. }
  17. @Override
  18. public void onApplicationEvent(FolderChangeEvent event) {
  19. emitters.send(event.getEvent());
  20. }
  21. }

 

SSE Java服务器:Spring Web Flux

介绍

Spring Web Flux框架5.2.0基于Reactive Streams API,并使用事件循环计算模型来实现异步Java Web应用程序。这样的应用可以在非阻挡的Web服务器等的Netty 4.1和1.4暗流运行上的Servlet容器3.1+如Tomcat 8.5和9.3码头。

总览

要使用Spring Web Flux框架实现发送事件:

  1. 创建一个控制器类,并用@RestController注释对其进行标记
  2. 创建一个创建客户端连接并发送事件的方法,该方法返回Flux,处理GET请求并产生text/event-stream
    1. 创建一个新的Flux并从方法中返回它

简化的控制器源:

  1. @RestController
  2. public class ExampleController
  3. @GetMapping(path="/sse", produces=MediaType.TEXT_EVENT_STREAM_VALUE)
  4. public Flux<String> createConnectionAndSendEvents() {
  5. return Flux.just("Alpha", "Omega");
  6. }
  7. }

要仅使用data字段发送事件,应使用该Flux<T>类型。要发送的事件与领域dataideventretry和意见,应当使用的Flux<ServerSentEvent<T>>类型。

处理持久性周期性事件流

在此示例中,服务器每秒发送一次持续时间短的定期事件流-有限的单词流(快速的棕色狐狸跳过懒惰的狗 pangram),直到单词完成为止。

要实现这一点:

  • 创建类型为a Flux的单词Flux.just(WORDS)Flux<String>
  • 创建一个 类型每秒Flux发出递增longFlux.interval(Duration.ofSeconds(1))Flux<Long>
  • 通过zip方法将它们组合在一起键入Flux<Tuple2<String,Long>>
  • 通过map(Tuple2::getT1)类型提取元组的第一个元素Flux<String>

简化的控制器源:

  1. @RestController
  2. @RequestMapping("/sse/flux")
  3. public class WordsController {
  4. private static final String[] WORDS = "The quick brown fox jumps over the lazy dog.".split(" ");
  5. @GetMapping(path = "/words", produces = MediaType.TEXT_EVENT_STREAM_VALUE)
  6. Flux<String> getWords() {
  7. return Flux
  8. .zip(Flux.just(WORDS), Flux.interval(Duration.ofSeconds(1)))
  9. .map(Tuple2::getT1);
  10. }
  11. }

此示例的事件客户端与Web MVC示例中使用的事件客户端相同。

处理持久的周期性事件

在此示例中,服务器发送持久的周期性事件流-服务器性能信息的每秒潜在无限流。

要实现这一点:

  • 创建一个类型每秒Flux发出递增longFlux.interval(Duration.ofSeconds(1))Flux<Long>
  • 通过map(sequence -> performanceService.getPerformance())方法将其转换为类型Flux<Performance>

简化的控制器示例:

  1. @RestController
  2. @RequestMapping("/sse/flux")
  3. public class PerformanceController {
  4. private final PerformanceService performanceService;
  5. PerformanceController(PerformanceService performanceService) {
  6. this.performanceService = performanceService;
  7. }
  8. @GetMapping(path = "/performance", produces = MediaType.TEXT_EVENT_STREAM_VALUE)
  9. Flux<Performance> getPerformance() {
  10. return Flux
  11. .interval(Duration.ofSeconds(1))
  12. .map(sequence -> performanceService.getPerformance());
  13. }
  14. }

此示例的事件客户端与Web MVC示例中使用的事件客户端相同。

处理非周期性事件

在此示例中,服务器发送有关正在监视的文件夹中的文件更改(创建,修改,删除)的非定期事件流。使用该文件夹后,System.getProperty("user.home")属性将提供当前用户的主文件夹。

为了实现此目的,实现了FolderWatchService类,该类使用Java NIO文件监视功能。为了异步和非周期性地发送事件,FolderWatchService类将生成Spring应用程序事件,这些事件由控制器(通过实现侦听器方法)消耗。控制器侦听器方法将事件发送到SubscribableChannel,在控制器方法中订阅该Flux事件以产生事件。

简化的控制器示例:

  1. @RestController
  2. @RequestMapping("/sse/flux")
  3. public class FolderWatchController implements ApplicationListener<FolderChangeEvent> {
  4. private final FolderWatchService folderWatchService;
  5. FolderWatchController(FolderWatchService folderWatchService) {
  6. this.folderWatchService = folderWatchService;
  7. }
  8. private final SubscribableChannel subscribableChannel = MessageChannels.publishSubscribe().get();
  9. @PostConstruct
  10. void init() {
  11. folderWatchService.start(System.getProperty("user.home"));
  12. }
  13. @GetMapping(path = "/folder-watch", produces = MediaType.TEXT_EVENT_STREAM_VALUE)
  14. Flux<FolderChangeEvent.Event> getFolderWatch() {
  15. return Flux.create(sink -> {
  16. MessageHandler handler = message -> sink.next(FolderChangeEvent.class.cast(message.getPayload()).getEvent());
  17. sink.onCancel(() -> subscribableChannel.unsubscribe(handler));
  18. subscribableChannel.subscribe(handler);
  19. }, FluxSink.OverflowStrategy.LATEST);
  20. }
  21. @Override
  22. public void onApplicationEvent(FolderChangeEvent event) {
  23. subscribableChannel.send(new GenericMessage<>(event));
  24. }
  25. }

此示例的事件客户端与Web MVC示例中使用的事件客户端相同。

SSE限制

SSE 在设计上有局限性:

  • 从服务器到客户端只能在一个方向上发送消息
  • 可以只发送短信;尽管可以使用Base64编码和gzip压缩来发送二进制消息,但效率低下。

但是SSE 在实施方面也存在局限性 :

  • Internet Explorer / Edge和许多移动浏览器不支持SSE。尽管可以使用polyfills,但效率低下
  • 许多浏览器允许打开数量非常有限的SSE连接(对于Chrome,Firefox,每个浏览器最多支持6个连接)
本文内容由网友自发贡献,转载请注明出处:【wpsshop博客】
推荐阅读
相关标签
  

闽ICP备14008679号