赞
踩
Http协议是无状态的,只能由客户端主动发起,服务端再被动响应,服务端无法向客户端主动推送内容,并且一旦服务器响应结束,链接就会断开(见注解部分),所以无法进行实时通信。WebSocket协议正是为解决客户端与服务端实时通信而产生的技术,现在已经被主流浏览器支持,所以对于Web开发者来说应该比较熟悉了,Flutter也提供了专门的包来支持WebSocket协议。
注意:Http协议中虽然可以通过keep-alive机制使服务器在响应结束后链接会保持一段时间,但最终还是会断开,keep-alive机制主要是用于避免在同一台服务器请求多个资源时频繁创建链接,它本质上是支持链接复用的技术,而并非用于实时通信,读者需要知道这两者的区别。
WebSocket协议本质上是一个基于tcp的协议,它是先通过HTTP协议发起一条特殊的http请求进行握手后,如果服务端支持WebSocket协议,则会进行协议升级。WebSocket会使用http协议握手后创建的tcp链接,和http协议不同的是,WebSocket的tcp链接是个长链接(不会断开),所以服务端与客户端就可以通过此TCP连接进行实时通信。有关WebSocket协议细节,读者可以看RFC文档,下面我们重点看看Flutter中如何使用WebSocket。
话不多说,直接撸代码Permalink
添加依赖:
web_socket_channel: ^1.1.0 # WebSocket
新建web_socket_utility.dart工具类:
- import 'dart:async';
-
- import 'package:web_socket_channel/io.dart';
- import 'package:web_socket_channel/web_socket_channel.dart';
-
- /// WebSocket地址
- const String _SOCKET_URL = 'ws://121.40.165.18:8800';
-
- /// WebSocket状态
- enum SocketStatus {
- SocketStatusConnected, // 已连接
- SocketStatusFailed, // 失败
- SocketStatusClosed, // 连接关闭
- }
-
- class WebSocketUtility {
- /// 单例对象
- static WebSocketUtility _socket;
-
- /// 内部构造方法,可避免外部暴露构造函数,进行实例化
- WebSocketUtility._();
-
- /// 获取单例内部方法
- factory WebSocketUtility() {
- // 只能有一个实例
- if (_socket == null) {
- _socket = new WebSocketUtility._();
- }
- return _socket;
- }
-
- IOWebSocketChannel _webSocket; // WebSocket
- SocketStatus _socketStatus; // socket状态
- Timer _heartBeat; // 心跳定时器
- num _heartTimes = 3000; // 心跳间隔(毫秒)
- num _reconnectCount = 60; // 重连次数,默认60次
- num _reconnectTimes = 0; // 重连计数器
- Timer _reconnectTimer; // 重连定时器
- Function onError; // 连接错误回调
- Function onOpen; // 连接开启回调
- Function onMessage; // 接收消息回调
-
- /// 初始化WebSocket
- void initWebSocket({Function onOpen, Function onMessage, Function onError}) {
- this.onOpen = onOpen;
- this.onMessage = onMessage;
- this.onError = onError;
- openSocket();
- }
-
- /// 开启WebSocket连接
- void openSocket() {
- closeSocket();
- _webSocket = IOWebSocketChannel.connect(_SOCKET_URL);
- print('WebSocket连接成功: $_SOCKET_URL');
- // 连接成功,返回WebSocket实例
- _socketStatus = SocketStatus.SocketStatusConnected;
- // 连接成功,重置重连计数器
- _reconnectTimes = 0;
- if (_reconnectTimer != null) {
- _reconnectTimer.cancel();
- _reconnectTimer = null;
- }
- onOpen();
- // 接收消息
- _webSocket.stream.listen((data) => webSocketOnMessage(data),
- onError: webSocketOnError, onDone: webSocketOnDone);
- }
-
- /// WebSocket接收消息回调
- webSocketOnMessage(data) {
- onMessage(data);
- }
-
- /// WebSocket关闭连接回调
- webSocketOnDone() {
- print('closed');
- reconnect();
- }
-
- /// WebSocket连接错误回调
- webSocketOnError(e) {
- WebSocketChannelException ex = e;
- _socketStatus = SocketStatus.SocketStatusFailed;
- onError(ex.message);
- closeSocket();
- }
-
- /// 初始化心跳
- void initHeartBeat() {
- destroyHeartBeat();
- _heartBeat =
- new Timer.periodic(Duration(milliseconds: _heartTimes), (timer) {
- sentHeart();
- });
- }
-
- /// 心跳
- void sentHeart() {
- sendMessage('{"module": "HEART_CHECK", "message": "请求心跳"}');
- }
-
- /// 销毁心跳
- void destroyHeartBeat() {
- if (_heartBeat != null) {
- _heartBeat.cancel();
- _heartBeat = null;
- }
- }
-
- /// 关闭WebSocket
- void closeSocket() {
- if (_webSocket != null) {
- print('WebSocket连接关闭');
- _webSocket.sink.close();
- destroyHeartBeat();
- _socketStatus = SocketStatus.SocketStatusClosed;
- }
- }
-
- /// 发送WebSocket消息
- void sendMessage(message) {
- if (_webSocket != null) {
- switch (_socketStatus) {
- case SocketStatus.SocketStatusConnected:
- print('发送中:' + message);
- _webSocket.sink.add(message);
- break;
- case SocketStatus.SocketStatusClosed:
- print('连接已关闭');
- break;
- case SocketStatus.SocketStatusFailed:
- print('发送失败');
- break;
- default:
- break;
- }
- }
- }
-
- /// 重连机制
- void reconnect() {
- if (_reconnectTimes < _reconnectCount) {
- _reconnectTimes++;
- _reconnectTimer =
- new Timer.periodic(Duration(milliseconds: _heartTimes), (timer) {
- openSocket();
- });
- } else {
- if (_reconnectTimer != null) {
- print('重连次数超过最大次数');
- _reconnectTimer.cancel();
- _reconnectTimer = null;
- }
- return;
- }
- }
- }
-
- 使用方法Permalink
- import 'package:my_app/utils/web_socket_utility.dart';
-
- WebSocketUtility().initWebSocket(onOpen: () {
- WebSocketUtility().initHeartBeat();
- }, onMessage: (data) {
- print(data);
- }, onError: (e) {
- print(e);
- });

