赞
踩
import io.netty.bootstrap.ServerBootstrap; import io.netty.channel.ChannelFuture; import io.netty.channel.ChannelOption; import io.netty.channel.EventLoopGroup; import io.netty.channel.nio.NioEventLoopGroup; import io.netty.channel.socket.nio.NioServerSocketChannel; import lombok.extern.slf4j.Slf4j; import org.springframework.stereotype.Component; import java.net.InetSocketAddress; /** * @program: qingcheng * @author: XIONG CHUAN * @create: 2019-04-28 15:04 * @description: netty服务启动类 **/ @Slf4j @Component public class NettyServer { public void start(InetSocketAddress address) { //配置服务端的NIO线程组 EventLoopGroup bossGroup = new NioEventLoopGroup(1); EventLoopGroup workerGroup = new NioEventLoopGroup(); try { ServerBootstrap bootstrap = new ServerBootstrap() .group(bossGroup, workerGroup) // 绑定线程池 .channel(NioServerSocketChannel.class) .localAddress(address) .childHandler(new NettyServerChannelInitializer())//编码解码 .option(ChannelOption.SO_BACKLOG, 128) //服务端接受连接的队列长度,如果队列已满,客户端连接将被拒绝 .childOption(ChannelOption.SO_KEEPALIVE, true); //保持长连接,2小时无数据激活心跳机制 // 绑定端口,开始接收进来的连接 ChannelFuture future = bootstrap.bind(address).sync(); log.info("netty服务器开始监听端口:" + address.getPort()); //关闭channel和块,直到它被关闭 future.channel().closeFuture().sync(); } catch (Exception e) { e.printStackTrace(); bossGroup.shutdownGracefully(); workerGroup.shutdownGracefully(); } } }
import io.netty.channel.ChannelInitializer; import io.netty.channel.socket.SocketChannel; import io.netty.handler.codec.string.StringDecoder; import io.netty.handler.codec.string.StringEncoder; import io.netty.util.CharsetUtil; /** * @program: qingcheng * @author: XIONG CHUAN * @create: 2019-04-28 15:10 * @description: 服务端初始化,客户端与服务器端连接一旦创建,这个类中方法就会被回调,设置出站编码器和入站解码器 **/ public class NettyServerChannelInitializer extends ChannelInitializer<SocketChannel> { @Override protected void initChannel(SocketChannel channel) throws Exception { channel.pipeline().addLast("decoder",new StringDecoder(CharsetUtil.UTF_8)); channel.pipeline().addLast("encoder",new StringEncoder(CharsetUtil.UTF_8)); channel.pipeline().addLast(new NettyServerHandler()); } }
import io.netty.channel.ChannelHandlerContext; import io.netty.channel.ChannelId; import io.netty.channel.ChannelInboundHandlerAdapter; import io.netty.handler.timeout.IdleState; import io.netty.handler.timeout.IdleStateEvent; import lombok.extern.slf4j.Slf4j; import java.net.InetSocketAddress; import java.util.concurrent.ConcurrentHashMap; /** * @program: qingcheng * @author: XIONG CHUAN * @create: 2019-04-28 15:21 * @description: netty服务端处理类 **/ @Slf4j public class NettyServerHandler extends ChannelInboundHandlerAdapter { /** * 管理一个全局map,保存连接进服务端的通道数量 */ private static final ConcurrentHashMap<ChannelId, ChannelHandlerContext> CHANNEL_MAP = new ConcurrentHashMap<>(); /** * @param ctx * @author xiongchuan on 2019/4/28 16:10 * @DESCRIPTION: 有客户端连接服务器会触发此函数 * @return: void */ @Override public void channelActive(ChannelHandlerContext ctx) { InetSocketAddress insocket = (InetSocketAddress) ctx.channel().remoteAddress(); String clientIp = insocket.getAddress().getHostAddress(); int clientPort = insocket.getPort(); //获取连接通道唯一标识 ChannelId channelId = ctx.channel().id(); System.out.println(); //如果map中不包含此连接,就保存连接 if (CHANNEL_MAP.containsKey(channelId)) { log.info("客户端【" + channelId + "】是连接状态,连接通道数量: " + CHANNEL_MAP.size()); } else { //保存连接 CHANNEL_MAP.put(channelId, ctx); log.info("客户端【" + channelId + "】连接netty服务器[IP:" + clientIp + "--->PORT:" + clientPort + "]"); log.info("连接通道数量: " + CHANNEL_MAP.size()); } } /** * @param ctx * @author xiongchuan on 2019/4/28 16:10 * @DESCRIPTION: 有客户端终止连接服务器会触发此函数 * @return: void */ @Override public void channelInactive(ChannelHandlerContext ctx) { InetSocketAddress insocket = (InetSocketAddress) ctx.channel().remoteAddress(); String clientIp = insocket.getAddress().getHostAddress(); ChannelId channelId = ctx.channel().id(); //包含此客户端才去删除 if (CHANNEL_MAP.containsKey(channelId)) { //删除连接 CHANNEL_MAP.remove(channelId); System.out.println(); log.info("客户端【" + channelId + "】退出netty服务器[IP:" + clientIp + "--->PORT:" + insocket.getPort() + "]"); log.info("连接通道数量: " + CHANNEL_MAP.size()); } } /** * @param ctx * @author xiongchuan on 2019/4/28 16:10 * @DESCRIPTION: 有客户端发消息会触发此函数 * @return: void */ @Override public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception { System.out.println(); log.info("加载客户端报文......"); log.info("【" + ctx.channel().id() + "】" + " :" + msg); /** * 下面可以解析数据,保存数据,生成返回报文,将需要返回报文写入write函数 * */ //响应客户端 this.channelWrite(ctx.channel().id(), msg); } /** * @param msg 需要发送的消息内容 * @param channelId 连接通道唯一id * @author xiongchuan on 2019/4/28 16:10 * @DESCRIPTION: 服务端给客户端发送消息 * @return: void */ public void channelWrite(ChannelId channelId, Object msg) throws Exception { ChannelHandlerContext ctx = CHANNEL_MAP.get(channelId); if (ctx == null) { log.info("通道【" + channelId + "】不存在"); return; } if (msg == null || msg == "") { log.info("服务端响应空的消息"); return; } //将客户端的信息直接返回写入ctx ctx.write(msg); //刷新缓存区 ctx.flush(); } @Override public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception { String socketString = ctx.channel().remoteAddress().toString(); if (evt instanceof IdleStateEvent) { IdleStateEvent event = (IdleStateEvent) evt; if (event.state() == IdleState.READER_IDLE) { log.info("Client: " + socketString + " READER_IDLE 读超时"); ctx.disconnect(); } else if (event.state() == IdleState.WRITER_IDLE) { log.info("Client: " + socketString + " WRITER_IDLE 写超时"); ctx.disconnect(); } else if (event.state() == IdleState.ALL_IDLE) { log.info("Client: " + socketString + " ALL_IDLE 总超时"); ctx.disconnect(); } } } /** * @param ctx * @author xiongchuan on 2019/4/28 16:10 * @DESCRIPTION: 发生异常会触发此函数 * @return: void */ @Override public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception { System.out.println(); ctx.close(); log.info(ctx.channel().id() + " 发生了错误,此连接被关闭" + "此时连通数量: " + CHANNEL_MAP.size()); //cause.printStackTrace(); } }
import com.qingcheng.common.DefaultConstants; import com.qingcheng.common.utils.DateUtils; import io.netty.bootstrap.Bootstrap; import io.netty.channel.*; import io.netty.channel.nio.NioEventLoopGroup; import io.netty.channel.socket.SocketChannel; import io.netty.channel.socket.nio.NioSocketChannel; import io.netty.handler.codec.string.StringDecoder; import io.netty.handler.codec.string.StringEncoder; import lombok.Data; import lombok.extern.slf4j.Slf4j; /** * @program: qingcheng * @author: XIONG CHUAN * @create: 2019-04-28 19:42 * @description: 客户端 **/ @Slf4j @Data public class NettyClient implements Runnable { static final String HOST = System.getProperty("host", DefaultConstants.SOCKET_IP); static final int PORT = Integer.parseInt(System.getProperty("port", "8888")); static final int SIZE = Integer.parseInt(System.getProperty("size", "256")); private String content; public NettyClient(String content) { this.content = content; } @Override public void run() { // Configure the client. EventLoopGroup group = new NioEventLoopGroup(); try { int num = 0; boolean boo =true; Bootstrap b = new Bootstrap(); b.group(group) .channel(NioSocketChannel.class) .option(ChannelOption.TCP_NODELAY, true) .handler(new NettyClientChannelInitializer() { @Override public void initChannel(SocketChannel ch) throws Exception { ChannelPipeline p = ch.pipeline(); p.addLast("decoder", new StringDecoder()); p.addLast("encoder", new StringEncoder()); p.addLast(new NettyClientHandler()); } }); ChannelFuture future = b.connect(HOST, PORT).sync(); while (boo) { num++; future.channel().writeAndFlush(content + "--" + DateUtils.getDateTime()); try { //休眠一段时间 Thread.sleep(3000); } catch (InterruptedException e) { e.printStackTrace(); } //每一条线程向服务端发送的次数 if (num == 100) { boo = false; } } log.info(content + "-----------------------------" + num); //future.channel().closeFuture().sync(); } catch (InterruptedException e) { e.printStackTrace(); } finally { group.shutdownGracefully(); } } /** * 下面是不加线程的 */ /*public static void main(String[] args) throws Exception { sendMessage("hhh你好?"); } public static void sendMessage(String content) throws InterruptedException { // Configure the client. EventLoopGroup group = new NioEventLoopGroup(); try { Bootstrap b = new Bootstrap(); b.group(group) .channel(NioSocketChannel.class) .option(ChannelOption.TCP_NODELAY, true) .handler(new NettyClientChannelInitializer() { @Override public void initChannel(SocketChannel ch) throws Exception { ChannelPipeline p = ch.pipeline(); p.addLast("decoder", new StringDecoder()); p.addLast("encoder", new StringEncoder()); p.addLast(new NettyClientHandler()); } }); ChannelFuture future = b.connect(HOST, PORT).sync(); future.channel().writeAndFlush(content); future.channel().closeFuture().sync(); } finally { group.shutdownGracefully(); } }*/ }
/** * @program: qingcheng * @author: XIONG CHUAN * @create: 2019-04-29 16:06 * @description: 模拟多客户端发送报文 **/ public class TestNettyClient { public static void main(String[] args) { //开启10条线程,每条线程就相当于一个客户端 for (int i = 1; i <= 10; i++) { new Thread(new NettyClient("thread" + "--" + i)).start(); } } }
import com.qingcheng.common.netty.server.NettyServerHandler; import io.netty.channel.ChannelInitializer; import io.netty.channel.socket.SocketChannel; import io.netty.handler.codec.string.StringDecoder; import io.netty.handler.codec.string.StringEncoder; import io.netty.util.CharsetUtil; /** * @program: qingcheng * @author: XIONG CHUAN * @create: 2019-04-28 15:10 * @description: 客户端初始化,客户端与服务器端连接一旦创建,这个类中方法就会被回调,设置出站编码器和入站解码器,客户端服务端编解码要一致 **/ public class NettyClientChannelInitializer extends ChannelInitializer<SocketChannel> { @Override protected void initChannel(SocketChannel channel) throws Exception { channel.pipeline().addLast("decoder",new StringDecoder(CharsetUtil.UTF_8)); channel.pipeline().addLast("encoder",new StringEncoder(CharsetUtil.UTF_8)); channel.pipeline().addLast(new NettyServerHandler()); } }
import com.qingcheng.common.utils.DateUtils; import io.netty.channel.ChannelHandlerContext; import io.netty.channel.ChannelId; import io.netty.channel.ChannelInboundHandlerAdapter; import lombok.extern.slf4j.Slf4j; import java.util.concurrent.ConcurrentHashMap; /** * @program: qingcheng * @author: XIONG CHUAN * @create: 2019-04-28 19:37 * @description: 客户端处理类 **/ @Slf4j public class NettyClientHandler extends ChannelInboundHandlerAdapter { /** * 计算有多少客户端接入,第一个string为客户端ip */ private static final ConcurrentHashMap<ChannelId, ChannelHandlerContext> CLIENT_MAP = new ConcurrentHashMap<>(); @Override public void channelActive(ChannelHandlerContext ctx) { CLIENT_MAP.put(ctx.channel().id(), ctx); log.info("ClientHandler Active"); } /** * @param ctx * @author xiongchuan on 2019/4/28 16:10 * @DESCRIPTION: 有服务端端终止连接服务器会触发此函数 * @return: void */ @Override public void channelInactive(ChannelHandlerContext ctx) { ctx.close(); log.info("服务端终止了服务"); } @Override public void channelRead(ChannelHandlerContext ctx, Object msg) { log.info("回写数据:" + msg); } @Override public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) { //cause.printStackTrace(); log.info("服务端发生异常【" + cause.getMessage() + "】"); ctx.close(); } /** * @param msg 需要发送的消息内容 * @param channelId 连接通道唯一id * @author xiongchuan on 2019/4/28 16:10 * @DESCRIPTION: 客户端给服务端发送消息 * @return: void */ public void channelWrite(ChannelId channelId, String msg) { ChannelHandlerContext ctx = CLIENT_MAP.get(channelId); if (ctx == null) { log.info("通道【" + channelId + "】不存在"); return; } //将客户端的信息直接返回写入ctx ctx.write(msg + " 时间:" + DateUtils.getDateTime()); //刷新缓存区 ctx.flush(); } }
##### 客户端启动后服务端日志,启动两个客户端向服务端发送消息,服务端直接把消息回写
###### 客户端日志
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。