赞
踩
零拷贝
具体来讲,如果要从IO中读取数据,分为两个步骤:
(1)从IO流中读取出来放到缓冲区,程序从缓冲区中读取,再放到堆中,此时数据就会被拷贝两次才能到达堆或者堆内存中。如果数据量很大,那么就会造成资源的浪费
(2)Netty其实就是利用NIO中的零拷贝特性,当Netty需要接收数据或者传输数据的时候,就会新开辟一块堆内存,然后数据就直接通过IO读取到了新开辟的堆内存中,这样也就加快了数据传输的速度。
线程在访问某一个资源的时候,该资源是否准备就绪的一种处理方式,如果说该资源当前没准备就绪,这个时候就会有两种处理方式:阻塞与非阻塞,netty解决了NIO目前发现的所有bug。
(1)阻塞:这个线程会一直持续等待这个资源就绪并处理完毕,直到响应返回一个结果,这个时候线程是一直阻塞状态,不可以去做任何事情
(2)非阻塞:这个线程直接返回结果,不会持续等待这个资源处理完毕才响应,它会去请求别的资源。
同步与异步
这里的 “同步与异步” 指的是访问数据的一种机制,类似于Ajax。
(1)同步:主动请求,并且会等待IO操作完成之后,IO会有一个通知
(2)异步:当一个线程主动请求数据之后,可以继续处理其他任务,发起其他请求,多个请求完成之后再逐一的通过异步形式
1.Reactor线程模型:
(1)单线程模型:所有的IO操作都由同一个NIO线程处理,仅限于一些小型应用场景。但在高负载、高并发等情况下使用单线程肯定就不太合理,主要是因为NIO的一个线程同时要去处理成千上万的请求 的时候,在性能上会支撑不了,即便CPU负载100%,对于海量消息的处理,编码解码以及读取、发送消息等情况,依然满足不了。
(2)多线程模型:由一组NIO线程处理IO操作
(3)主从线程模型:一组线程池接受请求,一组线程池处理IO
2.当NIO的线程负载过重之后,整体服务性能处理就会变慢,结果就是导致客户端在向服务端发起请求、链接就会超时,由于客户端一般都会有一种超时机制,反复地向服务端再次发起请求,此时就相当于陷入了死循环,更加加重了服务器负载。
maven依赖
<dependency>
<groupId>io.netty</groupId>
<artifactId>netty-all</artifactId>
<version>4.1.25.Final</version>
</dependency>
搭建netty服务
package com.imooc.netty; import org.springframework.stereotype.Component; import io.netty.bootstrap.ServerBootstrap; import io.netty.channel.ChannelFuture; import io.netty.channel.EventLoopGroup; import io.netty.channel.nio.NioEventLoopGroup; import io.netty.channel.socket.nio.NioServerSocketChannel; @Component public class WSServer { private static class SingletionWSServer { static final WSServer instance = new WSServer(); } public static WSServer getInstance() { return SingletionWSServer.instance; } private EventLoopGroup mainGroup; private EventLoopGroup subGroup; private ServerBootstrap server; private ChannelFuture future; public WSServer() { mainGroup = new NioEventLoopGroup(); subGroup = new NioEventLoopGroup(); server = new ServerBootstrap(); server.group(mainGroup, subGroup) .channel(NioServerSocketChannel.class) .childHandler(new WSServerInitialzer()); } public void start() { this.future = server.bind(8088); System.err.println("netty websocket server 启动完毕..."); } }
启动netty
package com.imooc; import org.springframework.context.ApplicationListener; import org.springframework.context.event.ContextRefreshedEvent; import org.springframework.stereotype.Component; import com.imooc.netty.WSServer; @Component public class NettyBooter implements ApplicationListener<ContextRefreshedEvent> { @Override public void onApplicationEvent(ContextRefreshedEvent event) { if (event.getApplicationContext().getParent() == null) { try { WSServer.getInstance().start(); } catch (Exception e) { e.printStackTrace(); } } } }
Channel
package com.imooc.netty; import io.netty.channel.ChannelInitializer; import io.netty.channel.ChannelPipeline; import io.netty.channel.socket.SocketChannel; import io.netty.handler.codec.http.HttpObjectAggregator; import io.netty.handler.codec.http.HttpServerCodec; import io.netty.handler.codec.http.websocketx.WebSocketServerProtocolHandler; import io.netty.handler.stream.ChunkedWriteHandler; import io.netty.handler.timeout.IdleStateHandler; public class WSServerInitialzer extends ChannelInitializer<SocketChannel> { @Override protected void initChannel(SocketChannel ch) throws Exception { ChannelPipeline pipeline = ch.pipeline(); // websocket 基于http协议,所以要有http编解码器 pipeline.addLast(new HttpServerCodec()); // 对写大数据流的支持 pipeline.addLast(new ChunkedWriteHandler()); // 对httpMessage进行聚合,聚合成FullHttpRequest或FullHttpResponse // 几乎在netty中的编程,都会使用到此hanler pipeline.addLast(new HttpObjectAggregator(1024*64)); // ====================== 以上是用于支持http协议 ====================== // ====================== 增加心跳支持 start ====================== // 针对客户端,如果在1分钟时没有向服务端发送读写心跳(ALL),则主动断开 // 如果是读空闲或者写空闲,不处理 pipeline.addLast(new IdleStateHandler(50, 52, 54)); // 自定义的空闲状态检测 pipeline.addLast(new HeartBeatHandler()); // ====================== 增加心跳支持 end ====================== // ====================== 以下是支持httpWebsocket ====================== /** * websocket 服务器处理的协议,用于指定给客户端连接访问的路由 : /ws * 本handler会帮你处理一些繁重的复杂的事 * 会帮你处理握手动作: handshaking(close, ping, pong) ping + pong = 心跳 * 对于websocket来讲,都是以frames进行传输的,不同的数据类型对应的frames也不同 */ pipeline.addLast(new WebSocketServerProtocolHandler("/ws")); // 自定义的handler pipeline.addLast(new ChatHandler()); } }
自定义的空闲状态检测
package com.imooc.netty; import io.netty.channel.Channel; import io.netty.channel.ChannelHandlerContext; import io.netty.channel.ChannelInboundHandlerAdapter; import io.netty.handler.timeout.IdleState; import io.netty.handler.timeout.IdleStateEvent; /** * @Description: 用于检测channel的心跳handler * 继承ChannelInboundHandlerAdapter,从而不需要实现channelRead0方法 */ public class HeartBeatHandler extends ChannelInboundHandlerAdapter { @Override public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception { // 判断evt是否是IdleStateEvent(用于触发用户事件,包含 读空闲/写空闲/读写空闲 ) if (evt instanceof IdleStateEvent) { IdleStateEvent event = (IdleStateEvent)evt; // 强制类型转换 if (event.state() == IdleState.READER_IDLE) { System.out.println("进入读空闲..."); } else if (event.state() == IdleState.WRITER_IDLE) { System.out.println("进入写空闲..."); } else if (event.state() == IdleState.ALL_IDLE) { System.out.println("channel关闭前,users的数量为:" + ChatHandler.users.size()); Channel channel = ctx.channel(); // 关闭无用的channel,以防资源浪费 channel.close(); System.out.println("channel关闭后,users的数量为:" + ChatHandler.users.size()); } } } }
自定义处理消息的handler
package com.imooc.netty; import java.time.LocalDateTime; import java.util.ArrayList; import java.util.List; import org.apache.commons.lang3.StringUtils; import com.imooc.SpringUtil; import com.imooc.enums.MsgActionEnum; import com.imooc.service.UserService; import com.imooc.utils.JsonUtils; import io.netty.channel.Channel; import io.netty.channel.ChannelHandlerContext; import io.netty.channel.SimpleChannelInboundHandler; import io.netty.channel.group.ChannelGroup; import io.netty.channel.group.DefaultChannelGroup; import io.netty.handler.codec.http.websocketx.TextWebSocketFrame; import io.netty.util.concurrent.GlobalEventExecutor; /** * * @Description: 处理消息的handler * TextWebSocketFrame: 在netty中,是用于为websocket专门处理文本的对象,frame是消息的载体 */ public class ChatHandler extends SimpleChannelInboundHandler<TextWebSocketFrame> { // 用于记录和管理所有客户端的channle public static ChannelGroup users = new DefaultChannelGroup(GlobalEventExecutor.INSTANCE); @Override protected void channelRead0(ChannelHandlerContext ctx, TextWebSocketFrame msg) throws Exception { // 获取客户端传输过来的消息 String content = msg.text(); Channel currentChannel = ctx.channel(); // 1. 获取客户端发来的消息 DataContent dataContent = JsonUtils.jsonToPojo(content, DataContent.class); Integer action = dataContent.getAction(); // 2. 判断消息类型,根据不同的类型来处理不同的业务 if (action == MsgActionEnum.CONNECT.type) { // 2.1 当websocket 第一次open的时候,初始化channel,把用的channel和userid关联起来 String senderId = dataContent.getChatMsg().getSenderId(); UserChannelRel.put(senderId, currentChannel); // 测试 for (Channel c : users) { System.out.println(c.id().asLongText()); } UserChannelRel.output(); } else if (action == MsgActionEnum.CHAT.type) { // 2.2 聊天类型的消息,把聊天记录保存到数据库,同时标记消息的签收状态[未签收] ChatMsg chatMsg = dataContent.getChatMsg(); String msgText = chatMsg.getMsg(); String receiverId = chatMsg.getReceiverId(); String senderId = chatMsg.getSenderId(); // 保存消息到数据库,并且标记为 未签收 UserService userService = (UserService)SpringUtil.getBean("userServiceImpl"); String msgId = userService.saveMsg(chatMsg); chatMsg.setMsgId(msgId); DataContent dataContentMsg = new DataContent(); dataContentMsg.setChatMsg(chatMsg); // 发送消息 // 从全局用户Channel关系中获取接受方的channel Channel receiverChannel = UserChannelRel.get(receiverId); if (receiverChannel == null) { // TODO channel为空代表用户离线,推送消息(JPush,个推,小米推送) } else { // 当receiverChannel不为空的时候,从ChannelGroup去查找对应的channel是否存在 Channel findChannel = users.find(receiverChannel.id()); if (findChannel != null) { // 用户在线 receiverChannel.writeAndFlush( new TextWebSocketFrame( JsonUtils.objectToJson(dataContentMsg))); } else { // 用户离线 TODO 推送消息 } } } else if (action == MsgActionEnum.SIGNED.type) { // 2.3 签收消息类型,针对具体的消息进行签收,修改数据库中对应消息的签收状态[已签收] UserService userService = (UserService)SpringUtil.getBean("userServiceImpl"); // 扩展字段在signed类型的消息中,代表需要去签收的消息id,逗号间隔 String msgIdsStr = dataContent.getExtand(); String msgIds[] = msgIdsStr.split(","); List<String> msgIdList = new ArrayList<>(); for (String mid : msgIds) { if (StringUtils.isNotBlank(mid)) { msgIdList.add(mid); } } System.out.println(msgIdList.toString()); if (msgIdList != null && !msgIdList.isEmpty() && msgIdList.size() > 0) { // 批量签收 userService.updateMsgSigned(msgIdList); } } else if (action == MsgActionEnum.KEEPALIVE.type) { // 2.4 心跳类型的消息 System.out.println("收到来自channel为[" + currentChannel + "]的心跳包..."); } } /** * 当客户端连接服务端之后(打开连接) * 获取客户端的channle,并且放到ChannelGroup中去进行管理 */ @Override public void handlerAdded(ChannelHandlerContext ctx) throws Exception { users.add(ctx.channel()); } @Override public void handlerRemoved(ChannelHandlerContext ctx) throws Exception { String channelId = ctx.channel().id().asShortText(); System.out.println("客户端被移除,channelId为:" + channelId); // 当触发handlerRemoved,ChannelGroup会自动移除对应客户端的channel users.remove(ctx.channel()); } @Override public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception { cause.printStackTrace(); // 发生异常之后关闭连接(关闭channel),随后从ChannelGroup中移除 ctx.channel().close(); users.remove(ctx.channel()); } }
package com.imooc.enums; /** * * @Description: 发送消息的动作 枚举 */ public enum MsgActionEnum { CONNECT(1, "第一次(或重连)初始化连接"), CHAT(2, "聊天消息"), SIGNED(3, "消息签收"), KEEPALIVE(4, "客户端保持心跳"), PULL_FRIEND(5, "拉取好友"); public final Integer type; public final String content; MsgActionEnum(Integer type, String content){ this.type = type; this.content = content; } public Integer getType() { return type; } }
package com.imooc.netty; import java.io.Serializable; public class DataContent implements Serializable { private static final long serialVersionUID = 8021381444738260454L; private Integer action; // 动作类型 private ChatMsg chatMsg; // 用户的聊天内容entity private String extand; // 扩展字段 public Integer getAction() { return action; } public void setAction(Integer action) { this.action = action; } public ChatMsg getChatMsg() { return chatMsg; } public void setChatMsg(ChatMsg chatMsg) { this.chatMsg = chatMsg; } public String getExtand() { return extand; } public void setExtand(String extand) { this.extand = extand; } }
package com.imooc.netty; import java.io.Serializable; public class ChatMsg implements Serializable { private static final long serialVersionUID = 3611169682695799175L; private String senderId; // 发送者的用户id private String receiverId; // 接受者的用户id private String msg; // 聊天内容 private String msgId; // 用于消息的签收 public String getSenderId() { return senderId; } public void setSenderId(String senderId) { this.senderId = senderId; } public String getReceiverId() { return receiverId; } public void setReceiverId(String receiverId) { this.receiverId = receiverId; } public String getMsg() { return msg; } public void setMsg(String msg) { this.msg = msg; } public String getMsgId() { return msgId; } public void setMsgId(String msgId) { this.msgId = msgId; } }
赞
踩
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。