更新dart版本后的代码:
- import 'dart:async';
-
- import 'package:web_socket_channel/io.dart';
- import 'package:web_socket_channel/web_socket_channel.dart';
- import 'package:kkview_kuaichuan/config.dart';
- /// WebSocket状态
- enum SocketStatus {
- socketStatusConnected, // 已连接
- socketStatusFailed, // 失败
- socketStatusClosed, // 连接关闭
- }
-
- class WebSocketUtility {
- /// 单例对象
- static final WebSocketUtility _socket = WebSocketUtility._internal();
-
- /// 内部构造方法,可避免外部暴露构造函数,进行实例化
- WebSocketUtility._internal();
-
- /// 获取单例内部方法
- factory WebSocketUtility() {
- return _socket;
- }
-
- late WebSocketChannel _webSocket; // WebSocket
- SocketStatus? _socketStatus; // socket状态
- Timer? _heartBeat; // 心跳定时器
- final int _heartTimes = 30000; // 心跳间隔(毫秒)
- final int _reconnectCount = 2; // 重连次数,默认60次
- int _reconnectTimes = 0; // 重连计数器
- Timer? _reconnectTimer; // 重连定时器
- late Function onError; // 连接错误回调
- late Function onOpen; // 连接开启回调
- late Function onMessage; // 接收消息回调
-
-
-
- /// 初始化WebSocket
- void initWebSocket({required Function onOpen, required Function onMessage, required Function onError}) {
- this.onOpen = onOpen;
- this.onMessage = onMessage;
- this.onError = onError;
- openSocket();
- }
-
- /// 开启WebSocket连接
- void openSocket() {
- // closeSocket();
- _webSocket = WebSocketChannel.connect(Uri.parse(SIGNALSERVERURL));
- print('WebSocket连接成功: $SIGNALSERVERURL');
- // 连接成功,返回WebSocket实例
- _socketStatus = SocketStatus.socketStatusConnected;
- // 连接成功,重置重连计数器
- _reconnectTimes = 0;
- if (_reconnectTimer != null) {
- _reconnectTimer?.cancel();
- _reconnectTimer = null;
- }
- onOpen();
- // 接收消息
- _webSocket.stream.listen((data) => webSocketOnMessage(data),
- onError: webSocketOnError, onDone: webSocketOnDone);
- }
-
- /// WebSocket接收消息回调
- webSocketOnMessage(data) {
- onMessage(data);
- }
-
- /// WebSocket关闭连接回调
- webSocketOnDone() {
- print('webSocketOnDone closed');
- _socketStatus = SocketStatus.socketStatusClosed;
- reconnect();
- }
-
- /// WebSocket连接错误回调
- webSocketOnError(e) {
- WebSocketChannelException ex = e;
- _socketStatus = SocketStatus.socketStatusFailed;
- onError(ex.message);
- closeSocket();
- }
-
- /// 初始化心跳
- void initHeartBeat() {
- destroyHeartBeat();
- _heartBeat =
- Timer.periodic(Duration(milliseconds: _heartTimes), (timer) {
- sentHeart();
- });
- }
-
- /// 心跳
- void sentHeart() {
- sendMessage('{"module": "HEART_CHECK", "message": "请求心跳"}');
- }
-
- /// 销毁心跳
- void destroyHeartBeat() {
- if (_heartBeat != null) {
- _heartBeat?.cancel();
- _heartBeat = null;
- }
- }
-
- /// 关闭WebSocket
- void closeSocket() {
- print('WebSocket连接关闭');
- _webSocket.sink.close();
- destroyHeartBeat();
- _socketStatus = SocketStatus.socketStatusClosed;
- }
-
- /// 发送WebSocket消息
- void sendMessage(message) {
- switch (_socketStatus) {
- case SocketStatus.socketStatusConnected:
- print('发送中:$message');
- _webSocket.sink.add(message);
- break;
- case SocketStatus.socketStatusClosed:
- print('连接已关闭');
- break;
- case SocketStatus.socketStatusFailed:
- print('发送失败');
- break;
- default:
- break;
- }
- }
-
- /// 重连机制
- void reconnect() {
- if (_reconnectTimes < _reconnectCount) {
- _reconnectTimes++;
- _reconnectTimer =
- Timer.periodic(Duration(milliseconds: _heartTimes), (timer) {
- openSocket();
- });
- } else {
- if (_reconnectTimer != null) {
- print('重连次数超过最大次数');
- _reconnectTimer?.cancel();
- _reconnectTimer = null;
- }
- return;
- }
- }
-
- get socketStatus => _socketStatus;
- get webSocketCloseCode => _webSocket.closeCode;
-
-
- }
-

- import 'package:my_app/utils/web_socket_utility.dart';
-
- WebSocketUtility().initWebSocket(onOpen: () {
- WebSocketUtility().initHeartBeat();
- }, onMessage: (data) {
- print(data);
- }, onError: (e) {
- print(e);
- });
Protobuf是google 的一种数据交换的格式,它独立于语言,独立于平台。
优点:
缺点:
在一个需要大量的数据传输的场景中,如果数据量很大,那么选择protobuf可以明显的减少数据量,减少网络IO,从而减少网络传输所消耗的时间。考虑到作为一个主打社交的产品,消息数据量会非常大,同时为了节约流量,所以采用protobuf是一个不错的选择。
1.引入protobuf库
pubspec.yaml
- ...
-
- protobuf: 1.0.1
2.编写proto文件
socket.message.proto
- syntax = "proto3";
- package socket;
-
- // 发送聊天信息
- message Message {
- string eventId = 1;
- string from = 2;
- string to = 3;
- string createAt = 4;
- string type = 5;
- string body = 6;
- }
-
- // 收到聊天消息
- message AckMessage {
- string eventId = 1;
- }

3.生成proto相关Model
Terminal
protoc --dart_out=. socket.message.proto
4.编码、发消息
a.准备protobuf对象
- Message message = Message();
- message.eventId = '####';
- message.type = 'text';
- message.body = 'hello world';
b.ProtobufUtil编码
- const MESSAGE_HEADER_LEN = 2;
- /// 数据编码
- static List<int> encode(int type, var content) {
- ByteData data = ByteData(MESSAGE_HEADER_LEN);
- data.setUint16(0, type, Endian.little);
- List<int> msg = data.buffer.asUint8List() + content.writeToBuffer().buffer.asUint8List();
- return msg;
- }
c.发消息
- /// 发送
- sendSocket(int type, var content) async {
- IOWebSocketChannel channel = await SocketService.getInstance().getChannel();
- if (channel == null) return;
- List<int> msg = ProtobufUtil.encode(type, content);
- channel.sink.add(msg);
- }
-
- sendSocket(11, message)
5.收消息、解码
a.解码
- /// 数据解码
- static DecodedMsg decode(data) {
- Int8List int8Data = Int8List.fromList(data);
- Int8List contentTypeInt8Data = int8Data.sublist(0, MESSAGE_HEADER_LEN);
- Int8List contentInt8Data = int8Data.sublist(MESSAGE_HEADER_LEN, int8Data.length);
- int contentType = contentTypeInt8Data.elementAt(0);
-
-
- GeneratedMessage content;
- switch (contentType) {
- case 10:
- content = AckMessage.fromBuffer(contentInt8Data);
- break;
- case 11:
- content = Message.fromBuffer(contentInt8Data);
- break;
- }
-
- DecodedMsg decodedMsg;
- if (contentType != null && content != null) {
- decodedMsg = DecodedMsg(
- contentType: contentType,
- content: content,
- );
- }
- return decodedMsg;
- }

b.收消息
- channel.stream.listen((data) {
- DecodedMsg msg = ProtobufUtil.decode(data);
- }
突然有用户反馈,页面卡死,无法操作。这便是全部信息,让排查问题。排查过程是很困难的,直接说结论:前同事socket使用错误,导致内存占用过大,任何事件都得不到响应。
环境
flutter : 2.10.4
dart : 2.16.2
socket_io_client : 0.9.12
原代码
- class SocketIoUtil {
- static bool retryConnect = false;
- static var messDate;
-
- static Future socketIo() async {
- // print("创建isolate");
- retryConnect = true;
- onConnect();
- }
-
- static Future dispose() async {
- retryConnect = false;
- socket?.disconnect();
- socket = null;
- messDate = null;
- return null;
- }
-
- static Future onConnect() async {
- print("socket:onConnect");
- // 随便写的,具体连接是有逻辑判断的
- String connectUrl="http://www.xxx.com:1414";
- socket = IO.io(
- connectUrl, IO.OptionBuilder().setTransports(['websocket']).build());
-
- socket.on(
- "message",
- (data) => {
- onMessage(data.toString()),
- });
-
- socket.onDisconnect((data) => {
- print("socket:连接断开"),
- _retryConnectSocketIo(),
- });
- socket.onConnect((data) => {
- print("socket:连接成功"),
- });
- socket.onConnectError((data) => {
- print("socket:连接出错"),
- _retryConnectSocketIo(),
- });
- }
-
- static onMessage(String string) {
- // do something
- }
-
- static _retryConnectSocketIo() {
- if (retryConnect) {
- print("socket:开启重新连接");
- Future.delayed(Duration(seconds: 10), () {
- onConnect();
- });
- }
- }
- }

分析
大概逻辑就是开启一个socket,连接成功则对接收到的消息进行业务处理,否则10s后重试连接。
看似没啥问题,但实测后打印日志如下:
从原代码可以看出在连接失败后会调用_retryConnectSocketIo方法,而该方法会在延迟10s后调用 onConnect 方法,但日志中显示在这延迟的10s中又多调用了3次 连接出错 ,这样在下一个10s后就会总共调用 4个onConnect 方法,而每个onConnect又会调用4次 连接出错,那么再过10s就会有4*4个 onConnect被调用。这样每个10s就会有4倍的socket连接,最终导致内存占用过大,项目卡死。
然而这些多余的连接出错不是项目触发的,因此怀疑创建的socket自身具有失败重试的功能。因此对代码进行如下修改:
- ...
- static Future onConnect() async {
- print("socket:onConnect");
- // 随便写的,具体连接是有逻辑判断的
- String connectUrl="http://www.xxx.com:1414";
- socket = IO.io(
- connectUrl,
- IO.OptionBuilder().setTransports(['websocket']).disableReconnection().build());
- ...
本以为问题得到解决,结果神奇的一幕发生了,看日志
- 2022-07-14 21:20:30.785 13742-13791/com.acewill.kvs_operation I/flutter: socket:onConnect
- 2022-07-14 21:20:30.914 13742-13791/com.acewill.kvs_operation I/flutter: socket:连接出错
- 2022-07-14 21:20:30.914 13742-13791/com.acewill.kvs_operation I/flutter: socket:开启重新连接
- 2022-07-14 21:20:40.924 13742-13791/com.acewill.kvs_operation I/flutter: socket:onConnect
后面就没日志了,onConnect 后面没有再打印 连接出错,也就是说再次运行至onConnect中创建的socket没有自动连接。
socket_io_client中的socket是自动连接的,而上面修改后的代码第二次进入就不再连接,抱着试一试的想法打印了下socket的hashcode:
- 2022-07-14 21:42:36.112 16057-16129/com.acewill.kvs_operation I/flutter: socket:onConnect
- 2022-07-14 21:42:36.192 16057-16129/com.acewill.kvs_operation I/flutter: socket:hashcode_726189657
- 2022-07-14 21:42:36.242 16057-16129/com.acewill.kvs_operation I/flutter: socket:连接出错
- 2022-07-14 21:42:36.243 16057-16129/com.acewill.kvs_operation I/flutter: socket:开启重新连接
- 2022-07-14 21:42:46.246 16057-16129/com.acewill.kvs_operation I/flutter: socket:onConnect
- 2022-07-14 21:42:46.247 16057-16129/com.acewill.kvs_operation I/flutter: socket:hashcode_726189657
- ...
竟然完全一致,说明虽然socket是在onConnect中创建的但依旧是原来的对象。那么这样就解释的通了:
第一次onConnect创建socket会调用自动连接,当再次进入onConnect后由于之前已经执行过了自动连接,因此这次什么都不做。
为什么socket会是同一个呢,明明是在onConnect中重新创建的?看下socket的创建代码:
- // socket_io_client.dart
- Socket io(uri, [opts]) => _lookup(uri, opts);
-
- Socket _lookup(uri, opts) {
- ...
- if (newConnection) {
- io = Manager(uri: uri, options: opts);
- } else {
- io = cache[id] ??= Manager(uri: uri, options: opts);
- }
- ...
- // 这个方法实际是调用Manager.socket()方法
- return io.socket(parsed.path.isEmpty ? '/' : parsed.path, opts);
- }
-
- // manager.dart
-
- Map<String, Socket> nsps;
-
- // socket_io_client传入的nsp是socket连接的地址+端口号
- Socket socket(String nsp, Map opts) {
- var socket = nsps[nsp];
- }

从上面代码可以看出当地址+端口号不变时,通过IO.io得到的是同一个socket。
原因找到了,解决方案就简单了,只需要将自动连接改为手动触发就好了,代码如下:
- ...
- static Future onConnect() async {
- print("socket:onConnect");
- // 随便写的,具体连接是有逻辑判断的
- String connectUrl="http://www.xxx.com:1414";
- socket = IO.io(connectUrl,
- IO.OptionBuilder().setTransports(['websocket'])
- .disableReconnection().disableAutoConnect().build());
- ...
- socket.connect();
- ...
再试一次:
2022-07-14 22:14:34.384 17786-17877/com.acewill.kvs_operation I/flutter: socket:onConnect 2022-07-14 22:14:34.489 17786-17877/com.acewill.kvs_operation I/flutter: socket:连接出错 2022-07-14 22:14:34.490 17786-17877/com.acewill.kvs_operation I/flutter: socket:开启重新连接 2022-07-14 22:14:44.493 17786-17877/com.acewill.kvs_operation I/flutter: socket:onConnect 2022-07-14 22:14:44.539 17786-17877/com.acewill.kvs_operation I/flutter: socket:连接出错 2022-07-14 22:14:44.540 17786-17877/com.acewill.kvs_operation I/flutter: socket:开启重新连接 2022-07-14 22:14:44.540 17786-17877/com.acewill.kvs_operation I/flutter: socket:连接出错 2022-07-14 22:14:44.541 17786-17877/com.acewill.kvs_operation I/flutter: socket:开启重新连接 2022-07-14 22:14:54.543 17786-17877/com.acewill.kvs_operation I/flutter: socket:onConnect 2022-07-14 22:14:54.553 17786-17877/com.acewill.kvs_operation I/flutter: socket:onConnect 2022-07-14 22:14:54.574 17786-17877/com.acewill.kvs_operation I/flutter: socket:连接出错 2022-07-14 22:14:54.575 17786-17877/com.acewill.kvs_operation I/flutter: socket:开启重新连接 2022-07-14 22:14:54.576 17786-17877/com.acewill.kvs_operation I/flutter: socket:连接出错 2022-07-14 22:14:54.577 17786-17877/com.acewill.kvs_operation I/flutter: socket:开启重新连接 2022-07-14 22:14:54.577 17786-17877/com.acewill.kvs_operation I/flutter: socket:连接出错 2022-07-14 22:14:54.578 17786-17877/com.acewill.kvs_operation I/flutter: socket:开启重新连接 2022-07-14 22:14:54.579 17786-17877/com.acewill.kvs_operation I/flutter: socket:连接出错 2022-07-14 22:14:54.579 17786-17877/com.acewill.kvs_operation I/flutter: socket:开启重新连接 ...
居然还不行!!!
连接出错 几个字的打印频率是1、2、4…呈2的指数增长,这又该怎么解决呢?
虽然上面依旧存在过度重试,但整体的重试时间点比较集中,似乎是有些代码在onConnect中重复执行了,逐行排查也只有socket.onConnectError这个代码重复执行了,看下内部实现:
- // darty.dart
- void onConnectError(EventHandler handler) {
- on('connect_error', handler);
- }
-
- void on(String event, EventHandler handler) {
- this._events.putIfAbsent(event, () => new List<EventHandler>());
- // 罪魁祸首,这里是add,导致了重复添加
- this._events[event].add(handler);
- }
重新整理代码:
- class SocketIoUtil {
- static bool retryConnect = false;
- static var messDate;
-
- static Future socketIo() async {
- // print("创建isolate");
- retryConnect = true;
- onConnect();
- }
-
- static IO.Socket createSocket(String url) {
- var option = IO.OptionBuilder()
- .setTransports(['websocket'])
- .disableReconnection()
- .disableAutoConnect()
- .build();
- IO.Socket socket = IO.io(url, option);
- socket.on(
- "message",
- (data) => {
- onMessage(data.toString()),
- });
-
- socket.onDisconnect((data) => {
- print("连接断开 "),
- EventBus().emit(Event.eventNet, '服务连接断开'),
- _retryConnectSocketIo(),
- });
- socket.onConnect((data) => {
- print("socketIo连接成功"),
- socket.emit("join_group", ["refreshwake"]), // 触发join_group 事件 将加入分组
- EventBus().emit(Event.eventNet, '网络状态良好'),
- });
- socket.onConnectError((data) => {
- print("socket:连接出错"),
- _retryConnectSocketIo(),
- });
- return socket;
- }
-
- static Future dispose() async {
- retryConnect = false;
- socket?.disconnect();
- socket = null;
- messDate = null;
- return null;
- }
-
- static Future onConnect() async {
- print("socket:onConnect");
- // 随便写的,具体连接是有逻辑判断的
- String connectUrl="http://www.xxx.com:1414";
- if (socket != null) {
- if (socket.io.uri != connectUrl) {
- dispose();
- socket = createSocket(connectUrl);
- }
- } else {
- socket = createSocket(connectUrl);
- }
- socket.connect();
- }
-
- static onMessage(String string) {
- // do something
- }
-
- static _retryConnectSocketIo() {
- if (retryConnect) {
- print("socket:开启重新连接");
- Future.delayed(Duration(seconds: 10), () {
- onConnect();
- });
- }
- }
- }

上面代码运行正常,至此终于把这个坑填完。
赞
踩
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。